<a href="https://colab.research.google.com/github/cammylexi/CS2341-Assignment-3/blob/main/McPhaul_Llanes_7.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Preparation



https://www.kaggle.com/datasets/sunilthite/text-document-classification-dataset

In [None]:
def clear_memory():
    """Clear memory to prevent OOM errors"""
    gc.collect()
    tf.keras.backend.clear_session()

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("lakshmi25npathi/imdb-dataset-of-50k-movie-reviews")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/imdb-dataset-of-50k-movie-reviews


In [None]:
import pandas as pd

df = pd.read_csv(path + "/IMDB Dataset.csv")

df.head()

Unnamed: 0,review,sentiment
0,One of the other reviewers has mentioned that ...,positive
1,A wonderful little production. <br /><br />The...,positive
2,I thought this was a wonderful way to spend ti...,positive
3,Basically there's a family where a little boy ...,negative
4,"Petter Mattei's ""Love in the Time of Money"" is...",positive


## Class Variables
Binary Classification Task


*   Sentiment
 *  Positive
 *  Negative

In [None]:
# Change 'positive' to 1 and 'negative' to 0 in the 'sentiment' column
df['sentiment'] = df['sentiment'].map({'positive': 1, 'negative': 0})

df.head()

Unnamed: 0,review,sentiment
0,One of the other reviewers has mentioned that ...,1
1,A wonderful little production. <br /><br />The...,1
2,I thought this was a wonderful way to spend ti...,1
3,Basically there's a family where a little boy ...,0
4,"Petter Mattei's ""Love in the Time of Money"" is...",1


In [None]:
# Display dataset information
print(f"Dataset shape: {df.shape}")
print(f"Class distribution: {df['sentiment'].value_counts(normalize=True)}")
print(f"Sample reviews:\n{df['review'].head()}")

# Check for missing values
print(f"Missing values:\n{df.isnull().sum()}")

Dataset shape: (50000, 2)
Class distribution: sentiment
1    0.5
0    0.5
Name: proportion, dtype: float64
Sample reviews:
0    One of the other reviewers has mentioned that ...
1    A wonderful little production. <br /><br />The...
2    I thought this was a wonderful way to spend ti...
3    Basically there's a family where a little boy ...
4    Petter Mattei's "Love in the Time of Money" is...
Name: review, dtype: object
Missing values:
review       0
sentiment    0
dtype: int64


#Tokenization and Sequence Preparation:

For this task, I've chosen to use the BERT tokenizer which employs WordPiece tokenization. This method breaks words into subword units, which helps handle out-of-vocabulary words effectively. The maximum sequence length is set to 256 tokens, which balances:

Coverage: Most movie reviews fit within this length
Information Preservation: Key sentiment indicators are typically distributed throughout the text
Computational Efficiency: Keeps memory requirements manageable

In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import BertTokenizer, TFBertModel, TFAutoModelForSequenceClassification
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, f1_score
import gc

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Process data in batches to avoid memory issues
def preprocess_in_batches(texts, batch_size=1000, max_length=256):
    """Process text data in batches to avoid memory issues"""
    all_input_ids = []
    all_attention_masks = []

    # Process in batches
    for i in range(0, len(texts), batch_size):
        batch_texts = texts[i:i+batch_size].tolist()

        # Tokenize batch
        encoded = tokenizer(
            batch_texts,
            padding='max_length',
            truncation=True,
            max_length=max_length,
            return_tensors='tf',
            return_attention_mask=True,
            return_token_type_ids=False
        )

        # Append results
        all_input_ids.append(encoded['input_ids'])
        all_attention_masks.append(encoded['attention_mask'])

        # Clear some memory
        if i % (batch_size * 5) == 0:
            clear_memory()
            print(f"Processed {i}/{len(texts)} samples")

    # Concatenate all batches
    input_ids = tf.concat(all_input_ids, axis=0)
    attention_masks = tf.concat(all_attention_masks, axis=0)

    return {'input_ids': input_ids, 'attention_mask': attention_masks}

# Preprocess the texts using batching
encoded_inputs = preprocess_in_batches(df['review'])

# Visualization of sequence lengths before truncation (to justify max_length choice)
def analyze_sequence_lengths():
    """Analyze and visualize the sequence lengths in the dataset"""
    lengths = []
    sample_size = min(5000, len(df))  # Use a sample to save memory

    for i in range(sample_size):
        tokens = tokenizer.tokenize(df['review'].iloc[i])
        lengths.append(len(tokens))

    plt.figure(figsize=(10, 6))
    plt.hist(lengths, bins=50)
    plt.axvline(x=256, color='r', linestyle='--', label='Max length (256)')
    plt.xlabel('Sequence Length')
    plt.ylabel('Count')
    plt.title('Distribution of Review Lengths (in tokens)')
    plt.legend()
    plt.show()

    print(f"Mean sequence length: {np.mean(lengths):.2f}")
    print(f"Median sequence length: {np.median(lengths):.2f}")
    print(f"95th percentile length: {np.percentile(lengths, 95):.2f}")
    print(f"Percentage of sequences truncated: {sum(l > 256 for l in lengths) / len(lengths) * 100:.2f}%")

analyze_sequence_lengths()


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

Processed 0/50000 samples
Processed 5000/50000 samples
Processed 10000/50000 samples


## Evaluation  Metrics


For this sentiment analysis task, we have chose the following evaluation metrics to access the model's performance.

F1-Score

ROC-AUC Score

Confusion Matrix Visualization

In [None]:
# F1 score metric for TensorFlow
def f1_score_metric(y_true, y_pred):
    """Custom F1 score implementation for TensorFlow"""
    # Convert logits to predictions
    y_pred = tf.math.sigmoid(y_pred)
    y_pred = tf.cast(y_pred > 0.5, tf.float32)

    # Calculate precision and recall
    true_positives = tf.reduce_sum(y_true * y_pred)
    predicted_positives = tf.reduce_sum(y_pred)
    actual_positives = tf.reduce_sum(y_true)

    precision = true_positives / (predicted_positives + tf.keras.backend.epsilon())
    recall = true_positives / (actual_positives + tf.keras.backend.epsilon())

    # Calculate F1 score
    f1 = 2 * (precision * recall) / (precision + recall + tf.keras.backend.epsilon())
    return f1

# Function to evaluate and visualize metrics
def evaluate_model(model, test_dataset, y_test):
    """Evaluate model performance with multiple metrics"""
    # Get predictions
    predictions = model.predict(test_dataset)
    y_pred_proba = tf.nn.softmax(predictions.logits, axis=1)[:, 1]
    y_pred = tf.math.argmax(predictions.logits, axis=1)

    # Calculate metrics
    auc = roc_auc_score(y_test, y_pred_proba)
    f1 = f1_score(y_test, y_pred)

    # Generate classification report
    report = classification_report(y_test, y_pred)

    # Create confusion matrix
    cm = confusion_matrix(y_test, y_pred)

    # Display results
    print(f"ROC-AUC Score: {auc:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"Classification Report:\n{report}")

    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=['Negative', 'Positive'],
                yticklabels=['Negative', 'Positive'])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()

    return {
        'auc': auc,
        'f1': f1,
        'y_pred': y_pred,
        'y_pred_proba': y_pred_proba
    }

## Stratified Ten Fold Cross Validation

In [None]:
from sklearn.model_selection import StratifiedKFold
import numpy as np

# Define the stratified k-fold cross-validation
n_splits = 10
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

# Prepare data arrays
X = encoded_inputs['input_ids'].numpy()  # Tokenized input IDs
attention_masks = encoded_inputs['attention_mask'].numpy()  # Attention masks
y = df['sentiment'].values  # Target labels

# Lists to store metrics for each fold
fold_metrics = []

# Implement cross-validation
for fold, (train_idx, test_idx) in enumerate(skf.split(X, y)):
    print(f"\n--- Fold {fold+1}/{n_splits} ---")

    # Split data
    X_train, X_test = X[train_idx], X[test_idx]
    mask_train, mask_test = attention_masks[train_idx], attention_masks[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]

    # Verify stratification (should be close to original distribution)
    print(f"Training set class distribution: {np.bincount(y_train) / len(y_train)}")
    print(f"Testing set class distribution: {np.bincount(y_test) / len(y_test)}")



    # For demonstration purposes, we'll just show how to create the datasets
    train_dataset = tf.data.Dataset.from_tensor_slices((
        {'input_ids': X_train, 'attention_mask': mask_train},
        y_train
    )).shuffle(10000).batch(32)

    test_dataset = tf.data.Dataset.from_tensor_slices((
        {'input_ids': X_test, 'attention_mask': mask_test},
        y_test
    )).batch(32)



#Modeling

## Transformer-based Model (BERT)




In [None]:
# Create Transformer-based model (BERT)
def create_transformer_model(learning_rate=2e-5, dropout_rate=0.1):
    """Create a BERT-based transformer model for classification"""
    # Load base BERT model
    base_model = TFBertModel.from_pretrained('bert-base-uncased')

    # Input layers
    input_ids = tf.keras.layers.Input(shape=(256,), dtype=tf.int32, name='input_ids')
    attention_mask = tf.keras.layers.Input(shape=(256,), dtype=tf.int32, name='attention_mask')

    # Get BERT embeddings
    embeddings = base_model(input_ids, attention_mask=attention_mask)[0]

    # Use CLS token embedding for classification
    cls_token = embeddings[:, 0, :]

    # Add dropout for regularization
    x = tf.keras.layers.Dropout(dropout_rate)(cls_token)

    # Output layer
    outputs = tf.keras.layers.Dense(1, activation=None)(x)

    # Create and compile model
    model = tf.keras.Model(inputs=[input_ids, attention_mask], outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
        metrics=['accuracy', tf.keras.metrics.AUC(), f1_score_metric]
    )

    return model

# Create CNN-based model
def create_cnn_model(learning_rate=1e-3, dropout_rate=0.2):
    """Create a CNN-based model with embedding layer"""
    # Input layers
    input_ids = tf.keras.layers.Input(shape=(256,), dtype=tf.int32, name='input_ids')
    attention_mask = tf.keras.layers.Input(shape=(256,), dtype=tf.int32, name='attention_mask')

    # Load pre-trained embeddings from BERT
    bert_model = TFBertModel.from_pretrained('bert-base-uncased')

    # Get embeddings (freezing the embedding layer)
    bert_model.trainable = False
    embedding_layer = bert_model.get_layer('embeddings')
    word_embeddings = embedding_layer.word_embeddings

    # Apply embedding layer
    x = word_embeddings(input_ids)

    # Apply mask
    mask = tf.expand_dims(tf.cast(attention_mask, tf.float32), axis=-1)
    x = x * mask

    # 1D Convolutional layers with different filter sizes
    conv_blocks = []
    for filter_size in [3, 4, 5]:
        conv = tf.keras.layers.Conv1D(
            filters=128,
            kernel_size=filter_size,
            padding='valid',
            activation='relu',
            strides=1
        )(x)
        conv = tf.keras.layers.GlobalMaxPooling1D()(conv)
        conv_blocks.append(conv)

    # Concatenate convolutional outputs
    concatenated = tf.keras.layers.Concatenate()(conv_blocks)

    # Add dropout
    x = tf.keras.layers.Dropout(dropout_rate)(concatenated)

    # Dense layers
    x = tf.keras.layers.Dense(128, activation='relu')(x)
    x = tf.keras.layers.Dropout(dropout_rate)(x)

    # Output layer
    outputs = tf.keras.layers.Dense(1, activation=None)(x)

    # Create and compile model
    model = tf.keras.Model(inputs=[input_ids, attention_mask], outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
        metrics=['accuracy', tf.keras.metrics.AUC(), f1_score_metric]
    )

    return model

# Function to train model with early stopping and learning rate reduction
def train_model(model, train_dataset, validation_dataset, model_name, fold):
    """Train model with early stopping and learning rate reduction"""
    # Callbacks
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=2,
        restore_best_weights=True
    )

    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=1,
        min_lr=1e-6
    )

    # Checkpoint callback to save model progress
    checkpoint = tf.keras.callbacks.ModelCheckpoint(
        f'{model_name}_fold{fold}.h5',
        monitor='val_loss',
        save_best_only=True
    )

    # Train model
    history = model.fit(
        train_dataset,
        epochs=10,  # Reduced for demonstration
        validation_data=validation_dataset,
        callbacks=[early_stopping, reduce_lr, checkpoint]
    )

    # Plot training history
    plt.figure(figsize=(12, 4))

    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title(f'{model_name} - Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    # Plot metrics
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title(f'{model_name} - Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()

    # Clear memory
    clear_memory()

    return history

# Hyperparameter combinations for the two architectures
transformer_configs = [
    {'learning_rate': 2e-5, 'dropout_rate': 0.1},
    {'learning_rate': 5e-5, 'dropout_rate': 0.1},
    {'learning_rate': 2e-5, 'dropout_rate': 0.2},
    {'learning_rate': 5e-5, 'dropout_rate': 0.2}
]

cnn_configs = [
    {'learning_rate': 1e-3, 'dropout_rate': 0.2},
    {'learning_rate': 5e-4, 'dropout_rate': 0.2},
    {'learning_rate': 1e-3, 'dropout_rate': 0.5},
    {'learning_rate': 5e-4, 'dropout_rate': 0.5}
]

# Training loop (training multiple models across folds)
def train_all_models(fold_results):
    """Train multiple models across folds"""
    all_results = {
        'transformer': [],
        'cnn': []
    }

    # Limited to first fold for demonstration
    fold_data = fold_results[0]
    train_dataset = fold_data['train_dataset']
    test_dataset = fold_data['test_dataset']
    y_test = fold_data['y_test']
    fold = fold_data['fold']

    # Train transformer models
    for i, config in enumerate(transformer_configs):
        print(f"\nTraining Transformer Model {i+1} (Fold {fold})")
        print(f"Parameters: {config}")

        model = create_transformer_model(**config)
        model_name = f"transformer_{config['learning_rate']}_{config['dropout_rate']}"

        history = train_model(model, train_dataset, test_dataset, model_name, fold)
        results = evaluate_model(model, test_dataset, y_test)

        all_results['transformer'].append({
            'config': config,
            'history': history.history,
            'metrics': results,
            'model_name': model_name
        })

        # Clear memory before next model
        tf.keras.backend.clear_session()
        gc.collect()

    # Train CNN models
    for i, config in enumerate(cnn_configs):
        print(f"\nTraining CNN Model {i+1} (Fold {fold})")
        print(f"Parameters: {config}")

        model = create_cnn_model(**config)
        model_name = f"cnn_{config['learning_rate']}_{config['dropout_rate']}"

        history = train_model(model, train_dataset, test_dataset, model_name, fold)
        results = evaluate_model(model, test_dataset, y_test)

        all_results['cnn'].append({
            'config': config,
            'history': history.history,
            'metrics': results,
            'model_name': model_name
        })

        # Clear memory before next model
        tf.keras.backend.clear_session()
        gc.collect()

    return all_results

#Exceptional Work