In [None]:
from google.colab import files

print("Please upload consumer_complaints.csv, config.py, and utils.py:")
uploaded = files.upload()

for filename in uploaded.keys():
    print(f"Uploaded: {filename}")

In [None]:
# Import the libraries

import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score, precision_score, recall_score
from transformers import (
    AutoTokenizer,
    TFAutoModelForSequenceClassification,
    create_optimizer,
)
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

import config
from utils import save_object, load_object
import os

In [None]:
# Check for GPU Availability
if tf.config.list_physical_devices('GPU'):
    print("GPU is available for TensorFlow! Training will be faster.")
else:
    print("WARNING: GPU not found. Training might be slow on CPU.")

In [None]:
# Data Loading and Initial Exploration
import csv

def load_and_explore_data(file_path: str, text_col: str, label_col: str):
    """Loads data, handles missing values, and performs initial exploration."""
    print(f"\nLoading data from {file_path}")
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Data file not found at: {file_path}")

    df = pd.read_csv(file_path, sep=None, engine='python', encoding='utf-8', on_bad_lines="skip", quotechar='"',
                 quoting=csv.QUOTE_ALL)

    print("\nInitial DataFrame Info:")
    df.info()

    # We first drop rows where the consumer_complaint_narrative is missing or where it's explicitly 'N/A'
    initial_rows = len(df)
    df.dropna(subset=[text_col], inplace=True)
    df = df[df[text_col].astype(str).str.strip() != '']
    df = df[df[text_col].astype(str).str.lower() != 'n/a']
    rows_after_dropna = len(df)
    print(f"\nDropped {initial_rows - rows_after_dropna} rows with missing or empty '{text_col}'.")

    # Basic data info - focus on relevant columns
    print(f"\nFirst 5 rows (relevant columns):")
    print(df[[text_col, label_col]].head())

    # Check label distribution
    print(f"\nLabel distribution for '{label_col}':")
    label_counts = df[label_col].value_counts()
    print(label_counts)
    print(f"Number of unique labels: {len(label_counts)}")

    if len(label_counts) < 2:
        raise ValueError("Dataset must have at least two unique labels for classification.")

    return df

df = load_and_explore_data(config.DATA_FILE_PATH, config.TEXT_COLUMN, config.LABEL_COLUMN)

In [None]:
# Data Preprocessing
def preprocess_data(df: pd.DataFrame, text_col: str, label_col: str):
    """
    Encodes labels and performs minimal text cleaning.
    """
    print("\n--- Preprocessing Data ---")

    # Label Encoding
    label_encoder = LabelEncoder()
    labels_encoded = label_encoder.fit_transform(df[label_col])
    print(f"Encoded labels: {label_encoder.classes_}")

    # Save label encoder
    os.makedirs(os.path.dirname(config.LABEL_ENCODER_PATH), exist_ok=True)
    save_object(label_encoder, config.LABEL_ENCODER_PATH)

    # Text Preprocessing - Ensure text is string type and strip whitespace, then convert to list
    texts = df[text_col].astype(str).apply(lambda x: x.strip()).tolist()

    return texts, labels_encoded, label_encoder

texts, labels_encoded, label_encoder = preprocess_data(df, config.TEXT_COLUMN, config.LABEL_COLUMN)

In [None]:
num_labels = len(label_encoder.classes_)
print(f"Detected {num_labels} unique labels.")

In [None]:
# Data spliting

def split_data(texts: list, labels: np.ndarray):
    """
    Splits data into training, validation, and test sets
    """
    print("\nSplitting data...")
    X_train, X_temp, y_train, y_temp = train_test_split(texts, labels, test_size=config.TEST_SIZE,
                                                        random_state=config.RANDOM_SEED, stratify=labels)

    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5,
                                                   random_state=config.RANDOM_SEED, stratify=y_temp)

    print(f"Train samples: {len(X_train)} (Labels: {len(y_train)})")
    print(f"Validation samples: {len(X_val)} (Labels: {len(y_val)})")
    print(f"Test samples: {len(X_test)} (Labels: {len(y_test)})")

    # We verify if the data is well stratified - this is because the data is highly imbalanced
    print("\nTrain Label Distribution:")
    unique_train, counts_train = np.unique(y_train, return_counts=True)
    print(dict(zip(unique_train, counts_train)))

    print("\nValidation Label Distribution:")
    unique_val, counts_val = np.unique(y_val, return_counts=True)
    print(dict(zip(unique_val, counts_val)))

    print("\nTest Label Distribution:")
    unique_test, counts_test = np.unique(y_test, return_counts=True)
    print(dict(zip(unique_test, counts_test)))

    return X_train, X_val, X_test, y_train, y_val, y_test

X_train, X_val, X_test, y_train, y_val, y_test = split_data(texts, labels_encoded)

In [None]:
# Tokenization

def tokenize_data(texts: list, tokenizer, max_len: int):
    """Tokenizes a list of texts using the provided tokenizer."""
    print(f"\n--- Tokenizing data with {config.MODEL_NAME} (max_length={max_len}) ---")
    encodings = tokenizer(
        texts,
        truncation=True,
        padding='max_length',
        max_length=max_len,
        return_tensors='tf'
    )
    print("Tokenization complete.")
    return encodings


tokenizer = AutoTokenizer.from_pretrained(config.MODEL_NAME)

train_encodings = tokenize_data(X_train, tokenizer, config.MAX_SEQUENCE_LENGTH)
val_encodings = tokenize_data(X_val, tokenizer, config.MAX_SEQUENCE_LENGTH)
test_encodings = tokenize_data(X_test, tokenizer, config.MAX_SEQUENCE_LENGTH)

In [None]:
# Create TensorFlow Datasets
# Convert tokenized data and labels into optimized TensorFlow tf.data.Dataset objects
# Shuffle only the training data for better generalization

def create_tf_dataset(encodings, labels, batch_size, shuffle: bool = False):
    """Creates a tf.data.Dataset from encodings and labels."""
    dataset = tf.data.Dataset.from_tensor_slices((dict(encodings), labels))
    if shuffle:
        dataset = dataset.shuffle(buffer_size=len(labels)) # Shuffle the entire dataset
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset


train_dataset_tf = create_tf_dataset(train_encodings, y_train, config.BATCH_SIZE_TRAIN, shuffle=True)
val_dataset_tf = create_tf_dataset(val_encodings, y_val, config.BATCH_SIZE_EVAL, shuffle=False)
test_dataset_tf = create_tf_dataset(test_encodings, y_test, config.BATCH_SIZE_EVAL, shuffle=False)

In [None]:
# Model Training

def calculate_evaluation_metrics(y_true, y_pred_logits):
    """
    Computes various evaluation metrics from true labels and model logits.
    """
    predictions = np.argmax(y_pred_logits, axis=-1)

    # Calculate metrics
    accuracy = accuracy_score(y_true, predictions)
    f1 = f1_score(y_true, predictions, average='weighted') # Use weighted for multi-class
    precision = precision_score(y_true, predictions, average='weighted', zero_division=0)
    recall = recall_score(y_true, predictions, average='weighted', zero_division=0)

    return {
        "accuracy": accuracy,
        "f1": f1,
        "precision": precision,
        "recall": recall,
        "predictions_encoded": predictions # Return predictions for report/matrix
    }

In [None]:
def train_model(train_dataset_tf, val_dataset_tf, num_labels: int):
    """
    Initializes and trains the sequence classification model using Keras's
    model.compile() and model.fit() methods.
    """
    print(f"\nInitializing and Training Model ({config.MODEL_NAME})")

    # Load model for sequence classification with the correct number of labels
    model = TFAutoModelForSequenceClassification.from_pretrained(
        config.MODEL_NAME, num_labels=num_labels
    )

    # Define optimizer, loss, and metrics for Keras compile
    optimizer = tf.keras.optimizers.Adam(learning_rate=config.LEARNING_RATE)
    # Use SparseCategoricalCrossentropy as labels are integers (0, 1, 2, ...)
    # from_logits=True because the model outputs raw logits, not probabilities
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

    metrics = [tf.keras.metrics.SparseCategoricalAccuracy('accuracy')]

    # Compile the model
    model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

    print("Starting training with model.fit()...")
    # Train the model using Keras's fit method
    history = model.fit(
        train_dataset_tf,
        epochs=config.NUM_TRAIN_EPOCHS,
        validation_data=val_dataset_tf
    )
    print("--- Model Training Complete ---")

    return model, history


model, history = train_model(train_dataset_tf, val_dataset_tf, num_labels)

In [None]:
# Model Evaluation
def evaluate_model(model, test_dataset_tf, y_true: np.ndarray, label_encoder):
    """
    Evaluates the trained model and performs predictions on the test set.
    Generates and displays classification report and confusion matrix.
    """
    print("\nEvaluating Model on Test Set")
    loss, accuracy = model.evaluate(test_dataset_tf)
    print(f"Test Set Evaluation Results: Loss: {loss:.4f}, Accuracy: {accuracy:.4f}")

    print("\nGenerating Predictions on Test Set")

    predictions_output = model.predict(test_dataset_tf)
    logits = predictions_output.logits if hasattr(predictions_output, 'logits') else predictions_output

    # Calculate detailed metrics using our custom function
    metrics_results = calculate_evaluation_metrics(y_true, logits)
    predicted_labels_encoded = metrics_results["predictions_encoded"]

    # Convert encoded labels back to original string labels for readability
    predicted_labels_decoded = label_encoder.inverse_transform(predicted_labels_encoded)
    true_labels_decoded = label_encoder.inverse_transform(y_true)

    print("\nClassification Report")
    # target_names are crucial for readability in the report
    print(classification_report(true_labels_decoded, predicted_labels_decoded,
                                target_names=label_encoder.classes_, zero_division=0))

    print("\nConfusion Matrix")
    cm = confusion_matrix(true_labels_decoded, predicted_labels_decoded,
                          labels=label_encoder.classes_)
    print(cm)

    # Visualize Confusion Matrix
    plt.figure(figsize=(max(8, len(label_encoder.classes_) * 0.8), max(6, len(label_encoder.classes_) * 0.6)))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.show()

    return predicted_labels_encoded, cm

predicted_labels_encoded, cm = evaluate_model(model, test_dataset_tf, y_test, label_encoder)

In [None]:
# Model Saving

print(f"\nSaving Model and Tokenizer to {config.TRAINED_MODEL_PATH}")
os.makedirs(config.TRAINED_MODEL_PATH, exist_ok=True) # Ensure directory exists
model.save_pretrained(config.TRAINED_MODEL_PATH)
tokenizer.save_pretrained(config.TRAINED_MODEL_PATH)
print("Model and Tokenizer saved.")

In [None]:
# Example of how the model can be used on new data

print("\nDemonstrating Model Inference")
try:
    # Load the saved model and tokenizer for making new predictions
    loaded_tokenizer = AutoTokenizer.from_pretrained(config.TRAINED_MODEL_PATH)
    loaded_model = TFAutoModelForSequenceClassification.from_pretrained(config.TRAINED_MODEL_PATH)
    loaded_label_encoder = load_object(config.LABEL_ENCODER_PATH)

    if loaded_model is not None and loaded_tokenizer is not None and loaded_label_encoder is not None:
        sample_complaints = [
            "My bank charged me an overdraft fee even though I had money in my account.",
            "I received multiple calls from a debt collector about a debt I already paid off.",
            "My credit report shows an account that I never opened, and it's hurting my score."
        ]

        print("\nSample Complaints for Inference:")
        for i, complaint in enumerate(sample_complaints):
            print(f"{i+1}. {complaint}")

        # Tokenize new input
        inference_encodings = loaded_tokenizer(
            sample_complaints,
            truncation=True,
            padding='max_length',
            max_length=config.MAX_SEQUENCE_LENGTH,
            return_tensors='tf'
        )

        # Make predictions
        tf_dataset_inference = tf.data.Dataset.from_tensor_slices(dict(inference_encodings)).batch(1)
        logits = loaded_model.predict(tf_dataset_inference).logits
        predictions_encoded = np.argmax(logits, axis=-1)

        # Decode predictions back to original product names
        predictions_decoded = loaded_label_encoder.inverse_transform(predictions_encoded)

        print("\nInference Results:")
        for complaint, prediction in zip(sample_complaints, predictions_decoded):
            print(f"Complaint: '{complaint}'\nPredicted Product: '{prediction}'\n")
    else:
        print("Could not load model, tokenizer, or label encoder for inference (one or more are None).")

except Exception as e:
    print(f"Error during inference demonstration: {e}")
    import traceback
    traceback.print_exc()

print("\n--- Project Finished ---")