First we will import all the necessary libraries

In [37]:
# Basic Libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import itertools

#Biopython for Fasta file processing
from Bio import SeqIO

#Tensorflow and Keras for deep learning
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv1D, MaxPooling1D, Flatten, Dropout, LSTM, Embedding, BatchNormalization, GlobalMaxPooling1D
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Scikit-learn for preprocessing and evaluation
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.preprocessing import MinMaxScaler
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras import Input

# Utility Libraries
import os
import random

Following is the code for k-mer encoding which comes under preprocessing of data obtained from the fasta files.

In [68]:
NUCLEOTIDES = {'A', 'C', 'G', 'T'}

def clean_sequence(seq):
    """Remove invalid characters from the DNA sequence."""
    return ''.join(nuc for nuc in seq if nuc in NUCLEOTIDES)

def kmer_count(sequence, k):
    """Count k-mers in a given DNA sequence and return a NumPy array of counts."""
    sequence = clean_sequence(sequence)
    n = len(sequence)
    kmer_counts = np.zeros((4 ** k,), dtype=int)

    # Generate k-mer indices using itertools.product only once
    kmer_index = {''.join(p): idx for idx, p in enumerate(itertools.product('ACGT', repeat=k))}
    
    # Count k-mers
    for i in range(n - k + 1):
        kmer = sequence[i:i + k]
        if kmer in kmer_index:
            kmer_counts[kmer_index[kmer]] += 1
            
    return kmer_counts

def encode_top_kmers(fasta_file, k, top_n=2):
    """Encode only the top N sequences from a FASTA file into k-mer counts."""
    kmer_features = []

    for i, seq_record in enumerate(SeqIO.parse(fasta_file, "fasta")):
        if i >= top_n:  
            break
        sequence = str(seq_record.seq)
        kmer_counts = kmer_count(sequence, k)
        kmer_features.append(kmer_counts)

    return np.array(kmer_features)  # Return only kmer features

def vectorize_kmers(kmer_features, all_possible_kmers):
    """
    Convert k-mer feature counts into a DataFrame with all possible k-mers as columns.
    Handles cases where kmer_features is a list of dictionaries or sparse input.
    """
    # Convert kmer_features into a DataFrame
    if isinstance(kmer_features, list) and isinstance(kmer_features[0], dict):
        # Assume sparse input, convert to DataFrame
        kmer_df = pd.DataFrame(kmer_features).fillna(0)
        # Reorder columns to match all_possible_kmers
        kmer_df = kmer_df.reindex(columns=all_possible_kmers, fill_value=0)
    else:
        # Assume dense input (2D array-like)
        kmer_df = pd.DataFrame(kmer_features, columns=all_possible_kmers)
    
    return kmer_df

# Function to append sequences 21-30 to the existing CSV file
def append_kmer_features_to_csv(fasta_file, k, start_seq=21, end_seq=30, output_dir="kmer_features_csv"):
    # Generate all possible k-mers of length k
    all_possible_kmers = [''.join(p) for p in itertools.product('ACGT', repeat=k)]
    
    # Process sequences 21-30 from the FASTA file
    kmer_features = []
    for i, seq_record in enumerate(SeqIO.parse(fasta_file, "fasta")):
        if i < start_seq - 1:
            continue
        if i >= end_seq:
            break
        sequence = str(seq_record.seq)
        kmer_counts = kmer_count(sequence, k)
        kmer_features.append(kmer_counts)
    
    # Convert to DataFrame
    new_kmer_df = vectorize_kmers(kmer_features, all_possible_kmers)
    
    # Load existing CSV file
    base_name = os.path.splitext(os.path.basename(fasta_file))[0]
    output_csv = os.path.join(output_dir, f"{base_name}_kmer_features.csv")
    if os.path.exists(output_csv):
        existing_df = pd.read_csv(output_csv)
    else:
        raise FileNotFoundError(f"CSV file {output_csv} does not exist.")
    
    # Append new data
    updated_df = pd.concat([existing_df, new_kmer_df], ignore_index=True)
    
    # Save back to the CSV
    updated_df.to_csv(output_csv, index=False)
    print(f"Appended sequences {start_seq}-{end_seq} for {fasta_file} to {output_csv}")


# Parameters
k = 4
fasta_files = ["Chimpanzee.fa"]
output_dir = "kmer_features_csv"

# Process each FASTA file and append sequences 21-30
for fasta_file in fasta_files:
    try:
        append_kmer_features_to_csv(fasta_file, k, start_seq=50001, end_seq=10000000, output_dir=output_dir)
    except Exception as e:
        print(f"Error processing {fasta_file}: {e}")


# Generate all possible k-mers of length k
# k = 4
# all_possible_kmers = [''.join(p) for p in itertools.product('ACGT', repeat=k)]

# Example usage with increased top_n
# fasta_files = ["Human.fa", "Chimpanzee.fa", "Mouse.fa", "FruitFly.fa", "Fungus.fa"]
# top_n = 20

# # Directory to store the output CSV files
# output_dir = "kmer_features_csv"
# os.makedirs(output_dir, exist_ok=True)  # Create the directory if it doesn't exist

# # Process each FASTA file and save the features to CSV
# for fasta_file in fasta_files:
#     try:
#         kmer_features = encode_top_kmers(fasta_file, k, top_n)
#         vectorized_features = vectorize_kmers(kmer_features, all_possible_kmers)
        
#         # Construct output file name
#         base_name = os.path.splitext(os.path.basename(fasta_file))[0]
#         output_csv = os.path.join(output_dir, f"{base_name}_kmer_features.csv")
        
#         # Save the DataFrame to a CSV file
#         vectorized_features.to_csv(output_csv, index=False)
        
#         print(f"Vectorized features for {fasta_file} saved to {output_csv}")
#     except Exception as e:
#         print(f"Error processing {fasta_file}: {e}")


Error processing Chimpanzee.fa: list index out of range


Now, we will convert the kmer features into a consistent format, that is vectorize the kmers

The next step is to spilt the data for testing and training the model. The following code performs this function

In [69]:
# Species-to-label mapping
species_labels = {
    'Homo sapiens': 0,
    'Pan troglodytes': 1,
    'Mus musculus': 2,
    'Drosophila melanogaster': 3,
    'Saccharomyces cerevisiae': 4  
}

# Paths to precomputed CSV files
csv_files = {
    'Homo sapiens': "kmer_features_csv/Human_kmer_features.csv",
    'Pan troglodytes': "kmer_features_csv/Chimpanzee_kmer_features.csv",
    'Mus musculus': "kmer_features_csv/Mouse_kmer_features.csv",
    'Drosophila melanogaster': "kmer_features_csv/FruitFly_kmer_features.csv",
    'Saccharomyces cerevisiae': "kmer_features_csv/Fungus_kmer_features.csv"
}

# Combine features and labels
vectorized_features = []
labels = []

for species, file_path in csv_files.items():
    # Load k-mer features from CSV
    species_features = pd.read_csv(file_path)
    print(f"{species}: Loaded {species_features.shape[0]} rows from {file_path}")
    
    # Add features to the combined list
    vectorized_features.append(species_features)
    
    # Generate corresponding labels
    labels.extend([species_labels[species]] * len(species_features))

# Concatenate all features into a single DataFrame
vectorized_features = pd.concat(vectorized_features, ignore_index=True)

# Sanity check: Features and labels must match
print("Shape of vectorized_features:", vectorized_features.shape)
print("Length of labels:", len(labels))

if vectorized_features.shape[0] != len(labels):
    raise ValueError("Mismatch between the number of features and labels!")

Homo sapiens: Loaded 706 rows from kmer_features_csv/Human_kmer_features.csv
Pan troglodytes: Loaded 44449 rows from kmer_features_csv/Chimpanzee_kmer_features.csv
Mus musculus: Loaded 61 rows from kmer_features_csv/Mouse_kmer_features.csv
Drosophila melanogaster: Loaded 1870 rows from kmer_features_csv/FruitFly_kmer_features.csv
Saccharomyces cerevisiae: Loaded 17 rows from kmer_features_csv/Fungus_kmer_features.csv
Shape of vectorized_features: (47103, 256)
Length of labels: 47103


Truncating the sequence 

In [7]:
# def clean_and_truncate_sequences(fasta_file, max_length=10000):
#     """Cleans the sequence by removing 'N' and truncates it to a fixed maximum length."""
#     cleaned_sequences = []
#     sequence_ids = []

#     for seq_record in SeqIO.parse(fasta_file, "fasta"):
#         sequence = str(seq_record.seq)
#         # Clean the sequence by removing 'N'
#         cleaned_seq = ''.join([nuc for nuc in sequence if nuc != 'N'])
#         # Truncate the cleaned sequence
#         truncated_seq = cleaned_seq[:max_length]
#         cleaned_sequences.append(truncated_seq)
#         sequence_ids.append(seq_record.id)

#     return sequence_ids, cleaned_sequences

# # Example usage
# fasta_file = "Human.fa"  # Replace with the actual path
# max_length = 10000  # Set truncation length
# sequence_ids, cleaned_sequences = clean_and_truncate_sequences(fasta_file, max_length)

CNN model code

In [70]:
# Scale the data
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(vectorized_features)

# Train-test split with stratification
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, labels, test_size=0.2, random_state=42, stratify=labels
)

# Reshape to fit CNN (samples, time_steps, features)
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

# One-hot encode the labels
y_train = np.array(y_train)
y_test = np.array(y_test)
num_classes = len(np.unique(labels))  # Dynamically calculate number of classes
y_train = to_categorical(y_train, num_classes=num_classes)
y_test = to_categorical(y_test, num_classes=num_classes)

# Outputs for sanity check
print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
print(f"Training labels shape: {y_train.shape}")
print(f"Test labels shape: {y_test.shape}")
print(f"Training labels distribution:\n{pd.Series(y_train.argmax(axis=1)).value_counts()}")
print(f"Test labels distribution:\n{pd.Series(y_test.argmax(axis=1)).value_counts()}")

#Define CNN model
def create_cnn_model(input_shape, num_classes):
    model = Sequential([
        Input(shape=input_shape),  # Explicitly define the input shape
        Conv1D(32, 4, activation='relu'),
        MaxPooling1D(2),
        Dropout(0.2),
        Conv1D(64, 4, activation='relu'),
        MaxPooling1D(2),
        Dropout(0.3),
        Flatten(),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')  # Output layer
    ])
    model.compile(optimizer=Adam(learning_rate=1e-4), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

input_shape = (X_train.shape[1], 1)
cnn_model = create_cnn_model(input_shape, num_classes)
# cnn_model.summary()
# Add early stopping and checkpointing
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
checkpoint = ModelCheckpoint('best_cnn_model.keras', save_best_only=True, monitor='val_accuracy')

# Compute class weights
from sklearn.utils.class_weight import compute_class_weight
class_weights = compute_class_weight('balanced', classes=np.unique(labels), y=labels)
class_weights_dict = dict(enumerate(class_weights))

# Train the model
cnn_model.fit(
    X_train, y_train,
    epochs=50, batch_size=32,
    validation_data=(X_test, y_test),
    callbacks=[early_stopping, checkpoint],
    class_weight=class_weights_dict  # Include class weights
)

# Evaluate the model
test_loss, test_accuracy = cnn_model.evaluate(X_test, y_test)
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")


# def create_optimized_cnn_model(input_shape, num_classes):
#     model = Sequential([
#         Input(shape=input_shape),  # Explicitly define the input shape
        
#         # First Conv Layer
#         Conv1D(32, 4, activation='relu', kernel_regularizer=regularizers.l2(0.001)), 
#         MaxPooling1D(2),
#         Dropout(0.1),
        
#         # Second Conv Layer
#         Conv1D(64, 4, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
#         MaxPooling1D(2),
#         Dropout(0.2),
        
#         # Flatten and Dense Layer
#         Flatten(),
#         Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
#         Dropout(0.4),
        
#         # Output Layer
#         Dense(num_classes, activation='softmax')  
#     ])
    
#     model.compile(
#         optimizer=Adam(learning_rate=1e-4),  # Start with a smaller learning rate
#         loss='categorical_crossentropy',
#         metrics=['accuracy']
#     )
#     return model

# input_shape = (X_train.shape[1], 1)
# cnn_model = create_optimized_cnn_model(input_shape, num_classes)
# cnn_model.summary()

# # Callbacks for early stopping and best model saving
# early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
# checkpoint = ModelCheckpoint('best_cnn_model.keras', save_best_only=True, monitor='val_accuracy')

# # Learning rate scheduler
# lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1)

# # Compute class weights
# from sklearn.utils.class_weight import compute_class_weight
# class_weights = compute_class_weight('balanced', classes=np.unique(labels), y=labels)
# class_weights_dict = dict(enumerate(class_weights))

# # Train the model
# cnn_model.fit(
#     X_train, y_train,
#     epochs=50, batch_size=32,  # You can try increasing epochs if needed
#     validation_data=(X_test, y_test),
#     callbacks=[early_stopping, checkpoint, lr_scheduler],
#     class_weight=class_weights_dict  # Include class weights
# )

# # Evaluate the model
# test_loss, test_accuracy = cnn_model.evaluate(X_test, y_test)
# print(f"Test Accuracy: {test_accuracy * 100:.2f}%")


Training data shape: (37682, 256, 1)
Test data shape: (9421, 256, 1)
Training labels shape: (37682, 5)
Test labels shape: (9421, 5)
Training labels distribution:
1    35559
3     1496
0      565
2       49
4       13
Name: count, dtype: int64
Test labels distribution:
1    8890
3     374
0     141
2      12
4       4
Name: count, dtype: int64
Epoch 1/50
[1m1178/1178[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 5ms/step - accuracy: 0.2354 - loss: 1.6781 - val_accuracy: 0.9436 - val_loss: 1.5242
Epoch 2/50
[1m1178/1178[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 5ms/step - accuracy: 0.5084 - loss: 1.4546 - val_accuracy: 0.0404 - val_loss: 1.4736
Epoch 3/50
[1m1178/1178[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 5ms/step - accuracy: 0.4657 - loss: 1.4564 - val_accuracy: 0.0405 - val_loss: 1.5121
Epoch 4/50
[1m1178/1178[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 6ms/step - accuracy: 0.3498 - loss: 1.5212 - val_accuracy: 0.0404 - val_loss: 1.3315