In [None]:
# Applying data augmentation for deep learning model inputs
from Bio import SeqIO

def has_15_consecutive_common(seq1, seq2):
    """
    Check if two sequences have at least 15 consecutive nucleotides in common.
    """
    for i in range(len(seq1) - 14):  # Check for windows of 15 nucleotides
        if seq1[i:i+15] in seq2:
            return True
    return False

def generate_sequences_with_variable_overlap_and_common_bases(seq, k=40, min_overlap=5, max_overlap=20):
    """
    Generate all possible k-mers (subsequences) of length `k` (40 nucleotides) from the sequence with:
    1. Overlaps of varying lengths (5-20 nucleotides).
    2. Sequences that share at least 15 consecutive nucleotides with a previously extracted sequence.
    """
    valid_sequences = set()  # Use a set to ensure uniqueness

    # 1. Generate subsequences with varying overlaps (5 to 20 nucleotides)
    for overlap in range(min_overlap, max_overlap + 1):
        # Left overlap
        for i in range(0, len(seq) - k + 1, k - overlap):
            sub_seq = str(seq[i:i + k])
            valid_sequences.add(sub_seq)
        # Right overlap
        for i in range(overlap, len(seq) - k + 1, k - overlap):
            sub_seq = str(seq[i:i + k])
            valid_sequences.add(sub_seq)
        # Both sides overlap
        for i in range(overlap // 2, len(seq) - k + 1, k - overlap):
            sub_seq = str(seq[i:i + k])
            valid_sequences.add(sub_seq)

    # 2. Generate subsequences with at least 15 consecutive nucleotides in common
    for i in range(len(seq) - k + 1):
        sub_seq = str(seq[i:i + k])
        if not valid_sequences:  # If no sequences yet, add the first one
            valid_sequences.add(sub_seq)
        else:
            # Check if the current subsequence has 15 consecutive nucleotides in common with any previously added one
            if any(has_15_consecutive_common(existing_seq, sub_seq) for existing_seq in valid_sequences):
                valid_sequences.add(sub_seq)

    return list(valid_sequences)

def process_fasta_for_nn(input_fasta, output_file, k=40, min_overlap=5, max_overlap=20):
    """
    Process each sequence in the input FASTA file to generate subsequences with:
    1. Variable overlaps of 5-20 nucleotides.
    2. Sequences sharing at least 15 consecutive nucleotides with previous sequences.
    Ensure that all subsequences are exactly 40 nucleotides long.
    """
    with open(output_file, 'w') as output_handle:
        for record in SeqIO.parse(input_fasta, "fasta"):
            # Generate subsequences with variable overlap and common bases
            overlap_sequences = generate_sequences_with_variable_overlap_and_common_bases(record.seq, k, min_overlap, max_overlap)

            # Combine all sequences and ensure uniqueness by using a set
            all_sequences = set(overlap_sequences)

            label = record.id.split('_')[0]  # Extract label from the header (e.g., "trnR1" from ">trnR1_seq_1")
            for sub_seq in all_sequences:
                output_handle.write(f"{sub_seq}, {label}\n")

# Example usage
input_fasta = "/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Triticum aestivum chloroplast_300 bp up.fasta"
output_file = "/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Triticum aestivum_300N_40N_5to20_overlap.csv"
process_fasta_for_nn(input_fasta, output_file)


In [None]:
# Implementation of CNN-LSTM hybrid model for nucleotide sequences
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import StepLR
from sklearn.metrics import precision_recall_curve

# Load and preprocess the data
def load_data(file):
    df = pd.read_csv(file, header=None)
    sequences = df[0].values
    labels = df[1].values

    # One-hot encode the sequences
    def one_hot_encode_sequence(seq):
        mapping = {'A': [1, 0, 0, 0], 'C': [0, 1, 0, 0], 'G': [0, 0, 1, 0], 'T': [0, 0, 0, 1]}
        return np.array([mapping[nuc] for nuc in seq])

    encoded_sequences = np.array([one_hot_encode_sequence(seq) for seq in sequences])
    encoded_sequences = encoded_sequences.reshape(len(sequences), -1, 4)  # Reshape to (num_samples, seq_len, num_channels)

    # Convert labels to categorical
    unique_labels = np.unique(labels)
    label_dict = {label: idx for idx, label in enumerate(unique_labels)}
    encoded_labels = np.array([label_dict[label] for label in labels])

    return encoded_sequences, encoded_labels, unique_labels

# Build the CNN-LSTM hybrid model
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(CNNLSTMModel, self).__init__()

        # CNN layers
        self.conv1 = nn.Conv1d(in_channels=4, out_channels=64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)

        # LSTM layers
        self.lstm = nn.LSTM(input_size=128, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=0.3, bidirectional=True)

        # Fully connected layers
        self.fc1 = nn.Linear(hidden_size * 2, 100)  # Bidirectional LSTM doubles hidden size
        self.fc2 = nn.Linear(100, 80)
        self.output_layer = nn.Linear(80, num_classes)

        # Dropout layer
        self.dropout = nn.Dropout(0.4)

    def forward(self, x):
        # CNN forward pass
        x = x.permute(0, 2, 1)  # Change to (batch_size, num_channels, seq_len) for Conv1d
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = self.pool(x)

        # LSTM forward pass
        x = x.permute(0, 2, 1)  # Change to (batch_size, seq_len, num_channels) for LSTM
        h0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)  # Bidirectional LSTM
        c0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))  # Output shape: (batch_size, seq_length, hidden_size*2)

        # Take the output from the last time step
        out = out[:, -1, :]

        # Pass through fully connected layers with dropout
        out = torch.relu(self.fc1(out))
        out = self.dropout(out)
        out = torch.relu(self.fc2(out))
        out = self.output_layer(out)

        return out

# Early stopping implementation
class EarlyStopping:
    def __init__(self, patience=5, delta=0):
        self.patience = patience
        self.delta = delta
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

# Checkpoint functions
def save_checkpoint(state, filename='checkpoint.pth.tar'):
    print("=> Saving checkpoint")
    torch.save(state, filename)

def load_checkpoint(filename, model, optimizer):
    print("=> Loading checkpoint")
    checkpoint = torch.load(filename)
    model.load_state_dict(checkpoint['state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer'])
    return checkpoint['epoch']

# Function to evaluate model and return true labels and predicted probabilities
def evaluate_model_with_scores(model, data_loader):
    model.eval()  # Set model to evaluation mode
    all_true_labels = []
    all_pred_scores = []
    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            outputs = model(X_batch)
            predicted_probs = torch.softmax(outputs, dim=1)
            all_true_labels.extend(y_batch.cpu().numpy())
            all_pred_scores.extend(predicted_probs.cpu().numpy())
    return np.array(all_true_labels), np.array(all_pred_scores)

# Load data
input_file = "/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Arabidopsis thaliana_300N_40N_5to20_overlap.csv"  # Input file in the format prepared earlier
X, y, label_names = load_data(input_file)

# Normalize data
scaler = StandardScaler()
X = scaler.fit_transform(X.reshape(X.shape[0], -1)).reshape(X.shape)

# Split into training and testing sets (80% training, 20% testing)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Further split training data for validation (90% training, 10% validation)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=42)

# Convert data to PyTorch tensors and create DataLoader
train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.long))
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.long))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)

# Model parameters
input_size = 4  # Since one-hot encoding produces 4 channels for each nucleotide (A, C, G, T)
sequence_length = X.shape[1]
hidden_size = 128
num_layers = 2  # Number of LSTM layers
num_classes = len(label_names)

# Initialize model, loss function, optimizer, scheduler, and early stopping
model = CNNLSTMModel(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005)
scheduler = StepLR(optimizer, step_size=5, gamma=0.85)
early_stopping = EarlyStopping(patience=8)

# Function to train model
def train_model(model, train_loader, val_loader, test_loader, num_epochs=50, start_epoch=0):
    for epoch in range(start_epoch, num_epochs):
        model.train()
        total_loss, correct, total = 0, 0, 0

        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y_batch).sum().item()
            total += y_batch.size(0)

        train_accuracy = 100 * correct / total
        avg_train_loss = total_loss / len(train_loader)

        # Validation
        model.eval()
        val_loss, correct_val, total_val = 0, 0, 0
        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                outputs = model(X_batch)
                loss = criterion(outputs, y_batch)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct_val += (predicted == y_batch).sum().item()
                total_val += y_batch.size(0)

        val_accuracy = 100 * correct_val / total_val
        avg_val_loss = val_loss / len(val_loader)

        print(f"Epoch [{epoch+1}/{num_epochs}], "
              f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, "
              f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_accuracy:.2f}%")

        early_stopping(avg_val_loss)
        if early_stopping.early_stop:
            print("Early stopping.")
            break

        # Update scheduler
        scheduler.step()

    # Evaluate on test set
    model.eval()
    test_loss, correct_test, total_test = 0, 0, 0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            test_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct_test += (predicted == y_batch).sum().item()
            total_test += y_batch.size(0)

    test_accuracy = 100 * correct_test / total_test
    print(f"Test Loss: {test_loss/len(test_loader):.4f}, Test Acc: {test_accuracy:.2f}%")

    return model

trained_model = train_model(model, train_loader, val_loader, test_loader, num_epochs=50)

# Evaluate and generate true labels and predicted scores for precision-recall curve analysis
true_labels, pred_scores = evaluate_model_with_scores(trained_model, test_loader)

In [None]:
#Evaluation and performance of CNN-LSTM hybrid model for nucleotide sequences through the average test accuracy (%), calculated across all datasets over multiple trials (5 trials) and epochs
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import StepLR

# Load and preprocess the data
def load_data(file):
    df = pd.read_csv(file, header=None)
    sequences = df[0].values
    labels = df[1].values

    # One-hot encode the sequences
    def one_hot_encode_sequence(seq):
        mapping = {'A': [1, 0, 0, 0], 'C': [0, 1, 0, 0], 'G': [0, 0, 1, 0], 'T': [0, 0, 0, 1]}
        return np.array([mapping[nuc] for nuc in seq])

    encoded_sequences = np.array([one_hot_encode_sequence(seq) for seq in sequences])
    encoded_sequences = encoded_sequences.reshape(len(sequences), -1, 4)  # Reshape to (num_samples, seq_len, num_channels)

    # Convert labels to categorical
    unique_labels = np.unique(labels)
    label_dict = {label: idx for idx, label in enumerate(unique_labels)}
    encoded_labels = np.array([label_dict[label] for label in labels])

    return encoded_sequences, encoded_labels, unique_labels

# Build the CNN-LSTM hybrid model
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(CNNLSTMModel, self).__init__()

        # CNN layers
        self.conv1 = nn.Conv1d(in_channels=4, out_channels=64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)

        # LSTM layers
        self.lstm = nn.LSTM(input_size=128, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=0.3, bidirectional=True)

        # Fully connected layers
        self.fc1 = nn.Linear(hidden_size * 2, 100)  # Bidirectional LSTM doubles hidden size
        self.fc2 = nn.Linear(100, 80)
        self.output_layer = nn.Linear(80, num_classes)

        # Dropout layer
        self.dropout = nn.Dropout(0.4)

    def forward(self, x):
        # CNN forward pass
        x = x.permute(0, 2, 1)  # Change to (batch_size, num_channels, seq_len) for Conv1d
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = self.pool(x)

        # LSTM forward pass
        x = x.permute(0, 2, 1)  # Change to (batch_size, seq_len, num_channels) for LSTM
        h0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)  # Bidirectional LSTM
        c0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))  # Output shape: (batch_size, seq_length, hidden_size*2)

        # Take the output from the last time step
        out = out[:, -1, :]

        # Pass through fully connected layers with dropout
        out = torch.relu(self.fc1(out))
        out = self.dropout(out)
        out = torch.relu(self.fc2(out))
        out = self.output_layer(out)

        return out

# Early stopping implementation
class EarlyStopping:
    def __init__(self, patience=5, delta=0):
        self.patience = patience
        self.delta = delta
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

# Function to run multiple trials
def run_multiple_trials(trials, num_epochs=50):
    all_test_accuracies = []
    for trial in range(trials):
        print(f"Trial {trial + 1}/{trials}")

        # Load data
        input_file = "/content/drive/MyDrive/Chlamy_ML/Project 4/Upstream_seq_300N_40N_5to20_overlap.csv"
        X, y, label_names = load_data(input_file)

        # Normalize data
        scaler = StandardScaler()
        X = scaler.fit_transform(X.reshape(X.shape[0], -1)).reshape(X.shape)

        # Split into training, validation, and test sets
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42 + trial)
        X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=42 + trial)

        # Convert data to PyTorch tensors and create DataLoader
        train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
        val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.long))
        test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.long))

        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=32)
        test_loader = DataLoader(test_dataset, batch_size=32)

        # Model parameters
        input_size = 4
        hidden_size = 128
        num_layers = 2
        num_classes = len(label_names)

        # Initialize model, optimizer, and scheduler
        model = CNNLSTMModel(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.0005)
        scheduler = StepLR(optimizer, step_size=5, gamma=0.85)
        early_stopping = EarlyStopping(patience=8)

        # Train the model
        def train_model(num_epochs=50):
            for epoch in range(num_epochs):
                model.train()
                total_loss, correct, total = 0, 0, 0

                for X_batch, y_batch in train_loader:
                    optimizer.zero_grad()
                    outputs = model(X_batch)
                    loss = criterion(outputs, y_batch)
                    loss.backward()
                    optimizer.step()

                    total_loss += loss.item()
                    _, predicted = torch.max(outputs, 1)
                    correct += (predicted == y_batch).sum().item()
                    total += y_batch.size(0)

                # Validation
                model.eval()
                val_loss, correct_val, total_val = 0, 0, 0
                with torch.no_grad():
                    for X_batch, y_batch in val_loader:
                        outputs = model(X_batch)
                        loss = criterion(outputs, y_batch)
                        val_loss += loss.item()
                        _, predicted = torch.max(outputs, 1)
                        correct_val += (predicted == y_batch).sum().item()
                        total_val += y_batch.size(0)

                val_accuracy = 100 * correct_val / total_val
                print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {total_loss/len(train_loader):.4f}, "
                      f"Val Loss: {val_loss/len(val_loader):.4f}, Val Acc: {val_accuracy:.2f}%")

                scheduler.step()

                early_stopping(val_loss)
                if early_stopping.early_stop:
                    print("Early stopping")
                    break

        train_model()

        # Test the model after training
        model.eval()
        correct_test, total_test = 0, 0
        with torch.no_grad():
            for X_batch, y_batch in test_loader:
                outputs = model(X_batch)
                _, predicted = torch.max(outputs, 1)
                correct_test += (predicted == y_batch).sum().item()
                total_test += y_batch.size(0)

        test_accuracy = 100 * correct_test / total_test
        print(f"Trial {trial + 1}/{trials}, Test Accuracy: {test_accuracy:.2f}%")
        all_test_accuracies.append(test_accuracy)

    return all_test_accuracies

# Running multiple trials
results = run_multiple_trials(trials=5)
print("Final Results:", results)


In [None]:
#The CNN-LSTM model’s performance via Pearson correlation coefficient
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import StepLR
import os
from scipy.stats import pearsonr
import matplotlib.pyplot as plt

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load and preprocess the data
def load_data(file):
    df = pd.read_csv(file, header=None)
    sequences = df[0].values
    labels = df[1].values

    # One-hot encode the sequences
    def one_hot_encode_sequence(seq):
        mapping = {'A': [1, 0, 0, 0], 'C': [0, 1, 0, 0], 'G': [0, 0, 1, 0], 'T': [0, 0, 0, 1]}
        return np.array([mapping[nuc] for nuc in seq])

    encoded_sequences = np.array([one_hot_encode_sequence(seq) for seq in sequences])
    encoded_sequences = encoded_sequences.reshape(len(sequences), -1, 4)

    # Convert labels to categorical
    unique_labels = np.unique(labels)
    label_dict = {label: idx for idx, label in enumerate(unique_labels)}
    encoded_labels = np.array([label_dict[label] for label in labels])

    return encoded_sequences, encoded_labels, unique_labels

# CNN-LSTM Model
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(CNNLSTMModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=4, out_channels=64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
        self.lstm = nn.LSTM(input_size=128, hidden_size=hidden_size, num_layers=num_layers,
                            batch_first=True, dropout=0.3, bidirectional=True)
        self.fc1 = nn.Linear(hidden_size * 2, 100)
        self.fc2 = nn.Linear(100, 80)
        self.output_layer = nn.Linear(80, num_classes)
        self.dropout = nn.Dropout(0.4)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = self.pool(x)
        x = x.permute(0, 2, 1)
        h0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        c0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = out[:, -1, :]
        out = torch.relu(self.fc1(out))
        out = self.dropout(out)
        out = torch.relu(self.fc2(out))
        out = self.output_layer(out)
        return out

# Load data
input_file = "/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Arabidopsis thaliana_300N_40N_5to20_overlap.csv"
X, y, label_names = load_data(input_file)

# Normalize data
scaler = StandardScaler()
X = scaler.fit_transform(X.reshape(X.shape[0], -1)).reshape(X.shape)

# Data splits
X_dev, _, y_dev, _ = train_test_split(X, y, test_size=0.9, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=42)

# DataLoader
train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.long))
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.long))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)

# Model parameters
input_size = 4
sequence_length = X.shape[1]
hidden_size = 128
num_layers = 2
num_classes = len(label_names)

# Model, optimizer, scheduler
model = CNNLSTMModel(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005)
scheduler = StepLR(optimizer, step_size=5, gamma=0.85)

# Evaluation and plotting functions
def evaluate_model(model, test_loader):
    model.eval()
    correct_test, total_test, test_loss = 0, 0, 0
    predictions = []
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            test_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            predictions.extend(predicted.cpu().numpy())
            correct_test += (predicted == y_batch).sum().item()
            total_test += y_batch.size(0)
    test_accuracy = 100 * correct_test / total_test
    avg_test_loss = test_loss / len(test_loader)
    print(f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%")
    return predictions, test_accuracy

def calculate_correlation(predictions, true_labels):
    correlation, _ = pearsonr(predictions, true_labels)
    print(f'Pearson correlation coefficient: {correlation:.4f}')
    return correlation

# Function to plot the correlation
def plot_correlation(predictions, true_labels):
    plt.figure(figsize=(8, 6))
    plt.scatter(predictions, [true_labels[i] for i in range(len(true_labels))], alpha=0.6, color='blue', label='Predicted Values')  # Predictions in blue
    plt.scatter([true_labels[i] for i in range(len(true_labels))], true_labels, alpha=0.6, color='red', label='Experimental Values')  # Experimental values in red
    plt.xlabel('Predicted Values', fontsize=14, labelpad=15)  # Adjust font size
    plt.ylabel('Experimental Values', fontsize=14, labelpad=15)  # Adjust font size
    plt.title('Arabidopsis thaliana', fontsize=16)  # Title with font size
    plt.plot([true_labels.min(), true_labels.max()], [true_labels.min(), true_labels.max()], 'k-', lw=2, label='Perfect Correlation')  # Solid black line
    plt.grid(False)
    plt.legend(loc='lower right')  # Positioning the legend in the lower right corner
    plt.show()


# Training function
def train_model(model, train_loader, val_loader, test_loader, num_epochs=50):
    for epoch in range(num_epochs):
        model.train()
        total_loss, correct, total = 0, 0, 0
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y_batch).sum().item()
            total += y_batch.size(0)
        train_accuracy = 100 * correct / total
        avg_train_loss = total_loss / len(train_loader)

        model.eval()
        val_loss, correct_val, total_val = 0, 0, 0
        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                outputs = model(X_batch)
                loss = criterion(outputs, y_batch)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct_val += (predicted == y_batch).sum().item()
                total_val += y_batch.size(0)
        val_accuracy = 100 * correct_val / total_val
        avg_val_loss = val_loss / len(val_loader)

        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, Val Loss: {avg_val_loss:.4f}, Val Acc: {val_accuracy:.2f}%')
        scheduler.step()

    # Final evaluation on test set
    predictions, test_accuracy = evaluate_model(model, test_loader)
    calculate_correlation(predictions, y_test)
    plot_correlation(predictions, y_test)

# Run training
train_model(model, train_loader, val_loader, test_loader, num_epochs=50)


In [None]:
#The CNN-LSTM model’s performance  via precision-recall curve
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import StepLR
from sklearn.metrics import precision_recall_curve, average_precision_score
import matplotlib.pyplot as plt

# Load and preprocess the data
def load_data(file):
    df = pd.read_csv(file, header=None)
    sequences = df[0].values
    labels = df[1].values

    # One-hot encode the sequences
    def one_hot_encode_sequence(seq):
        mapping = {'A': [1, 0, 0, 0], 'C': [0, 1, 0, 0], 'G': [0, 0, 1, 0], 'T': [0, 0, 0, 1]}
        return np.array([mapping[nuc] for nuc in seq])

    encoded_sequences = np.array([one_hot_encode_sequence(seq) for seq in sequences])
    encoded_sequences = encoded_sequences.reshape(len(sequences), -1, 4)

    # Convert labels to categorical
    unique_labels = np.unique(labels)
    label_dict = {label: idx for idx, label in enumerate(unique_labels)}
    encoded_labels = np.array([label_dict[label] for label in labels])

    return encoded_sequences, encoded_labels, unique_labels

# Build the CNN-LSTM hybrid model
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(CNNLSTMModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=4, out_channels=64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
        self.lstm = nn.LSTM(input_size=128, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=0.3, bidirectional=True)
        self.fc1 = nn.Linear(hidden_size * 2, 100)
        self.fc2 = nn.Linear(100, 80)
        self.output_layer = nn.Linear(80, num_classes)
        self.dropout = nn.Dropout(0.4)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = self.pool(x)
        x = x.permute(0, 2, 1)
        h0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        c0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = out[:, -1, :]
        out = torch.relu(self.fc1(out))
        out = self.dropout(out)
        out = torch.relu(self.fc2(out))
        out = self.output_layer(out)
        return out

# Early stopping implementation
class EarlyStopping:
    def __init__(self, patience=5, delta=0):
        self.patience = patience
        self.delta = delta
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

# Function to evaluate model and return true labels and predicted probabilities
def evaluate_model_with_scores(model, data_loader):
    model.eval()
    all_true_labels = []
    all_pred_scores = []
    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            outputs = model(X_batch)
            predicted_probs = torch.softmax(outputs, dim=1)
            all_true_labels.extend(y_batch.cpu().numpy())
            all_pred_scores.extend(predicted_probs.cpu().numpy())
    return np.array(all_true_labels), np.array(all_pred_scores)

# Load data
input_file = "/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Chlamydomonas reinhardtii_300N_40N_5to20_overlap.csv"
X, y, label_names = load_data(input_file)

# Normalize data
scaler = StandardScaler()
X = scaler.fit_transform(X.reshape(X.shape[0], -1)).reshape(X.shape)

# Split into training and testing sets (80% training, 20% testing)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=42)

# Convert data to PyTorch tensors and create DataLoader
train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.long))
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.long))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)

# Model parameters
input_size = 4
sequence_length = X.shape[1]
hidden_size = 128
num_layers = 2
num_classes = len(label_names)

# Initialize model, loss function, optimizer, scheduler, and early stopping
model = CNNLSTMModel(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005)
scheduler = StepLR(optimizer, step_size=5, gamma=0.85)
early_stopping = EarlyStopping(patience=8)

# Function to train model
def train_model(model, train_loader, val_loader, test_loader, num_epochs=50, start_epoch=0):
    for epoch in range(start_epoch, num_epochs):
        model.train()
        total_loss, correct, total = 0, 0, 0
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y_batch).sum().item()
            total += y_batch.size(0)
        train_accuracy = 100 * correct / total
        avg_train_loss = total_loss / len(train_loader)

        # Validation
        model.eval()
        val_loss, correct_val, total_val = 0, 0, 0
        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                outputs = model(X_batch)
                loss = criterion(outputs, y_batch)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct_val += (predicted == y_batch).sum().item()
                total_val += y_batch.size(0)
        val_accuracy = 100 * correct_val / total_val
        avg_val_loss = val_loss / len(val_loader)

        print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, Val Loss: {avg_val_loss:.4f}, Val Acc: {val_accuracy:.2f}%")

        early_stopping(avg_val_loss)
        if early_stopping.early_stop:
            print("Early stopping triggered.")
            break

        scheduler.step()

    # Test the model
    model.eval()
    test_loss, correct_test, total_test = 0, 0, 0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            test_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct_test += (predicted == y_batch).sum().item()
            total_test += y_batch.size(0)

    test_accuracy = 100 * correct_test / total_test
    avg_test_loss = test_loss / len(test_loader)
    print(f"Test Loss: {avg_test_loss:.4f}, Test Acc: {test_accuracy:.2f}%")

    # Precision-recall analysis
    true_labels, predicted_probs = evaluate_model_with_scores(model, test_loader)

    # Randomly select 10 classes for precision-recall curve
    selected_classes = np.random.choice(num_classes, size=10, replace=False)

    plt.figure(figsize=(10, 8))
    for i in selected_classes:
        precision, recall, _ = precision_recall_curve(true_labels == i, predicted_probs[:, i])
        avg_precision = average_precision_score(true_labels == i, predicted_probs[:, i])
        auc = np.trapz(precision, recall)  # Calculate the area under the precision-recall curve
        plt.plot(recall, precision, label=f'Class {label_names[i]} (AP={avg_precision:.2f}, AUC={auc:.2f})')
        # Shade the area under the curve
        plt.fill_between(recall, precision, alpha=0.1)

    plt.xlabel('Recall', fontsize=14, labelpad=15)
    plt.ylabel('Precision', fontsize=14, labelpad=15)
    plt.title('Chlamydomonas reinhardtii', fontsize=16)
    plt.legend()
    plt.grid(False)
    plt.show()

# Train the model
train_model(model, train_loader, val_loader, test_loader)


In [None]:
#The CNN-LSTM model’s performance via Average AUC
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import StepLR
from sklearn.metrics import precision_recall_curve, average_precision_score
from scipy.stats import sem  # for standard error

# Load and preprocess the data
def load_data(file):
    df = pd.read_csv(file, header=None)
    sequences = df[0].values
    labels = df[1].values

    # One-hot encode the sequences
    def one_hot_encode_sequence(seq):
        mapping = {'A': [1, 0, 0, 0], 'C': [0, 1, 0, 0], 'G': [0, 0, 1, 0], 'T': [0, 0, 0, 1]}
        return np.array([mapping[nuc] for nuc in seq])

    encoded_sequences = np.array([one_hot_encode_sequence(seq) for seq in sequences])
    encoded_sequences = encoded_sequences.reshape(len(sequences), -1, 4)

    # Convert labels to categorical
    unique_labels = np.unique(labels)
    label_dict = {label: idx for idx, label in enumerate(unique_labels)}
    encoded_labels = np.array([label_dict[label] for label in labels])

    return encoded_sequences, encoded_labels, unique_labels

# Build the CNN-LSTM hybrid model
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(CNNLSTMModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=4, out_channels=64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
        self.lstm = nn.LSTM(input_size=128, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=0.3, bidirectional=True)
        self.fc1 = nn.Linear(hidden_size * 2, 100)
        self.fc2 = nn.Linear(100, 80)
        self.output_layer = nn.Linear(80, num_classes)
        self.dropout = nn.Dropout(0.4)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = self.pool(x)
        x = x.permute(0, 2, 1)
        h0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        c0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = out[:, -1, :]
        out = torch.relu(self.fc1(out))
        out = self.dropout(out)
        out = torch.relu(self.fc2(out))
        out = self.output_layer(out)
        return out

# Early stopping implementation
class EarlyStopping:
    def __init__(self, patience=5, delta=0):
        self.patience = patience
        self.delta = delta
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

# Function to evaluate model and return true labels and predicted probabilities
def evaluate_model_with_scores(model, data_loader):
    model.eval()
    all_true_labels = []
    all_pred_scores = []
    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            outputs = model(X_batch)
            predicted_probs = torch.softmax(outputs, dim=1)
            all_true_labels.extend(y_batch.cpu().numpy())
            all_pred_scores.extend(predicted_probs.cpu().numpy())
    return np.array(all_true_labels), np.array(all_pred_scores)

# Load data
input_file = "/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Chlorella vulgaris_300N_40N_5to20_overlap.csv"
X, y, label_names = load_data(input_file)

# Normalize data
scaler = StandardScaler()
X = scaler.fit_transform(X.reshape(X.shape[0], -1)).reshape(X.shape)

# Split into training and testing sets (80% training, 20% testing)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=42)

# Convert data to PyTorch tensors and create DataLoader
train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.long))
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.long))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)

# Model parameters
input_size = 4
sequence_length = X.shape[1]
hidden_size = 128
num_layers = 2
num_classes = len(label_names)

# Initialize model, loss function, optimizer, scheduler, and early stopping
model = CNNLSTMModel(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005)
scheduler = StepLR(optimizer, step_size=5, gamma=0.85)
early_stopping = EarlyStopping(patience=8)

# Function to train model
def train_model(model, train_loader, val_loader, test_loader, num_epochs=50, start_epoch=0):
    for epoch in range(start_epoch, num_epochs):
        model.train()
        total_loss, correct, total = 0, 0, 0
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y_batch).sum().item()
            total += y_batch.size(0)
        train_accuracy = 100 * correct / total
        avg_train_loss = total_loss / len(train_loader)

        # Validation
        model.eval()
        val_loss, correct_val, total_val = 0, 0, 0
        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                outputs = model(X_batch)
                loss = criterion(outputs, y_batch)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct_val += (predicted == y_batch).sum().item()
                total_val += y_batch.size(0)
        val_accuracy = 100 * correct_val / total_val
        avg_val_loss = val_loss / len(val_loader)

        print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, Val Loss: {avg_val_loss:.4f}, Val Acc: {val_accuracy:.2f}%")

        early_stopping(avg_val_loss)
        if early_stopping.early_stop:
            print("Early stopping triggered.")
            break

        scheduler.step()

    # Test the model
    model.eval()
    test_loss, correct_test, total_test = 0, 0, 0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            test_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct_test += (predicted == y_batch).sum().item()
            total_test += y_batch.size(0)

    test_accuracy = 100 * correct_test / total_test
    avg_test_loss = test_loss / len(test_loader)
    print(f"Test Loss: {avg_test_loss:.4f}, Test Acc: {test_accuracy:.2f}%")

    # Precision-recall analysis
    true_labels, predicted_probs = evaluate_model_with_scores(model, test_loader)

    # Calculate and print AUC for all classes
    auc_values = []
    for i in range(num_classes):
        precision, recall, _ = precision_recall_curve(true_labels == i, predicted_probs[:, i])
        auc = average_precision_score(true_labels == i, predicted_probs[:, i])
        auc_values.append(auc)
        print(f"AUC for class {label_names[i]}: {auc:.4f}")

    # Print average AUC and standard error
    average_auc = np.mean(auc_values)
    std_error = sem(auc_values)
    print(f"Average AUC: {average_auc:.4f}, Standard Error: {std_error:.4f}")

# Train and evaluate the model
train_model(model, train_loader, val_loader, test_loader)


In [None]:
#The CNN-LSTM model’s performance via feature importance analysis utilizing SHAP
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import StepLR
import shap

# Load and preprocess the data
def load_data(file):
    df = pd.read_csv(file, header=None)
    sequences = df[0].values
    labels = df[1].values

    # One-hot encode the sequences
    def one_hot_encode_sequence(seq):
        mapping = {'A': [1, 0, 0, 0], 'C': [0, 1, 0, 0], 'G': [0, 0, 1, 0], 'T': [0, 0, 0, 1]}
        return np.array([mapping[nuc] for nuc in seq])

    encoded_sequences = np.array([one_hot_encode_sequence(seq) for seq in sequences])
    encoded_sequences = encoded_sequences.reshape(len(sequences), -1, 4)  # Reshape to (num_samples, seq_len, num_channels)

    # Convert labels to categorical
    unique_labels = np.unique(labels)
    label_dict = {label: idx for idx, label in enumerate(unique_labels)}
    encoded_labels = np.array([label_dict[label] for label in labels])

    return encoded_sequences, encoded_labels, unique_labels

# Build the CNN-LSTM hybrid model
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(CNNLSTMModel, self).__init__()

        # CNN layers
        self.conv1 = nn.Conv1d(in_channels=4, out_channels=64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)

        # LSTM layers
        self.lstm = nn.LSTM(input_size=128, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=0.3, bidirectional=True)

        # Fully connected layers
        self.fc1 = nn.Linear(hidden_size * 2, 100)  # Bidirectional LSTM doubles hidden size
        self.fc2 = nn.Linear(100, 80)
        self.output_layer = nn.Linear(80, num_classes)

        # Dropout layer
        self.dropout = nn.Dropout(0.4)

    def forward(self, x):
        # CNN forward pass
        x = x.permute(0, 2, 1)  # Change to (batch_size, num_channels, seq_len) for Conv1d
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = self.pool(x)

        # LSTM forward pass
        x = x.permute(0, 2, 1)  # Change to (batch_size, seq_len, num_channels) for LSTM
        h0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)  # Bidirectional LSTM
        c0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))  # Output shape: (batch_size, seq_length, hidden_size*2)

        # Take the output from the last time step
        out = out[:, -1, :]

        # Pass through fully connected layers with dropout
        out = torch.relu(self.fc1(out))
        out = self.dropout(out)
        out = torch.relu(self.fc2(out))
        out = self.output_layer(out)

        return out

# Define the training function
def train_model(model, train_loader, val_loader, test_loader, num_epochs):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = StepLR(optimizer, step_size=5, gamma=0.85)

    # Loop over epochs
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        scheduler.step()

        train_loss = running_loss / len(train_loader.dataset)
        train_accuracy = correct / total * 100

        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%')

    print('Training complete!')

import matplotlib.pyplot as plt

# Feature Importance with SHAP (Updated Version)
def calculate_shap_importance(model, X_train, X_test):
    # Flatten sequences for SHAP input (samples, features)
    X_train_flat = X_train.reshape(X_train.shape[0], -1)
    X_test_flat = X_test.reshape(X_test.shape[0], -1)

    # SHAP Kernel Explainer requires a model's forward function
    explainer = shap.KernelExplainer(model_forward_fn, X_train_flat[:10])  # Reduced background data for faster computation
    shap_values = explainer.shap_values(X_test_flat[:5])  # Explain fewer samples for speed

    # Plot SHAP values for the first instance
    plt.figure(dpi=300)  # Set high resolution (DPI: 300)
    shap.summary_plot(shap_values, X_test_flat[:5], plot_type="bar")

    # Save the plot
    plt.savefig('SHAP Plot_Chlamydomonas reinhardtii.png', bbox_inches='tight', dpi=300)
    plt.show()



def model_forward_fn(input_data):
    # Reshape the flattened input back to (batch_size, seq_len, num_channels)
    input_data = input_data.reshape(input_data.shape[0], -1, 4)
    input_tensor = torch.tensor(input_data, dtype=torch.float32)

    with torch.no_grad():
        output = model(input_tensor).numpy()

    return output


# Load data
input_file = "/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Chlamydomonas reinhardtii_300N_40N_5to20_overlap.csv"
X, y, label_names = load_data(input_file)

# Normalize data
scaler = StandardScaler()
X = scaler.fit_transform(X.reshape(X.shape[0], -1)).reshape(X.shape)

# Split into training and testing sets (80% training, 20% testing)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Convert data to PyTorch tensors
train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.long))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

# Model parameters
input_size = 4  # Since one-hot encoding produces 4 channels for each nucleotide (A, C, G, T)
sequence_length = X.shape[1]
hidden_size = 128
num_layers = 2
num_classes = len(label_names)

# Initialize model
model = CNNLSTMModel(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes)

# Train the model
num_epochs = 5
train_model(model, train_loader, None, test_loader, num_epochs)

# Run SHAP feature importance analysis
calculate_shap_importance(model, X_train, X_test)



In [None]:
##The CNN-LSTM model’s performance on the protein datasets via average metrics (Macro/Weighted Precision,Recall, and F1 Score) with standard errors
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
from sklearn.model_selection import StratifiedKFold, StratifiedShuffleSplit
from collections import Counter
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data preprocessing function
def one_hot_encode_sequence(seq, alphabet="ACDEFGHIKLMNPQRSTVWY"):
    encoding = np.zeros((len(seq), len(alphabet)), dtype=np.float32)
    for i, aa in enumerate(seq):
        if aa in alphabet:
            encoding[i, alphabet.index(aa)] = 1.0
    return encoding

# Custom dataset class
class ProteinDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return torch.tensor(self.sequences[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.long)

# Model definition with dropout and an additional dense layer
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, dropout_rate=0.5):
        super(CNNLSTMModel, self).__init__()

        # Convolutional layers
        self.conv1 = nn.Conv1d(input_size, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)

        # LSTM layer
        self.lstm = nn.LSTM(input_size=128, hidden_size=hidden_size, num_layers=1, batch_first=True)

        # Dropout layer for regularization
        self.dropout = nn.Dropout(dropout_rate)

        # Fully connected layers
        self.fc1 = nn.Linear(hidden_size, 64)  # Additional dense layer
        self.fc2 = nn.Linear(64, num_classes)  # Final layer

    def forward(self, x):
        x = x.permute(0, 2, 1)  # Change to (batch, channels, sequence_length)
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))

        x = x.permute(0, 2, 1)  # Change to (batch, sequence_length, channels)
        x, _ = self.lstm(x)
        x = x[:, -1, :]  # Take the last hidden state

        x = self.dropout(torch.relu(self.fc1(x)))  # Apply dropout before the last layer
        out = self.fc2(x)
        return out

# Load data and preprocess
def load_data(file_path):
    data = pd.read_csv(file_path, header=None, names=["sequence", "label"])
    sequences = [one_hot_encode_sequence(seq) for seq in data["sequence"]]
    labels = data["label"].astype("category").cat.codes
    return np.array(sequences), np.array(labels), data["label"].astype("category").cat.categories

# Calculate class weights
def calculate_class_weights(labels, num_classes):
    label_counts = Counter(labels)
    total_samples = sum(label_counts.values())
    weights = [total_samples / label_counts[i] if i in label_counts else 0.0 for i in range(num_classes)]
    return torch.tensor(weights).to(device)

# Optional Focal Loss definition
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, inputs, targets):
        ce_loss = nn.CrossEntropyLoss()(inputs, targets)
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
        return focal_loss

# Training function
def train_model(model, train_loader, criterion, optimizer, num_epochs=50):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0.0
        all_preds, all_labels = [], []

        for sequences, labels in train_loader:
            sequences, labels = sequences.to(device), labels.to(device)

            outputs = model(sequences)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

        # Calculate weighted and macro precision, recall, and F1 scores
        precision_macro = precision_score(all_labels, all_preds, average='macro', zero_division=0)
        recall_macro = recall_score(all_labels, all_preds, average='macro', zero_division=0)
        f1_macro = f1_score(all_labels, all_preds, average='macro', zero_division=0)

        precision_weighted = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
        recall_weighted = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
        f1_weighted = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss:.4f}")
        print(f"Weighted - Precision: {precision_weighted:.4f}, Recall: {recall_weighted:.4f}, F1 Score: {f1_weighted:.4f}")
        print(f"Macro - Precision: {precision_macro:.4f}, Recall: {recall_macro:.4f}, F1 Score: {f1_macro:.4f}")

# Evaluation function with confusion matrix
def evaluate_model(model, data_loader, criterion):
    model.eval()
    with torch.no_grad():
        all_preds, all_labels = [], []
        total_loss = 0.0
        for sequences, labels in data_loader:
            sequences, labels = sequences.to(device), labels.to(device)
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

        # Calculate weighted and macro precision, recall, and F1 scores
        precision_macro = precision_score(all_labels, all_preds, average='macro', zero_division=0)
        recall_macro = recall_score(all_labels, all_preds, average='macro', zero_division=0)
        f1_macro = f1_score(all_labels, all_preds, average='macro', zero_division=0)

        precision_weighted = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
        recall_weighted = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
        f1_weighted = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

        # Print metrics
        print(f"Loss: {total_loss:.4f}")
        print(f"Weighted - Precision: {precision_weighted:.4f}, Recall: {recall_weighted:.4f}, F1 Score: {f1_weighted:.4f}")
        print(f"Macro - Precision: {precision_macro:.4f}, Recall: {recall_macro:.4f}, F1 Score: {f1_macro:.4f}")

        return all_labels, all_preds

import numpy as np

# Main function with multiple-run Stratified K-Fold Cross-Validation
def main():
    # Load and prepare data
    file_path = '/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Protein sequnces/Arabidopsis thaliana_Protein_40N_5to20_overlap.csv'
    sequences, labels, unique_labels = load_data(file_path)

    # Parameters for multiple runs
    num_runs = 3  # Define the number of runs with different seeds
    num_folds = 5  # Define the number of folds for each Stratified K-Fold

    # Initialize lists to store performance metrics
    precision_macro_list, recall_macro_list, f1_macro_list = [], [], []
    precision_weighted_list, recall_weighted_list, f1_weighted_list = [], [], []

    for run in range(num_runs):
        print(f"\n--- Run {run + 1} ---")

        # Set a new seed for each run to vary the split
        seed = np.random.randint(10000)
        skf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=seed)

        # Store performance metrics for each fold in the current run
        run_precision_macro, run_recall_macro, run_f1_macro = [], [], []
        run_precision_weighted, run_recall_weighted, run_f1_weighted = [], [], []

        for fold, (train_idx, test_idx) in enumerate(skf.split(sequences, labels)):
            print(f"\nFold {fold + 1} (Seed: {seed})")

            # Split data into training and testing sets
            train_sequences, test_sequences = sequences[train_idx], sequences[test_idx]
            train_labels, test_labels = labels[train_idx], labels[test_idx]

            # Stratified Shuffle Split for validation
            sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=seed)
            for train_idx_val, val_idx in sss.split(train_sequences, train_labels):
                train_sequences_split, val_sequences = train_sequences[train_idx_val], train_sequences[val_idx]
                train_labels_split, val_labels = train_labels[train_idx_val], train_labels[val_idx]

            # Create datasets and dataloaders
            train_dataset = ProteinDataset(train_sequences_split, train_labels_split)
            val_dataset = ProteinDataset(val_sequences, val_labels)
            test_dataset = ProteinDataset(test_sequences, test_labels)

            train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
            val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
            test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

            # Create model, define loss and optimizer
            num_classes = len(unique_labels)
            model = CNNLSTMModel(input_size=20, hidden_size=64, num_classes=num_classes).to(device)
            criterion = nn.CrossEntropyLoss(weight=calculate_class_weights(train_labels_split, num_classes))
            optimizer = optim.Adam(model.parameters(), lr=0.001)

            # Train the model
            train_model(model, train_loader, criterion, optimizer, num_epochs=50)

            # Evaluate on validation set
            print("Validation Set:")
            evaluate_model(model, val_loader, criterion)

            # Evaluate on test set and store metrics
            print("Test Set:")
            test_labels, test_preds = evaluate_model(model, test_loader, criterion)

            # Calculate fold metrics
            precision_macro = precision_score(test_labels, test_preds, average='macro', zero_division=0)
            recall_macro = recall_score(test_labels, test_preds, average='macro', zero_division=0)
            f1_macro = f1_score(test_labels, test_preds, average='macro', zero_division=0)

            precision_weighted = precision_score(test_labels, test_preds, average='weighted', zero_division=0)
            recall_weighted = recall_score(test_labels, test_preds, average='weighted', zero_division=0)
            f1_weighted = f1_score(test_labels, test_preds, average='weighted', zero_division=0)

            # Append fold metrics to the current run's list
            run_precision_macro.append(precision_macro)
            run_recall_macro.append(recall_macro)
            run_f1_macro.append(f1_macro)
            run_precision_weighted.append(precision_weighted)
            run_recall_weighted.append(recall_weighted)
            run_f1_weighted.append(f1_weighted)

        # Store average metrics across folds for the current run
        precision_macro_list.append(np.mean(run_precision_macro))
        recall_macro_list.append(np.mean(run_recall_macro))
        f1_macro_list.append(np.mean(run_f1_macro))
        precision_weighted_list.append(np.mean(run_precision_weighted))
        recall_weighted_list.append(np.mean(run_recall_weighted))
        f1_weighted_list.append(np.mean(run_f1_weighted))

    # Calculate final average and standard error across all runs
    def mean_std_err(metrics):
        return np.mean(metrics), np.std(metrics) / np.sqrt(len(metrics))

    # Print average metrics with standard errors
    print("\n--- Overall Results ---")
    print(f"Macro Precision: {mean_std_err(precision_macro_list)}")
    print(f"Macro Recall: {mean_std_err(recall_macro_list)}")
    print(f"Macro F1 Score: {mean_std_err(f1_macro_list)}")
    print(f"Weighted Precision: {mean_std_err(precision_weighted_list)}")
    print(f"Weighted Recall: {mean_std_err(recall_weighted_list)}")
    print(f"Weighted F1 Score: {mean_std_err(f1_weighted_list)}")

if __name__ == "__main__":
    main()



In [None]:
#The CNN-LSTM model’s performance on the protein datasets via confusion_matrix
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
from sklearn.model_selection import StratifiedKFold, StratifiedShuffleSplit
from collections import Counter
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data preprocessing function
def one_hot_encode_sequence(seq, alphabet="ACDEFGHIKLMNPQRSTVWY"):
    encoding = np.zeros((len(seq), len(alphabet)), dtype=np.float32)
    for i, aa in enumerate(seq):
        if aa in alphabet:
            encoding[i, alphabet.index(aa)] = 1.0
    return encoding

# Custom dataset class
class ProteinDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return torch.tensor(self.sequences[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.long)

# Model definition with dropout and an additional dense layer
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, dropout_rate=0.5):
        super(CNNLSTMModel, self).__init__()

        # Convolutional layers
        self.conv1 = nn.Conv1d(input_size, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)

        # LSTM layer
        self.lstm = nn.LSTM(input_size=128, hidden_size=hidden_size, num_layers=1, batch_first=True)

        # Dropout layer for regularization
        self.dropout = nn.Dropout(dropout_rate)

        # Fully connected layers
        self.fc1 = nn.Linear(hidden_size, 64)  # Additional dense layer
        self.fc2 = nn.Linear(64, num_classes)  # Final layer

    def forward(self, x):
        x = x.permute(0, 2, 1)  # Change to (batch, channels, sequence_length)
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))

        x = x.permute(0, 2, 1)  # Change to (batch, sequence_length, channels)
        x, _ = self.lstm(x)
        x = x[:, -1, :]  # Take the last hidden state

        x = self.dropout(torch.relu(self.fc1(x)))  # Apply dropout before the last layer
        out = self.fc2(x)
        return out

# Load data and preprocess
def load_data(file_path):
    data = pd.read_csv(file_path, header=None, names=["sequence", "label"])
    sequences = [one_hot_encode_sequence(seq) for seq in data["sequence"]]
    labels = data["label"].astype("category").cat.codes
    return np.array(sequences), np.array(labels), data["label"].astype("category").cat.categories

# Calculate class weights
def calculate_class_weights(labels, num_classes):
    label_counts = Counter(labels)
    total_samples = sum(label_counts.values())
    weights = [total_samples / label_counts[i] if i in label_counts else 0.0 for i in range(num_classes)]
    return torch.tensor(weights).to(device)

# Optional Focal Loss definition
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, inputs, targets):
        ce_loss = nn.CrossEntropyLoss()(inputs, targets)
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
        return focal_loss

# Training function
def train_model(model, train_loader, criterion, optimizer, num_epochs=50):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0.0
        all_preds, all_labels = [], []

        for sequences, labels in train_loader:
            sequences, labels = sequences.to(device), labels.to(device)

            outputs = model(sequences)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

        # Calculate weighted and macro precision, recall, and F1 scores
        precision_macro = precision_score(all_labels, all_preds, average='macro', zero_division=0)
        recall_macro = recall_score(all_labels, all_preds, average='macro', zero_division=0)
        f1_macro = f1_score(all_labels, all_preds, average='macro', zero_division=0)

        precision_weighted = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
        recall_weighted = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
        f1_weighted = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss:.4f}")
        print(f"Weighted - Precision: {precision_weighted:.4f}, Recall: {recall_weighted:.4f}, F1 Score: {f1_weighted:.4f}")
        print(f"Macro - Precision: {precision_macro:.4f}, Recall: {recall_macro:.4f}, F1 Score: {f1_macro:.4f}")

# Evaluation function with confusion matrix
def evaluate_model(model, data_loader, criterion):
    model.eval()
    with torch.no_grad():
        all_preds, all_labels = [], []
        total_loss = 0.0
        for sequences, labels in data_loader:
            sequences, labels = sequences.to(device), labels.to(device)
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

        # Calculate weighted and macro precision, recall, and F1 scores
        precision_macro = precision_score(all_labels, all_preds, average='macro', zero_division=0)
        recall_macro = recall_score(all_labels, all_preds, average='macro', zero_division=0)
        f1_macro = f1_score(all_labels, all_preds, average='macro', zero_division=0)

        precision_weighted = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
        recall_weighted = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
        f1_weighted = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

        # Print metrics
        print(f"Loss: {total_loss:.4f}")
        print(f"Weighted - Precision: {precision_weighted:.4f}, Recall: {recall_weighted:.4f}, F1 Score: {f1_weighted:.4f}")
        print(f"Macro - Precision: {precision_macro:.4f}, Recall: {recall_macro:.4f}, F1 Score: {f1_macro:.4f}")

        return all_labels, all_preds

# Main function to prepare data, create the model, and train/test with Stratified K-Fold
def main():
    # Load and prepare data
    file_path = '/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Protein sequnces/A. thaliana_Protein_Chloroplast_40N_5to20_overlap.csv'
    sequences, labels, unique_labels = load_data(file_path)

    # Stratified K-Fold Cross-Validation
    skf = StratifiedKFold(n_splits=5)
    overall_cm = np.zeros((len(unique_labels), len(unique_labels)), dtype=int)

    for fold, (train_idx, test_idx) in enumerate(skf.split(sequences, labels)):
        print(f"Fold {fold + 1}")

        # Split data into training and testing sets
        train_sequences, test_sequences = sequences[train_idx], sequences[test_idx]
        train_labels, test_labels = labels[train_idx], labels[test_idx]

        # Stratified Shuffle Split for validation
        sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
        for train_idx_val, val_idx in sss.split(train_sequences, train_labels):
            train_sequences_split, val_sequences = train_sequences[train_idx_val], train_sequences[val_idx]
            train_labels_split, val_labels = train_labels[train_idx_val], train_labels[val_idx]

        # Create datasets and dataloaders
        train_dataset = ProteinDataset(train_sequences_split, train_labels_split)
        val_dataset = ProteinDataset(val_sequences, val_labels)
        test_dataset = ProteinDataset(test_sequences, test_labels)

        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
        test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

        # Create model, define loss and optimizer
        num_classes = len(unique_labels)
        model = CNNLSTMModel(input_size=20, hidden_size=64, num_classes=num_classes).to(device)
        criterion = nn.CrossEntropyLoss(weight=calculate_class_weights(train_labels_split, num_classes))
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        # Train the model
        train_model(model, train_loader, criterion, optimizer, num_epochs=50)

        # Evaluate on validation set
        print("Validation Set:")
        val_labels, val_preds = evaluate_model(model, val_loader, criterion)

        # Evaluate on test set and accumulate confusion matrix
        print("Test Set:")
        test_labels, test_preds = evaluate_model(model, test_loader, criterion)
        cm = confusion_matrix(test_labels, test_preds, labels=np.arange(num_classes))
        overall_cm += cm


    # Final confusion matrix heatmap
    plt.figure(figsize=(14, 12))
    # Ensure the confusion matrix includes all labels
    sns.heatmap(overall_cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=unique_labels, yticklabels=unique_labels,
            cbar_kws={'label': 'Number of Predictions'})
    plt.ylabel('True Label', fontsize=16, labelpad=15)
    plt.xlabel('Predicted Label', fontsize=16, labelpad=15)
    plt.title('Arabidopsis thaliana', fontsize=18)
    plt.xticks(rotation=45)
    plt.yticks(rotation=0)
    plt.tight_layout()
    plt.savefig('Arabidopsis thaliana_confusion_matrix.png', dpi=300)
    plt.show()


if __name__ == "__main__":
    main()


In [None]:
##The CNN-LSTM model’s performance on the protein datasets via Training and validation performance
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import StepLR
from sklearn.metrics import precision_recall_curve
import matplotlib.pyplot as plt  # Importing matplotlib for plotting

# Load and preprocess the data
def load_data(file):
    df = pd.read_csv(file, header=None)
    sequences = df[0].values
    labels = df[1].values

    # One-hot encode the sequences
    def one_hot_encode_sequence(seq):
        mapping = {'A': [1, 0, 0, 0], 'C': [0, 1, 0, 0], 'G': [0, 0, 1, 0], 'T': [0, 0, 0, 1]}
        return np.array([mapping[nuc] for nuc in seq])

    encoded_sequences = np.array([one_hot_encode_sequence(seq) for seq in sequences])
    encoded_sequences = encoded_sequences.reshape(len(sequences), -1, 4)  # Reshape to (num_samples, seq_len, num_channels)

    # Convert labels to categorical
    unique_labels = np.unique(labels)
    label_dict = {label: idx for idx, label in enumerate(unique_labels)}
    encoded_labels = np.array([label_dict[label] for label in labels])

    return encoded_sequences, encoded_labels, unique_labels

# Build the CNN-LSTM hybrid model
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(CNNLSTMModel, self).__init__()

        # CNN layers
        self.conv1 = nn.Conv1d(in_channels=4, out_channels=64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)

        # LSTM layers
        self.lstm = nn.LSTM(input_size=128, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=0.3, bidirectional=True)

        # Fully connected layers
        self.fc1 = nn.Linear(hidden_size * 2, 100)  # Bidirectional LSTM doubles hidden size
        self.fc2 = nn.Linear(100, 80)
        self.output_layer = nn.Linear(80, num_classes)

        # Dropout layer
        self.dropout = nn.Dropout(0.4)

    def forward(self, x):
        # CNN forward pass
        x = x.permute(0, 2, 1)  # Change to (batch_size, num_channels, seq_len) for Conv1d
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = self.pool(x)

        # LSTM forward pass
        x = x.permute(0, 2, 1)  # Change to (batch_size, seq_len, num_channels) for LSTM
        h0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)  # Bidirectional LSTM
        c0 = torch.zeros(2 * self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))  # Output shape: (batch_size, seq_length, hidden_size*2)

        # Take the output from the last time step
        out = out[:, -1, :]

        # Pass through fully connected layers with dropout
        out = torch.relu(self.fc1(out))
        out = self.dropout(out)
        out = torch.relu(self.fc2(out))
        out = self.output_layer(out)

        return out

# Early stopping implementation
class EarlyStopping:
    def __init__(self, patience=5, delta=0):
        self.patience = patience
        self.delta = delta
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

# Checkpoint functions
def save_checkpoint(state, filename='checkpoint.pth.tar'):
    print("=> Saving checkpoint")
    torch.save(state, filename)

def load_checkpoint(filename, model, optimizer):
    print("=> Loading checkpoint")
    checkpoint = torch.load(filename)
    model.load_state_dict(checkpoint['state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer'])
    return checkpoint['epoch']

# Function to evaluate model and return true labels and predicted probabilities
def evaluate_model_with_scores(model, data_loader):
    model.eval()  # Set model to evaluation mode
    all_true_labels = []
    all_pred_scores = []
    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            outputs = model(X_batch)
            predicted_probs = torch.softmax(outputs, dim=1)
            all_true_labels.extend(y_batch.cpu().numpy())
            all_pred_scores.extend(predicted_probs.cpu().numpy())
    return np.array(all_true_labels), np.array(all_pred_scores)

# Load data
input_file = "/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Chlamydomonas reinhardtii_300 bp up.csv"  # Input file in the format prepared earlier
X, y, label_names = load_data(input_file)

# Normalize data
scaler = StandardScaler()
X = scaler.fit_transform(X.reshape(X.shape[0], -1)).reshape(X.shape)

# Split into training and testing sets (80% training, 20% testing)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Further split training data for validation (90% training, 10% validation)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=42)

# Convert data to PyTorch tensors and create DataLoader
train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.long))
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.long))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)

# Model parameters
input_size = 4  # Since one-hot encoding produces 4 channels for each nucleotide (A, C, G, T)
sequence_length = X.shape[1]
hidden_size = 128
num_layers = 2  # Number of LSTM layers
num_classes = len(label_names)

# Initialize model, loss function, optimizer, scheduler, and early stopping
model = CNNLSTMModel(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005)
scheduler = StepLR(optimizer, step_size=5, gamma=0.85)
early_stopping = EarlyStopping(patience=8)

# Function to train model
def train_model(model, train_loader, val_loader, test_loader, num_epochs=50, start_epoch=0):
    # Lists to store losses and accuracies
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []

    for epoch in range(start_epoch, num_epochs):
        model.train()
        total_loss, correct, total = 0, 0, 0

        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y_batch).sum().item()
            total += y_batch.size(0)

        train_accuracy = 100 * correct / total
        avg_train_loss = total_loss / len(train_loader)
        train_losses.append(avg_train_loss)
        train_accuracies.append(train_accuracy)

        # Validation
        model.eval()
        val_loss, correct_val, total_val = 0, 0, 0
        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                outputs = model(X_batch)
                loss = criterion(outputs, y_batch)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct_val += (predicted == y_batch).sum().item()
                total_val += y_batch.size(0)

        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = 100 * correct_val / total_val
        val_losses.append(avg_val_loss)
        val_accuracies.append(val_accuracy)

        print(f'Epoch [{epoch + 1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Train Acc: {train_accuracy:.2f}, Val Loss: {avg_val_loss:.4f}, Val Acc: {val_accuracy:.2f}')

        # Learning rate scheduler step
        scheduler.step()

        # Early stopping check
        early_stopping(avg_val_loss)
        if early_stopping.early_stop:
            print("Early stopping")
            break

    # After training, evaluate model performance on the test set
    true_labels, pred_scores = evaluate_model_with_scores(model, test_loader)
    return train_losses, val_losses, train_accuracies, val_accuracies

# Train the model
train_losses, val_losses, train_accuracies, val_accuracies = train_model(model, train_loader, val_loader, test_loader)

# Plot training and validation loss and accuracy in a single plot
fig, ax1 = plt.subplots(figsize=(10, 6))

# Plot loss on the primary y-axis (left side)
train_loss_line, = ax1.plot(train_losses, label='Training Loss', color='blue', marker='o')
val_loss_line, = ax1.plot(val_losses, label='Validation Loss', color='orange', marker='x')

# Label for the left y-axis (Loss)
ax1.set_ylabel('Loss', fontsize=16, labelpad=15)
ax1.set_xlabel('Epochs', fontsize=16, labelpad=15)

# Create a second y-axis for accuracy
ax2 = ax1.twinx()
train_accuracy_line, = ax2.plot(train_accuracies, label='Training Accuracy', color='green', marker='s')
val_accuracy_line, = ax2.plot(val_accuracies, label='Validation Accuracy', color='red', marker='d')

# Label for the right y-axis (Accuracy)
ax2.set_ylabel('Accuracy (%)', fontsize=16, labelpad=15)

# Title for the plot
plt.title('Chlamydomonas reinhardtii', fontsize=18)

# Combine legends - avoiding duplicate accuracy entries
lines = [train_loss_line, val_loss_line, train_accuracy_line, val_accuracy_line]
labels = ['Training Loss', 'Validation Loss', 'Training Accuracy', 'Validation Accuracy']
ax1.legend(lines, labels, loc='upper center', bbox_to_anchor=(0.87, 0.6))

# Save the plot in high resolution
plt.tight_layout()
plt.savefig("training_validation_plot_Chlamydomonas reinhardtii_no.png", dpi=300, bbox_inches='tight')  # Save as PNG with 300 DPI

# Display the plot
plt.show()




In [None]:
#The CNN-LSTM model’s performance on the protein datasets via Loss,Recall & F1 Score
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.model_selection import StratifiedKFold, StratifiedShuffleSplit
from collections import Counter
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data preprocessing function
def one_hot_encode_sequence(seq, alphabet="ACDEFGHIKLMNPQRSTVWY"):
    encoding = np.zeros((len(seq), len(alphabet)), dtype=np.float32)
    for i, aa in enumerate(seq):
        if aa in alphabet:
            encoding[i, alphabet.index(aa)] = 1.0
    return encoding

# Custom dataset class
class ProteinDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return torch.tensor(self.sequences[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.long)

# Model definition with dropout and an additional dense layer
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, dropout_rate=0.5):
        super(CNNLSTMModel, self).__init__()

        # Convolutional layers
        self.conv1 = nn.Conv1d(input_size, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)

        # LSTM layer
        self.lstm = nn.LSTM(input_size=128, hidden_size=hidden_size, num_layers=1, batch_first=True)

        # Dropout layer for regularization
        self.dropout = nn.Dropout(dropout_rate)

        # Fully connected layers
        self.fc1 = nn.Linear(hidden_size, 64)  # Additional dense layer
        self.fc2 = nn.Linear(64, num_classes)  # Final layer

    def forward(self, x):
        x = x.permute(0, 2, 1)  # Change to (batch, channels, sequence_length)
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))

        x = x.permute(0, 2, 1)  # Change to (batch, sequence_length, channels)
        x, _ = self.lstm(x)
        x = x[:, -1, :]  # Take the last hidden state

        x = self.dropout(torch.relu(self.fc1(x)))  # Apply dropout before the last layer
        out = self.fc2(x)
        return out

# Load data and preprocess
def load_data(file_path):
    data = pd.read_csv(file_path, header=None, names=["sequence", "label"])
    sequences = [one_hot_encode_sequence(seq) for seq in data["sequence"]]
    labels = data["label"].astype("category").cat.codes
    return np.array(sequences), np.array(labels), data["label"].astype("category").cat.categories

# Calculate class weights
def calculate_class_weights(labels, num_classes):
    label_counts = Counter(labels)
    total_samples = sum(label_counts.values())
    weights = [total_samples / label_counts[i] if i in label_counts else 0.0 for i in range(num_classes)]
    return torch.tensor(weights).to(device)

# Optional Focal Loss definition
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, inputs, targets):
        ce_loss = nn.CrossEntropyLoss()(inputs, targets)
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
        return focal_loss

# Training function
from sklearn.metrics import f1_score, recall_score

# Initialize lists to store metrics for each epoch across all folds
train_losses_folds = []
val_losses_folds = []
train_f1_scores_folds = []
val_f1_scores_folds = []
train_recalls_folds = []
val_recalls_folds = []

# Training function (modified to return metrics for each fold)
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs):
    fold_train_losses = []
    fold_val_losses = []
    fold_train_f1_scores = []
    fold_val_f1_scores = []
    fold_train_recalls = []
    fold_val_recalls = []

    for epoch in range(num_epochs):
        # Training loop
        model.train()
        total_loss = 0.0
        all_train_preds, all_train_labels = [], []
        for sequences, labels in train_loader:
            sequences, labels = sequences.to(device), labels.to(device)
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            preds = torch.argmax(outputs, dim=1)
            all_train_preds.extend(preds.cpu().numpy())
            all_train_labels.extend(labels.cpu().numpy())

        # Calculate training metrics
        train_loss = total_loss / len(train_loader)
        train_f1 = f1_score(all_train_labels, all_train_preds, average='weighted', zero_division=0)
        train_recall = recall_score(all_train_labels, all_train_preds, average='weighted', zero_division=0)

        fold_train_losses.append(train_loss)
        fold_train_f1_scores.append(train_f1)
        fold_train_recalls.append(train_recall)

        # Validation loop
        model.eval()
        with torch.no_grad():
            val_loss = 0.0
            all_val_preds, all_val_labels = [], []
            for sequences, labels in val_loader:
                sequences, labels = sequences.to(device), labels.to(device)
                outputs = model(sequences)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                preds = torch.argmax(outputs, dim=1)
                all_val_preds.extend(preds.cpu().numpy())
                all_val_labels.extend(labels.cpu().numpy())

            # Calculate validation metrics
            val_loss /= len(val_loader)
            val_f1 = f1_score(all_val_labels, all_val_preds, average='weighted', zero_division=0)
            val_recall = recall_score(all_val_labels, all_val_preds, average='weighted', zero_division=0)

            fold_val_losses.append(val_loss)
            fold_val_f1_scores.append(val_f1)
            fold_val_recalls.append(val_recall)

        # Print metrics for each epoch
        print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
        print(f"Train F1 Score: {train_f1:.4f}, Val F1 Score: {val_f1:.4f}")
        print(f"Train Recall: {train_recall:.4f}, Val Recall: {val_recall:.4f}")

    # Return metrics for the current fold
    return fold_train_losses, fold_val_losses, fold_train_f1_scores, fold_val_f1_scores, fold_train_recalls, fold_val_recalls

# Main function to prepare data, create the model, and train/test with Stratified K-Fold
def main():
    # Load and prepare data
    file_path = '/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Protein sequnces/Arabidopsis thaliana_Protein_40N_5to20_overlap.csv'
    sequences, labels, unique_labels = load_data(file_path)

    # Stratified K-Fold Cross-Validation
    skf = StratifiedKFold(n_splits=5)

    for fold, (train_idx, test_idx) in enumerate(skf.split(sequences, labels)):
        print(f"Fold {fold + 1}")

        # Split data into training and testing sets
        train_sequences, test_sequences = sequences[train_idx], sequences[test_idx]
        train_labels, test_labels = labels[train_idx], labels[test_idx]

        # Stratified Shuffle Split for validation
        sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
        for train_idx_val, val_idx in sss.split(train_sequences, train_labels):
            train_sequences_split, val_sequences = train_sequences[train_idx_val], train_sequences[val_idx]
            train_labels_split, val_labels = train_labels[train_idx_val], train_labels[val_idx]

        # Create datasets and dataloaders
        train_dataset = ProteinDataset(train_sequences_split, train_labels_split)
        val_dataset = ProteinDataset(val_sequences, val_labels)
        test_dataset = ProteinDataset(test_sequences, test_labels)

        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
        test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

        # Create model, define loss and optimizer
        model = CNNLSTMModel(input_size=20, hidden_size=64, num_classes=len(unique_labels)).to(device)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        # Train model and get metrics
        train_losses, val_losses, train_f1s, val_f1s, train_recalls, val_recalls = train_model(
            model, train_loader, val_loader, criterion, optimizer, num_epochs=50
        )

        # Store metrics for each fold
        train_losses_folds.append(train_losses)
        val_losses_folds.append(val_losses)
        train_f1_scores_folds.append(train_f1s)
        val_f1_scores_folds.append(val_f1s)
        train_recalls_folds.append(train_recalls)
        val_recalls_folds.append(val_recalls)

# Run main function
if __name__ == "__main__":
    main()

# Plotting the metrics
plt.figure(figsize=(14, 7))

# Create first y-axis for loss
ax1 = plt.gca()
ax1.set_xlabel("Epochs", fontsize=14, labelpad=15)
ax1.set_ylabel("Loss", color='tab:red', fontsize=14, labelpad=15)

# Calculate mean and std for train and validation losses
mean_train_losses = np.mean(train_losses_folds, axis=0)
mean_val_losses = np.mean(val_losses_folds, axis=0)
std_train_losses = np.std(train_losses_folds, axis=0)
std_val_losses = np.std(val_losses_folds, axis=0)

# Plot mean loss with shaded areas for std deviation
ax1.plot(mean_train_losses, label='Mean Train Loss', color='tab:red', linestyle='--')
ax1.plot(mean_val_losses, label='Mean Val Loss', color='tab:red')

# Fill between for standard deviation
ax1.fill_between(range(len(mean_train_losses)),
                 mean_train_losses - std_train_losses,
                 mean_train_losses + std_train_losses,
                 color='tab:red', alpha=0.2)
ax1.fill_between(range(len(mean_val_losses)),
                 mean_val_losses - std_val_losses,
                 mean_val_losses + std_val_losses,
                 color='tab:red', alpha=0.2)

ax1.tick_params(axis='y', labelcolor='tab:red')

# Create second y-axis for recall and F1 score
ax2 = ax1.twinx()
ax2.set_ylabel("Recall & F1 Score", color='tab:blue', fontsize=14, labelpad=15)

# Calculate mean and std for F1 scores and recalls
mean_train_f1s = np.mean(train_f1_scores_folds, axis=0)
mean_val_f1s = np.mean(val_f1_scores_folds, axis=0)
mean_train_recalls = np.mean(train_recalls_folds, axis=0)
mean_val_recalls = np.mean(val_recalls_folds, axis=0)

std_train_f1s = np.std(train_f1_scores_folds, axis=0)
std_val_f1s = np.std(val_f1_scores_folds, axis=0)
std_train_recalls = np.std(train_recalls_folds, axis=0)
std_val_recalls = np.std(val_recalls_folds, axis=0)

# Plot mean F1 scores and recalls with shaded areas for std deviation
ax2.plot(mean_train_f1s, label='Mean Train F1 Score', color='tab:blue', linestyle='--')
ax2.plot(mean_val_f1s, label='Mean Val F1 Score', color='tab:blue')
ax2.fill_between(range(len(mean_train_f1s)),
                 mean_train_f1s - std_train_f1s,
                 mean_train_f1s + std_train_f1s,
                 color='tab:blue', alpha=0.2)
ax2.fill_between(range(len(mean_val_f1s)),
                 mean_val_f1s - std_val_f1s,
                 mean_val_f1s + std_val_f1s,
                 color='tab:blue', alpha=0.2)

ax2.plot(mean_train_recalls, label='Mean Train Recall', color='tab:green', linestyle='--')
ax2.plot(mean_val_recalls, label='Mean Val Recall', color='tab:green')
ax2.fill_between(range(len(mean_train_recalls)),
                 mean_train_recalls - std_train_recalls,
                 mean_train_recalls + std_train_recalls,
                 color='tab:green', alpha=0.2)
ax2.fill_between(range(len(mean_val_recalls)),
                 mean_val_recalls - std_val_recalls,
                 mean_val_recalls + std_val_recalls,
                 color='tab:green', alpha=0.2)

ax2.tick_params(axis='y', labelcolor='tab:blue')

# Add title and legend
plt.title("Arabidopsis thaliana", fontsize=16)

# Adjust legend position
ax1.legend(loc='center right', bbox_to_anchor=(1, 0.2), framealpha=0.5)
ax2.legend(loc='center right', bbox_to_anchor=(1, 0.75), framealpha=0.5)

# Save the plot as a high-resolution image
plt.savefig("Arabidopsis thaliana_training_validation_metrics.png", dpi=300, bbox_inches='tight')
# Show plot
plt.show()



In [None]:
#The CNN-LSTM model’s performance on the protein datasets via ROC Curve & Precision-Recallcurves
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix, precision_recall_curve, roc_curve, auc
from sklearn.model_selection import StratifiedKFold
from collections import Counter
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import random

# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data preprocessing function
def one_hot_encode_sequence(seq, alphabet="ACDEFGHIKLMNPQRSTVWY"):
    encoding = np.zeros((len(seq), len(alphabet)), dtype=np.float32)
    for i, aa in enumerate(seq):
        if aa in alphabet:
            encoding[i, alphabet.index(aa)] = 1.0
    return encoding

# Custom dataset class
class ProteinDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return torch.tensor(self.sequences[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.long)

# Model definition with dropout and an additional dense layer
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, dropout_rate=0.5):
        super(CNNLSTMModel, self).__init__()

        # Convolutional layers
        self.conv1 = nn.Conv1d(input_size, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)

        # LSTM layer
        self.lstm = nn.LSTM(input_size=128, hidden_size=hidden_size, num_layers=1, batch_first=True)

        # Dropout layer for regularization
        self.dropout = nn.Dropout(dropout_rate)

        # Fully connected layers
        self.fc1 = nn.Linear(hidden_size, 64)  # Additional dense layer
        self.fc2 = nn.Linear(64, num_classes)  # Final layer

    def forward(self, x):
        x = x.permute(0, 2, 1)  # Change to (batch, channels, sequence_length)
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))

        x = x.permute(0, 2, 1)  # Change to (batch, sequence_length, channels)
        x, _ = self.lstm(x)
        x = x[:, -1, :]  # Take the last hidden state

        x = self.dropout(torch.relu(self.fc1(x)))  # Apply dropout before the last layer
        out = self.fc2(x)
        return out

# Load data and preprocess
def load_data(file_path):
    data = pd.read_csv(file_path, header=None, names=["sequence", "label"])
    sequences = [one_hot_encode_sequence(seq) for seq in data["sequence"]]
    labels = data["label"].astype("category").cat.codes
    return np.array(sequences), np.array(labels), data["label"].astype("category").cat.categories


# Calculate class weights
def calculate_class_weights(labels, num_classes):
    label_counts = Counter(labels)
    total_samples = sum(label_counts.values())
    weights = [total_samples / label_counts[i] if i in label_counts else 0.0 for i in range(num_classes)]
    return torch.tensor(weights).to(device)

# Training function
def train_model(model, train_loader, criterion, optimizer, num_epochs=50):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0.0
        for sequences, labels in train_loader:
            sequences, labels = sequences.to(device), labels.to(device)

            outputs = model(sequences)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss:.4f}")

# Evaluation function with Precision-Recall and ROC curves
def evaluate_model(model, data_loader, criterion):
    model.eval()
    all_preds, all_labels = [], []
    all_probs = []  # Collect predicted probabilities
    total_loss = 0.0
    with torch.no_grad():
        for sequences, labels in data_loader:
            sequences, labels = sequences.to(device), labels.to(device)
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(torch.softmax(outputs, dim=1).cpu().numpy())  # Get probabilities

        # Convert lists to numpy arrays
        all_labels = np.array(all_labels)
        all_preds = np.array(all_preds)
        all_probs = np.array(all_probs)

        # Calculate metrics with zero_division parameter
        weighted_precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
        weighted_recall = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
        weighted_f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

        macro_precision = precision_score(all_labels, all_preds, average='macro', zero_division=0)
        macro_recall = recall_score(all_labels, all_preds, average='macro', zero_division=0)
        macro_f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0)

        print(f'Weighted Precision: {weighted_precision:.4f}, Weighted Recall: {weighted_recall:.4f}, Weighted F1: {weighted_f1:.4f}')
        print(f'Macro Precision: {macro_precision:.4f}, Macro Recall: {macro_recall:.4f}, Macro F1: {macro_f1:.4f}')

        return all_labels, all_preds, np.array(all_probs), total_loss  # Return probabilities for AUC


# Plot Precision-Recall and ROC curves
import matplotlib.pyplot as plt

# Plot Precision-Recall and ROC curves with highlighted AUC
def plot_metrics(y_true, y_scores, num_classes):
    # Calculate and plot average Precision-Recall curve
    plt.figure(figsize=(10, 6))
    average_precision = []
    average_auc_pr = 0
    auc_pr_list = []

    for i in range(num_classes):
        precision, recall, _ = precision_recall_curve(y_true, y_scores[:, i], pos_label=i)
        auc_pr = auc(recall, precision)
        auc_pr_list.append(auc_pr)
        average_auc_pr += auc_pr
        plt.plot(recall, precision, label=f'Class {i} (AUC = {auc_pr:.2f})')

    average_auc_pr /= num_classes
    std_error_pr = np.std(auc_pr_list) / np.sqrt(num_classes)
    print(f"Average AUC for Precision-Recall Curve: {average_auc_pr:.2f} ± {std_error_pr:.2f}")

    plt.title('Precision-Recall Curve - Arabidopsis thaliana', fontsize=16)
    plt.xlabel('Recall', fontsize=14, labelpad=15)
    plt.ylabel('Precision', fontsize=14, labelpad=15)
    plt.grid(False)
    plt.text(0.6, 0.1, f"Avg AUC = {average_auc_pr:.2f} ± {std_error_pr:.2f}",
             transform=plt.gca().transAxes, fontsize=12, color='darkblue')  # AUC annotation
    plt.savefig("Arabidopsis_thaliana_precision_recall_curve.png", dpi=300, bbox_inches='tight')
    plt.show()

    # Calculate and plot average ROC curve
    plt.figure(figsize=(10, 6))
    average_auc_roc = 0
    auc_roc_list = []

    for i in range(num_classes):
        fpr, tpr, _ = roc_curve(y_true, y_scores[:, i], pos_label=i)
        auc_roc = auc(fpr, tpr)
        auc_roc_list.append(auc_roc)
        average_auc_roc += auc_roc
        plt.plot(fpr, tpr, label=f'Class {i} (AUC = {auc_roc:.2f})')

    average_auc_roc /= num_classes
    std_error_roc = np.std(auc_roc_list) / np.sqrt(num_classes)
    print(f"Average AUC for ROC Curve: {average_auc_roc:.2f} ± {std_error_roc:.2f}")

    plt.title('ROC Curve - Arabidopsis thaliana', fontsize=16)
    plt.xlabel('False Positive Rate', fontsize=14, labelpad=15)
    plt.ylabel('True Positive Rate', fontsize=14, labelpad=15)
    plt.grid(False)
    plt.text(0.6, 0.1, f"Avg AUC = {average_auc_roc:.2f} ± {std_error_roc:.2f}",
             transform=plt.gca().transAxes, fontsize=12, color='darkblue')  # AUC annotation
    plt.savefig("Arabidopsis_thaliana_roc_curve.png", dpi=300, bbox_inches='tight')
    plt.show()


# Main execution function
def main(file_path):
    sequences, labels, categories = load_data(file_path)
    num_classes = len(categories)
    class_weights = calculate_class_weights(labels, num_classes)

    dataset = ProteinDataset(sequences, labels)

    # Stratified K-Fold Cross-Validation
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    # To store results across all folds
    all_y_true = []
    all_y_probs = []

    for fold_num, (train_idx, test_idx) in enumerate(skf.split(np.zeros(len(dataset)), labels)):
        print(f"\nFold {fold_num + 1}")
        train_subset = torch.utils.data.Subset(dataset, train_idx)
        test_subset = torch.utils.data.Subset(dataset, test_idx)

        train_loader = DataLoader(train_subset, batch_size=32, shuffle=True)
        test_loader = DataLoader(test_subset, batch_size=32, shuffle=False)

        model = CNNLSTMModel(input_size=20, hidden_size=64, num_classes=num_classes).to(device)
        criterion = nn.CrossEntropyLoss(weight=class_weights)
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        train_model(model, train_loader, criterion, optimizer, num_epochs=50)

        y_true, y_pred, y_probs, loss = evaluate_model(model, test_loader, criterion)

        # Collect all true labels and probabilities for metrics
        all_y_true.extend(y_true)
        all_y_probs.append(y_probs)  # Store probabilities for AUC

    # Convert to numpy arrays
    all_y_true = np.array(all_y_true)
    all_y_probs = np.vstack(all_y_probs)  # Stack probabilities from all folds

    # Plot metrics after all folds
    plot_metrics(all_y_true, all_y_probs, num_classes)

if __name__ == "__main__":
    file_path = '/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Protein sequnces/Arabidopsis thaliana_Protein_40N_5to20_overlap.csv'
    main(file_path)



In [None]:
##The CNN-LSTM model’s performance on the protein datasets via average metrics (Macro/Weighted Precision,Recall, and F1 Score) with standard errors
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
from sklearn.model_selection import StratifiedKFold, StratifiedShuffleSplit
from collections import Counter
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data preprocessing function
def one_hot_encode_sequence(seq, alphabet="ACDEFGHIKLMNPQRSTVWY"):
    encoding = np.zeros((len(seq), len(alphabet)), dtype=np.float32)
    for i, aa in enumerate(seq):
        if aa in alphabet:
            encoding[i, alphabet.index(aa)] = 1.0
    return encoding

# Custom dataset class
class ProteinDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return torch.tensor(self.sequences[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.long)

# Model definition with dropout and an additional dense layer
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, dropout_rate=0.5):
        super(CNNLSTMModel, self).__init__()

        # Convolutional layers
        self.conv1 = nn.Conv1d(input_size, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)

        # LSTM layer
        self.lstm = nn.LSTM(input_size=128, hidden_size=hidden_size, num_layers=1, batch_first=True)

        # Dropout layer for regularization
        self.dropout = nn.Dropout(dropout_rate)

        # Fully connected layers
        self.fc1 = nn.Linear(hidden_size, 64)  # Additional dense layer
        self.fc2 = nn.Linear(64, num_classes)  # Final layer

    def forward(self, x):
        x = x.permute(0, 2, 1)  # Change to (batch, channels, sequence_length)
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))

        x = x.permute(0, 2, 1)  # Change to (batch, sequence_length, channels)
        x, _ = self.lstm(x)
        x = x[:, -1, :]  # Take the last hidden state

        x = self.dropout(torch.relu(self.fc1(x)))  # Apply dropout before the last layer
        out = self.fc2(x)
        return out

# Load data and preprocess
def load_data(file_path):
    data = pd.read_csv(file_path, header=None, names=["sequence", "label"])
    sequences = [one_hot_encode_sequence(seq) for seq in data["sequence"]]
    labels = data["label"].astype("category").cat.codes
    return np.array(sequences), np.array(labels), data["label"].astype("category").cat.categories

# Calculate class weights
def calculate_class_weights(labels, num_classes):
    label_counts = Counter(labels)
    total_samples = sum(label_counts.values())
    weights = [total_samples / label_counts[i] if i in label_counts else 0.0 for i in range(num_classes)]
    return torch.tensor(weights).to(device)

# Optional Focal Loss definition
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, inputs, targets):
        ce_loss = nn.CrossEntropyLoss()(inputs, targets)
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
        return focal_loss

# Training function
def train_model(model, train_loader, criterion, optimizer, num_epochs=50):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0.0
        all_preds, all_labels = [], []

        for sequences, labels in train_loader:
            sequences, labels = sequences.to(device), labels.to(device)

            outputs = model(sequences)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

        # Calculate weighted and macro precision, recall, and F1 scores
        precision_macro = precision_score(all_labels, all_preds, average='macro', zero_division=0)
        recall_macro = recall_score(all_labels, all_preds, average='macro', zero_division=0)
        f1_macro = f1_score(all_labels, all_preds, average='macro', zero_division=0)

        precision_weighted = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
        recall_weighted = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
        f1_weighted = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss:.4f}")
        print(f"Weighted - Precision: {precision_weighted:.4f}, Recall: {recall_weighted:.4f}, F1 Score: {f1_weighted:.4f}")
        print(f"Macro - Precision: {precision_macro:.4f}, Recall: {recall_macro:.4f}, F1 Score: {f1_macro:.4f}")

# Evaluation function with confusion matrix
def evaluate_model(model, data_loader, criterion):
    model.eval()
    with torch.no_grad():
        all_preds, all_labels = [], []
        total_loss = 0.0
        for sequences, labels in data_loader:
            sequences, labels = sequences.to(device), labels.to(device)
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

        # Calculate weighted and macro precision, recall, and F1 scores
        precision_macro = precision_score(all_labels, all_preds, average='macro', zero_division=0)
        recall_macro = recall_score(all_labels, all_preds, average='macro', zero_division=0)
        f1_macro = f1_score(all_labels, all_preds, average='macro', zero_division=0)

        precision_weighted = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
        recall_weighted = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
        f1_weighted = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

        # Print metrics
        print(f"Loss: {total_loss:.4f}")
        print(f"Weighted - Precision: {precision_weighted:.4f}, Recall: {recall_weighted:.4f}, F1 Score: {f1_weighted:.4f}")
        print(f"Macro - Precision: {precision_macro:.4f}, Recall: {recall_macro:.4f}, F1 Score: {f1_macro:.4f}")

        return all_labels, all_preds

import numpy as np

# Main function with multiple-run Stratified K-Fold Cross-Validation
def main():
    # Load and prepare data
    file_path = '/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Protein sequnces/Arabidopsis thaliana_Protein_40N_5to20_overlap.csv'
    sequences, labels, unique_labels = load_data(file_path)

    # Parameters for multiple runs
    num_runs = 3  # Define the number of runs with different seeds
    num_folds = 5  # Define the number of folds for each Stratified K-Fold

    # Initialize lists to store performance metrics
    precision_macro_list, recall_macro_list, f1_macro_list = [], [], []
    precision_weighted_list, recall_weighted_list, f1_weighted_list = [], [], []

    for run in range(num_runs):
        print(f"\n--- Run {run + 1} ---")

        # Set a new seed for each run to vary the split
        seed = np.random.randint(10000)
        skf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=seed)

        # Store performance metrics for each fold in the current run
        run_precision_macro, run_recall_macro, run_f1_macro = [], [], []
        run_precision_weighted, run_recall_weighted, run_f1_weighted = [], [], []

        for fold, (train_idx, test_idx) in enumerate(skf.split(sequences, labels)):
            print(f"\nFold {fold + 1} (Seed: {seed})")

            # Split data into training and testing sets
            train_sequences, test_sequences = sequences[train_idx], sequences[test_idx]
            train_labels, test_labels = labels[train_idx], labels[test_idx]

            # Stratified Shuffle Split for validation
            sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=seed)
            for train_idx_val, val_idx in sss.split(train_sequences, train_labels):
                train_sequences_split, val_sequences = train_sequences[train_idx_val], train_sequences[val_idx]
                train_labels_split, val_labels = train_labels[train_idx_val], train_labels[val_idx]

            # Create datasets and dataloaders
            train_dataset = ProteinDataset(train_sequences_split, train_labels_split)
            val_dataset = ProteinDataset(val_sequences, val_labels)
            test_dataset = ProteinDataset(test_sequences, test_labels)

            train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
            val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
            test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

            # Create model, define loss and optimizer
            num_classes = len(unique_labels)
            model = CNNLSTMModel(input_size=20, hidden_size=64, num_classes=num_classes).to(device)
            criterion = nn.CrossEntropyLoss(weight=calculate_class_weights(train_labels_split, num_classes))
            optimizer = optim.Adam(model.parameters(), lr=0.001)

            # Train the model
            train_model(model, train_loader, criterion, optimizer, num_epochs=50)

            # Evaluate on validation set
            print("Validation Set:")
            evaluate_model(model, val_loader, criterion)

            # Evaluate on test set and store metrics
            print("Test Set:")
            test_labels, test_preds = evaluate_model(model, test_loader, criterion)

            # Calculate fold metrics
            precision_macro = precision_score(test_labels, test_preds, average='macro', zero_division=0)
            recall_macro = recall_score(test_labels, test_preds, average='macro', zero_division=0)
            f1_macro = f1_score(test_labels, test_preds, average='macro', zero_division=0)

            precision_weighted = precision_score(test_labels, test_preds, average='weighted', zero_division=0)
            recall_weighted = recall_score(test_labels, test_preds, average='weighted', zero_division=0)
            f1_weighted = f1_score(test_labels, test_preds, average='weighted', zero_division=0)

            # Append fold metrics to the current run's list
            run_precision_macro.append(precision_macro)
            run_recall_macro.append(recall_macro)
            run_f1_macro.append(f1_macro)
            run_precision_weighted.append(precision_weighted)
            run_recall_weighted.append(recall_weighted)
            run_f1_weighted.append(f1_weighted)

        # Store average metrics across folds for the current run
        precision_macro_list.append(np.mean(run_precision_macro))
        recall_macro_list.append(np.mean(run_recall_macro))
        f1_macro_list.append(np.mean(run_f1_macro))
        precision_weighted_list.append(np.mean(run_precision_weighted))
        recall_weighted_list.append(np.mean(run_recall_weighted))
        f1_weighted_list.append(np.mean(run_f1_weighted))

    # Calculate final average and standard error across all runs
    def mean_std_err(metrics):
        return np.mean(metrics), np.std(metrics) / np.sqrt(len(metrics))

    # Print average metrics with standard errors
    print("\n--- Overall Results ---")
    print(f"Macro Precision: {mean_std_err(precision_macro_list)}")
    print(f"Macro Recall: {mean_std_err(recall_macro_list)}")
    print(f"Macro F1 Score: {mean_std_err(f1_macro_list)}")
    print(f"Weighted Precision: {mean_std_err(precision_weighted_list)}")
    print(f"Weighted Recall: {mean_std_err(recall_weighted_list)}")
    print(f"Weighted F1 Score: {mean_std_err(f1_weighted_list)}")

if __name__ == "__main__":
    main()



In [None]:
#K-mer analysis and identification of common patterns for unsupervised data analysis
import pandas as pd
from Bio import SeqIO
from collections import defaultdict

# Function to extract k-mers from a sequence
def extract_kmers(seq, k):
    return [seq[i:i+k] for i in range(len(seq) - k + 1)]

# Function to find common k-mers between each pair of sequences and save results
def find_common_kmers(fasta_file, min_k, max_k, output_file):
    sequences = []
    sequence_names = []

    # Read sequences and names from the FASTA file
    for record in SeqIO.parse(fasta_file, "fasta"):
        sequences.append(str(record.seq))
        sequence_names.append(record.id)  # Use the record id as the sequence name

    kmer_dict = defaultdict(lambda: defaultdict(int))

    # Collect k-mers for all sequences
    for i, seq in enumerate(sequences):
        for k in range(min_k, max_k + 1):
            kmers = extract_kmers(seq, k)
            for kmer in kmers:
                kmer_dict[i][(k, kmer)] += 1

    # Find common k-mers between each pair of sequences
    results = []
    for i in range(len(sequences)):
        for j in range(i + 1, len(sequences)):
            common_kmers = set(kmer_dict[i].keys()) & set(kmer_dict[j].keys())
            total_common_kmers = sum(min(kmer_dict[i][kmer], kmer_dict[j][kmer]) for kmer in common_kmers)

            # Only store the sequence names and total common k-mers
            results.append({
                'Sequence1': sequence_names[i],  # Use sequence names
                'Sequence2': sequence_names[j],  # Use sequence names
                'Total Common K-mers': total_common_kmers
            })

    # Save results to a CSV file with the selected columns
    df = pd.DataFrame(results)
    df.to_csv(output_file, index=False)
    print(f"Results saved to {output_file}")

# Example usage
fasta_file = '/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/Oryza sativa cultivar TN1 chloroplast_300 bp up.fasta'  # Update this path as needed
min_k = 5  # Minimum k-mer length
max_k = 15  # Maximum k-mer length
output_file = '/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/O. sativa_300N_5-12_kmers-total.csv'  # Output file for common k-mers

find_common_kmers(fasta_file, min_k, max_k, output_file)

In [None]:
#Clustering and visualization of sequence similarities
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import seaborn as sns
from adjustText import adjust_text

# Function to load the CSV file with total common k-mers
def load_kmer_data(file_path):
    return pd.read_csv(file_path)

# Function to create a distance matrix
def create_distance_matrix(df, names):
    num_sequences = len(names)
    distance_matrix = np.zeros((num_sequences, num_sequences))

    for i in range(num_sequences):
        for j in range(num_sequences):
            if i != j:
                # Set distance to be the inverse of the total common k-mers (for clustering purposes)
                total_common_kmers = df[(df['Sequence1'] == names[i]) & (df['Sequence2'] == names[j])]['Total Common K-mers']
                if not total_common_kmers.empty:
                    distance_matrix[i, j] = max(0, np.max(total_common_kmers))  # Avoid negative distances
                else:
                    distance_matrix[i, j] = 0  # No common k-mers, set distance to 0

    return distance_matrix

def perform_clustering_and_pca(distance_matrix, names, num_clusters, seed=21):
    # Perform K-means clustering
    kmeans = KMeans(n_clusters=num_clusters, random_state=seed)
    clustering = kmeans.fit_predict(distance_matrix)

    # Perform PCA
    pca = PCA(n_components=2)
    pca_results = pca.fit_transform(distance_matrix)

    # Create a DataFrame for plotting
    df_pca = pd.DataFrame(pca_results, columns=['PC1', 'PC2'])
    df_pca['Cluster'] = clustering
    df_pca['Sequence'] = [name.replace("5UTR_", "") for name in names]  # Remove "5UTR_" from labels

    # Plot PCA results
    plt.figure(figsize=(12, 10))
    scatter = sns.scatterplot(x='PC1', y='PC2', hue='Cluster', palette='tab10', data=df_pca, s=100, edgecolor='k')

    # Add labels to data points with initial offset
    texts = []
    label_space = 0.4  # Initial label space adjustment
    for i, txt in enumerate(df_pca['Sequence']):
        texts.append(plt.text(df_pca['PC1'].iloc[i] + label_space,
                              df_pca['PC2'].iloc[i] + label_space,
                              txt, fontsize=12))

    # Adjust text labels to avoid overlap
    adjust_text(texts,
                expand_points=(1.5, 1.5),  # Expand points to give more space
                expand_text=(1.5, 1.5),    # Expand text to give more space
                arrowprops=dict(arrowstyle='->', color='grey', lw=0.5))

    plt.xlabel('Principal Component 1', fontsize=16, labelpad=15)
    plt.ylabel('Principal Component 2', fontsize=16, labelpad=15)

    # Customize the legend to start from 1
    handles, labels = scatter.get_legend_handles_labels()
    labels = [f'Cluster {int(label) + 1}' for label in labels]
    plt.legend(handles, labels, title='Cluster')

    plt.grid(False)
    plt.savefig("Chlorella vulgaris_Cluster.png", dpi=300, bbox_inches='tight')
    plt.show()

    # Write each cluster and its sequences
    for cluster_num in range(num_clusters):
        sequences_in_cluster = df_pca[df_pca['Cluster'] == cluster_num]['Sequence'].values
        print(f"Cluster {cluster_num + 1}:")
        for sequence in sequences_in_cluster:
            print(f" - {sequence}")
        print()  # Add space between clusters

# Example usage
kmer_data_file = '/content/drive/MyDrive/Chlamy_ML/Project 2/sequnces/C. vulgaris_300N_5-12_kmers-total.csv'  # Output file from Part 1
kmer_data = load_kmer_data(kmer_data_file)

# Get unique sequence names
sequence_names = pd.concat([kmer_data['Sequence1'], kmer_data['Sequence2']]).unique()

# Create distance matrix
distance_matrix = create_distance_matrix(kmer_data, sequence_names)

# Define the number of clusters for K-means
num_clusters = 7

# Perform clustering and PCA
perform_clustering_and_pca(distance_matrix, sequence_names, num_clusters)
