#Model Evaluation

In [46]:
print("📦 Installing required packages...")
!pip -q install shap transformers peft torch scikit-learn matplotlib seaborn

from __future__ import annotations
import os
import json
import math
import warnings
import pickle
from pathlib import Path
from typing import Dict, List, Optional

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModel
from peft import LoraConfig, get_peft_model, TaskType
from tqdm import tqdm

import matplotlib
matplotlib.use("agg")  # Non-interactive backend for safe file saving
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics import classification_report, confusion_matrix
from sklearn.inspection import permutation_importance

# Other utilities
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Optional SHAP
try:
    import shap  # noqa: F401
    _HAS_SHAP = True
except Exception:
    _HAS_SHAP = False

# Set style for plots
plt.style.use('default')
sns.set_palette("husl")

print("✅ All packages imported successfully!")

📦 Installing required packages...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ All packages imported successfully!


In [47]:
# ============================
# Configuration
# ============================
print("⚙️ Setting up configuration...")

CONFIG = {
    'base_path': '/content/drive/MyDrive/Tiktok_Hackaton',  # Adjust this path
    'device': torch.device('cuda' if torch.cuda.is_available() else 'cpu'),
    'batch_size': 32,
    'max_length': 512,
    'random_state': 42
}

# Set random seeds for reproducibility
np.random.seed(CONFIG['random_state'])
torch.manual_seed(CONFIG['random_state'])
if torch.cuda.is_available():
    torch.cuda.manual_seed(CONFIG['random_state'])

print(f"✅ Configuration set. Using device: {CONFIG['device']}")

⚙️ Setting up configuration...
✅ Configuration set. Using device: cuda


In [48]:
# ============================
# Model Classes (from training code)
# ============================
print("🏗️ Setting up model classes...")

class HybridClassificationModel(nn.Module):
    def __init__(self, num_labels, categorical_feature_dim, continuous_feature_dim,
                 categorical_vocab_sizes=None, dropout_rate=0.1, class_weights=None):
        super(HybridClassificationModel, self).__init__()

        # Store class weights as buffer (handles device movement automatically)
        if class_weights is not None:
            self.register_buffer('class_weights', class_weights)
        else:
            self.register_buffer('class_weights', None)

        # Text branch (DistilBERT with LoRA)
        self.distilbert = AutoModel.from_pretrained('distilbert-base-uncased')

        lora_config = LoraConfig(
            r=8,
            lora_alpha=16,
            target_modules=["q_lin", "k_lin", "v_lin"],
            lora_dropout=0.05,
            bias="none",
            task_type=TaskType.FEATURE_EXTRACTION
        )
        self.distilbert = get_peft_model(self.distilbert, lora_config)

        distilbert_hidden_size = self.distilbert.config.hidden_size

        # Dynamic categorical embeddings
        if categorical_vocab_sizes is None:
            categorical_vocab_sizes = [5, 7, 24, 10]

        self.categorical_embeddings = nn.ModuleList([
            nn.Embedding(vocab_size, min(50, vocab_size // 2 + 10))
            for vocab_size in categorical_vocab_sizes
        ])

        total_cat_embed_dim = sum(emb.embedding_dim for emb in self.categorical_embeddings)

        self.categorical_ffn = nn.Sequential(
            nn.Linear(total_cat_embed_dim, distilbert_hidden_size),
            nn.BatchNorm1d(distilbert_hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout_rate)
        )

        self.continuous_ffn = nn.Sequential(
            nn.Linear(continuous_feature_dim, distilbert_hidden_size // 2),
            nn.BatchNorm1d(distilbert_hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(distilbert_hidden_size // 2, distilbert_hidden_size),
            nn.BatchNorm1d(distilbert_hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout_rate)
        )

        combined_size = distilbert_hidden_size * 3
        self.classifier = nn.Sequential(
            nn.Linear(combined_size, distilbert_hidden_size),
            nn.BatchNorm1d(distilbert_hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(distilbert_hidden_size, distilbert_hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(distilbert_hidden_size // 2, num_labels)
        )

        self._init_weights()

    def _init_weights(self):
        for module in self.modules():
            if isinstance(module, nn.Linear):
                nn.init.xavier_uniform_(module.weight)
                if module.bias is not None:
                    nn.init.zeros_(module.bias)
            elif isinstance(module, nn.Embedding):
                nn.init.xavier_uniform_(module.weight)

    def forward(self, input_ids, attention_mask, categorical_features, continuous_features, labels=None):
        # Text branch
        text_outputs = self.distilbert(input_ids=input_ids, attention_mask=attention_mask)
        text_hidden = text_outputs.last_hidden_state[:, 0, :]

        # Categorical branch
        cat_embeddings = []
        for i, embedding_layer in enumerate(self.categorical_embeddings):
            feature_embedded = embedding_layer(categorical_features[:, i])
            cat_embeddings.append(feature_embedded)

        cat_combined = torch.cat(cat_embeddings, dim=-1)
        cat_processed = self.categorical_ffn(cat_combined)

        # Continuous branch
        cont_processed = self.continuous_ffn(continuous_features)

        # Combine all branches
        combined_features = torch.cat((text_hidden, cat_processed, cont_processed), dim=-1)
        logits = self.classifier(combined_features)

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss(
                weight=self.class_weights,
                label_smoothing=0.1
            )
            loss = loss_fct(logits, labels)

        return {"loss": loss, "logits": logits}

🏗️ Setting up model classes...


In [49]:
# ============================
# Model Wrapper
# ============================

class HybridModelWrapper:
    """Wrapper to load and use the trained HybridClassificationModel for evaluation"""

    def __init__(self, model_path, device=None):
        self.model_path = model_path

        if device is None:
            self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        else:
            self.device = device

        # Load model artifacts
        self._load_model_artifacts()

        # Initialize and load the model
        self._initialize_model()

        print(f"✅ Hybrid model loaded successfully on device: {self.device}")

    def _load_model_artifacts(self):
        """Load all model artifacts"""
        # Load hybrid model data
        hybrid_data_path = os.path.join(self.model_path, 'hybrid_model.bin')
        if not os.path.exists(hybrid_data_path):
            raise FileNotFoundError(f"Model file not found: {hybrid_data_path}")

        self.hybrid_data = torch.load(hybrid_data_path, map_location='cpu')

        # Load training metadata
        metadata_path = os.path.join(self.model_path, 'training_metadata.json')
        if os.path.exists(metadata_path):
            with open(metadata_path, 'r') as f:
                self.training_metadata = json.load(f)
        else:
            self.training_metadata = {}

        # Extract configuration and metadata
        self.config = self.hybrid_data['config']
        self.feature_metadata = self.hybrid_data['feature_metadata']
        self.categorical_vocab_sizes = self.hybrid_data['categorical_vocab_sizes']

        # Load class weights if available
        self.class_weights = None
        if 'class_weights' in self.hybrid_data and self.hybrid_data['class_weights'] is not None:
            self.class_weights = torch.tensor(self.hybrid_data['class_weights'], dtype=torch.float32)

        # Load tokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_path)
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

    def _initialize_model(self):
        """Initialize and load the hybrid model"""
        # Get dimensions from metadata
        categorical_dim = self.feature_metadata['categorical_features']['feature_dim']
        continuous_dim = self.feature_metadata['continuous_features']['feature_dim']

        # Create model instance
        self.model = HybridClassificationModel(
            num_labels=self.config['num_labels'],
            categorical_feature_dim=categorical_dim,
            continuous_feature_dim=continuous_dim,
            categorical_vocab_sizes=self.categorical_vocab_sizes,
            dropout_rate=self.config.get('regularization', {}).get('dropout_rate', 0.1),
            class_weights=self.class_weights
        )

        # Load model weights
        model_state_dict = self.hybrid_data['full_model_state_dict'].copy()

        # Remove class_weights from state dict if it exists
        if 'class_weights' in model_state_dict:
            del model_state_dict['class_weights']

        # Load state dict
        self.model.load_state_dict(model_state_dict, strict=False)

        # Move to device and set to eval mode
        self.model.to(self.device)
        self.model.eval()

    def prepare_features(self, texts, all_features_df):
        """Extract and prepare features for the given texts"""
        if isinstance(texts, str):
            texts = [texts]

        # Get feature names
        cat_feature_names = self.feature_metadata['categorical_features']['feature_names']
        cont_feature_names = self.feature_metadata['continuous_features']['feature_names']

        # Find matching features for each text
        categorical_data = []
        continuous_data = []

        for text in texts:
            # Find matching row in features DataFrame
            matching_rows = all_features_df[all_features_df['review_text'] == text]

            if not matching_rows.empty:
                row = matching_rows.iloc[0]

                # Extract categorical features
                cat_features = []
                for i, col in enumerate(cat_feature_names):
                    if col in row:
                        val = int(row[col])
                        vocab_size = self.categorical_vocab_sizes[i]
                        val = max(0, min(val, vocab_size - 1))  # Clip to valid range
                        cat_features.append(val)
                    else:
                        cat_features.append(0)

                # Extract continuous features
                cont_features = []
                for col in cont_feature_names:
                    if col in row:
                        val = float(row[col]) if not pd.isna(row[col]) else 0.0
                        cont_features.append(val)
                    else:
                        cont_features.append(0.0)

            else:
                # No matching row found - use default values
                cat_features = [0] * len(cat_feature_names)
                cont_features = [0.0] * len(cont_feature_names)

            categorical_data.append(cat_features)
            continuous_data.append(cont_features)

        # Convert to tensors
        categorical_tensor = torch.tensor(categorical_data, dtype=torch.long).to(self.device)
        continuous_tensor = torch.tensor(continuous_data, dtype=torch.float32).to(self.device)

        return categorical_tensor, continuous_tensor

    def predict(self, texts, all_features_df, batch_size=32):
        """Make predictions on texts and return both predictions and probabilities"""
        if isinstance(texts, str):
            texts = [texts]

        all_predictions = []
        all_probabilities = []

        # Process in batches
        for i in tqdm(range(0, len(texts), batch_size), desc="Making predictions"):
            batch_texts = texts[i:i + batch_size]

            # Tokenize texts
            encoded = self.tokenizer(
                batch_texts,
                padding=True,
                truncation=True,
                max_length=self.config.get('max_length', 512),
                return_tensors='pt'
            ).to(self.device)

            # Prepare features
            categorical_features, continuous_features = self.prepare_features(batch_texts, all_features_df)

            # Forward pass
            with torch.no_grad():
                outputs = self.model(
                    input_ids=encoded['input_ids'],
                    attention_mask=encoded['attention_mask'],
                    categorical_features=categorical_features,
                    continuous_features=continuous_features
                )

            # Get predictions and probabilities
            logits = outputs['logits']
            probabilities = torch.softmax(logits, dim=-1)
            predictions = torch.argmax(probabilities, dim=-1)

            all_predictions.extend(predictions.cpu().numpy())
            all_probabilities.extend(probabilities.cpu().numpy())

        return np.array(all_predictions), np.array(all_probabilities)

    def get_label_names(self):
        """Get human-readable label names"""
        label_mapping = self.feature_metadata.get('label_mapping', {})
        if label_mapping:
            # Reverse mapping (value -> key)
            return [k for k, v in sorted(label_mapping.items(), key=lambda x: x[1])]
        else:
            return [f"Class_{i}" for i in range(self.config['num_labels'])]

In [50]:
# ============================
# Evaluation Utilities
# ============================

def _ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)

def _safe_proba_to_confidence(y_proba: Optional[np.ndarray], y_pred: np.ndarray) -> np.ndarray:
    """Return confidence for predicted class or NaN if not available."""
    if y_proba is None:
        return np.full(shape=(len(y_pred),), fill_value=np.nan, dtype=float)
    if y_proba.ndim == 1:  # binary shortcut (prob of class 1)
        return np.where(y_pred == 1, y_proba, 1.0 - y_proba)
    idx = np.arange(y_proba.shape[0])
    return y_proba[idx, y_pred]

def _one_vs_rest_confusion(y_true: np.ndarray, y_pred: np.ndarray, positive_label: int) -> np.ndarray:
    """2x2 confusion: [[TN, FP], [FN, TP]] for a one-vs-rest view."""
    y_true_bin = (y_true == positive_label).astype(int)
    y_pred_bin = (y_pred == positive_label).astype(int)
    return confusion_matrix(y_true_bin, y_pred_bin, labels=[0, 1])

def per_class_metrics(y_true: np.ndarray, y_pred: np.ndarray, labels: List[str]) -> Dict:
    """Return sklearn classification_report as a dict."""
    rep = classification_report(
        y_true,
        y_pred,
        target_names=labels,
        output_dict=True,
        zero_division=0
    )
    return rep

def plot_confusion_matrix_per_policy(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    labels: List[str],
    out_dir: str
) -> Dict[str, str]:
    """Save overall confusion matrix + one-vs-rest matrices. Return dict of file paths."""
    _ensure_dir(out_dir)
    paths = {}

    # Overall multiclass CM
    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(labels))))
    fig, ax = plt.subplots(figsize=(8, 6))
    im = ax.imshow(cm, interpolation="nearest", cmap='Blues')
    ax.set_title("Confusion Matrix - Overall", fontsize=14, fontweight='bold')
    ax.set_xlabel("Predicted", fontsize=12)
    ax.set_ylabel("True", fontsize=12)
    ax.set_xticks(range(len(labels)))
    ax.set_yticks(range(len(labels)))
    ax.set_xticklabels(labels, rotation=45, ha="right")
    ax.set_yticklabels(labels)

    # Add text annotations
    for (i, j), v in np.ndenumerate(cm):
        ax.text(j, i, str(v), ha="center", va="center", fontsize=10,
                color="white" if v > cm.max()/2 else "black")

    fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
    overall_pth = os.path.join(out_dir, "confusion_overall.png")
    plt.tight_layout()
    plt.savefig(overall_pth, dpi=300, bbox_inches='tight')
    plt.close(fig)
    paths["overall"] = overall_pth

    # One-vs-rest per class
    for idx, lbl in enumerate(labels):
        cm_bin = _one_vs_rest_confusion(y_true, y_pred, positive_label=idx)
        fig, ax = plt.subplots(figsize=(5, 4))
        im = ax.imshow(cm_bin, interpolation="nearest", cmap='Blues')
        ax.set_title(f"One-vs-Rest: {lbl}", fontsize=12, fontweight='bold')
        ax.set_xticks([0, 1])
        ax.set_yticks([0, 1])
        ax.set_xticklabels(["Other", lbl], rotation=0)
        ax.set_yticklabels(["Other", lbl])

        for (i, j), v in np.ndenumerate(cm_bin):
            ax.text(j, i, str(v), ha="center", va="center", fontsize=10,
                    color="white" if v > cm_bin.max()/2 else "black")

        fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
        pth = os.path.join(out_dir, f"confusion_{lbl.replace('/', '_')}.png")
        plt.tight_layout()
        plt.savefig(pth, dpi=300, bbox_inches='tight')
        plt.close(fig)
        paths[lbl] = pth

    return paths

In [51]:
def analyse_feature_contributions(
    model: HybridModelWrapper,
    texts: List[str],
    features_df: pd.DataFrame,
    feature_names: List[str],
    out_dir: str,
    random_state: int = 7,
    max_features: int = 30,
    use_shap: bool = False,
) -> Dict:
    """
    Analyze feature contributions for hybrid model.
    """
    _ensure_dir(out_dir)
    result = {"summary_table": None, "plots": {}}

    try:
        # Get predictions to understand model behavior
        y_pred, y_proba = model.predict(texts, features_df)

        # Sample a subset for analysis
        sample_n = min(100, len(texts))
        rng = np.random.RandomState(random_state)
        sample_indices = rng.choice(len(texts), sample_n, replace=False)
        sample_texts = [texts[i] for i in sample_indices]
        sample_proba = y_proba[sample_indices]

        # Calculate basic statistics
        pred_variance = np.var(sample_proba, axis=0)
        pred_mean = np.mean(sample_proba, axis=0)

        # Create a simple importance measure
        class_names = model.get_label_names()
        importance_data = []

        for i, class_name in enumerate(class_names):
            importance_data.append({
                'feature': f'Class_{i}_{class_name}_variance',
                'importance': pred_variance[i]
            })
            importance_data.append({
                'feature': f'Class_{i}_{class_name}_mean_prob',
                'importance': pred_mean[i]
            })

        # Convert to DataFrame and save
        k = min(max_features, len(importance_data))
        fi_df = pd.DataFrame(importance_data).sort_values(
            "importance", ascending=False
        ).head(k).reset_index(drop=True)

        table_pth = os.path.join(out_dir, "hybrid_model_analysis.csv")
        fi_df.to_csv(table_pth, index=False)

        # Create a visualization
        fig, ax = plt.subplots(figsize=(10, max(6, 0.4 * len(fi_df))))
        bars = ax.barh(fi_df["feature"][::-1], fi_df["importance"][::-1],
                       color='skyblue', alpha=0.7)
        ax.set_title("Hybrid Model Analysis (Prediction Statistics)",
                     fontsize=14, fontweight='bold')
        ax.set_xlabel("Value", fontsize=12)
        ax.grid(axis='x', alpha=0.3)

        # Add value labels on bars
        for bar in bars:
            width = bar.get_width()
            ax.text(width + width*0.01, bar.get_y() + bar.get_height()/2,
                    f'{width:.3f}', ha='left', va='center', fontsize=8)

        plt.tight_layout()
        bar_pth = os.path.join(out_dir, "hybrid_model_analysis.png")
        plt.savefig(bar_pth, dpi=300, bbox_inches='tight')
        plt.close(fig)

        result["summary_table"] = fi_df.to_dict(orient="records")
        result["plots"]["bar"] = bar_pth

    except Exception as e:
        warnings.warn(f"Hybrid model analysis failed: {e}")

    return result

def examine_misclassified_samples(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    labels: List[str],
    y_proba: Optional[np.ndarray] = None,
    texts: Optional[List[str]] = None,
    top_n: int = 50
) -> pd.DataFrame:
    """Return a table of misclassified rows, with confidence and margin if available."""
    idx_mis = np.where(y_true != y_pred)[0]
    if idx_mis.size == 0:
        return pd.DataFrame(columns=["index", "true", "pred", "pred_conf", "margin", "text"])

    pred_conf = _safe_proba_to_confidence(y_proba, y_pred)
    pred_conf_sel = pred_conf[idx_mis]

    margin = np.full_like(pred_conf_sel, fill_value=np.nan, dtype=float)
    if y_proba is not None and y_proba.ndim == 2:
        rows = idx_mis
        y_proba_rows = y_proba[rows]
        preds = y_pred[rows]
        best = y_proba_rows[np.arange(len(rows)), preds]
        y_proba_masked = y_proba_rows.copy()
        y_proba_masked[np.arange(len(rows)), preds] = -np.inf
        second = np.max(y_proba_masked, axis=1)
        margin = best - second

    recs = []
    for i, conf, m in zip(idx_mis, pred_conf_sel, margin):
        recs.append({
            "index": int(i),
            "true": labels[int(y_true[i])],
            "pred": labels[int(y_pred[i])],
            "pred_conf": None if np.isnan(conf) else float(conf),
            "margin": None if np.isnan(m) else float(m),
            "text": (texts[i] if texts is not None else None)
        })
    df = pd.DataFrame(recs).sort_values(["pred_conf"], ascending=[False], na_position="last").head(top_n)
    return df

In [52]:
def calculate_false_positive_costs(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    cost_matrix: pd.DataFrame,
    labels: List[str]
) -> Dict:
    """
    cost_matrix[row=true, col=pred] = unit cost of that error.
    Index and columns must exactly match labels.
    """
    if list(cost_matrix.index) != labels or list(cost_matrix.columns) != labels:
        raise ValueError("Cost matrix index/columns must exactly match labels order.")

    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(labels))))
    total_cost = 0.0
    breakdown = []
    for i_true, true_lbl in enumerate(labels):
        for j_pred, pred_lbl in enumerate(labels):
            n = int(cm[i_true, j_pred])
            unit_cost = float(cost_matrix.loc[true_lbl, pred_lbl])
            c = n * unit_cost
            if n > 0:
                breakdown.append({
                    "true": true_lbl,
                    "pred": pred_lbl,
                    "count": n,
                    "unit_cost": unit_cost,
                    "cost": c
                })
            total_cost += c

    return {"total_cost": total_cost, "breakdown": breakdown}

In [53]:
def evaluate_hybrid_model(
    model_path: str,
    y_true: np.ndarray,
    texts: List[str],
    features_df: pd.DataFrame,
    labels: List[str] = None,
    cost_matrix: Optional[pd.DataFrame] = None,
    out_dir: str = "/content/reports"
) -> Dict:
    """
    Run full evaluation suite for hybrid model and save artifacts in out_dir.
    Returns a dictionary for quick logging.
    """
    _ensure_dir(out_dir)

    # Load the hybrid model
    print("🤖 Loading hybrid model...")
    model = HybridModelWrapper(model_path, device=CONFIG['device'])

    # Get label names from model if not provided
    if labels is None:
        labels = model.get_label_names()

    print(f"🏷️  Using labels: {labels}")

    # Get predictions
    print("🔮 Making predictions...")
    y_pred, y_proba = model.predict(texts, features_df, batch_size=CONFIG['batch_size'])

    # 1) Per-class metrics
    print("📊 Computing metrics...")
    report = per_class_metrics(y_true, y_pred, labels)
    with open(os.path.join(out_dir, "classification_report.json"), "w") as f:
        json.dump(report, f, indent=2)

    # 2) Confusion matrices
    print("📈 Creating confusion matrices...")
    cm_dir = os.path.join(out_dir, "confusion_matrices")
    cm_paths = plot_confusion_matrix_per_policy(y_true, y_pred, labels, out_dir=cm_dir)

    # 3) Feature importance (simplified for hybrid model)
    print("🔍 Analyzing feature contributions...")
    fi_result = None
    try:
        fi_dir = os.path.join(out_dir, "feature_importance")
        fi_result = analyse_feature_contributions(
            model, texts, features_df, labels, out_dir=fi_dir
        )
    except Exception as e:
        warnings.warn(f"Feature importance analysis failed: {e}")

    # 4) Error analysis
    print("🔬 Examining misclassified samples...")
    err_df = examine_misclassified_samples(
        y_true=y_true, y_pred=y_pred, labels=labels, y_proba=y_proba, texts=texts, top_n=100
    )
    err_path = os.path.join(out_dir, "misclassified_top.csv")
    err_df.to_csv(err_path, index=False)

    # 5) Business impact (optional)
    business = None
    if cost_matrix is not None:
        print("💰 Calculating business impact...")
        business = calculate_false_positive_costs(y_true, y_pred, cost_matrix, labels=labels)
        with open(os.path.join(out_dir, "business_impact.json"), "w") as f:
            json.dump(business, f, indent=2)

    # Print summary
    print("\n" + "="*60)
    print("📋 EVALUATION SUMMARY")
    print("="*60)
    print(f"✅ Accuracy: {report['accuracy']:.4f}")
    print(f"📊 Macro F1: {report['macro avg']['f1-score']:.4f}")
    print(f"⚖️  Weighted F1: {report['weighted avg']['f1-score']:.4f}")

    print("\n📈 Per-Class Performance:")
    for label in labels:
        if label in report:
            print(f"   {label}:")
            print(f"     Precision: {report[label]['precision']:.4f}")
            print(f"     Recall:    {report[label]['recall']:.4f}")
            print(f"     F1-Score:  {report[label]['f1-score']:.4f}")

    return {
        "model": model,
        "y_pred": y_pred,
        "y_proba": y_proba,
        "per_class_metrics": report,
        "confusion_matrices": cm_paths,
        "feature_importance": fi_result,
        "error_analysis_csv": err_path,
        "business_impact_analysis": business,
        "out_dir": out_dir
    }

In [57]:
print("📦 Loading packaged test data...")

# Define paths (adjust these to match your actual paths)
BASE_DIR = os.path.join(CONFIG['base_path'], 'preprocessed_data')
PROCESSED_TEST_FILE = os.path.join(BASE_DIR, "processed_test.pkl")
METADATA_FILE = os.path.join(BASE_DIR, "feature_metadata.json")
ENCODERS_FILE = os.path.join(BASE_DIR, "encoders.pkl")

try:
    #Load test dataset
    with open(PROCESSED_TEST_FILE, "rb") as f:
        test_data = pickle.load(f)

    # Load metadata
    with open(METADATA_FILE, "r") as f:
        metadata = json.load(f)

    # Load encoders
    with open(ENCODERS_FILE, "rb") as f:
        encoders_dict = pickle.load(f)

  # Load metadata
    with open(METADATA_FILE, "r") as f:
        metadata = json.load(f)

    # Load encoders
    with open(ENCODERS_FILE, "rb") as f:
        encoders_dict = pickle.load(f)

    print(f"✅ Loaded test data: {len(test_data['labels'])} samples")

    # Extract test components
    test_texts = test_data['review_text']
    test_labels = np.array(test_data['labels'])  # Already encoded labels

    # Create features DataFrame for the hybrid model
    # The hybrid model expects a DataFrame with both categorical and continuous features
    test_features_df = pd.DataFrame()

    # Add review text for matching (needed by HybridModelWrapper.prepare_features)
    test_features_df['review_text'] = test_texts

    # Add categorical features
    categorical_features = np.array(test_data['categorical_features'])
    for i, col_name in enumerate(metadata['categorical_features']['feature_names']):
        test_features_df[col_name] = categorical_features[:, i]

    # Add continuous features
    continuous_features = np.array(test_data['continuous_features'])
    for i, col_name in enumerate(metadata['continuous_features']['feature_names']):
        test_features_df[col_name] = continuous_features[:, i]

    # Get class names from metadata
    class_names = metadata['labels']['class_names']

    print(f"📊 Test dataset shape: {len(test_texts)} samples")
    print(f"🏷️  Classes: {class_names}")
    print(f"📈 Label distribution:")
    unique, counts = np.unique(test_labels, return_counts=True)
    for label_idx, count in zip(unique, counts):
        print(f"   {class_names[label_idx]}: {count} samples ({count/len(test_labels)*100:.1f}%)")

    print("✅ Test data loaded successfully!")

except Exception as e:
    print(f"❌ Error loading test data: {e}")


📦 Loading packaged test data...
✅ Loaded test data: 502 samples
📊 Test dataset shape: 502 samples
🏷️  Classes: ['Spam/Advertisement', 'Irrelevant Content', 'Rant/Complaint (without visit)', 'Relevant and Quality']
📈 Label distribution:
   Spam/Advertisement: 1 samples (0.2%)
   Irrelevant Content: 123 samples (24.5%)
   Rant/Complaint (without visit): 2 samples (0.4%)
   Relevant and Quality: 376 samples (74.9%)
✅ Test data loaded successfully!


In [58]:
print("🤖 Loading trained hybrid model...")
TRAINED_MODEL_PATH = os.path.join(CONFIG['base_path'], 'final_model_enhanced')

try:
    # Load the HybridModelWrapper - corrected initialization
    classifier = HybridModelWrapper(TRAINED_MODEL_PATH, device=CONFIG['device'])
    print("✅ Successfully loaded hybrid model with custom wrapper")
except Exception as e:
    print(f"❌ Error loading hybrid model: {e}")
    raise

🤖 Loading trained hybrid model...
✅ Hybrid model loaded successfully on device: cuda
✅ Successfully loaded hybrid model with custom wrapper


In [60]:
# Optional: Create a cost matrix for business impact analysis
print("💰 Setting up cost matrix for business impact analysis...")

# Define costs for misclassification (adjust these values based on your business needs)
# Higher values = more costly errors
cost_data = [
    #     spam_ad  irrelevant  rant_without_visit  relevant_and_quality
    [0.0,      2.0,        1.5,               5.0],  # true: spam_ad
    [1.0,      0.0,        1.0,               3.0],  # true: irrelevant
    [2.0,      1.5,        0.0,               4.0],  # true: rant_without_visit
    [8.0,      5.0,        6.0,               0.0]   # true: relevant_and_quality
]

cost_matrix = pd.DataFrame(
    cost_data,
    index=class_names,
    columns=class_names
)

print("💸 Cost Matrix (rows=true, cols=predicted):")
print(cost_matrix)
print("\nNote: Higher values indicate more costly misclassification errors")

💰 Setting up cost matrix for business impact analysis...
💸 Cost Matrix (rows=true, cols=predicted):
                                Spam/Advertisement  Irrelevant Content  \
Spam/Advertisement                             0.0                 2.0   
Irrelevant Content                             1.0                 0.0   
Rant/Complaint (without visit)                 2.0                 1.5   
Relevant and Quality                           8.0                 5.0   

                                Rant/Complaint (without visit)  \
Spam/Advertisement                                         1.5   
Irrelevant Content                                         1.0   
Rant/Complaint (without visit)                             0.0   
Relevant and Quality                                       6.0   

                                Relevant and Quality  
Spam/Advertisement                               5.0  
Irrelevant Content                               3.0  
Rant/Complaint (without visit)   

In [61]:
# Run the comprehensive evaluation
print("\n🚀 Starting comprehensive model evaluation...")
print("="*60)

try:
    # Create output directory with timestamp
    from datetime import datetime
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir = os.path.join(CONFIG['base_path'], f"evaluation_reports_{timestamp}")

    result = evaluate_hybrid_model(
        model_path=TRAINED_MODEL_PATH,
        y_true=test_labels,
        texts=test_texts,
        features_df=test_features_df,
        labels=class_names,
        cost_matrix=cost_matrix,  # Comment this out if you don't want business impact analysis
        out_dir=output_dir
    )

    print("\n🎉 Evaluation completed successfully!")
    print(f"📁 All reports saved to: {result['out_dir']}")

    # Additional detailed analysis
    print("\n" + "="*60)
    print("📊 DETAILED RESULTS ANALYSIS")
    print("="*60)

    # Show confusion matrix in text format
    from sklearn.metrics import confusion_matrix
    cm = confusion_matrix(test_labels, result['y_pred'])

    print(f"\n📈 Confusion Matrix:")
    print(f"{'':>15} " + " ".join(f"{name[:8]:>8}" for name in class_names))
    for i, true_label in enumerate(class_names):
        row = f"{true_label[:15]:>15} "
        row += " ".join(f"{cm[i,j]:>8}" for j in range(len(class_names)))
        print(row)

    # Show top misclassified examples
    if os.path.exists(result['error_analysis_csv']):
        error_df = pd.read_csv(result['error_analysis_csv'])
        if not error_df.empty:
            print(f"\n🔍 Top 5 Misclassified Examples:")
            for idx, row in error_df.head(5).iterrows():
                print(f"\n  Example {idx+1}:")
                print(f"    True: {row['true']} | Predicted: {row['pred']}")
                print(f"    Confidence: {row['pred_conf']:.3f}")
                if pd.notna(row['text']) and len(str(row['text'])) > 0:
                    text_preview = str(row['text'])[:100] + "..." if len(str(row['text'])) > 100 else str(row['text'])
                    print(f"    Text: {text_preview}")

    # Business impact summary
    if result['business_impact_analysis']:
        total_cost = result['business_impact_analysis']['total_cost']
        print(f"\n💰 Business Impact Analysis:")
        print(f"    Total misclassification cost: {total_cost:.2f}")

        # Show top 3 most costly error types
        breakdown = result['business_impact_analysis']['breakdown']
        if breakdown:
            sorted_breakdown = sorted(breakdown, key=lambda x: x['cost'], reverse=True)
            print(f"    Top 3 most costly error types:")
            for i, error in enumerate(sorted_breakdown[:3], 1):
                print(f"      {i}. {error['true']} → {error['pred']}: "
                      f"{error['count']} cases, cost = {error['cost']:.2f}")

except Exception as e:
    print(f"❌ Error during evaluation: {e}")
    import traceback
    traceback.print_exc()
    raise


🚀 Starting comprehensive model evaluation...
🤖 Loading hybrid model...
✅ Hybrid model loaded successfully on device: cuda
🏷️  Using labels: ['Spam/Advertisement', 'Irrelevant Content', 'Rant/Complaint (without visit)', 'Relevant and Quality']
🔮 Making predictions...


Making predictions: 100%|██████████| 16/16 [00:02<00:00,  7.83it/s]


📊 Computing metrics...
📈 Creating confusion matrices...
🔍 Analyzing feature contributions...


Making predictions: 100%|██████████| 16/16 [00:01<00:00, 14.16it/s]


🔬 Examining misclassified samples...
💰 Calculating business impact...

📋 EVALUATION SUMMARY
✅ Accuracy: 0.5558
📊 Macro F1: 0.3410
⚖️  Weighted F1: 0.6720

📈 Per-Class Performance:
   Spam/Advertisement:
     Precision: 0.0000
     Recall:    0.0000
     F1-Score:  0.0000
   Irrelevant Content:
     Precision: 0.7711
     Recall:    0.5203
     F1-Score:  0.6214
   Rant/Complaint (without visit):
     Precision: 0.0256
     Recall:    0.5000
     F1-Score:  0.0488
   Relevant and Quality:
     Precision: 0.8880
     Recall:    0.5691
     F1-Score:  0.6937

🎉 Evaluation completed successfully!
📁 All reports saved to: /content/drive/MyDrive/Tiktok_Hackaton/evaluation_reports_20250831_002909

📊 DETAILED RESULTS ANALYSIS

📈 Confusion Matrix:
                Spam/Adv Irreleva Rant/Com Relevant
Spam/Advertisem        0        0        0        1
Irrelevant Cont       29       64        4       26
Rant/Complaint         1        0        1        0
Relevant and Qu      109       19       34  

In [62]:
# Create additional visualizations
print("\n📊 Creating additional visualizations...")

try:
    # 1. Class distribution comparison (True vs Predicted)
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

    # True distribution
    true_counts = np.bincount(test_labels)
    ax1.bar(range(len(class_names)), true_counts, color='lightblue', alpha=0.7)
    ax1.set_title('True Label Distribution', fontsize=14, fontweight='bold')
    ax1.set_xlabel('Classes')
    ax1.set_ylabel('Count')
    ax1.set_xticks(range(len(class_names)))
    ax1.set_xticklabels(class_names, rotation=45, ha='right')

    # Add count labels on bars
    for i, count in enumerate(true_counts):
        ax1.text(i, count + count*0.01, str(count), ha='center', va='bottom')

    # Predicted distribution
    pred_counts = np.bincount(result['y_pred'])
    ax2.bar(range(len(class_names)), pred_counts, color='lightcoral', alpha=0.7)
    ax2.set_title('Predicted Label Distribution', fontsize=14, fontweight='bold')
    ax2.set_xlabel('Classes')
    ax2.set_ylabel('Count')
    ax2.set_xticks(range(len(class_names)))
    ax2.set_xticklabels(class_names, rotation=45, ha='right')

    # Add count labels on bars
    for i, count in enumerate(pred_counts):
        ax2.text(i, count + count*0.01, str(count), ha='center', va='bottom')

    plt.tight_layout()
    dist_plot_path = os.path.join(result['out_dir'], 'label_distributions.png')
    plt.savefig(dist_plot_path, dpi=300, bbox_inches='tight')
    plt.show()
    plt.close()

    # 2. Prediction confidence distribution
    fig, ax = plt.subplots(figsize=(10, 6))

    # Get confidence scores for each class
    confidence_scores = np.max(result['y_proba'], axis=1)

    ax.hist(confidence_scores, bins=30, alpha=0.7, color='skyblue', edgecolor='black')
    ax.set_title('Prediction Confidence Distribution', fontsize=14, fontweight='bold')
    ax.set_xlabel('Confidence Score')
    ax.set_ylabel('Frequency')
    ax.axvline(np.mean(confidence_scores), color='red', linestyle='--',
               label=f'Mean: {np.mean(confidence_scores):.3f}')
    ax.axvline(np.median(confidence_scores), color='green', linestyle='--',
               label=f'Median: {np.median(confidence_scores):.3f}')
    ax.legend()
    ax.grid(True, alpha=0.3)

    plt.tight_layout()
    conf_plot_path = os.path.join(result['out_dir'], 'confidence_distribution.png')
    plt.savefig(conf_plot_path, dpi=300, bbox_inches='tight')
    plt.show()
    plt.close()

    # 3. Per-class performance radar chart
    fig, ax = plt.subplots(figsize=(8, 8), subplot_kw=dict(projection='polar'))

    metrics = ['precision', 'recall', 'f1-score']
    angles = np.linspace(0, 2*np.pi, len(class_names), endpoint=False)

    colors = ['red', 'blue', 'green']
    for i, metric in enumerate(metrics):
        values = [result['per_class_metrics'][class_name][metric] for class_name in class_names]
        values += values[:1]  # Complete the circle
        angles_plot = np.concatenate([angles, [angles[0]]])

        ax.plot(angles_plot, values, 'o-', linewidth=2, label=metric.capitalize(), color=colors[i])
        ax.fill(angles_plot, values, alpha=0.1, color=colors[i])

    ax.set_xticks(angles)
    ax.set_xticklabels(class_names)
    ax.set_ylim(0, 1)
    ax.set_yticks([0.2, 0.4, 0.6, 0.8, 1.0])
    ax.set_title('Per-Class Performance Metrics', fontsize=14, fontweight='bold', pad=20)
    ax.legend(loc='upper right', bbox_to_anchor=(1.2, 1.0))
    ax.grid(True)

    plt.tight_layout()
    radar_plot_path = os.path.join(result['out_dir'], 'performance_radar.png')
    plt.savefig(radar_plot_path, dpi=300, bbox_inches='tight')
    plt.show()
    plt.close()

    print(f"✅ Additional visualizations saved:")
    print(f"   📊 Label distributions: {os.path.basename(dist_plot_path)}")
    print(f"   📈 Confidence distribution: {os.path.basename(conf_plot_path)}")
    print(f"   🎯 Performance radar: {os.path.basename(radar_plot_path)}")

except Exception as e:
    print(f"⚠️  Warning: Could not create additional visualizations: {e}")


📊 Creating additional visualizations...
✅ Additional visualizations saved:
   📊 Label distributions: label_distributions.png
   📈 Confidence distribution: confidence_distribution.png
   🎯 Performance radar: performance_radar.png


In [63]:
# Generate a comprehensive summary report
print("\n📝 Generating comprehensive summary report...")

try:
    summary_path = os.path.join(result['out_dir'], 'evaluation_summary.txt')

    with open(summary_path, 'w') as f:
        f.write("="*80 + "\n")
        f.write("TIKTOK CONTENT CLASSIFICATION - MODEL EVALUATION REPORT\n")
        f.write("="*80 + "\n\n")

        f.write(f"Evaluation Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write(f"Model Path: {TRAINED_MODEL_PATH}\n")
        f.write(f"Test Samples: {len(test_labels)}\n")
        f.write(f"Classes: {', '.join(class_names)}\n\n")

        f.write("OVERALL PERFORMANCE METRICS\n")
        f.write("-" * 40 + "\n")
        f.write(f"Accuracy:     {result['per_class_metrics']['accuracy']:.4f}\n")
        f.write(f"Macro F1:     {result['per_class_metrics']['macro avg']['f1-score']:.4f}\n")
        f.write(f"Weighted F1:  {result['per_class_metrics']['weighted avg']['f1-score']:.4f}\n")
        f.write(f"Macro Precision: {result['per_class_metrics']['macro avg']['precision']:.4f}\n")
        f.write(f"Macro Recall:    {result['per_class_metrics']['macro avg']['recall']:.4f}\n\n")

        f.write("PER-CLASS DETAILED METRICS\n")
        f.write("-" * 40 + "\n")
        f.write(f"{'Class':<20} {'Precision':<10} {'Recall':<10} {'F1-Score':<10} {'Support':<10}\n")
        f.write("-" * 60 + "\n")

        for class_name in class_names:
            if class_name in result['per_class_metrics']:
                metrics = result['per_class_metrics'][class_name]
                f.write(f"{class_name:<20} {metrics['precision']:<10.4f} "
                       f"{metrics['recall']:<10.4f} {metrics['f1-score']:<10.4f} "
                       f"{int(metrics['support']):<10}\n")

        f.write("\nCONFUSION MATRIX\n")
        f.write("-" * 40 + "\n")
        cm = confusion_matrix(test_labels, result['y_pred'])
        f.write(f"{'':>15} " + " ".join(f"{name[:8]:>8}" for name in class_names) + "\n")
        for i, true_label in enumerate(class_names):
            row = f"{true_label[:15]:>15} "
            row += " ".join(f"{cm[i,j]:>8}" for j in range(len(class_names)))
            f.write(row + "\n")

        f.write("\nMODEL CONFIDENCE ANALYSIS\n")
        f.write("-" * 40 + "\n")
        confidence_scores = np.max(result['y_proba'], axis=1)
        f.write(f"Mean Confidence: {np.mean(confidence_scores):.4f}\n")
        f.write(f"Median Confidence: {np.median(confidence_scores):.4f}\n")
        f.write(f"Std Confidence: {np.std(confidence_scores):.4f}\n")
        f.write(f"Min Confidence: {np.min(confidence_scores):.4f}\n")
        f.write(f"Max Confidence: {np.max(confidence_scores):.4f}\n")

        # Low confidence predictions
        low_conf_threshold = 0.7
        low_conf_mask = confidence_scores < low_conf_threshold
        f.write(f"\nLow Confidence Predictions (< {low_conf_threshold}):\n")
        f.write(f"Count: {np.sum(low_conf_mask)} ({np.sum(low_conf_mask)/len(confidence_scores)*100:.1f}%)\n")

        if result['business_impact_analysis']:
            f.write("\nBUSINESS IMPACT ANALYSIS\n")
            f.write("-" * 40 + "\n")
            f.write(f"Total Misclassification Cost: {result['business_impact_analysis']['total_cost']:.2f}\n")
            f.write(f"Average Cost per Sample: {result['business_impact_analysis']['total_cost']/len(test_labels):.4f}\n\n")

            f.write("Cost Breakdown by Error Type:\n")
            breakdown = result['business_impact_analysis']['breakdown']
            sorted_breakdown = sorted(breakdown, key=lambda x: x['cost'], reverse=True)
            for error in sorted_breakdown[:10]:  # Top 10 most costly
                f.write(f"  {error['true']} → {error['pred']}: {error['count']} cases, "
                       f"cost = {error['cost']:.2f}\n")

        f.write(f"\n\nREPORT GENERATED: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write("="*80 + "\n")

    print(f"📄 Summary report saved: {os.path.basename(summary_path)}")

except Exception as e:
    print(f"⚠️  Warning: Could not generate summary report: {e}")


📝 Generating comprehensive summary report...
📄 Summary report saved: evaluation_summary.txt


In [64]:
print("\n" + "="*80)
print("🎉 MODEL EVALUATION COMPLETED SUCCESSFULLY!")
print("="*80)

print(f"\n📊 FINAL RESULTS SUMMARY:")
print(f"   🎯 Accuracy: {result['per_class_metrics']['accuracy']:.4f}")
print(f"   📈 Macro F1: {result['per_class_metrics']['macro avg']['f1-score']:.4f}")
print(f"   ⚖️  Weighted F1: {result['per_class_metrics']['weighted avg']['f1-score']:.4f}")

if result['business_impact_analysis']:
    print(f"   💰 Total Cost: {result['business_impact_analysis']['total_cost']:.2f}")

print(f"\n📁 All evaluation artifacts saved to:")
print(f"   {result['out_dir']}")

print(f"\n📋 Generated Files:")
files_created = [
    "classification_report.json",
    "confusion_matrices/",
    "feature_importance/",
    "misclassified_top.csv",
    "evaluation_summary.txt"
]

if result['business_impact_analysis']:
    files_created.append("business_impact.json")

for file in files_created:
    print(f"   ✅ {file}")

print(f"\n🔗 Key file paths:")
print(f"   📊 Classification Report: {os.path.join(result['out_dir'], 'classification_report.json')}")
print(f"   📈 Confusion Matrices: {os.path.join(result['out_dir'], 'confusion_matrices')}")
print(f"   🔍 Error Analysis: {result['error_analysis_csv']}")
print(f"   📄 Summary Report: {os.path.join(result['out_dir'], 'evaluation_summary.txt')}")

print(f"\n💡 Next Steps:")
print(f"   1. Review the confusion matrices to understand error patterns")
print(f"   2. Analyze misclassified samples for model improvement insights")
print(f"   3. Consider the business impact analysis for deployment decisions")
print(f"   4. Use the feature importance analysis to understand model behavior")

print(f"\n🚀 Evaluation notebook completed successfully!")
print("="*80)


🎉 MODEL EVALUATION COMPLETED SUCCESSFULLY!

📊 FINAL RESULTS SUMMARY:
   🎯 Accuracy: 0.5558
   📈 Macro F1: 0.3410
   ⚖️  Weighted F1: 0.6720
   💰 Total Cost: 1289.00

📁 All evaluation artifacts saved to:
   /content/drive/MyDrive/Tiktok_Hackaton/evaluation_reports_20250831_002909

📋 Generated Files:
   ✅ classification_report.json
   ✅ confusion_matrices/
   ✅ feature_importance/
   ✅ misclassified_top.csv
   ✅ evaluation_summary.txt
   ✅ business_impact.json

🔗 Key file paths:
   📊 Classification Report: /content/drive/MyDrive/Tiktok_Hackaton/evaluation_reports_20250831_002909/classification_report.json
   📈 Confusion Matrices: /content/drive/MyDrive/Tiktok_Hackaton/evaluation_reports_20250831_002909/confusion_matrices
   🔍 Error Analysis: /content/drive/MyDrive/Tiktok_Hackaton/evaluation_reports_20250831_002909/misclassified_top.csv
   📄 Summary Report: /content/drive/MyDrive/Tiktok_Hackaton/evaluation_reports_20250831_002909/evaluation_summary.txt

💡 Next Steps:
   1. Review the con