In [None]:
import torch

# Check if GPU is available
if torch.cuda.is_available():
    print(f"GPU is available: {torch.cuda.get_device_name(0)}")
else:
    print("GPU is not available.")


In [None]:
from huggingface_hub import login

In [None]:
import torch
import os
import wandb
import pandas as pd
import numpy as np
import seaborn as sns
from pathlib import Path
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, DataCollatorWithPadding, AdamW, get_scheduler, TrainingArguments
from sklearn.metrics import precision_recall_fscore_support, accuracy_score, confusion_matrix

In [None]:
def count_plot(y, title):
    sns.countplot(y)
    plt.title(f'Class dist in {title} set')
    plt.savefig(f'{title}.png')
    plt.show()

In [None]:
def plot_tsne(X, y, title):
    X_tsne = TSNE(n_components=2, n_iter = 2000 , init='random').fit_transform(X)

    plt.scatter(X_tsne[:, 0], X_tsne[:, 1], c=y)
    plt.title(f'TSNE - {title} data 2D')
    plt.legend(title)
    plt.savefig(f'{title}.png')
    plt.show()    


In [None]:
def plot_pca(X, y, title):
    # create a PCA object with 2 components
    pca = PCA(n_components=2)

    # fit the PCA object to X and transform X
    X_pca = pca.fit_transform(X)

    # plot the PCA transform of X colored by y
    plt.scatter(X_pca[:, 0], X_pca[:, 1], c=y)
    plt.xlabel('Principal Component 1')
    plt.ylabel('Principal Component 2')
    plt.savefig(f'{title}.png')
    plt.show()

In [None]:
class HF_dataset(torch.utils.data.Dataset):
    def __init__(self, input_ids, attention_masks, labels):
        self.input_ids = input_ids
        self.attention_masks = attention_masks
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        return {
            "input_ids": torch.tensor(self.input_ids[index]),
            "attention_mask": torch.tensor(self.attention_masks[index]),
            "labels": torch.tensor(self.labels[index]),
        }

def val_dataset_generator(
    tokenizer,
    kmer_size,
    val_dir,
    max_len=512,
):
    for file in os.listdir(val_dir):
        df_test = pd.read_csv(f"{val_dir}/{file}")
        print(file, len(df_test))
        val_kmers, labels_val = [], []

        cls = (
            "CLASS" if "CLASS" in df_test.columns else "Class"
        )  # Ensure to handle both column variations
        
        for seq, label in zip(df_test["SEQ"], df_test[cls]):
            # Filter invalid sequences
            if not is_dna_sequence(seq):
                continue

            # K-mer generation and label adjustment
            kmer_seq = return_kmer(seq, K=kmer_size)
            val_kmers.append(kmer_seq)
            labels_val.append(label - 1)

        # Tokenization and padding strategy
        val_encodings = tokenizer.batch_encode_plus(
            val_kmers,
            max_length=max_len,
            padding='max_length',  # Ensures all sequences have the same length
            truncation=True,  # Truncate sequences longer than max length
            return_attention_mask=True, 
            return_tensors="pt",  # Return PyTorch tensors
        )

        # Create dataset
        val_dataset = HF_dataset(
            val_encodings["input_ids"], val_encodings["attention_mask"], labels_val
        )
        yield val_dataset

def return_kmer(seq, K=4):
    # Efficient k-mer generation with validation
    kmer_list = []
    for x in range(len(seq) - K + 1):  # Slide a window of size K
        kmer = seq[x : x + K]
        if is_dna_sequence(kmer):  # Ensure valid k-mers
            kmer_list.append(kmer)

    kmer_seq = " ".join(kmer_list)
    return kmer_seq

def is_dna_sequence(sequence):
    valid_bases = {"A", "C", "G", "T"}
    return all(base in valid_bases for base in sequence.upper())



In [None]:
# Hugging Face login
hf_token = "hf_PC...r"
login(token=hf_token)

# Define model name and K-mer length
model_name = 'Priyasi/Pretrain-virusmodel_4'
KMER = 4

# Load training data
df_training = pd.read_csv("Trainingdata.csv")

# Placeholder for K-mer function
def return_kmer(sequence, K=4):
    return [sequence[i:i+K] for i in range(len(sequence) - K + 1)]

# Preprocess training data
train_kmers, labels_train = [], []
for seq, label in zip(df_training["SEQ"], df_training["CLASS"]):
    kmer_seq = return_kmer(seq, K=KMER)
    train_kmers.append(' '.join(kmer_seq))  # Joining K-mers into a single string
    labels_train.append(label - 1)

# Define number of classes
NUM_CLASSES = len(np.unique(labels_train))

# Define model configuration
model_config = {
    "model_path": f"{model_name}",  # Use original model name
    "num_classes": NUM_CLASSES,
}


# Load the pre-trained model
model = AutoModelForSequenceClassification.from_pretrained(
    model_config["model_path"],
    num_labels=NUM_CLASSES  # Number of output labels
)


# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(
    model_config["model_path"], do_lower_case=False, use_auth_token=hf_token
)


# Encode sequences
SEQ_MAX_LEN = 512  # max len of BERT
train_encodings = tokenizer.batch_encode_plus(
    train_kmers,
    max_length=SEQ_MAX_LEN,
    padding=True,
    truncation=True,
    return_attention_mask=True,
    return_tensors="pt"
)

# Placeholder for HF_dataset
# Assuming HF_dataset is a custom Dataset class
train_dataset = HF_dataset(
    train_encodings["input_ids"], train_encodings["attention_mask"], labels_train
)


In [None]:
df_val = pd.read_csv("Testdata.csv")  # Load validation data

KMER=4
val_kmers, labels_val = [], []
for seq, label in zip(df_val["SEQ"], df_val["CLASS"]):
    kmer_seq = return_kmer(seq, K=KMER)
    val_kmers.append(' '.join(kmer_seq))  # Convert K-mers list to a single string
    labels_val.append(label - 1)

count_plot(labels_val, "Validation Class Distribution")

# Encode the validation sequences
val_encodings = tokenizer.batch_encode_plus(
    val_kmers,
    max_length=SEQ_MAX_LEN,
    padding=True,  # pad to max len
    truncation=True,  # truncate to max len
    return_attention_mask=True,
    return_tensors="pt",  # return PyTorch tensors
)

# Create the validation dataset
val_dataset = HF_dataset(
    val_encodings["input_ids"], val_encodings["attention_mask"], labels_val
)


In [None]:
# Create the results directory if it doesn't exist
results_dir = Path("./results/virusidentification-4/")
results_dir.mkdir(parents=True, exist_ok=True)

# Initialize wandb for logging the training process
wandb.init(project="DNA_bert", name=model_config["model_path"])
wandb.config.update(model_config)

# Training configuration
EPOCHS = 10  # Increasing epochs for better convergence
BATCH_SIZE = 4
GRADIENT_ACCUMULATION_STEPS = 8  # To simulate a larger batch size without requiring more memory
LEARNING_RATE = 1e-05  # Try lowering learning rate for fine-tuning

# Define training arguments with updates for better accuracy
training_args = TrainingArguments(
    output_dir=results_dir / "check-points",  # output directory for checkpoints
    num_train_epochs=EPOCHS,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    warmup_steps=500,  # number of warmup steps for learning rate scheduler
    weight_decay=0.02,  # L2 regularization to reduce overfitting
    logging_steps=60,  # log metrics every 60 steps
    load_best_model_at_end=True,  # Load the best model at the end of training
    evaluation_strategy="epoch",  # Evaluate at the end of each epoch
    save_strategy="epoch",  # Save model at the end of each epoch
    gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS,  # Simulate larger batch size
    metric_for_best_model="accuracy",  # Track best model based on accuracy
    logging_dir=str(results_dir / "logs"),  # Log directory
)

# Initialize the data collator (helps with dynamic padding)
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

# Initialize the optimizer (AdamW with a lower learning rate for fine-tuning)
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)

# Initialize a learning rate scheduler
scheduler = get_scheduler(
    "linear", optimizer=optimizer, num_warmup_steps=500, num_training_steps=EPOCHS * len(train_dataset) // BATCH_SIZE
)

# Define custom metrics for accuracy, precision, recall, and F1
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average="weighted")
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc, "f1": f1, "precision": precision, "recall": recall}


In [None]:
# Initialize the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,  # Use custom metrics
    optimizers=(optimizer, scheduler),  # Pass both optimizer and scheduler
    data_collator=data_collator,
)

# Train the model
trainer.train()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score, average_precision_score, accuracy_score, roc_curve, precision_recall_curve, matthews_corrcoef
from sklearn.preprocessing import label_binarize

# Evaluate the model to get predictions and labels
predictions = trainer.predict(val_dataset)

# Extract logits and labels
logits = predictions.predictions
labels = predictions.label_ids

# Convert logits to predicted labels
predicted_labels = np.argmax(logits, axis=-1)

# Calculate accuracy
accuracy = accuracy_score(labels, predicted_labels)
print(f"Accuracy: {accuracy}")

In [None]:
# Assume `labels` and `trainer` are defined earlier in your code
# Number of classes
num_classes = len(set(labels))

# Binarize the labels for multi-class classification (One-vs-Rest approach)
binarized_labels = label_binarize(labels, classes=range(num_classes))
# Calculate AUC-ROC (One-vs-Rest, weighted average)
roc_auc = roc_auc_score(binarized_labels, logits, multi_class="ovr", average="weighted")
print(f"AUC-ROC: {roc_auc}")

# Calculate AUC-PR (One-vs-Rest approach)
auc_pr = average_precision_score(binarized_labels, logits, average="weighted")
print(f"AUC-PR: {auc_pr}")

In [None]:
# Calculate MCC
mcc = matthews_corrcoef(labels, predicted_labels)
print(f"MCC: {mcc}")

In [None]:
from sklearn.metrics import log_loss

# Calculate Log Loss
log_loss_value = log_loss(binarized_labels, logits)
print(f"Log Loss: {log_loss_value:.2f}")

In [None]:
from sklearn.metrics import cohen_kappa_score

# Calculate Cohen's Kappa
kappa = cohen_kappa_score(labels, predicted_labels)
print(f"Cohen's Kappa: {kappa:.2f}")

In [None]:
from sklearn.manifold import TSNE
tsne = TSNE(n_components=2, random_state=42)
X_embedded = tsne.fit_transform(logits)

# Plot t-SNE embedding
plt.figure(figsize=(6, 6))
for i in range(num_classes):
    plt.scatter(X_embedded[labels == i, 0], X_embedded[labels == i, 1], label=f'Class {i}')
plt.title('t-SNE Embedding of Logits')
plt.xlabel('t-SNE Feature 1')
plt.ylabel('t-SNE Feature 2')
plt.legend()
plt.grid(True)
plt.savefig('tsne_embedding4.png')
plt.show()

In [None]:
# Save the model
trainer.save_model("./virus_identification_4")
tokenizer.save_pretrained("./virus_identificationtoken_4")