<a href="https://colab.research.google.com/github/MF884/Machine-Learning-Projects/blob/main/BERT_multilaber_classification_with_spacy_rag.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import spacy
import matplotlib.pyplot as plt
from annoy import AnnoyIndex
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import classification_report
from transformers import AutoTokenizer, AutoModelForSequenceClassification, get_linear_schedule_with_warmup
from torch.optim import AdamW
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Constants
MAX_LEN = 256
BATCH_SIZE = 16 if torch.cuda.is_available() else 8
EPOCHS = 10
LEARNING_RATE = 1e-5
THRESHOLD = 0.5

# Load spaCy model
nlp = spacy.load("en_core_web_sm")

class DataProcessor:
    def __init__(self):
        self.sentiment_keywords = {
            'positive': {'thanks': 2, 'great': 2, 'solved': 3, 'love': 2, '😊': 3},
            'negative': {'😡': 3, 'sucks': 3, 'hate': 3, 'broken': 2, 'painful': 2}
        }
        self.issue_keywords = {
            'technical': ['broken', 'crash', 'freezes', 'bug', 'error'],
            'service': ['delay', 'refund', 'callback', 'support'],
            'performance': ['battery', 'slow', 'speed', 'drains']
        }

    def preprocess_text(self, text):
        doc = nlp(text)
        return " ".join([token.lemma_ for token in doc if not token.is_stop])

    def load_and_validate_data(self, file_path):
        df = pd.read_csv(file_path)
        df['text'] = df['text'].str.replace(r'http\S+', '', regex=True)
        df = df.dropna(subset=['text'])
        df['text'] = df['text'].apply(self.preprocess_text)
        df['labels'] = df['text'].apply(self.generate_labels)
        assert len(df['text']) == len(df['labels']), "Text and labels length mismatch"
        return df

    def generate_labels(self, text):
        text_lower = text.lower()
        labels = []
        pos_score = sum(weight for word, weight in self.sentiment_keywords['positive'].items() if word in text_lower)
        neg_score = sum(weight for word, weight in self.sentiment_keywords['negative'].items() if word in text_lower)
        if pos_score > neg_score:
            labels.append('positive')
        elif neg_score > pos_score:
            labels.append('negative')
        else:
            labels.append('neutral')
        for issue_type, keywords in self.issue_keywords.items():
            if any(keyword in text_lower for keyword in keywords):
                labels.append(issue_type)
        return labels

class TweetDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts.reset_index(drop=True)
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        encoding = self.tokenizer.encode_plus(
            text, max_length=self.max_len, truncation=True, padding='max_length', return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.FloatTensor(label)
        }

def build_rag_index(texts):
    vector_dim = 768
    annoy_index = AnnoyIndex(vector_dim, 'angular')
    model = AutoModelForSequenceClassification.from_pretrained('roberta-base').to(device)
    tokenizer = AutoTokenizer.from_pretrained('roberta-base')

    for i, text in enumerate(texts):
        inputs = tokenizer(text, return_tensors='pt', truncation=True, padding='max_length', max_length=MAX_LEN).to(device)
        with torch.no_grad():
            embedding = model(**inputs).logits.cpu().numpy().flatten()
        annoy_index.add_item(i, embedding)

    annoy_index.build(10)  # Build index with 10 trees
    return annoy_index

def safe_train_test_split(texts, labels, test_size=0.2):
    return train_test_split(texts, labels, test_size=test_size, random_state=42, stratify=None)

def main():
    processor = DataProcessor()
    df = processor.load_and_validate_data('sample.csv')
    mlb = MultiLabelBinarizer()
    y = mlb.fit_transform(df['labels'])
    X_train, X_test, y_train, y_test = safe_train_test_split(df['text'], y)
    X_train, X_val, y_train, y_val = safe_train_test_split(X_train, y_train, test_size=0.25)
    tokenizer = AutoTokenizer.from_pretrained('roberta-base')
    train_dataset = TweetDataset(X_train, y_train, tokenizer, MAX_LEN)
    val_dataset = TweetDataset(X_val, y_val, tokenizer, MAX_LEN)
    test_dataset = TweetDataset(X_test, y_test, tokenizer, MAX_LEN)
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)
    model = AutoModelForSequenceClassification.from_pretrained(
        'roberta-base', num_labels=len(mlb.classes_), problem_type="multi_label_classification"
    ).to(device)
    optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)
    total_steps = len(train_loader) * EPOCHS
    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=int(0.1 * total_steps), num_training_steps=total_steps)
    loss_fn = nn.BCEWithLogitsLoss().to(device)
    rag_index = build_rag_index(df['text'])
    print("RAG Index built successfully")

    for epoch in range(EPOCHS):
        model.train()
        train_loss = 0
        for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}"):
            optimizer.zero_grad()
            inputs = {key: batch[key].to(device) for key in ['input_ids', 'attention_mask', 'labels']}
            outputs = model(**inputs)
            loss = loss_fn(outputs.logits, inputs['labels'])
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            scheduler.step()
            train_loss += loss.item()
        print(f"Epoch {epoch+1} | Train Loss: {train_loss/len(train_loader):.4f}")

if __name__ == "__main__":
    main()
