In [None]:
import torch
from torch.optim import AdamW
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification, get_scheduler

import os
import optuna
import logging
from peft import get_peft_model, LoraConfig, TaskType  # LoRA integration
from sklearn.metrics import classification_report, accuracy_score, precision_score, recall_score, f1_score, confusion_matrix


In [None]:
# Configure logging for better debugging and production readiness
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


In [None]:
# Initialize TensorBoard writer
writer = SummaryWriter()


In [None]:
# Load a pre-trained sentiment analysis model from HuggingFace
MODEL_NAME = "distilbert-base-uncased-finetuned-sst-2-english"


In [None]:
# Define hyperparameters
BATCH_SIZE = 16
LEARNING_RATE = 5e-5
NUM_EPOCHS = 3


In [None]:
# Directory for saving checkpoints
CHECKPOINT_DIR = "model_checkpoints"
os.makedirs(CHECKPOINT_DIR, exist_ok=True)


In [None]:
# Function for hyperparameter tuning using Optuna
def objective(trial):
    learning_rate = trial.suggest_float("learning_rate", 1e-6, 1e-4, log=True)
    batch_size = trial.suggest_int("batch_size", 8, 32, step=8)

    tokenizer, model = load_model_and_tokenizer(MODEL_NAME)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    train_full = load_dataset("imdb", split="train")
    train_shuffled = train_full.shuffle(seed=42)
    train_dataset = train_shuffled.select(range(2000))
    train_dataset = preprocess_data(tokenizer, train_dataset)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    optimizer = AdamW(model.parameters(), lr=learning_rate)
    num_training_steps = NUM_EPOCHS * len(train_loader)
    lr_scheduler = get_scheduler("linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps)

    train_model(model, train_loader, optimizer, lr_scheduler, device)

    test_full = load_dataset("imdb", split="test")
    test_shuffled = test_full.shuffle(seed=42)
    test_dataset = test_shuffled.select(range(500))
    test_dataset = preprocess_data(tokenizer, test_dataset)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    predictions, labels = evaluate_model(model, test_loader, device)
    return f1_score(labels, predictions, average='binary')


In [None]:
# Load model and tokenizer
def load_model_and_tokenizer(model_name):
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(model_name)

    # Define the target modules for DistilBERT
    target_modules = ["q_lin", "k_lin", "v_lin", "out_lin"]  # Target Linear layers in attention
    # target_modules = ["pre_classifier", "classifier"]
    
    # Integrate LoRA configuration
    peft_config = LoraConfig(
        task_type=TaskType.SEQ_CLS,                   # Task type (Sequence Classification)
        inference_mode=False,                         # Enable training mode
        r=16,                                         # LoRA rank
        lora_alpha=32,                                # Scaling factor
        lora_dropout=0.05,                            # Dropout rate
        target_modules=target_modules                 # Specify target modules
    )
    model = get_peft_model(model, peft_config)
    logging.info("LoRA model initialized.")

    return tokenizer, model


In [None]:
# Preprocess data
def preprocess_data(tokenizer, dataset):
    def tokenize(batch):
        return tokenizer(batch['text'], padding=True, truncation=True, max_length=128)

    dataset = dataset.map(tokenize, batched=True)
    dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
    return dataset


In [None]:
# Train model
def train_model(model, train_loader, optimizer, lr_scheduler, device):
    model.train()
    for epoch in range(NUM_EPOCHS):
        logging.info(f"Starting epoch {epoch + 1}/{NUM_EPOCHS}")
        for i, batch in enumerate(train_loader):
            try:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)

                outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss

                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                optimizer.step()
                lr_scheduler.step()

                # Log training loss to TensorBoard
                writer.add_scalar("Loss/train", loss.item(), epoch * len(train_loader) + i)

            except Exception as e:
                logging.error(f"Error during training: {e}")

        # Save checkpoint
        checkpoint_path = os.path.join(CHECKPOINT_DIR, f"model_epoch_{epoch + 1}.pt")
        torch.save(model.state_dict(), checkpoint_path)
        logging.info(f"Checkpoint saved at {checkpoint_path}")

        logging.info(f"Epoch {epoch + 1} completed. Loss: {loss.item():.4f}")


In [None]:
# Evaluate model
def evaluate_model(model, data_loader, device):
    model.eval()
    predictions, labels = [], []
    with torch.no_grad():
        for batch in data_loader:
            try:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels.extend(batch['label'].tolist())

                outputs = model(input_ids, attention_mask=attention_mask)
                logits = outputs.logits
                predictions.extend(torch.argmax(logits, dim=-1).tolist())

            except Exception as e:
                logging.error(f"Error during evaluation: {e}")

    return predictions, labels


In [None]:
# Classify text
def classify_text(model, tokenizer, text, device):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128).to(device)
    model.eval()
    with torch.no_grad():
        try:
            outputs = model(**inputs)
            logits = outputs.logits
            predicted_class = torch.argmax(logits, dim=-1).item()
            probabilities = torch.softmax(logits, dim=-1).squeeze().tolist()
        except Exception as e:
            logging.error(f"Error during text classification: {e}")
            return None, None

    return predicted_class, probabilities


In [None]:
if __name__ == "__main__":
    tokenizer, model = load_model_and_tokenizer(MODEL_NAME)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # Hyperparameter tuning
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=10)

    logging.info("Best hyperparameters:")
    logging.info(study.best_params)

    # Load best hyperparameters
    best_learning_rate = study.best_params["learning_rate"]
    best_batch_size = study.best_params["batch_size"]

    train_full = load_dataset("imdb", split="train")
    train_shuffled = train_full.shuffle(seed=42)
    train_dataset = train_shuffled.select(range(2000))
    train_dataset = preprocess_data(tokenizer, train_dataset)
    train_loader = DataLoader(train_dataset, batch_size=best_batch_size, shuffle=True)

    test_full = load_dataset("imdb", split="test")
    test_shuffled = test_full.shuffle(seed=42)
    test_dataset = test_shuffled.select(range(500))
    test_dataset = preprocess_data(tokenizer, test_dataset)
    test_loader = DataLoader(test_dataset, batch_size=best_batch_size)

    optimizer = AdamW(model.parameters(), lr=best_learning_rate)
    num_training_steps = NUM_EPOCHS * len(train_loader)
    lr_scheduler = get_scheduler("linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps)

    logging.info("Starting fine-tuning the model.")
    train_model(model, train_loader, optimizer, lr_scheduler, device)

    logging.info("Evaluating the model.")
    predictions, labels = evaluate_model(model, test_loader, device)

    accuracy = accuracy_score(labels, predictions)
    precision = precision_score(labels, predictions, average='binary')
    recall = recall_score(labels, predictions, average='binary')
    f1 = f1_score(labels, predictions, average='binary')

    logging.info(f"Accuracy: {accuracy * 100:.2f}%")
    logging.info(f"Precision: {precision * 100:.2f}%")
    logging.info(f"Recall: {recall * 100:.2f}%")

    logging.info(f"F1 Score: {f1 * 100:.2f}%")
    logging.info("\nConfusion Matrix:")
    logging.info(confusion_matrix(labels, predictions))

    while True:
        text = input("\nEnter text for sentiment analysis (or type 'exit' to quit): ")
        if text.lower() == "exit":
            break

        predicted_class, probabilities = classify_text(model, tokenizer, text, device)
        sentiment = "Positive" if predicted_class == 1 else "Negative"

        print(f"Sentiment: {sentiment}")
        print(f"Probabilities: {probabilities}")

    writer.close()
