In [None]:
import os
import numpy as np
import pandas as pd
import torch
import tensorflow as tf
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Dropout, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from transformers import AutoTokenizer, AutoModel
import pickle

In [None]:
os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
def load_therapeutic_categories(data_path):
    therapeutic_path = os.path.join(data_path, "raw", "therapeutic")
    sequences, categories = [], []
    
    category_mapping = {
        'anticancer': 0,
        'antimicrobial': 1,
        'antiviral': 2,
        'immunomodulatory': 3,
        'neuropeptide': 4
    }
    
    for filename in os.listdir(therapeutic_path):
        if filename.endswith('.csv'):
            category_name = filename.replace('.csv', '').lower()
            if any(cat in category_name for cat in category_mapping.keys()):
                for cat_key, cat_value in category_mapping.items():
                    if cat_key in category_name:
                        category = cat_value
                        break
            else:
                category = len(category_mapping)
            
            df = pd.read_csv(os.path.join(therapeutic_path, filename))
            sequences.extend(df.iloc[:, 0].tolist())
            categories.extend([category] * len(df))
    
    return sequences, categories, list(category_mapping.keys()) + ['other']

In [None]:
def create_multiclass_model(input_dim, num_classes):
    input_layer = Input(shape=(input_dim,))
    
    dense1 = Dense(512, activation='relu')(input_layer)
    dropout1 = Dropout(0.4)(dense1)
    
    dense2 = Dense(256, activation='relu')(dropout1)
    dropout2 = Dropout(0.3)(dense2)
    
    dense3 = Dense(128, activation='relu')(dropout2)
    dropout3 = Dropout(0.2)(dense3)
    
    dense4 = Dense(64, activation='relu')(dropout3)
    dropout4 = Dropout(0.1)(dense4)
    
    output = Dense(num_classes, activation='softmax')(dropout4)
    
    model = Model(inputs=input_layer, outputs=output)
    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model

In [None]:
data_path = "../data"
sequences, categories, category_names = load_therapeutic_categories(data_path)

print(f"Total therapeutic sequences: {len(sequences)}")
print(f"Number of categories: {len(category_names)}")
print(f"Categories: {category_names}")

category_counts = np.bincount(categories)
for i, (name, count) in enumerate(zip(category_names, category_counts)):
    print(f"{name}: {count} sequences")

if os.path.exists("../data/processed/protbert_embeddings.npy"):
    print("Loading cached ProtBERT embeddings...")
    features = np.load("../data/processed/protbert_embeddings.npy")
    features = features[:len(sequences)]
else:
    print("ProtBERT embeddings not found. Please run 02_cnn_lstm_protbert.ipynb first.")
    features = np.random.randn(len(sequences), 1024)

scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)

with open("../backend/models/multiclass_scaler.pkl", "wb") as f:
    pickle.dump(scaler, f)

X_train, X_test, y_train, y_test = train_test_split(
    features_scaled, categories, test_size=0.2, random_state=42, stratify=categories
)

num_classes = len(category_names)
model = create_multiclass_model(features_scaled.shape[1], num_classes)

callbacks = [
    EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True),
    ModelCheckpoint('../backend/models/multiclass_classifier.h5', save_best_only=True, monitor='val_loss')
]

history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=150,
    batch_size=32,
    callbacks=callbacks,
    verbose=1
)

test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
print(f"Test Accuracy: {test_acc:.4f}")

y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)

print("Classification Report:")
print(classification_report(y_test, y_pred_classes, target_names=category_names))

cm = confusion_matrix(y_test, y_pred_classes)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=category_names, yticklabels=category_names)
plt.title('Confusion Matrix - Therapeutic Category Classification')
plt.ylabel('True Category')
plt.xlabel('Predicted Category')
plt.xticks(rotation=45)
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train')
plt.plot(history.history['val_accuracy'], label='Validation')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train')
plt.plot(history.history['val_loss'], label='Validation')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# ✅ Divide & Conquer: Parallel Feature Extraction
def extract_protbert_features(sequences):
    def process_sequence(seq):
        seq_hash = hash_sequence(seq)
        if seq_hash in embedding_cache:
            return embedding_cache[seq_hash]

        seq = ' '.join(list(seq))  
        encoded = protbert_tokenizer.batch_encode_plus(
            [seq], padding=True, truncation=True, max_length=512, return_tensors="pt"
        ).to(device)

        with torch.no_grad():
            output = protbert_model(**encoded).last_hidden_state.mean(dim=1).cpu().numpy().flatten()

        embedding_cache[seq_hash] = output  
        return output

    with ThreadPoolExecutor() as executor:
        embeddings = list(tqdm(executor.map(process_sequence, sequences), total=len(sequences), desc="Extracting ProtBERT Features"))

    return np.array(embeddings, dtype=np.float32)

In [None]:
# ✅ Load & Preprocess Data
base_folder = r"C:\\Users\\SURYA HA\\OneDrive\\Documents\\Prediction of Therapeutic Peptide using Deep Learning and DAA\\data\\Therapeutic Category Classification"
sequences, labels = load_data(base_folder)

# Check if we have data
if len(sequences) == 0:
    raise ValueError("No sequences were loaded. Please check your data files.")

print(f"Total sequences loaded: {len(sequences)}")
print(f"Unique categories: {len(set(labels))}")

# Extract features
protbert_features = extract_protbert_features(sequences)
scaler = StandardScaler()
X = scaler.fit_transform(protbert_features)
joblib.dump(scaler, "scaler 2.pkl")

label_encoder = LabelEncoder()
y = label_encoder.fit_transform(labels)
joblib.dump(label_encoder, "label_encoder.pkl")

In [None]:
# ✅ Feature Selection using PCA
pca = PCA(n_components=min(50, len(sequences) - 1))  # Ensure n_components is valid
X_pca = pca.fit_transform(X)
joblib.dump(pca, "pca_model.pkl")

# Split data
X_train, X_test, y_train, y_test = train_test_split(X_pca, y, test_size=0.2, random_state=42, stratify=y)
X_train = np.expand_dims(X_train, axis=-1)
X_test = np.expand_dims(X_test, axis=-1)

In [None]:
# ✅ CNN-LSTM Model (Efficient)
num_classes = len(set(labels))
input_shape = (X_train.shape[1], 1)

with tf.device('/GPU:0'):
    model = Sequential([
        Conv1D(32, 3, activation='relu', input_shape=input_shape),
        MaxPooling1D(2),
        LSTM(64),
        Dropout(0.3),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(loss='sparse_categorical_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])
    
    # Add class weights to handle imbalanced data
    class_weights = {}
    class_counts = np.bincount(y_train)
    total = len(y_train)
    for i in range(len(class_counts)):
        class_weights[i] = total / (len(class_counts) * class_counts[i])
    
    # Train with class weights
    history = model.fit(
        X_train, y_train, 
        epochs=40, 
        batch_size=32, 
        validation_data=(X_test, y_test),
        class_weight=class_weights
    )

model.save("model 2.h5")

In [None]:
# ✅ Accuracy Evaluation
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")

# Generate classification report
from sklearn.metrics import classification_report, confusion_matrix
y_pred = np.argmax(model.predict(X_test), axis=1)
print("\nClassification Report:")
report = classification_report(y_test, y_pred, target_names=label_encoder.classes_)
print(report)

In [None]:
# ✅ Plot confusion matrix
plt.figure(figsize=(10, 8))
cm = confusion_matrix(y_test, y_pred)
import seaborn as sns
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.tight_layout()
plt.savefig('confusion_matrix.png')
plt.show()

In [None]:
# ✅ Modified Confidence Adjustment with 0.3 threshold
def confidence_adjustment(prediction):
    max_prob = np.max(prediction)
    if max_prob < 0.3:  # Changed threshold from 0.7 to 0.3
        return "Unknown Category"
    return label_encoder.inverse_transform([np.argmax(prediction)])[0]