## Text Cleaning

In [None]:
import string, re, nltk
import spacy
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize, RegexpTokenizer
!python -m spacy download en_core_web_sm

# Converting to lowercase
def convert_to_lowercase(text):
    return text.lower()

# Removing whitespaces
def remove_whitespace(text):
    return text.strip()

# Remove Punctuation
def remove_punctuation(text):
    punct_str = string.punctuation # contains a predefined set of punctuation characters.
    punct_str = punct_str.replace("'", "") # discarding apostrophe from the string
    clean_text = "".join(char for char in text if char not in punct_str)
    return clean_text

# Remove stopwords
def remove_stopwords(text):
    regexp = RegexpTokenizer("[\w']+")
    
    stops = stopwords.words("english") # stopwords
    clean_text = " ".join([word for word in regexp.tokenize(text) if word not in stops])
    return clean_text

# Lemmatization
spacy_lemmatizer = spacy.load("en_core_web_sm", disable = ['parser', 'ner'])
def text_lemmatizer(text):
    text_spacy = " ".join([token.lemma_ for token in spacy_lemmatizer(text)])
    return text_spacy

def text_normalizer(text):
    text = convert_to_lowercase(text)
    text = remove_whitespace(text)
    text = remove_punctuation(text)
    text = remove_stopwords(text)
    text = text_lemmatizer(text) 
    return text

## TFIDF

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
TfidfVec = TfidfVectorizer()
X_train_tfidf = TfidfVec.fit_transform(X_train["description"])
X_val_tfidf = TfidfVec.transform(X_val["description"])
X_test_tfidf = TfidfVec.transform(X_test["description"])

model = RandomForestClassifier()
model.fit(X_train_tfidf, y_train)
y_test_pred, y_val_pred = model.predict(X_test_tfidf), model.predict(X_val_tfidf)

## LSTM

In [None]:
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras import layers

# Hyperparameters
vocab_size = 5000  # Maximum vocabulary size
max_length = 100  # Maximum sentence length
embedding_dim = 128  # Word embedding dimensions

# Tokenization
# Builds the vocabulary based on X_train. Assigns an index (number) to each word
tokenizer = Tokenizer(num_words=vocab_size, oov_token="<OOV>") # tensorflow tokeniser
tokenizer.fit_on_texts(X_train)

# Convert text to sequences
# This replaces words with their assigned indices (numbers).
# if X_train = ["I love this movie", "This movie is amazing"], 
# then X_train_seq = [[3, 4, 1, 2],  [1, 2, 5, 6]]
X_train_seq = tokenizer.texts_to_sequences(X_train)
X_val_seq = tokenizer.texts_to_sequences(X_val)
X_test_seq = tokenizer.texts_to_sequences(X_test)

# Pad sequences to ensure uniform input size
X_train_pad = pad_sequences(X_train_seq, maxlen=max_length, padding='post', truncating='post')
X_val_pad = pad_sequences(X_val_seq, maxlen=max_length, padding='post', truncating='post')
X_test_pad = pad_sequences(X_test_seq, maxlen=max_length, padding='post', truncating='post')

# Define LSTM Model
inputs = layers.Input(shape=(max_length,))
embedding = layers.Embedding(input_dim=vocab_size, output_dim=embedding_dim, 
                             input_length=max_length)(inputs)
x = layers.Bidirectional(layers.LSTM(64, return_sequences=True))(embedding)
x = layers.Dropout(0.5)(x)
x = layers.Bidirectional(layers.LSTM(32, return_sequences=False))(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(4, activation='softmax')(x)

# Compile Model
model = Model(inputs=inputs, outputs=outputs)
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the model
early_stopping = EarlyStopping(monitor='val_loss',  # Monitor validation loss
                               patience=3,          # Stop after 3 epochs of no improvement
                               restore_best_weights=True,  # Restore best weights
                               verbose=1)
history = model.fit(X_train_pad, y_train, epochs=20, batch_size=4, validation_data=(X_val_pad, y_val), 
                    callbacks=[early_stopping])

# Since softmax is used, the output will be a probability distribution across the 4 classes.
y_test_pred = model.predict(X_test_pad)
# To get the predicted class, take the argmax (index of the highest probability):
y_test_pred = np.argmax(y_test_pred, axis=1)

f1_macro = f1_score(y_test, y_test_pred, average='macro')
print(classification_report(y_test, y_test_pred))
conf_matrix = confusion_matrix(y_test, y_test_pred) # 
# tn, fp, fn, tp = conf_matrix.ravel()

## BERT Finetuning

In [None]:
from datasets import Dataset
# Convert to Hugging Face Dataset format
# X_train = list of descriptions, y_train = list of labels
train_dataset = Dataset.from_dict({"text": X_train, "label": y_train}) 
val_dataset = Dataset.from_dict({"text": X_test, "label": y_test})

from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

def preprocess_function(examples):
    return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=512)

# Tokenize datasets
tokenized_train = train_dataset.map(preprocess_function, batched=True)
tokenized_val = val_dataset.map(preprocess_function, batched=True)

from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer
model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=4)

for param in model.distilbert.parameters():
    param.requires_grad = False  # Freeze all DistilBERT layers

# Training arguments
training_args = TrainingArguments(output_dir="./results", evaluation_strategy="epoch", learning_rate=2e-5,
                                  per_device_train_batch_size=16, per_device_eval_batch_size=16, 
                                  num_train_epochs=1, load_best_model_at_end=True)
# Trainer
trainer = Trainer(model=model, args=training_args, 
                  train_dataset=tokenized_train, eval_dataset=tokenized_val,
                  tokenizer=tokenizer)
# Train model
trainer.train()

# Prediction
import torch
device = 'cpu'
model = AutoModelForSequenceClassification.from_pretrained("./results/checkpoint-1391").to(device).eval()

def predict(texts):
    inputs = tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors="pt").to(device)
    with torch.no_grad():
        logits = model(**inputs).logits
        return torch.argmax(logits, dim=-1).cpu().numpy()

## Finetune T5 LLM for Text Classification (LoRA - PEFT)

In [None]:
from datasets import Dataset
from transformers import AutoTokenizer, T5Tokenizer, T5ForConditionalGeneration, Trainer, TrainingArguments
from peft import LoraConfig, TaskType, get_peft_model

# Convert to Hugging Face Dataset format
train_dataset = Dataset.from_dict({"text": X_train, "label": y_train})
val_dataset = Dataset.from_dict({"text": X_test, "label": y_test})

# Load tokenizer
tokenizer = T5Tokenizer.from_pretrained("t5-small")

# Define a preprocessing function for T5 format
def preprocess_function(examples):
    # Format input texts with prefix
    inputs = ["classify category: " + text for text in examples["text"]]
    
    # Convert labels to strings - ensure they're simple strings
    # This is critical for T5 which expects text targets
    targets = [str(label) for label in examples["label"]]
    
    # Tokenize inputs
    model_inputs = tokenizer(inputs, max_length=128, padding="max_length", truncation=True)
    
    # Tokenize targets
    with tokenizer.as_target_tokenizer():
        labels = tokenizer(targets, max_length=8, padding="max_length", truncation=True,)
    
    # Set the labels explicitly
    model_inputs["labels"] = labels["input_ids"]
    
    return model_inputs

# Apply preprocessing and tokenization in one step
tokenized_train = train_dataset.map(preprocess_function, batched=True, 
                                    remove_columns=train_dataset.column_names)
tokenized_val = val_dataset.map(preprocess_function, batched=True, 
                                remove_columns=val_dataset.column_names)

# Ensure datasets have the right format
tokenized_train.set_format("torch")
tokenized_val.set_format("torch")

# Load the base model
model = T5ForConditionalGeneration.from_pretrained("t5-small")

# Define LoRA configuration
lora_config = LoraConfig( task_type=TaskType.SEQ_2_SEQ_LM,  inference_mode=False,  
                         r=8, lora_alpha=16, lora_dropout=0.1)

# Wrap model with LoRA
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

training_args = TrainingArguments(output_dir="./t5-sentiment-lora", evaluation_strategy="epoch", 
                                  learning_rate=5e-4,
                                  per_device_train_batch_size=16, per_device_eval_batch_size=16, 
                                  num_train_epochs=5)

trainer = Trainer(model=model, args=training_args, 
                  train_dataset=tokenized_train, eval_dataset=tokenized_val)

trainer.train()

def predict_sentiment(text):
    input_text = "classify category: " + text
    inputs = tokenizer(input_text, return_tensors="pt", padding=True, truncation=True, max_length=128)
    with torch.no_grad():
        output = model.generate(**inputs)
    return tokenizer.decode(output[0], skip_special_tokens=True)

# Test the model
print(predict_sentiment("This movie was amazing!"))
print(predict_sentiment("The food was terrible."))

## Train Test Split

In [None]:
from sklearn.model_selection import train_test_split

# Train-test split
X, y = df.drop(columns=['target']), df['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify = y, test_size=0.2, random_state=42)

print(X_train.shape, X_test.shape)
print(y_train.shape, y_test.shape)

## Data Pre Processing

In [None]:
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.feature_extraction.text import TfidfVectorizer

# Impute missing values
num_imputer = SimpleImputer(strategy='median')
cat_imputer = SimpleImputer(strategy='most_frequent')

# One-Hot Encoding with 'Other' category handling
ohe = OneHotEncoder(handle_unknown='infrequent_if_exist', min_frequency=0.01)  # Less frequent categories go to 'other'

# Standard Scaling
scaler = StandardScaler()

# TF-IDF Vectorization
tfidf = TfidfVectorizer(max_features=100)

# --------

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer

# Column Transformer
# Each transformation is defined as a tuple: ('name', transformer, columns)
preprocessor = ColumnTransformer([
    ('numerical_transformation', Pipeline([('imputer', num_imputer), ('scaler', scaler)]), ['num_1', 'num_2', 'num_3']),
    ('categorical_transformation', Pipeline([('imputer', cat_imputer), ('ohe', ohe)]), ['category']),
    ('text_transformation', tfidf, 'text')
])

# Apply transformations
X_train_transformed = preprocessor.fit_transform(X_train)
X_test_transformed = preprocessor.transform(X_test)

from imblearn.over_sampling import SMOTE
# Handle class imbalance using SMOTE
smote = SMOTE(random_state=42)
X_train_smote, y_train_smote = smote.fit_resample(X_train_transformed, y_train)

## Models

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score, classification_report

model = RandomForestClassifier(random_state=42)
model.fit(X_train_smote, y_train_smote)
pred = model.predict(X_test_transformed)

report = classification_report(y_test, pred)
f1_macro = f1_score(y_test, pred, average='macro')

## Neural Network

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, Input, Model
from tensorflow.keras.callbacks import EarlyStopping

input_shape = X_train_smote.shape[1]
output_shape = len(np.unique(y_train))

# NN Model
inputs = Input(shape=(input_shape,))

x = layers.Dense(64)(inputs)  # No activation yet
x = layers.BatchNormalization()(x)  # Normalize before activation
x = layers.ReLU()(x)  # Apply activation
x = layers.Dropout(0.2)(x)  # Dropout AFTER activation

x = layers.Dense(32, activation='relu')(x) # just to show that relu can also be applied here, but it is less efficient
x = layers.BatchNormalization()(x) 
x = layers.Dropout(0.2)(x)

outputs = layers.Dense(output_shape, activation='softmax')(x)
model = Model(inputs=inputs, outputs=outputs)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy' , metrics = ['accuracy'])
# model.compile(optimizer='adam', loss='mse' , metrics = ['mae']) # Fore regression

# Train model
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)              
history = model.fit(X_train_smote, y_train_smote_ids, epochs=3, batch_size=16, verbose=1, validation_split=0.2, 
                    callbacks=[early_stopping])

# Predictions; 
y_pred = model.predict(X_test_transformed)
y_pred = np.argmax(y_pred, axis=1)

## Mini Batch GD

In [None]:
import numpy as np

def MBGD(X, y, gradient_function, num_epochs, lr, batch_size):
    num_rows, num_cols = X.shape[0], X.shape[1]
    theta = np.random.randn(num_cols)

    indices = np.random.permutation(len(X)) 
    X, y = X[indices], y[indices]

    # no_improve_count = 0  # Counter for stopping patience
    # tol = 1e-2
    # patience = 5

    for e in range(num_epochs):
        for b in range(0, num_rows, batch_size):
            X_batch = X[b:b+batch_size]
            y_batch = y[b:b+batch_size].flatten()
            grad = gradient_function(X_batch, y_batch, theta)
            theta = theta - (lr * grad)

            # # **Stopping Criterion: Check gradient magnitude**
            # grad_norm = np.linalg.norm(grad)  # Compute L2 norm of gradient
            # if grad_norm < tol:
            #     no_improve_count += 1
            # else:
            #     no_improve_count = 0  # Reset if gradient is significant

            # # **Early stopping if gradient is too small for `patience` epochs**
            # if no_improve_count >= patience:
            #     print(f"Stopping early at epoch {e+1} due to small gradient updates.")
            #     return theta

    return theta

| Loss  | Loss Function (Matrix Form) | Gradient |
|-------|-----------------------------|------------------------------------------------------|
| **MSE** | $$\mathcal{L}_{MSE} = \frac{1}{N} \| Y - X\theta \|_2^2$$ | $$\frac{\partial \mathcal{L}_{MSE}}{\partial \theta} = -\frac{2}{N} X^T (Y - X\theta)$$ |
| **BCE** | $$\mathcal{L}_{BCE} = -\frac{1}{N} \left[ Y^T \log \sigma(X\theta) + (1 - Y)^T \log (1 - \sigma(X\theta)) \right]$$ | $$\frac{\partial \mathcal{L}_{BCE}}{\partial \theta} = -\frac{1}{N} X^T (Y - \sigma(X\theta))$$ |

In [None]:
def mse_grad(X, y, theta):
    n = len(y) 
    return (-2 / n) * X.T @ (y - (X @ theta))

X = np.random.randn(10000, 1)
noise = np.random.randn(10000, 1) * 0.2  # Add noise
y = 4 + 3*X + noise
X = np.column_stack([np.ones(10000), X])

MBGD(X, y, mse_grad, 1000, 0.01, 8)

In [None]:
def sigmoid(z):
    return 1 / (1 + np.exp(-z))

def bce_grad(X, y, theta):
    n = len(y) 
    return (-1 / n) * X.T @ (y - sigmoid(X @ theta))

X = np.random.randn(10000, 1)
noise = np.random.randn(10000, 1) * 0.2  # Add noise
y = sigmoid(4 + 3*X + noise) 
# y = (y > 0.5).astype(int)
X = np.column_stack([np.ones(10000), X])

MBGD(X, y, bce_grad, 50, 0.01, 32)

# Clustering

In [None]:
import numpy as np
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.metrics import silhouette_score

# Step 4: Apply Clustering Methods
def apply_clustering(model, X, name):
    clusters = model.fit_predict(X)
    score = silhouette_score(X, clusters) if len(set(clusters)) > 1 else -1
    print(f"{name}: Silhouette Score = {score:.4f}")
    return clusters

models = {
    "K-Means (k=3)": KMeans(n_clusters=3, random_state=42),
    "DBSCAN": DBSCAN(eps=1, min_samples=5),
    "Agglomerative": AgglomerativeClustering(n_clusters=3)
}

cluster_results = {name: apply_clustering(model, X_processed, name) for name, model in models.items()}


# Step 1: Try different values of k
inertia_values = []
k_values = range(1, 11)  # Testing k from 1 to 10

for k in k_values:
    kmeans = KMeans(n_clusters=k, random_state=42)
    kmeans.fit(X_processed)
    inertia_values.append(kmeans.inertia_)

# Step 2: Plot Elbow Curve
plt.figure(figsize=(8, 5))
plt.plot(k_values, inertia_values, marker='o', linestyle='-')