# Swiss Model

### Import libraries and load dependencies

In [None]:
import time  # Import time library for sleep functionality
import pandas as pd  # Import pandas library for data manipulation
import requests  # Import requests library for making HTTP requests
import gzip  # Import gzip library for handling gzip files
import os  # Import os library for file and directory operations

from pandarallel import pandarallel  # Import pandarallel for parallel processing with pandas
pandarallel.initialize(progress_bar=True)  # Initialize pandarallel with a progress bar

### Define global variables

In [None]:
CSV_PATH = "../../data/csv/"  # Define the path to the CSV files
PDB_PATH = "../../data/pdb/swiss_model/"  # Define the path to the PDB files
ONE_MINUTE = 60  # Define one minute in seconds
SIX_HOURS = 6 * 60 * 60  # Define six hours in seconds
TOKEN = "e76e1cfea901a4497d7b6007a379939340126b4a"  # Define the authorization token for the API
# TOKEN = "3292b97d245ba99f1f80ad03ab8de69c8ef909f2"  # Another token (commented out)

### Getting models

In [None]:
variant_df = pd.read_csv(f'{CSV_PATH}fasta_variant.csv', sep=';')  # Read the CSV file into a DataFrame
variant_df.head()  # Display the first few rows of the DataFrame

Check and update status:

In [None]:
def check_and_update_status(row):
    variant = row["variant"]  # Get the variant from the row
    filename = variant.replace('_p.', '_')  # Replace '_p.' with '_' in the variant name
    file_path = f"{PDB_PATH}{filename}.pdb"  # Define the file path for the PDB file
    if os.path.isfile(file_path):  # Check if the PDB file exists
        return 'concluded'  # Return 'concluded' if the file exists
    return 'not_concluded'  # Return 'not_concluded' if the file does not exist

def update_swiss_model_status(df):
    if 'swiss_model' not in df.columns:  # Check if 'swiss_model' column is not in the DataFrame
        df['swiss_model'] = 'not_concluded'  # Add 'swiss_model' column with default value 'not_concluded'
    df['swiss_model'] = df.apply(check_and_update_status, axis=1)  # Update 'swiss_model' status for each row
    df.to_csv(f'{CSV_PATH}fasta_variant.csv', index=False, sep=';')  # Save the updated DataFrame to CSV
    print("Status updated based on existing files")  # Print a status update message

Get model functions:

In [None]:
def start_modeling(title, sequence):
    response = requests.post(
        "https://swissmodel.expasy.org/automodel",  # URL for the Swiss Model API
        headers={"Authorization": f"Token {TOKEN}"},  # Authorization header with the token
        json={
            "target_sequences": sequence,  # JSON payload with the target sequences
            "project_title": title  # Project title
        }
    )
    project_id = response.json().get("project_id")  # Get the project ID from the response
    return project_id  # Return the project ID

def wait_modeling(project_id):
    while True:
        response = requests.get(
            f"https://swissmodel.expasy.org/project/{project_id}/models/summary/",  # URL for the project summary
            headers={"Authorization": f"Token {TOKEN}"}  # Authorization header with the token
        )
        status = response.json().get("status", "UNKNOWN")  # Get the status from the response
        if status in ["COMPLETED", "FAILED"]:  # Check if the status is 'COMPLETED' or 'FAILED'
            break  # Exit the loop if modeling is completed or failed
        time.sleep(10)  # Wait for 10 seconds before checking the status again
    if status == "COMPLETED":  # Check if the status is 'COMPLETED'
        model = response.json().get("models", [])[0]  # Get the first model from the response
        return model  # Return the model
    else:
        print("Modeling failed\n")  # Print a failure message
        return None  # Return None if modeling failed

def download_pdb(title, model):
    filename = title.replace('_p.', '_')  # Replace '_p.' with '_' in the title
    url = model["coordinates_url"]  # Get the coordinates URL from the model
    response = requests.get(url)  # Make a GET request to the coordinates URL
    if response.status_code == 200:  # Check if the response status code is 200 (OK)
        with open(f'{PDB_PATH}{filename}.pdb.gz', 'wb') as file:  # Open a gzip file for writing
            file.write(response.content)  # Write the response content to the gzip file
        with gzip.open(f'{PDB_PATH}{filename}.pdb.gz', 'rb') as gz_file:  # Open the gzip file for reading
            with open(f'{PDB_PATH}{filename}.pdb', 'wb') as extracted_file:  # Open the extracted PDB file for writing
                extracted_file.write(gz_file.read())  # Write the extracted content to the PDB file
        os.remove(f'{PDB_PATH}{filename}.pdb.gz')  # Remove the gzip file
    else:
        print(f"Failed to download file. Status code: {response.status_code}")  # Print a failure message with the status code

def get_pdb(title, sequence):
    project_id = start_modeling(title, sequence)  # Start modeling and get the project ID
    model = wait_modeling(project_id)  # Wait for modeling to complete and get the model
    return model  # Return the model

def process_row(row):
    title = row["variant"]  # Get the variant from the row
    sequence = [row["fasta"]]  # Get the sequence from the row
    model = get_pdb(title, sequence)  # Get the PDB model
    download_pdb(title, model)  # Download the PDB file
    return True  # Return True to indicate successful processing

Run the modelling

In [None]:
update_swiss_model_status(variant_df)  # Update the Swiss model status in the DataFrame
not_concluded_df = variant_df[variant_df['swiss_model'] == 'not_concluded']  # Filter rows with 'not_concluded' status
print("Starting modeling...")  # Print a starting message
results = not_concluded_df.parallel_apply(process_row, axis=1)  # Apply the process_row function in parallel
variant_df.loc[not_concluded_df.index, 'swiss_model'] = results.apply(lambda x: 'concluded' if x else 'not_concluded')  # Update the status based on results
print("Successfully modeled")  # Print a success message