In [19]:
import os
import sys
import numpy as np

from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader
import torchvision.models as models

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Fasta File Grabbing from Local Directory Based on Extension

In [2]:
def import_fasta_file(directory):

  fasta_files = []
  if os.path.exists(directory):
    for filename in os.listdir(directory):
      if filename.endswith(".fasta"):
        fasta_files.append(os.path.join(directory, filename))
  return fasta_files[0]

fasta_file_path = import_fasta_file('/content')

if fasta_file_path:
    print(f"Found the following .fasta file:\n{fasta_file_path}") if fasta_file_path else print("No .fasta files found in the specified directory.")


Found the following .fasta file:
/content/ParInva_2_P_mask.fasta


# Class for Fasta File Processing

<b>Key idea:</b> From the initial fasta file we are obtaining `3D numpy arrays` `X_train`, `y_train`, `X_test`, `y_test` which shapes are `number of samples, fragment length, features` for `X` and `number of samples, 1` for `y` with fair class representation and "classic" 80%/20% train/test split.


---


Notes:
 - <b>transform_data()</b> does not explicitly rely on class variables for flexibility in case of "voting system" for contig classification implementation, which will require "higher level variables" processing during eval mode.
 - other variables (like <b>samples</b>) are accessible for the same reason mentioned above and debug.

In [3]:
class Contigs_Processing():
  def __init__(self, file, fragment_size = 1000):
    self.contigs_dict, self.contigs_sequences, self.contigs_labels, self.contigs_sequences_truncated, self.contigs_labels_truncated = self.initial_processing(file, fragment_size)
    self.samples, self.labels = self.transform_data(self.contigs_sequences_truncated, self.contigs_labels_truncated, fragment_size)
    self.X_train, self.y_train, self.X_test, self.y_test = self.split_data()


  def initial_processing(self, file, fragment_size):

    """
    Original fasta file:
      >contig original name
      sequence fr 1
      sequence fr 2
      ...
    to
      {contig original name: full sequence}
    """
    contigs_dict={}
    with open(file, 'r') as f:
      for line in f:
        if line.startswith('>'):
          contig_name = line[1:].strip()
          contigs_dict[contig_name] = ''
        else:
          contigs_dict[contig_name] += line.strip()

    """
    {contig original name: full sequence}
    to
      contigs_sequences -> list of sequences
      contigs_labels -> list of labels: 0 for Para, 1 for Perk

      contigs_sequences_truncated-> list of truncated sequences (each sequence trimmed to N truncated sequences of fragment_size)
      contigs_labels_truncated -> list of labels: 0 for Para, 1 for Perk for expanded version

      for full/truncated: if sequence/fragment < fragment_size => discard
    """

    contigs_sequences = []
    contigs_labels = []

    contigs_sequences_truncated = []
    contigs_labels_truncated = []

    for contig_name, sequence in contigs_dict.items():
      if len(sequence) >= fragment_size:
        if "Para" in contig_name:
          contigs_labels.append(0)
          contigs_sequences.append(sequence)

          for i in range(0, len(sequence) - fragment_size + 1, fragment_size):
            contigs_sequences_truncated.append(sequence[i:i+fragment_size])
            contigs_labels_truncated.append(0)

        elif "Perk" in contig_name:
          contigs_labels.append(1)
          contigs_sequences.append(sequence)

          for i in range(0, len(sequence) - fragment_size + 1, fragment_size):
            contigs_sequences_truncated.append(sequence[i:i+fragment_size])
            contigs_labels_truncated.append(1)

    return contigs_dict, contigs_sequences, contigs_labels, contigs_sequences_truncated, contigs_labels_truncated


  def transform_data(self, sequences, labels, fragment_size):
      """
      Transform a list of truncated DNA sequences and labels into a CNN-ready format.

      Each sequence is converted into a 2D array of shape (frag_length, 5):
        - The first 4 columns are one-hot encoding for nucleotides A, C, G, T.
        - The 5th column is a binary flag for low-complexity regions.

      Parameters:
          sequences (list of str): List of DNA sequences, each of identical length.
          labels (list): Corresponding labels for each sequence.

      Returns:
          X (np.ndarray): 3D array with shape (num_samples, frag_length, 5).
          Y (np.ndarray): Array of labels with shape (num_samples, 1).
      """

      mapping = {'A': 0, 'a':0, 'C': 1, 'c':1, 'G': 2, 'g':2, 'T': 3, 't':3}


      X_list = []
      for seq in sequences:
          feature_arr = np.zeros((fragment_size, 5), dtype=np.float32)

          for j in range(fragment_size):
              char = seq[j]

              # One-hot encoding
              if char in mapping:
                  feature_arr[j, mapping[char]] = 1.0

              # Lowercase => low complexity (1)
              # Uppercase => 0
              if char in 'acgt':
                  feature_arr[j, 4] = 1.0
              else:
                  feature_arr[j, 4] = 0.0

          X_list.append(feature_arr)

      # Individual 2D arrays => 3D tensor: (num_samples, frag_length, 5)
      X = np.array(X_list)

      # (num_samples, ) => (num_samples, 1)
      Y = np.array(labels).reshape(-1, 1)

      return X, Y

  def split_data(self, test_size=0.2, random_state=42):
      """
      Splits self.samples and self.labels into training and testing sets
      while maintaining fair class representation (stratified).

      Parameters:
          test_size (float): Proportion of data to assign to the test set.
          random_state (int): Seed for reproducibility.

      Returns:
          X_train (np.ndarray): Training samples of shape (num_train, frag_length, num_features).
          y_train (np.ndarray): Training labels of shape (num_train, 1).
          X_test (np.ndarray): Testing samples of shape (num_test, frag_length, num_features).
          y_test (np.ndarray): Testing labels of shape (num_test, 1).
      """
      labels_flat = self.labels.flatten()

      X_train, X_test, y_train, y_test = train_test_split(
          self.samples, labels_flat,
          test_size=test_size,
          stratify=labels_flat,
          random_state=random_state
      )

      y_train = y_train.reshape(-1, 1)
      y_test = y_test.reshape(-1, 1)

      return X_train, y_train, X_test, y_test


# DataLoader Dataset

In [4]:
class Fragments_Dataset(Dataset):
    def __init__(self, samples, labels):
        if len(samples) != len(labels):
            raise ValueError("# of samples != # of labels")

        self.samples = torch.tensor(samples, dtype=torch.float)
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        # Return the total number of samples
        return len(self.labels)

    def __getitem__(self, idx):
        return self.samples[idx], self.labels[idx]

# Data Processing

In [5]:
dataset = Contigs_Processing(fasta_file_path)

train_dataset = Fragments_Dataset(dataset.X_train, dataset.y_train)
test_dataset = Fragments_Dataset(dataset.X_test, dataset.y_test)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Model Architecture (Hybrid CNN → LSTM)

Visualization (detailed & simplified) is provided in the end of the notebook. <br>

Conv1d → MaxPool1d → Conv1d → Dropout → LSTM → FC <br>

returns: raw logits (`CrossEntropyLoss` criterion)

In [6]:
class Hybrid_CNN_LSTM_Classifier(nn.Module):
    def __init__(self, num_features = 5, num_classes = 2,
                 input_dim = 64, hidden_dim = 128, num_layers = 1):
        super(Hybrid_CNN_LSTM_Classifier, self).__init__()

        self.conv1 = nn.Conv1d(in_channels=num_features,
                               out_channels=32,
                               kernel_size=5,
                               padding=2
                               )

        self.pool = nn.MaxPool1d(kernel_size=2)

        self.conv2 = nn.Conv1d(in_channels=32,
                               out_channels=64,
                               kernel_size=5,
                               padding=2
                               )

        self.dropout = nn.Dropout(0.3)

        self.lstm = nn.LSTM(input_size=input_dim,
                            hidden_size=hidden_dim,
                            num_layers=num_layers,
                            batch_first=True,
                            bidirectional=True
                            )

        self.fc = nn.Linear(hidden_dim * 2,
                            num_classes
                            )

    def forward(self, x):

        x = x.permute(0, 2, 1) # => (batch_size, num_features, fragment_length)

        # CNN step

        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = self.dropout(x)

        x = x.permute(0, 2, 1) # => (batch_size, pooled_length, out_channels of conv2)

        # LSTM step

        out_lstm, _ = self.lstm(x)
        out_lstm = out_lstm.mean(dim=1)

        # FC layer
        out_fc = self.fc(out_lstm)

        return out_fc

# Training Loop for One Epoch

In [12]:
def train_one_epoch(
    model: nn.Module,
    loader: DataLoader,
    criterion: nn.Module,
    optimizer: optim.Optimizer,
    device: str = "cpu",
    verbose: bool = True,
) -> dict:

    model.train()

    losses = []
    all_preds = []
    all_targets = []

    with tqdm(total=len(loader), desc="training", file=sys.stdout, ncols=100, disable=not verbose) as progress:
        for x_batch, y_true in loader:
            # Move the input and target tensors to the specified device (CPU or GPU)
            x_batch = x_batch.to(device)
            y_true = y_true.squeeze(1).to(device)

            optimizer.zero_grad()

            y_pred = model(x_batch)

            loss = criterion(y_pred, y_true)
            loss.backward()
            losses.append(loss.item())

            preds = y_pred.argmax(dim=1).detach().cpu().numpy()
            labels = y_true.detach().cpu().numpy()
            all_preds.append(preds)
            all_targets.append(labels)

            progress.set_postfix_str(f"loss {losses[-1]:.4f}")

            optimizer.step()
            progress.update(1)

    all_preds = np.concatenate(all_preds)
    all_targets = np.concatenate(all_targets)

    accuracy = accuracy_score(all_targets, all_preds)
    precision = precision_score(all_targets, all_preds, average="macro")
    recall = recall_score(all_targets, all_preds, average="macro")
    f1 = f1_score(all_targets, all_preds, average="macro")

    # Aggregate and return evaluation logs
    logs = {
        "losses": np.array(losses),
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
    }

    return logs

# Validation/Test Inference

In [13]:
@torch.inference_mode()
def evaluate(
    model: nn.Module,
    loader: DataLoader,
    criterion: nn.Module,
    device: str = "cpu",
    verbose: bool = True,
) -> dict:

    model.eval()

    losses = []
    all_preds = []
    all_targets = []

    for x_batch, y_true in tqdm(loader, desc="evaluation", file=sys.stdout, ncols=100, disable=not verbose):
        # Move the input and target tensors to the specified device (CPU or GPU)
        x_batch = x_batch.to(device)
        y_true = y_true.squeeze(1).to(device)

        y_pred = model(x_batch)

        loss = criterion(y_pred, y_true)
        losses.append(loss.item())

        preds = y_pred.argmax(dim=1).detach().cpu().numpy()
        labels = y_true.detach().cpu().numpy()
        all_preds.append(preds)
        all_targets.append(labels)

    all_preds = np.concatenate(all_preds)
    all_targets = np.concatenate(all_targets)

    accuracy = accuracy_score(all_targets, all_preds)
    precision = precision_score(all_targets, all_preds, average="macro")
    recall = recall_score(all_targets, all_preds, average="macro")
    f1 = f1_score(all_targets, all_preds, average="macro")

    # Aggregate and return evaluation logs
    logs = {
        "losses": np.array(losses),
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
    }
    return logs

# Model initialization

- <b>Criterion:</b> `CrossEntropyLoss`
- <b>Optimizer:</b> `Adam` (lr = 10<sup>-3</sup>)

In [14]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device - {device}")

torch.manual_seed(42)

model = Hybrid_CNN_LSTM_Classifier()

model = model.to(device)

print(model)
print("Number of trainable parameters -", sum(p.numel() for p in model.parameters() if p.requires_grad))

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

Device - cuda
Hybrid_CNN_LSTM_Classifier(
  (conv1): Conv1d(5, 32, kernel_size=(5,), stride=(1,), padding=(2,))
  (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv1d(32, 64, kernel_size=(5,), stride=(1,), padding=(2,))
  (dropout): Dropout(p=0.3, inplace=False)
  (lstm): LSTM(64, 128, batch_first=True, bidirectional=True)
  (fc): Linear(in_features=256, out_features=2, bias=True)
)
Number of trainable parameters - 210306


# Training & Evaluation

In [15]:
n_epochs = 25

train_losses = []
train_accuracies = []
train_precisions = []
train_recalls = []
train_f1s = []

valid_losses = []
valid_accuracies = []
valid_precisions = []
valid_recalls = []
valid_f1s = []

for ep in range(n_epochs):
    print(f"\nEpoch {ep + 1:2d}/{n_epochs:2d}")

    # Train the model for one epoch and collect training logs
    train_logs = train_one_epoch(model, train_loader, criterion, optimizer, device, verbose=True)
    train_loss = np.mean(train_logs["losses"])
    train_accuracy = train_logs["accuracy"]
    train_precision = train_logs["precision"]
    train_recall = train_logs["recall"]
    train_f1 = train_logs["f1"]

    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)
    train_precisions.append(train_precision)
    train_recalls.append(train_recall)
    train_f1s.append(train_f1)

    print("Train loss      :", train_loss)
    print("Train accuracy  :", train_accuracy)
    print("Train precision :", train_precision)
    print("Train recall    :", train_recall)
    print("Train F1        :", train_f1)

    # Evaluate the model on the validation set and collect evaluation logs
    valid_logs = evaluate(model, valid_loader, criterion, device, verbose=True)
    valid_loss = np.mean(valid_logs["losses"])
    valid_accuracy = valid_logs["accuracy"]
    valid_precision = valid_logs["precision"]
    valid_recall = valid_logs["recall"]
    valid_f1 = valid_logs["f1"]

    valid_losses.append(valid_loss)
    valid_accuracies.append(valid_accuracy)
    valid_precisions.append(valid_precision)
    valid_recalls.append(valid_recall)
    valid_f1s.append(valid_f1)

    print("Valid loss      :", valid_loss)
    print("Valid accuracy  :", valid_accuracy)
    print("Valid precision :", valid_precision)
    print("Valid recall    :", valid_recall)
    print("Valid F1        :", valid_f1)


Epoch  1/25
training: 100%|██████████████████████████████████████| 860/860 [00:14<00:00, 58.71it/s, loss 0.0039]
Train loss      : 0.1452440449169317
Train accuracy  : 0.9447000418250259
Train precision : 0.8901961332908516
Train recall    : 0.8324756198567391
Train F1        : 0.8581571312062961
evaluation: 100%|████████████████████████████████████████████████| 215/215 [00:01<00:00, 165.59it/s]
Valid loss      : 0.040472659240151906
Valid accuracy  : 0.986470759383183
Valid precision : 0.9791317335909646
Valid recall    : 0.9557067498936836
Valid F1        : 0.9670201792351283

Epoch  2/25
training: 100%|██████████████████████████████████████| 860/860 [00:13<00:00, 63.54it/s, loss 0.0002]
Train loss      : 0.0438037070286999
Train accuracy  : 0.9834700223672964
Train precision : 0.9606138949068812
Train recall    : 0.9607958002701924
Train F1        : 0.9607048228356093
evaluation: 100%|████████████████████████████████████████████████| 215/215 [00:01<00:00, 161.47it/s]
Valid loss    

# Results Visualization

Was build with `plotly` because of the resulting interactivity of the graphs.

In [16]:
epochs = np.arange(1, len(train_losses) + 1)

# Subplots for all the calculated metrics

fig = make_subplots(
    rows=2, cols=3,
    subplot_titles=("Loss", "Accuracy", "Precision", "Recall", "F1 Score", ""),
    horizontal_spacing=0.1,
    vertical_spacing=0.15
)

# Loss
fig.add_trace(
    go.Scatter(x=epochs, y=train_losses, mode='lines+markers', name='Train Loss'),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=epochs, y=valid_losses, mode='lines+markers', name='Valid Loss'),
    row=1, col=1
)

# Accuracy
fig.add_trace(
    go.Scatter(x=epochs, y=train_accuracies, mode='lines+markers', name='Train Accuracy'),
    row=1, col=2
)
fig.add_trace(
    go.Scatter(x=epochs, y=valid_accuracies, mode='lines+markers', name='Valid Accuracy'),
    row=1, col=2
)

# Precision
fig.add_trace(
    go.Scatter(x=epochs, y=train_precisions, mode='lines+markers', name='Train Precision'),
    row=1, col=3
)
fig.add_trace(
    go.Scatter(x=epochs, y=valid_precisions, mode='lines+markers', name='Valid Precision'),
    row=1, col=3
)

# Recall
fig.add_trace(
    go.Scatter(x=epochs, y=train_recalls, mode='lines+markers', name='Train Recall'),
    row=2, col=1
)
fig.add_trace(
    go.Scatter(x=epochs, y=valid_recalls, mode='lines+markers', name='Valid Recall'),
    row=2, col=1
)

# F1 Score
fig.add_trace(
    go.Scatter(x=epochs, y=train_f1s, mode='lines+markers', name='Train F1'),
    row=2, col=2
)
fig.add_trace(
    go.Scatter(x=epochs, y=valid_f1s, mode='lines+markers', name='Valid F1'),
    row=2, col=2
)

# Unused subplot - 2, 3 (hide)
fig.update_xaxes(visible=False, row=2, col=3)
fig.update_yaxes(visible=False, row=2, col=3)

fig.update_layout(
    title="Training and Validation Metrics per Epoch",
    height=600,
    width=1200,
    showlegend=True,
    legend=dict(x=1.02, y=1.0)
)

fig.show()

# Architecture Visualization - Full (`torchviz`) and Simplified (`torchview`)

In [None]:
!pip install torchviz
!pip install torchview

In [24]:
import torch
from torchviz import make_dot
from torchview import draw_graph

In [21]:
# This block was generated with the help of Claude 3.7 Sonnet

model = Hybrid_CNN_LSTM_Classifier()
model.eval()

# Create a dummy input tensor.
# The input shape is (batch_size, fragment_length, num_features).
dummy_input = torch.randn(1, 1000, 5)

# Pass the dummy input through the model to produce an output.
# To generate a scalar for the graph <= take the mean of the output.
output = model(dummy_input)
output_mean = output.mean()

# Generate the visualization graph.
# Pass model's named_parameters to see the parameter names in the graph.
graph = make_dot(output_mean, params=dict(model.named_parameters()),
                 show_attrs=True, show_saved=True)

# Render (save) the graph to an image file.
graph.render("Hybrid_CNN_LSTM_Classifier_architecture", format="svg")

'Hybrid_CNN_LSTM_Classifier_architecture.svg'

Hybrid_CNN_LSTM_Classifier_architecture.svg

In [25]:
model = Hybrid_CNN_LSTM_Classifier()

# Generate a simplified graph
# 'device="meta"' means no memory is consumed for visualization
model_graph = draw_graph(
    model,
    input_size=(64, 1000, 5),  # (batch_size, fragment_length, features)
    device='meta',
    expand_nested=False  # Keep nested modules like LSTM collapsed
)

# Save the visualization
model_graph.visual_graph.render("model_architecture_simplified", format="svg")

'model_architecture_simplified.svg'

model_architecture_simplified.svg
