In [3]:
!pip install ray

[33mDEPRECATION: graphql-ws 0.3.0 has a non-standard dependency specifier graphql-core>=2.0<3. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of graphql-ws or contact the author to suggest that they release a version with a conforming dependency specifiers. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0m

In [4]:
!pip install captum

!pip install ax-platform

[33mDEPRECATION: graphql-ws 0.3.0 has a non-standard dependency specifier graphql-core>=2.0<3. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of graphql-ws or contact the author to suggest that they release a version with a conforming dependency specifiers. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
Collecting plotly>=5.12.0 (from ax-platform)
  Obtaining dependency information for plotly>=5.12.0 from https://files.pythonhosted.org/packages/00/4e/6258fc3b26f1f7abd1b2e75b1e9e4f12f13584136e2e1549f995ff4c6b7b/plotly-5.20.0-py3-none-any.whl.metadata
  Downloading plotly-5.20.0-py3-none-any.whl.metadata (7.0 kB)
Downloading plotly-5.20.0-py3-none-any.whl (15.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.7/15.7 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h[33mDEPRECATION: graphql-ws 0.3.0 has a non-standard dependency specifier graphql-core>=2.0<3. pip 2

In [7]:
import os
import re
import numpy as np
import pandas as pd
# import ray
import torch
import torch.nn.functional as F
import torch.optim as optim
from torch import nn
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.model_selection import KFold
from sklearn.metrics import precision_score

In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [9]:
# Defining Constants
DATA_PATH = "./data/"
labels_train_path = DATA_PATH + "labels_train.csv"
sample_path = DATA_PATH + "sample.csv"
seqs_test_path = DATA_PATH + "seqs_test.csv"
seqs_train_path = DATA_PATH + "seqs_train.csv"
train_path = DATA_PATH + "train"
test_path = DATA_PATH + "test"

# Amino acid mapping
amino_acid_mapping = {
    'A': 0, 'C': 1, 'D': 2, 'E': 3, 'F': 4,
    'G': 5, 'H': 6, 'I': 7, 'K': 8, 'L': 9,
    'M': 10, 'N': 11, 'P': 12, 'Q': 13, 'R': 14,
    'S': 15, 'T': 16, 'V': 17, 'W': 18, 'Y': 19,
    'X': 20,  
    'B': 21,  
    'Z': 22,  
    'J': 23, 
    '-': 24,  
}

sec_struct_mapping = {'H': 0, 'E': 1, 'C': 2}

Processes the input data from the csv files and uses one-hot encoding to encode the sequences. The pssm profiles are normalized for uniformity and labels if present, are just loaded into a dataframe.

In [10]:
class DataProcessing(Dataset):
    def __init__(self, csv_file, train_dir, label_file=None, normalize_method='min-max'):

        # Load the sequences
        self.seqs = pd.read_csv(csv_file)

        # Load the data from the directory
        self.protein_data = {}
        for filename in os.listdir(train_dir):
            if filename.endswith(".csv"):  # Check if the file is a CSV
                protein_id = re.split(r'_train|_test', filename)[0]
                self.protein_data[protein_id] = pd.read_csv(os.path.join(train_dir, filename))

        # Load the labels (for training data)
        if label_file:
            self.labels = pd.read_csv(label_file)
        else:
            self.labels = None

        # Perform amino acid mapping
        self.amino_acid_mapping = amino_acid_mapping
        self.normalize_method = normalize_method

    def seq_encode(self, sequence):
        #Perform one-hot encoding
        encoded_sequence = np.zeros((len(sequence), len(self.amino_acid_mapping)), dtype=int)
        for i, amino_acid in enumerate(sequence):
            # Set 'X' for unknown amino acids
            index = self.amino_acid_mapping.get(amino_acid, self.amino_acid_mapping['X'])
            encoded_sequence[i, index] = 1
        return encoded_sequence

    def normalize_pssm(self, pssm):
        numeric_columns = pssm[:, 2:]
        try:
            pssm_numeric = numeric_columns.astype(np.float32)
        except ValueError as e:
            raise ValueError(f"Error converting PSSM to float: {e}")

        if self.normalize_method == 'min-max':
            # Min-Max normalization
            pssm_min = pssm_numeric.min(axis=0)
            pssm_max = pssm_numeric.max(axis=0)
            # Ensuring no zero division error
            pssm_range = np.where(pssm_max - pssm_min == 0, 1, pssm_max - pssm_min)
            normalized_pssm = (pssm_numeric - pssm_min) / pssm_range
        elif self.normalize_method == 'z-score':
            # Z-Score normalization
            pssm_mean = pssm_numeric.mean(axis=0)
            pssm_std = pssm_numeric.std(axis=0)
            # Avoid division by zero
            pssm_std = np.where(pssm_std == 0, 1, pssm_std)
            normalized_pssm = (pssm_numeric - pssm_mean) / pssm_std
        else:
            # If no normalization method provided, return the original PSSM
            normalized_pssm = pssm_numeric

        return normalized_pssm

    def __len__(self):
        return len(self.seqs)

    def __getitem__(self, idx):
        protein_id = self.seqs.iloc[idx]['PDB_ID']
        sequence = self.seqs.iloc[idx]['SEQUENCE']
        encoded_sequence = self.seq_encode(sequence)  # Encoding the sequence
        pssm = self.protein_data[protein_id].values 
        normalized_pssm = self.normalize_pssm(pssm) 

        if self.labels is not None:
            label_seq = self.labels.iloc[idx]['SEC_STRUCT']
            label_numeric = [sec_struct_mapping[char] for char in label_seq]
            label_tensor = torch.tensor(label_numeric, dtype=torch.long)
            return (
                protein_id,
                torch.tensor(encoded_sequence, dtype=torch.float32),
                torch.tensor(normalized_pssm, dtype=torch.float32),
                label_tensor
            )

        return (
            protein_id,
            torch.tensor(encoded_sequence, dtype=torch.float32),
            torch.tensor(normalized_pssm, dtype=torch.float32)
        )

In [11]:
def collate(batch):
    _, sequences, pssms, labels_list = zip(*batch)  # Unzip the batch
    # Pad sequences and PSSMs to avoid shape mismatch
    sequences_padded = pad_sequence([seq.clone().detach() for seq in sequences], batch_first=True)
    pssms_padded = pad_sequence([pssm.clone().detach() for pssm in pssms], batch_first=True)
    if labels_list[0] is not None:  # Check if labels exist
        labels_padded = pad_sequence([label.clone().detach() for label in labels_list], batch_first=True)
    else:
        labels_padded = None
    return sequences_padded, pssms_padded, labels_padded


In [12]:
class CNN(nn.Module):
    def __init__(
            self,
            num_classes,
            input_channels,
            hidden_layers_number,
            dropout_rate
    ):
        super(CNN, self).__init__()
        self.hidden_layers = self.get_hidden_layers_size(hidden_layers_number)
        self.dropout_rate = dropout_rate
        self.convs = nn.ModuleList()
        in_channels = input_channels
        for out_channels in self.hidden_layers:
            self.convs.append(
                nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, padding=1)
            )
            in_channels = out_channels 
        # Dropout layer
        self.dropout = nn.Dropout(dropout_rate)
        self.final_conv = nn.Conv1d(in_channels=self.hidden_layers[-1], out_channels=num_classes, kernel_size=1)

    def forward(self, x):
        for conv in self.convs:
            x = F.relu(conv(x))
            x = self.dropout(x)  
        x = self.final_conv(x)
        x = x.transpose(1, 2)
        return x

    def get_hidden_layers_size(self, number):
        hidden_layers_configs = {
            1: [64],  
            2: [64, 128],  
            3: [64, 128, 256],  
            4: [64, 128, 256, 512],  
            5: [64, 128, 256, 512, 1024]  
        }
        return hidden_layers_configs[number]

The train, test and validate model functions are being declared below.

In [13]:
import torch
import matplotlib.pyplot as plt
from sklearn.metrics import precision_score

def train_model(model, criterion, optimizer, train_dataloader, num_epochs, input_type, log_precision=False):
    losses = []
    for epoch in range(num_epochs):
        model.train()  
        epoch_losses = []
        print("Compiler in the train_model")
        for sequences, pssms, labels in train_dataloader:
            if input_type == "pssms":
                inputs = pssms.permute(0, 2, 1) 
            else:
                inputs = sequences.permute(0, 2, 1)  

            inputs = inputs
            labels = labels
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs.transpose(1, 2), labels)
            loss.backward()
            optimizer.step()
            
            epoch_losses.append(loss.item())

        losses.append(epoch_losses)
        
        predictions, labels = [], []
        model.eval() 
        with torch.no_grad():
            for sequences, pssms, labels in train_dataloader:
                inputs = pssms.permute(0, 2, 1)
                labels = labels
                outputs = model(inputs)
                print("predicting...")
                _, predicted = torch.max(outputs.data, 2)
                #predictions.extend(predicted.view(-1).cpu().numpy())
                predicted_list = predicted.view(-1).cpu().numpy().tolist()
                labels_list = labels.view(-1).cpu().numpy().tolist()
                #predictions.extend(predicted_list)
                #labels.extend(labels_list)
        
        # Calculate precision
        epoch_precision = precision_score(labels, predictions, average='macro')
    
        print(f'Epoch {epoch+1}/{num_epochs}, Precision: {epoch_precision:.4f}')
    
    return losses

def plot_loss_curve(losses):
    for fold, loss in enumerate(losses):
        plt.plot(loss, label=f'Epoch {fold + 1}')
    plt.title('Training Loss Curve')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()


In [14]:
def validate_model(model, criterion, val_dataloader, input_type):
    model.eval()  
    predictions, labels = [], []


    with torch.no_grad():
        for sequences, pssms, labels in val_dataloader:
            if input_type == "pssms":
                inputs = pssms.permute(0, 2, 1)  
            else:
                inputs = sequences.permute(0, 2, 1)  

            inputs = inputs
            labels = labels
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 2)
            predictions.extend(predicted.view(-1).cpu().numpy())
            labels.extend(labels.view(-1).cpu().numpy())

    precision = precision_score(labels, predictions, average='macro')
    return precision

In [15]:
def test_model(model, test_dataset, output_file, input_type):
    model.eval()  
    predictions = []

    with torch.no_grad():
        for i in range(len(test_dataset)):  
            pdb_id, sequence, pssm = test_dataset[i] 
            if input_type == "pssms":
                input = pssm.unsqueeze(0).permute(0, 2, 1)  # Adjust dimensions to [1, features, seq_len]
            else:
                input = sequence.unsqueeze(0).permute(0, 2, 1)  # Adjust dimensions to [1, features, seq_len]
            
            # Make a prediction
            outputs = model(input)
            _, predicted = torch.max(outputs, 2)  # Get the index of max log-probability
            # Process the predictions
            seq_len = pssm.shape[0]  
            for j in range(seq_len):
                residue_id = f"{pdb_id}_{j + 1}"  
                structure_label = ['H', 'E', 'C'][predicted[0, j].item()] 
                predictions.append([residue_id, structure_label])

    # Write predictions to a CSV for submission
    pd.DataFrame(predictions, columns=['ID', 'STRUCTURE']).to_csv(output_file, index=False)
    print(f'Submission file saved to {output_file}')

The best optimizer is given as input accordingly and the optimizer function defines the "optimizer" accordingly to avoid hassle.

In [16]:
def get_optimizer(optimizer_type, model, lr, weight_decay):
    if optimizer_type == "adam":
        optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    elif optimizer_type == "sgd":
        optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay)
    elif optimizer_type == "rmsprop":
        optimizer = torch.optim.RMSprop(model.parameters(), lr=lr, weight_decay=weight_decay)
    else:
        raise ValueError("Optimizer not known")
    return optimizer

HYPER-PARAMETER TUNING


The AX-hyper parameter tuning is used here to improve the accuracy of the model. Ry-tune couldnt be used due to some package version mis-match issues.


The below function is to train the model after extracting the best parameters.

In [17]:
import torch
import matplotlib.pyplot as plt

def train_and_val(
        num_folds,
        input_type,
        lr,
        batch_size,
        hidden_layers,
        dropout_rate,
        weight_decay,
        optimizer_type,
        normalization,
        num_epochs,
):
    train_dataset = DataProcessing(csv_file=seqs_train_path, train_dir=train_path, label_file=labels_train_path,
                                   normalize_method=normalization)

    kf = KFold(n_splits=num_folds)   # Initialize KFold
    full_dataset_list = list(range(len(train_dataset)))  # Converting dataset to a list
    fold_precisions = []     # Cross-validation loop
    
    # List to store loss values during training

    for fold, (train_index, val_index) in enumerate(kf.split(full_dataset_list)):
        print(f"Fold {fold + 1}/{num_folds}")
        # Split dataset to training and evaluation set
        train_subsampler = torch.utils.data.SubsetRandomSampler(train_index)
        val_subsampler = torch.utils.data.SubsetRandomSampler(val_index)
        # Create data-loaders 
        train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_subsampler, collate_fn=collate)
        val_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=val_subsampler, collate_fn=collate)
        # Model Initialization
        if input_type == "pssms":
            input_channels = 20
        else:
            input_channels = 25
        model = CNN(
            num_classes=3,
            input_channels=input_channels,
            hidden_layers_number=hidden_layers,
            dropout_rate=dropout_rate
        )
        
        # Loss function and optimizer
        criterion = nn.CrossEntropyLoss()
        optimizer = get_optimizer(optimizer_type, model, lr, weight_decay)

        # Train and validate the model
        print("Training the model...")
        train_model(model, criterion, optimizer, train_loader, num_epochs, input_type)
        
        print("Validating the model...")
        fold_precision = validate_model(model, criterion, val_loader, input_type)
        fold_precisions.append(fold_precision)

    # Calculate average loss and accuracy across all folds created
    avg_precisions = sum(fold_precisions) / num_folds
    print(f"Average Precisions: {avg_precisions:.4f}")

    
    return avg_precisions





The train_test_entire trains and tests the entire data after the model is completely ready with thye best parameters

In [18]:
def train_test_entire(
        input_type,
        lr,
        batch_size,
        hidden_layers,
        dropout_rate,
        weight_decay,
        optimizer_type,
        normalization,
        num_epochs,
        output_file
):
    train_dataset = DataProcessing(csv_file=seqs_train_path, train_dir=train_path, label_file=labels_train_path,
                                   normalize_method=normalization)
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate)
    test_dataset = DataProcessing(csv_file=seqs_test_path, train_dir=test_path, normalize_method=normalization)
    if input_type == "pssms":
        input_channels = 20
    else:
        input_channels = 25
    model = CNN(
        num_classes=3,
        input_channels=input_channels,
        hidden_layers_number=hidden_layers,
        dropout_rate=dropout_rate
    )
    criterion = nn.CrossEntropyLoss()
    optimizer = get_optimizer(optimizer_type, model, lr, weight_decay)
    print("Training model...")
    train_model(model, criterion, optimizer, train_dataloader, num_epochs, input_type)
    print("Testing model...")
    test_model(model, test_dataset, output_file, input_type)

In [19]:
from ax.service.ax_client import AxClient
from ax.service.utils.instantiation import ObjectiveProperties
import os
import numpy as np
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader
from torch._tensor import Tensor
from ax.service.ax_client import AxClient, ObjectiveProperties
from ax.service.utils.report_utils import exp_to_df
from ax.utils.notebook.plotting import init_notebook_plotting, render
from ax.utils.tutorials.cnn_utils import evaluate, load_mnist, train

In [20]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") #Use GPU to speed-up model training , validation and testing

The train_hyper function is used to train the model after getting the best parameters.

In [21]:
def train_hyper(parameterization):
    avg_precision, epoch = train_and_val(
                                num_folds=3,
                                input_type="pssms",  # Can give "sequence" or "pssms" as inputs
                                lr=parameterization["lr"],
                                batch_size=parameterization["batch_size"],
                                hidden_layers=parameterization.get("hidden_layers", 3),
                                dropout_rate=parameterization.get("dropout_rate", 0.5),
                                weight_decay=parameterization.get("weight_decay", 0),
                                optimizer_type=parameterization["optimizer"],
                                normalization=parameterization["normalization"],
                                num_epochs=parameterization.get("epochs", 1),
                            )
    print(f"Accuracy: {avg_precision}, Epoch: {epoch}")
    return {"precision": avg_precision}


Here, the AX experiment is being set-up to run repetitive trials for finding the best parameters. The parameters needed for the model optimization are passed to the training function and the model is trained every-time giving a different value for eachparam every time. Finally after training, the best parameter is chosen and the model can be trained with the best parameter to yield better results while testing.

In [22]:
# Setting up an Ax experiment
ax_client = AxClient(enforce_sequential_optimization=False)
ax_client.create_experiment(
    name="protein_model_experiment",
    parameters=[
        {"name": "lr", "type": "range", "bounds": [0.0001, 0.01], "log_scale": True},
        {"name": "batch_size", "type": "fixed", "value": 4},
        {"name": "hidden_layers", "type": "choice", "values": [4, 5]},
        {"name": "dropout_rate", "type": "range", "bounds": [0.0, 0.7]},
        {"name": "weight_decay", "type": "range", "bounds": [0.0, 0.1]},
        {"name": "epochs", "type": "fixed", "value": 10},
        {"name": "optimizer", "type": "fixed", "value": "rmsprop"},  
        {"name": "normalization", "type": "fixed", "value": "min-max"},

    ],
    objectives={"precision": ObjectiveProperties(minimize=False)}
)

# Running the trials
for i in range(1):  
    params, trial_index = ax_client.get_next_trial()
    metrics = train_hyper(params)
    ax_client.complete_trial(trial_index=trial_index, raw_data=metrics)

# Fetching the best parameters
best_parameters, metrics = ax_client.get_best_parameters()
print(f'Best Parameters: {best_parameters}')
print(metrics)
best_metrics = metrics['objectives']
print(f'Best Precision: {best_metrics}')

[INFO 03-18 11:56:24] ax.service.ax_client: Starting optimization with verbose logging. To disable logging, set the `verbose_logging` argument to `False`. Note that float values in the logs are rounded to 6 decimal points.
[INFO 03-18 11:56:24] ax.service.utils.instantiation: Inferred value type of ParameterType.FLOAT for parameter lr. If that is not the expected value type, you can explicitly specify 'value_type' ('int', 'float', 'bool' or 'str') in parameter dict.
[INFO 03-18 11:56:24] ax.service.utils.instantiation: Inferred value type of ParameterType.INT for parameter hidden_layers. If that is not the expected value type, you can explicitly specify 'value_type' ('int', 'float', 'bool' or 'str') in parameter dict.




[INFO 03-18 11:56:24] ax.service.utils.instantiation: Inferred value type of ParameterType.FLOAT for parameter dropout_rate. If that is not the expected value type, you can explicitly specify 'value_type' ('int', 'float', 'bool' or 'str') in parameter dict.
[INFO 03-1

Fold 1/3
Training the model...
Compiler in the train_model
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
predicting...
pre

ValueError: Found input variables with inconsistent numbers of samples: [2, 0]

Get the best parameters and train-validate (by calling the train_hyper function) the model again to increase the accuracy.

In [None]:
train_and_val(
    num_folds=3,
    input_type="pssms", 
    lr=0.001,
    batch_size=4,
    hidden_layers=3,
    dropout_rate=0.233246,
    weight_decay=0.0,
    optimizer_type='rmsprop',
    normalization='min-max',
    num_epochs=10,
)

After training and validating the model using the best parameters, test the test-data to predict the secondary protein structures of the given input.

In [None]:
train_test_entire(
    input_type="pssms",  # "sequence" or "pssms"
    lr=0.001,
    batch_size=4,
    hidden_layers=3,
    dropout_rate=0.5,
    weight_decay=0.0001,
    optimizer_type='sgd',
    normalization='min-max',
    num_epochs=10,
    output_file='./prediction.csv'
 )