# Check for GPU
Checks if a GPU is present as it makes processing data much faster.

In [1]:
import torch

def check_gpu():
    if torch.cuda.is_available():
        print("CUDA is available. PyTorch can use the GPU.")
        print(f"Number of GPUs: {torch.cuda.device_count()}")
        print(f"GPU Name: {torch.cuda.get_device_name(0)}")
    else:
        print("CUDA is not available. PyTorch will use the CPU.")

if __name__ == "__main__":
    check_gpu()

CUDA is available. PyTorch can use the GPU.
Number of GPUs: 1
GPU Name: NVIDIA GeForce RTX 4060 Laptop GPU


# Import libraries
Importing relevant libraries for the Large Language Model.

In [3]:
import torch
from transformers import (
    BertTokenizer, 
    BertForSequenceClassification, 
    TrainingArguments, 
    Trainer
)
from Bio import SeqIO
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, 
    roc_auc_score, 
    matthews_corrcoef, 
    confusion_matrix, 
    precision_score, 
    recall_score
)
import random
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


# Colab mounting (if necessary)

In [4]:
# # allow GoogleColab to access GoogleDrive
# from google.colab import drive
# drive.mount('/content/drive')

# Initialize tokenizer (pretrained model)

In [5]:
tokenizer = BertTokenizer.from_pretrained("Rostlab/prot_bert", do_lower_case=False)
model = BertForSequenceClassification.from_pretrained(
    "Rostlab/prot_bert", 
    num_labels=2
)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at Rostlab/prot_bert and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


# Load datasets

In [6]:
def parse_fasta(fasta_file):
    sequences = []
    labels = []
    for record in SeqIO.parse(fasta_file, "fasta"):
        sequence = str(record.seq)
        # Extract the label from the sequence ID (assuming it's at the end and separated by '_')
        label = int(record.id.split('_')[-1])  # Expected to be 0 or 1
        sequences.append(sequence)
        labels.append(label)
    return sequences, labels
train_sequences, train_labels = parse_fasta("Train.fasta")
test_sequences, test_labels = parse_fasta("Test.fasta")

# Splitting Training, Evaluation, and Test Data

In [7]:
train_texts, val_texts, train_labels, val_labels = train_test_split(
    train_sequences,
    train_labels,
    test_size=0.2,
    stratify=train_labels,  # Maintains class distribution
    random_state=42  # Ensures reproducibility
)

# Tokenizing Sequences using Tokenizer

In [8]:
train_encodings = tokenizer(
    train_texts, 
    padding=True, 
    truncation=True, 
    max_length=1200,
    return_tensors='pt'
)
val_encodings = tokenizer(
    val_texts, 
    padding=True, 
    truncation=True, 
    max_length=1200,
    return_tensors='pt'
)
test_encodings = tokenizer(
    test_sequences, 
    padding=True, 
    truncation=True, 
    max_length=1200,
    return_tensors='pt'
)

train_labels = torch.tensor(train_labels)
val_labels = torch.tensor(val_labels)
test_labels = torch.tensor(test_labels)


# `ProteinDataset` class

This code defines a custom ProteinDataset class to create a PyTorch dataset for protein sequence data, and then uses it to create training and validation datasets (train_dataset and val_dataset).

In [9]:
class ProteinDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = ProteinDataset(train_encodings, train_labels)
val_dataset = ProteinDataset(val_encodings, val_labels)
test_dataset = ProteinDataset(test_encodings, test_labels)


# `WeightedTrainer` class
This code defines a custom WeightedTrainer class that inherits from Hugging Face’s Trainer class. Its purpose is to override the default loss computation in the Trainer class so that class weights can be applied when calculating the loss, helping to handle imbalanced data.

In [10]:
class WeightedTrainer(Trainer):
    def __init__(self, class_weights, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights.to(self.model.device)  # Move weights to the correct device

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        
        # Define loss function with class weights
        loss_fct = torch.nn.CrossEntropyLoss(weight=self.class_weights)
        loss = loss_fct(logits, labels)
        
        return (loss, outputs) if return_outputs else loss

We will apply class weights on trainining datasets to deal with class imbalance.

In [11]:
train_lab = train_labels.tolist()
class_counts = [len(train_lab) - sum(train_lab), sum(train_lab)]  # [class 0, class 1]
print("Class Counts:", class_counts)

# Compute class weights inversely proportional to class frequencies
total_samples = sum(class_counts)
class_weights = [total_samples / count for count in class_counts]
print("Class Weights:", class_weights)

# Convert class_weights to a tensor
class_weights = torch.tensor(class_weights, dtype=torch.float)

Class Counts: [28342, 14286]
Class Weights: [1.504057582386564, 2.98390032199356]


# Initialise model and Training

In [12]:
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=10,               # Number of epochs
    per_device_train_batch_size=8,
    gradient_accumulation_steps=2,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=2,
    learning_rate=2e-5,
    logging_dir='./logs',
    logging_steps=10,
    load_best_model_at_end=True,
    metric_for_best_model='accuracy',  # Metric to determine the best model
    greater_is_better=True,            # Whether the metric should be maximized
    seed=42,                           # Ensures reproducibility
    # Additional arguments can be added here
)



# Use GPU if available

In [13]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30, 1024, padding_idx=0)
      (position_embeddings): Embedding(40000, 1024)
      (token_type_embeddings): Embedding(2, 1024)
      (LayerNorm): LayerNorm((1024,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-29): 30 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=1024, out_features=1024, bias=True)
              (key): Linear(in_features=1024, out_features=1024, bias=True)
              (value): Linear(in_features=1024, out_features=1024, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=1024, out_features=1024, bias=True)
              (LayerNorm): LayerNorm((10

In [14]:
for param in model.parameters():
    param.data = param.data.contiguous()

# Computation of Evaluation Metrics

In [17]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    probs = torch.softmax(torch.tensor(logits), dim=-1).numpy()
    
    accuracy = accuracy_score(labels, predictions)
    auc_roc = roc_auc_score(labels, probs[:, 1])
    mcc = matthews_corrcoef(labels, predictions)
    precision = precision_score(labels, predictions)
    recall = recall_score(labels, predictions)
    
    # Compute confusion matrix
    tn, fp, fn, tp = confusion_matrix(labels, predictions).ravel()
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0.0
    
    return {
        'accuracy': accuracy,
        'roc_auc': auc_roc,
        'mcc': mcc,
        'precision': precision,
        'recall': recall,
        'specificity': specificity
    }

In [18]:
trainer = WeightedTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    class_weights=class_weights,      # Pass class_weights here
    compute_metrics=compute_metrics   # Add compute_metrics here
)

# Start Model Training

In [19]:
# Start training
trainer.train()


  0%|          | 2/26640 [00:06<22:29:02,  3.04s/it]

KeyboardInterrupt: 

# Perform Classification

In [None]:
# Create test dataset
test_dataset = ProteinDataset(test_encodings, test_labels)

# Evaluate the model
eval_results = trainer.evaluate(eval_dataset=test_dataset)
print(eval_results)


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
100%|██████████| 1682/1682 [01:05<00:00, 25.53it/s]

{'eval_loss': 0.6811649799346924, 'eval_accuracy': 0.6673357121617604, 'eval_roc_auc': 0.5, 'eval_mcc': 0.0, 'eval_precision': 0.0, 'eval_recall': 0.0, 'eval_specificity': 1.0, 'eval_runtime': 66.1246, 'eval_samples_per_second': 203.434, 'eval_steps_per_second': 25.437, 'epoch': 9.9981234753237}





In [None]:
# Get predictions on the test dataset
predictions = trainer.predict(test_dataset)
pred_labels = predictions.predictions.argmax(-1)  # Convert logits to predicted labels
true_labels = test_labels.numpy()  # Ground truth labels

# Calculate evaluation metrics
accuracy = accuracy_score(true_labels, pred_labels)
auc_roc = roc_auc_score(true_labels, predictions.predictions[:, 1])  # Use positive class probabilities for AUC
mcc = matthews_corrcoef(true_labels, pred_labels)
sensitivity = recall_score(true_labels, pred_labels)  # Sensitivity is the recall for the positive class
tn, fp, fn, tp = confusion_matrix(true_labels, pred_labels).ravel()
specificity = tn / (tn + fp)

# Print metrics
print(f"Accuracy: {accuracy:.2f}")
print(f"AUC-ROC: {auc_roc:.2f}")
print(f"MCC: {mcc:.2f}")
print(f"Sensitivity (Recall): {sensitivity:.2f}")
print(f"Specificity: {specificity:.2f}")

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
100%|██████████| 1682/1682 [01:07<00:00, 24.94it/s]

Accuracy: 0.67
AUC-ROC: 0.50
MCC: 0.00
Sensitivity (Recall): 0.00
Specificity: 1.00





# Save model

In [None]:
model_save_path = "./trained_model"
tokenizer_save_path = "./trained_tokenizer"

# Create directories if they don't exist
os.makedirs(model_save_path, exist_ok=True)
os.makedirs(tokenizer_save_path, exist_ok=True)

# Save the model
trainer.save_model(model_save_path)
print(f"Model saved to {model_save_path}")

# Save the tokenizer
tokenizer.save_pretrained(tokenizer_save_path)
print(f"Tokenizer saved to {tokenizer_save_path}")

Model saved to ./trained_model
Tokenizer saved to ./trained_tokenizer
