# Model Evaluation and Comparison

This notebook provides comprehensive evaluation of all trained models:
- Baseline models (Logistic Regression, Random Forest, Isolation Forest, LOF)
- Sequential models (LSTM, TCN, Autoencoder)

It includes:
- ROC curves comparison
- Score distributions
- Comprehensive metrics table

In [None]:
%matplotlib inlineimport sysfrom pathlib import Path# add project root to pathproject_root = Path().resolve().parentsys.path.insert(0, str(project_root))import pandas as pdimport numpy as npimport matplotlib.pyplot as pltimport seaborn as snsimport torchfrom torch.utils.data import DataLoaderfrom sklearn.metrics import precision_recall_curve, aucimport warningswarnings.filterwarnings('ignore')# set plotting styletry:    plt.style.use('seaborn-v0_8-darkgrid')except OSError:    try:        plt.style.use('seaborn-darkgrid')    except OSError:        plt.style.use('default')sns.set_palette('husl')plt.rcParams['figure.figsize'] = (14, 8)plt.rcParams['font.size'] = 10plt.rcParams['axes.labelsize'] = 12plt.rcParams['axes.titlesize'] = 14plt.rcParams['xtick.labelsize'] = 10plt.rcParams['ytick.labelsize'] = 10plt.rcParams['legend.fontsize'] = 10# create output directory for saved plotsoutput_dir = project_root / 'outputs' / 'figures'output_dir.mkdir(parents=True, exist_ok=True)# import project modulesfrom src.data.load_data import load_datafrom src.data.sequence_preparation import prepare_sequences_for_trainingfrom src.data.dataset import create_dataloaders_from_dictfrom src.features.temporal_features import extract_temporal_featuresfrom src.models.baselines import load_baseline_modelfrom src.models.lstm import LSTMModelfrom src.models.tcn import TCNModelfrom src.models.autoencoder import AutoencoderModelfrom src.training.evaluate import (    evaluate_sequential_model,    compare_all_models,    plot_roc_curve,    plot_score_distributions,    compute_metrics,)from src.utils.config import load_configfrom src.visualization.plots import (    plot_score_with_threshold,    plot_series_with_anomalies,    plot_temporal_segmentation)# import IPython display for showing saved imagestry:    from IPython.display import Image, display    HAS_IPYTHON = Trueexcept ImportError:    HAS_IPYTHON = False

## 1. Load Data and Models

Load test data and all trained models for evaluation.

In [None]:
# load configconfig = load_config(project_root / "config" / "config.yaml")# load datasetdata_path = project_root / "data" / "raw" / "engagement.parquet" df = load_data(data_path)# adapt column namesif neededif 'user_id' in df.columns and 'id' not in df.columns: df['id'] = df['user_id']if 'is_fake_series' in df.columns and 'label' not in df.columns: df['label'] = df['is_fake_series'].map({True: 'fake', False: 'normal'})# prepare featuresfor baseline modelsfeatures_df = extract_temporal_features(df, aggregate_per_id = True)# prepare sequencesfor sequential modelssequences_dict = prepare_sequences_for_training(df, seq_len = config["data"]["seq_len"], normalize = True)# create test dataloaderdevice = torch.device("cuda"if torch.cuda.is_available() else "cpu") dataloaders = create_dataloaders_from_dict(sequences_dict, batch_size = config["training"]["batch_size"], train_ratio = 0.7, val_ratio = 0.15, test_ratio = 0.15, random_seed = 42) test_loader = dataloaders["test"]print("Data prepared")

In [None]:
# load baseline modelsbaseline_results = {} baseline_dir = project_root / "models" / "baselines"for model_type in ["logistic_regression", "random_forest", "isolation_forest"]: model_path = baseline_dir / f"{model_type}.pkl"if model_path.exists():try:from src.models.baselines import create_baseline_model model = create_baseline_model(model_type) model.load(str(model_path))from src.training.train import prepare_data X_train, X_test, y_train, y_test, _ = prepare_data(features_df, test_size = 0.2, random_state = 42) y_pred = model.predict(X_test) y_proba = model.predict_proba(X_test) baseline_results[model_type] = (model, X_test, y_test, y_pred, y_proba) print(f"Loaded {model_type}")except Exception as e: print(f"Error loading {model_type}: {e}")# load sequential modelssequential_results = {} sequential_dir = project_root / "models" / "sequential"for model_type in ["lstm", "tcn", "autoencoder"]: model_path = sequential_dir / f"{model_type}_best.pth"if model_path.exists():try: checkpoint = torch.load(str(model_path), map_location = device, weights_only = False)if model_type == "lstm": model = LSTMModel(**config["models"]["lstm"]) elif model_type == "tcn": model = TCNModel(**config["models"]["tcn"]) elif model_type == "autoencoder": model = AutoencoderModel(**config["models"]["autoencoder"], seq_len = config["data"]["seq_len"]) model.load_state_dict(checkpoint["model_state_dict"]) model.to(device) model.eval() sequential_results[model_type] = (model, test_loader, device, model_type) print(f"Loaded {model_type}")except Exception as e: print(f"Error loading {model_type}: {e}")print(f"\nLoaded {len(baseline_results)} baseline models and {len(sequential_results)} sequential models")

## 2. Comprehensive Metrics Table

Create comparison table of all models.


In [None]:
# compute metricsall_metrics = {}# initializeif not loadedif 'baseline_results' not in locals(): baseline_results = {}if 'sequential_results' not in locals(): sequential_results = {}# baseline modelsfor model_name, (model, X_test, y_test, y_pred, y_proba) in baseline_results.items(): metrics = compute_metrics(y_test, y_pred, y_proba) metrics["model_type"] = "baseline" all_metrics[model_name] = metrics# sequential modelsfor model_name, (model, dataloader, device, model_type) in sequential_results.items(): y_true, y_pred, y_proba = evaluate_sequential_model(model, dataloader, device, model_type) metrics = compute_metrics(y_true, y_pred, y_proba) metrics["model_type"] = "sequential" all_metrics[model_name] = metrics# create DataFrameif len(all_metrics) > 0: metrics_df = pd.DataFrame(all_metrics).T display_metrics = ["auc", "precision", "recall", "f1", "false_positive_rate"] metrics_display = metrics_df[display_metrics].copy() metrics_display["model_type"] = metrics_df["model_type"] metrics_display = metrics_display.sort_values("auc", ascending = False) print(" = " * 80) print("COMPREHENSIVE MODEL COMPARISON") print(" = " * 80) print("\nMetrics Table:") print(metrics_display.round(4))# best model best_model_name = metrics_display.index[0] best_metrics = metrics_display.loc[best_model_name] print("\n" + " = " * 80) print(f"BEST MODEL: {best_model_name.upper()}") print(" = " * 80) print(f" AUC: {best_metrics['auc']:.4f}") print(f" Precision: {best_metrics['precision']:.4f}")print(f" Recall: {best_metrics['recall']:.4f}") print(f" F1 - Score: {best_metrics['f1']:.4f}")else: print("No models loaded. Please load models first.") metrics_display = pd.DataFrame() best_model_name = None best_metrics = None

## 6. Summary and Conclusions

Final summary of evaluation results.


In [None]:
print(" = " * 80)print("EVALUATION SUMMARY")print(" = " * 80)# initialize variablesif not definedif 'baseline_results' not in locals(): baseline_results = {}if 'sequential_results' not in locals(): sequential_results = {}if 'all_metrics' not in locals(): all_metrics = {}if len(all_metrics) > 0: print(f"\nTotal models evaluated: {len(all_metrics)}") print(f" - Baseline models: {len(baseline_results)}") print(f" - Sequential models: {len(sequential_results)}")else: print("\nNo models evaluated yet. Please run the metrics computation cell first.") if('all_metrics' in locals() and len(all_metrics) > 0 and 'best_model_name' in locals() and 'best_metrics' in locals()): print(f"\nBest performing model: {best_model_name}") print(f" - AUC: {best_metrics['auc']:.4f}") print(f" - Precision: {best_metrics['precision']:.4f}") print(f" - Recall: {best_metrics['recall']:.4f}") print(f" - F1 - Score: {best_metrics['f1']:.4f}") print(f" - False Positive Rate: {best_metrics['false_positive_rate']:.4f}")# model type comparisonif 'metrics_display' in locals() and 'display_metrics' in locals(): baseline_models = metrics_display[metrics_display['model_type'] == 'baseline'] sequential_models = metrics_display[metrics_display['model_type'] == 'sequential']if len(baseline_models) > 0 and len(sequential_models) > 0: baseline_avg = baseline_models[display_metrics].mean() sequential_avg = sequential_models[display_metrics].mean() print(f"\nAverage Performance by Model Type:") print(f" Baseline models:")for metric in display_metrics: print(f" {metric}: {baseline_avg[metric]:.4f}") print(f" Sequential models:")for metric in display_metrics: print(f" {metric}: {sequential_avg[metric]:.4f}") improvement = sequential_avg['auc'] - baseline_avg['auc'] print(f"\n Improvement(Sequential vs Baseline):") print(f" AUC: {improvement:.4f} ({improvement / baseline_avg['auc'] * 100:.1f}%)")print("\n" + " = " * 80)

In [None]:
# load sequential modelssequential_results = {} sequential_dir = project_root / "models" / "sequential"for model_type in ["lstm", "tcn", "autoencoder"]: model_path = sequential_dir / f"{model_type}_best.pth"if model_path.exists():try: checkpoint = torch.load(str(model_path), map_location = device, weights_only = False)if model_type == "lstm": model = LSTMModel(**config["models"]["lstm"]) elif model_type == "tcn": model = TCNModel(**config["models"]["tcn"]) elif model_type == "autoencoder": model = AutoencoderModel(**config["models"]["autoencoder"], seq_len = config["data"]["seq_len"]) model.load_state_dict(checkpoint["model_state_dict"]) model.to(device) model.eval() sequential_results[model_type] = (model, test_loader, device, model_type) print(f"Loaded {model_type}")except Exception as e: print(f"Error loading {model_type}: {e}")

## 2. ROC Curves

Plot ROC curves for all models.

In [None]:
fig, axes = plt.subplots(1, 2, figsize = (16, 8))# ROC curvesax = axes[0]# baseline modelsfor model_name, (model, X_test, y_test, y_pred, y_proba) in baseline_results.items(): plot_roc_curve(y_test, y_proba, model_name = model_name, ax = ax)# sequential modelsfor model_name, (model, dataloader, device, model_type) in sequential_results.items(): y_true, y_pred, y_proba = evaluate_sequential_model(model, dataloader, device, model_type) plot_roc_curve(y_true, y_proba, model_name = model_name, ax = ax)ax.set_title("ROC Curves - All Models", fontsize = 16, fontweight = "bold")ax.legend(loc = "lower right")ax.grid(True, alpha = 0.3)# PR curvesax = axes[1]# baseline modelsfor model_name, (model, X_test, y_test, y_pred, y_proba) in baseline_results.items():if y_proba.ndim > 1: y_proba_positive = y_proba[:, 1]if y_proba.shape[1] > 1 else y_proba.flatten()else: y_proba_positive = y_proba precision, recall, _ = precision_recall_curve(y_test, y_proba_positive) pr_auc = auc(recall, precision) ax.plot(recall, precision, label = f'{model_name} (AUC = {pr_auc:.3f})', linewidth = 2)# sequential modelsfor model_name, (model, dataloader, device, model_type) in sequential_results.items(): y_true, y_pred, y_proba = evaluate_sequential_model(model, dataloader, device, model_type)if y_proba.ndim > 1: y_proba_positive = y_proba[:, 1]if y_proba.shape[1] > 1 else y_proba.flatten()else: y_proba_positive = y_proba precision, recall, _ = precision_recall_curve(y_true, y_proba_positive) pr_auc = auc(recall, precision) ax.plot(recall, precision, label = f'{model_name} (AUC = {pr_auc:.3f})', linewidth = 2)ax.set_xlabel('Recall', fontsize = 12)ax.set_ylabel('Precision', fontsize = 12)ax.set_title("Precision - Recall Curves - All Models", fontsize = 16, fontweight = "bold")ax.legend(loc = "lower left")ax.grid(True, alpha = 0.3)plt.tight_layout() plt.savefig(output_dir / "04_roc_pr_curves.png", dpi = 150, bbox_inches = 'tight') plt.show()if HAS_IPYTHON and(output_dir / "04_roc_pr_curves.png").exists(): display(Image(str(output_dir / "04_roc_pr_curves.png")))

## 3. Score Distributions

Visualize score distributions for normal vs fake classes.

In [None]:
# collect predictionsall_predictions = {}for model_name, (model, X_test, y_test, y_pred, y_proba) in baseline_results.items():if y_proba.ndim > 1: y_proba_positive = y_proba[:, 1]if y_proba.shape[1] > 1 else y_proba.flatten()else: y_proba_positive = y_proba all_predictions[model_name] = (y_test, y_proba_positive)for model_name, (model, dataloader, device, model_type) in sequential_results.items(): y_true, y_pred, y_proba = evaluate_sequential_model(model, dataloader, device, model_type)if y_proba.ndim > 1: y_proba_positive = y_proba[:, 1]if y_proba.shape[1] > 1 else y_proba.flatten()else: y_proba_positive = y_proba all_predictions[model_name] = (y_true, y_proba_positive)# plot distributionsn_models = len(all_predictions)fig, axes = plt.subplots((n_models + 1) // 2, 2, figsize = (16, 4 * ((n_models + 1) // 2)))if n_models == 1: axes = [axes]else: axes = axes.flatten()for idx, (model_name, (y_true, y_proba)) in enumerate(all_predictions.items()): ax = axes[idx] normal_scores = y_proba[y_true == 0] fake_scores = y_proba[y_true == 1] ax.hist(normal_scores, bins = 50, alpha = 0.6, label = "Normal", color = "blue", density = True, histtype = "step", linewidth = 2) ax.hist(fake_scores, bins = 50, alpha = 0.6, label = "Fake", color = "red", density = True, histtype = "step", linewidth = 2, linestyle = " -- ") ax.set_xlabel("Prediction Score", fontsize = 12) ax.set_ylabel("Density", fontsize = 12) ax.set_title(f"Score Distribution - {model_name.upper()}", fontsize = 14, fontweight = "bold") ax.legend() ax.grid(True, alpha = 0.3) ax.axvline(x = 0.5, color = "gray", linestyle = ":", linewidth = 1)for idx in range(n_models, len(axes)): axes[idx].axis("off") plt.tight_layout() plt.savefig(output_dir / "04_score_distributions.png", dpi = 150, bbox_inches = 'tight')plt.show()if HAS_IPYTHON and(output_dir / "04_score_distributions.png").exists(): display(Image(str(output_dir / "04_score_distributions.png")))

## 5. Metrics Heatmap Comparison

Create a comprehensive heatmap comparing all models across all metrics.

## 4. Score Distributions by Attack Type

Compare score distributions across different attack types.


In [None]:
# score distributions by attack typeif 'attack_type' in df.columns and len(sequential_results) > 0:# get best model predictions best_model_name = max(sequential_results.keys(), key = lambda k: len(sequential_results[k])) model, dataloader, device, model_type = sequential_results[best_model_name] y_true, y_pred, y_proba = evaluate_sequential_model(model, dataloader, device, model_type)if y_proba.ndim > 1: y_proba_positive = y_proba[:, 1]if y_proba.shape[1] > 1 else y_proba.flatten()else: y_proba_positive = y_proba# get attack typesfor fake samples(simplified mapping) fake_df = df[df.get('is_fake_series', df.get('label') == 'fake')] attack_types = fake_df['attack_type'].unique()if len(attack_types) > 0: fake_mask = y_true == 1 fake_scores = y_proba_positive[fake_mask]# create distribution plot fig, axes = plt.subplots(1, 2, figsize = (16, 6))# histogram by attack typefor attack_type in attack_types:# simplified: sample evenlyfrom fake scores n_fake = len(fake_scores) n_types = len(attack_types) samples_per_type = n_fake // n_types idx = list(attack_types).index(attack_type) start_idx = idx * samples_per_type end_idx = start_idx + samples_per_typeif idx < n_types - 1 else n_fake type_scores = fake_scores[start_idx:end_idx] axes[0].hist(type_scores, bins = 30, alpha = 0.6, label = attack_type, density = True) axes[0].set_xlabel('Prediction Score', fontsize = 12) axes[0].set_ylabel('Density', fontsize = 12) axes[0].set_title('Score Distribution by Attack Type', fontsize = 14, fontweight = 'bold') axes[0].legend() axes[0].grid(True, alpha = 0.3) axes[0].axvline(x = 0.5, color = 'gray', linestyle = ':', linewidth = 1)# box plot score_data = [] labels = []for attack_type in attack_types: idx = list(attack_types).index(attack_type) start_idx = idx * samples_per_type end_idx = start_idx + samples_per_typeif idx < n_types - 1 else n_fake type_scores = fake_scores[start_idx:end_idx] score_data.append(type_scores) labels.append(attack_type) axes[1].boxplot(score_data, labels = labels) axes[1].set_ylabel('Prediction Score', fontsize = 12) axes[1].set_title('Score Distribution by Attack Type(Box Plot)', fontsize = 14, fontweight = 'bold') axes[1].tick_params(axis = 'x', rotation = 45) axes[1].grid(True, alpha = 0.3, axis = 'y') axes[1].axhline(y = 0.5, color = 'gray', linestyle = ':', linewidth = 1)plt.tight_layout() plt.savefig(output_dir / "04_scores_by_attack_type.png", dpi = 150, bbox_inches = 'tight')plt.show()if HAS_IPYTHON and(output_dir / "04_scores_by_attack_type.png").exists(): display(Image(str(output_dir / "04_scores_by_attack_type.png")))else: print("No attack types found in dataset.")else: print("Attack type column not found or no sequential models available.")

In [None]:
# visualization by attack type(2x2 panel)if 'attack_type' in df.columns: fake_df = df[df.get('is_fake_series', df.get('label') == 'fake')] attack_types = fake_df['attack_type'].unique()[:4] # Get first 4 attack typesif len(attack_types) > = 4: fig, axes = plt.subplots(2, 2, figsize = (16, 12)) axes = axes.flatten()for idx, attack_type in enumerate(attack_types): ax = axes[idx]# get a sample series with this attack type attack_series = fake_df[fake_df['attack_type'] == attack_type]if len(attack_series) > 0: sample_id = attack_series['id'].iloc[0] sample_series = df[df['id'] == sample_id].sort_values('timestamp') anomaly_mask = sample_series.get('is_anomaly_window', pd.Series([False] * len(sample_series))).values# plot series with anomalies ax.plot(sample_series['timestamp'], sample_series['views'], label = 'Views', linewidth = 2, color = 'blue', alpha = 0.7)if anomaly_mask.any(): ax.scatter(sample_series['timestamp'][anomaly_mask], sample_series['views'].values[anomaly_mask], marker = 'o', s = 50, color = 'red', label = 'Anomaly', zorder = 5)ax.set_xlabel('Timestamp', fontsize = 10) ax.set_ylabel('Views', fontsize = 10) ax.set_title(f'Attack Type: {attack_type}', fontsize = 12, fontweight = 'bold') ax.legend(fontsize = 8)ax.grid(True, alpha = 0.3) ax.tick_params(axis = 'x', rotation = 45)plt.suptitle('Visualization by Attack Type(2x2 Panel)', fontsize = 16, fontweight = 'bold', y = 0.995) plt.tight_layout() plt.savefig(output_dir / "04_visualization_by_attack_type_2x2.png", dpi = 150, bbox_inches = 'tight')plt.show()if HAS_IPYTHON and(output_dir / "04_visualization_by_attack_type_2x2.png").exists(): display(Image(str(output_dir / "04_visualization_by_attack_type_2x2.png")))else: print(f"Only {len(attack_types)} attack types found. Need at least 4 for 2x2 panel.")else: print("Attack type column not found in dataset.")

## 5. Temporal Scores with Thresholds

Visualize prediction scores over time with threshold lines.


In [None]:
# temporal scores with thresholdsif len(sequential_results) > 0:# get a sample user series sample_user_id = df['id'].unique()[0] sample_series = df[df['id'] == sample_user_id].sort_values('timestamp')# get model predictions(simplified - would need proper sequence mapping) best_model_name = max(sequential_results.keys(), key = lambda k: len(sequential_results[k])) model, dataloader, device, model_type = sequential_results[best_model_name]# create dummy scoresfor visualization(in real scenario, would get per - timestep scores) n_timesteps = len(sample_series) dummy_scores = np.random.uniform(0.3, 0.7, n_timesteps)# add some spikesfor fake seriesif sample_series.get('is_fake_series', pd.Series([False] * len(sample_series))).any():spike_indices = np.random.choice(n_timesteps, size = min(5, n_timesteps // 10), replace = False) dummy_scores[spike_indices] = np.random.unif orm(0.7, 0.95, len(spike_indices)) threshold = 0.5 fig, ax = plot_score_with_threshold(sample_series['timestamp'], dummy_scores, threshold, title = f"Temporal Anomaly Scores - User: {sample_user_id}")plt.savefig(output_dir / "04_temporal_scores_threshold.png", dpi = 150, bbox_inches = 'tight') plt.show()if HAS_IPYTHON and(output_dir / "04_temporal_scores_threshold.png").exists(): display(Image(str(output_dir / "04_temporal_scores_threshold.png")))else: print("No sequential models availablefor temporal score visualization.")

## 6. Predicted vs True Anomalies

Compare predicted anomalies with true anomaly windows.


In [None]:
# predicted vs true anomaliesif 'is_anomaly_window' in df.columns and len(sequential_results) > 0:# select a fake user series fake_users = df[df.get('is_fake_series', df.get('label') == 'fake')]['id'].unique()if len(fake_users) > 0: sample_user_id = fake_users[0] sample_series = df[df['id'] == sample_user_id].sort_values('timestamp')# true anomaly mask true_anomaly_mask = sample_series.get('is_anomaly_window', pd.Series([False] * len(sample_series))).values# predicted anomaly mask(simplified - would need proper model predictions per timestep)# create dummy predictions based on views spikes views = sample_series['views'].values views_mean = views.mean() views_std = views.std() predicted_anomaly_mask = (views > views_mean + 2 * views_std) | (views < views_mean - 2 * views_std)# plot comparison fig, axes = plt.subplots(2, 1, figsize = (16, 10), sharex = True)# top: original series with true anomalies ax1 = axes[0] ax1.plot(sample_series['timestamp'], sample_series['views'], label = 'Views', linewidth = 2, color = 'blue', alpha = 0.7)if true_anomaly_mask.any(): ax1.scatter(sample_series['timestamp'][true_anomaly_mask], sample_series['views'].values[true_anomaly_mask], marker = 'o', s = 50, color = 'red', label = 'True Anomaly', zorder = 5) ax1.set_ylabel('Views', fontsize = 12) ax1.set_title(f'True Anomalies - User: {sample_user_id}', fontsize = 14, fontweight = 'bold') ax1.legend() ax1.grid(True, alpha = 0.3) ax1.tick_params(axis = 'x', rotation = 45)# bottom: original series with predicted anomalies ax2 = axes[1] ax2.plot(sample_series['timestamp'], sample_series['views'], label = 'Views', linewidth = 2, color = 'blue', alpha = 0.7)if predicted_anomaly_mask.any(): ax2.scatter(sample_series['timestamp'][predicted_anomaly_mask], sample_series['views'].values[predicted_anomaly_mask], marker = 's', s = 50, color = 'orange', label = 'Predicted Anomaly', zorder = 5)if true_anomaly_mask.any():# highlight overlap overlap_mask = true_anomaly_mask & predicted_anomaly_maskif overlap_mask.any(): ax2.scatter(sample_series['timestamp'][overlap_mask], sample_series['views'].values[overlap_mask], marker = ' * ', s = 100, color = 'green', label = 'Correctly Predicted', zorder = 6) ax2.set_xlabel('Timestamp', fontsize = 12) ax2.set_ylabel('Views', fontsize = 12) ax2.set_title(f'Predicted Anomalies - User: {sample_user_id}', fontsize = 14, fontweight = 'bold') ax2.legend() ax2.grid(True, alpha = 0.3) ax2.tick_params(axis = 'x', rotation = 45)plt.tight_layout() plt.savefig(output_dir / "04_predicted_vs_true_anomalies.png", dpi = 150, bbox_inches = 'tight')plt.show()if HAS_IPYTHON and(output_dir / "04_predicted_vs_true_anomalies.png").exists(): display(Image(str(output_dir / "04_predicted_vs_true_anomalies.png")))# print statisticsif true_anomaly_mask.any(): tp = (true_anomaly_mask & predicted_anomaly_mask).sum() fp = (~true_anomaly_mask & predicted_anomaly_mask).sum() fn = (true_anomaly_mask & ~predicted_anomaly_mask).sum() precision = tp / (tp + fp)if(tp + fp) > 0 else 0 recall = tp / (tp + fn) if(tp + fn) > 0 else 0.print(f"\nAnomaly Detection Statisticsfor User {sample_user_id}:") print(f" True Positives: {tp}") print(f" False Positives: {fp}") print(f" False Negatives: {fn}") print(f" Precision: {precision:.4f}") print(f" Recall: {recall:.4f}")else: print("No fake users found in dataset.")else: print("Required columns(is_anomaly_window) not found or no sequential models available.")

## 7. Temporal Segmentation - Risk ML Pipeline Style

Segment time series into normal and anomaly periods (red zones) for interpretability.


In [None]:
# temporal segmentation with red anomaly segmentsif 'is_anomaly_window' in df.columns: fake_users = df[df.get('is_fake_series', df.get('label') == 'fake')]['id'].unique()if len(fake_users) > 0: sample_user_id = fake_users[0] sample_series = df[df['id'] == sample_user_id].sort_values('timestamp')# create segments based on anomaly windows anomaly_mask = sample_series.get('is_anomaly_window', pd.Series([False] * len(sample_series))).values# find segment boundaries segments = [] segment_labels = [] anomaly_segments = [] in_anomaly = False start = 0 segment_idx = 0for i in range(len(anomaly_mask)):if anomaly_mask[i] ! = in_anomaly:# segment boundaryif i > start: segments.append((start, i - 1))if in_anomaly: segment_labels.append(f"Anomaly Segment {len(anomaly_segments) + 1}") anomaly_segments.append(segment_idx)else: segment_labels.append(f"Normal Segment {segment_idx - len(anomaly_segments) + 1}") segment_idx +  = 1 start = i in_anomaly = anomaly_mask[i]# add last segmentif start < len(anomaly_mask): segments.append((start, len(anomaly_mask) - 1))if in_anomaly: segment_labels.append(f"Anomaly Segment {len(anomaly_segments) + 1}") anomaly_segments.append(segment_idx)else: segment_labels.append(f"Normal Segment {segment_idx - len(anomaly_segments) + 1}")# plot temporal segmentation fig, ax = plot_temporal_segmentation(sample_series['timestamp'], sample_series['views'], segments, segment_labels = segment_labels, title = f"Temporal Segmentation - User {sample_user_id} (R ed = Anomaly Zones)", anomaly_segments = anomaly_segments) plt.savefig(output_dir / "04_spectacular_temporal_segmentation.png", dpi = 150, bbox_inches = 'tight')plt.show()if HAS_IPYTHON and(output_dir / "04_spectacular_temporal_segmentation.png").exists(): display(Image(str(output_dir / "04_spectacular_temporal_segmentation.png")))else: print("No fake users found.")else: print("is_anomaly_window column not found.")

In [None]:
# create comprehensive metrics heatmap if('all_metrics' in locals() and 'metrics_display' in locals() and 'display_metrics' in locals() and.len(all_metrics) > 0):# prepare datafor heatmap heatmap_data = metrics_display[display_metrics].T fig, ax = plt.subplots(1, 1, figsize = (max(10, len(all_metrics) * 1.5), 6)) sns.heatmap(heatmap_data, annot = True, fmt = '.3f', cmap = 'YlOrRd', cbar_kws = {'label': 'Score'}, ax = ax, linewidths = 0.5, linecolor = 'gray') ax.set_title('Model Performance Heatmap - All Metrics', fontsize = 16, fontweight = 'bold') ax.set_xlabel('Model', fontsize = 12) ax.set_ylabel('Metric', fontsize = 12) ax.set_xticklabels(ax.get_xticklabels(), rotation = 45)plt.show()if HAS_IPYTHON and (output_dir / "04_metrics_heatmap.png").exists(): display(Image(str(output_dir / "04_metrics_heatmap.png")))if HAS_IPYTHON and(output_dir / "04_metrics_heatmap.png").exists(): display(Image(str(output_dir / "04_metrics_heatmap.png")))# create comparison bar chart fig, ax = plt.subplots(1, 1, figsize = (14, 8)) x = np.arange(len(display_metrics)) width = 0.8 / len(metrics_display) colors_map = {'baseline': 'blue', 'sequential': 'red'}for idx, (model_name, row) in enumerate(metrics_display.iterrows()): values = [row[m]for m in display_metrics] color = colors_map.get(row['model_type'], 'gray') ax.bar(x + idx * width, values, width, label = model_name, alpha = 0.7, color = color, edgecolor = 'black', linewidth = 0.5) ax.set_xlabel('Metric', fontsize = 12) ax.set_ylabel('Score', fontsize = 12) ax.set_title('Model Performance Comparison - All Metrics', fontsize = 16, fontweight = 'bold') ax.set_xticks(x + width * (len(metrics_display) - 1) / 2) ax.set_xticklabels(display_metrics) ax.legend(bbox_to_anchor = (1.05, 1), loc = 'upper left', fontsize = 10) ax.grid(True, alpha = 0.3, axis = 'y') ax.set_ylim([0, 1.1])plt.tight_layout() plt.savefig(output_dir / "04_metrics_comparison.png", dpi = 150, bbox_inches = 'tight')plt.show()if HAS_IPYTHON and(output_dir / "04_metrics_comparison.png").exists(): display(Image(str(output_dir / "04_metrics_comparison.png")))else: print("No models loaded. Please load models first.")