# Big Data Analytics — Assignment 03
> Author : Badr TAJINI - Big Data Analytics - ESIEE 2025-2026

**Chapter 5 :** Graphs (PageRank/PPR)   
**Chapter 6 :** Spam classification (SGD) in PySpark

**Tools :** Spark or PySpark.   
**Advice:** Keep evidence and reproducibility.


## Setup global d'initialisation

In [1]:
# imports et chemins globaux

import sys
import platform
from pathlib import Path
import urllib.request
import bz2
import shutil
import gzip
import math
import random
from operator import add
from contextlib import redirect_stdout
from io import StringIO

# Définir les chemins globaux
BASE_DIR = Path.cwd()
DATA_DIR = BASE_DIR / "data"
OUTPUTS_DIR = BASE_DIR / "outputs"
PROOF_DIR = BASE_DIR / "proof"

# Créer les dossiers
for directory in (DATA_DIR, OUTPUTS_DIR, PROOF_DIR):
    directory.mkdir(exist_ok=True)

# Définir les chemins des fichiers
spam_dir = DATA_DIR / "spam"
gnutella_path = DATA_DIR / "p2p-Gnutella08-adj.txt"
spam_train_britney_path = DATA_DIR / "spam.train.britney.txt"
spam_train_group_x_path = DATA_DIR / "spam.train.group_x.txt"
spam_train_group_y_path = DATA_DIR / "spam.train.group_y.txt"
spam_qrels_path = DATA_DIR / "spam.test.qrels.txt"

print("✓ Setup complete!")
print(f"  BASE_DIR: {BASE_DIR}")
print(f"  DATA_DIR: {DATA_DIR}")
print(f"  OUTPUTS_DIR: {OUTPUTS_DIR}")

✓ Setup complete!
  BASE_DIR: /home/aurel/bda_labs/bda_assignment03
  DATA_DIR: /home/aurel/bda_labs/bda_assignment03/data
  OUTPUTS_DIR: /home/aurel/bda_labs/bda_assignment03/outputs


## 0. Bootstrap

In [2]:
# write some code here
# - create SparkSession('BDA-A03') with UTC timezone
# - print Spark/PySpark/Python versions
# - set spark.sql.shuffle.partitions for local runs

from pyspark.sql import SparkSession
import pyspark

# Arrêter Spark s'il existe déjà
try:
    spark.stop()
    print("Stopped existing Spark session")
except:
    pass

# Créer une nouvelle session avec paramètres optimisés
spark = (
    SparkSession.builder
    .appName("BDA-A03")
    .config("spark.sql.session.timeZone", "UTC")
    .config("spark.sql.shuffle.partitions", "2")  # Réduit pour éviter surcharge
    .config("spark.driver.memory", "2g")
    .config("spark.executor.memory", "2g")
    .config("spark.default.parallelism", "2")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")

print("✓ Spark initialized!")
print(f"  Spark version: {spark.version}")
print(f"  PySpark version: {pyspark.__version__}")
print(f"  Python version: {sys.version.split()[0]}")


25/12/06 13:53:08 WARN Utils: Your hostname, PCPORTABLEAUR resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/12/06 13:53:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/06 13:53:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


✓ Spark initialized!
  Spark version: 3.5.0
  PySpark version: 3.5.0
  Python version: 3.10.19


## 1. Dataset acquisition

In [3]:
# write some code here
# - ensure data/p2p-Gnutella08-adj.txt exists (convert from SNAP edgelist if needed)
# - ensure spam.train.* and spam.test.qrels.txt exist (download + bunzip2)
# - quick sanity checks on file sizes and line counts

if not gnutella_path.exists():
    print("Graphe non trouvé. Création d'un graphe synthétique...")
    print("(Pour utiliser le vrai graphe Gnutella, téléchargez-le manuellement)")
    
    # Créer un graphe synthétique de taille raisonnable
    import random
    random.seed(42)
    
    num_nodes = 200  # Taille modérée pour éviter les crashs
    adjacency = {}
    
    for i in range(num_nodes):
        node = str(i)
        adjacency[node] = []
        
        # Chaque nœud a 3-8 voisins
        num_neighbors = random.randint(3, 8)
        for _ in range(num_neighbors):
            # Préférence pour les nœuds bas (simulation scale-free)
            if random.random() < 0.6 and i > 20:
                neighbor = str(random.randint(0, min(30, i-1)))
            else:
                neighbor = str(random.randint(0, num_nodes-1))
            
            if neighbor != node and neighbor not in adjacency[node]:
                adjacency[node].append(neighbor)
    
    with open(gnutella_path, 'w') as f:
        for node, neighbors in sorted(adjacency.items()):
            if neighbors:
                f.write(f"{node} {' '.join(neighbors)}\n")
    
    print(f"✓ Graphe synthétique créé: {num_nodes} nœuds")
    print(f"  Fichier: {gnutella_path}")
else:
    # Compter les nœuds
    num_nodes = sum(1 for _ in open(gnutella_path))
    print(f"✓ Graphe existant trouvé: ~{num_nodes} nœuds")
    print(f"  Fichier: {gnutella_path}")


spam_files = [
    ("spam.test.qrels.txt.bz2", spam_qrels_path),
    ("spam.train.britney.txt.bz2", spam_train_britney_path),
    ("spam.train.group_x.txt.bz2", spam_train_group_x_path),
    ("spam.train.group_y.txt.bz2", spam_train_group_y_path),
]

print("Décompression des fichiers spam...")
for bz2_filename, txt_path in spam_files:
    if not txt_path.exists():
        bz2_path = spam_dir / bz2_filename
        if bz2_path.exists():
            print(f"  Décompression de {bz2_filename}...")
            with bz2.open(bz2_path, 'rb') as src, open(txt_path, 'wb') as dst:
                shutil.copyfileobj(src, dst)
            print(f"    ✓ {txt_path.name}")
        else:
            print(f"    ✗ {bz2_filename} non trouvé dans {spam_dir}")
    else:
        print(f"  ✓ {txt_path.name} déjà décompressé")

print("\n✓ Acquisition des données terminée!")


✓ Graphe existant trouvé: ~6301 nœuds
  Fichier: /home/aurel/bda_labs/bda_assignment03/data/p2p-Gnutella08-adj.txt
Décompression des fichiers spam...
  ✓ spam.test.qrels.txt déjà décompressé
  ✓ spam.train.britney.txt déjà décompressé
  ✓ spam.train.group_x.txt déjà décompressé
  ✓ spam.train.group_y.txt déjà décompressé

✓ Acquisition des données terminée!


## 2. Helpers

In [4]:
# write some code here
# - parse adjacency-list line 'u v1 v2 ...' to (u, [v1, v2, ...])
# - utility for top-k without collect: use takeOrdered on (rank, node) with key
# - formatting helpers to save top-20 CSVs

def parse_adjacency_line(line):
    """Parse 'u v1 v2 ...' to (u, [v1, v2, ...])"""
    parts = line.strip().split()
    if not parts:
        return None
    node = parts[0]
    neighbors = parts[1:] if len(parts) > 1 else []
    return (node, neighbors)

def format_topk_csv(topk_list, output_path):
    """Save top-k list to CSV"""
    with open(output_path, 'w') as f:
        f.write("node,score\n")
        for node, score in topk_list:
            f.write(f"{node},{score:.10f}\n")
    print(f"✓ Saved top-{len(topk_list)} to {output_path}")

def parse_spam_line(line):
    """Parse spam line: 'docid label f1 f2 ...' (binary features)"""
    parts = line.strip().split()
    if len(parts) < 2:
        return None
    
    docid = parts[0]
    label_str = parts[1].lower()
    label = 1.0 if label_str == 'spam' else 0.0
    
    features = {}
    for token in parts[2:]:
        try:
            if ':' in token:
                feat_id, value = token.split(':', 1)
                features[int(feat_id)] = float(value)
            else:
                feat_id = int(token)
                features[feat_id] = 1.0
        except (ValueError, IndexError):
            continue
    
    return (docid, label, features)

print("✓ Helper functions defined")

✓ Helper functions defined


## 3. Part A — PageRank

In [5]:
# write some code here
# - parameters: alpha=0.85, iterations, partitions
# - initialize ranks uniformly; build adjacency RDD partitioned by key
# - iterative loop: contributions + missing mass redistribution
# - compute top-20 without collect; write outputs/pagerank_top20.csv
# - save any DF stage plan to proof/plan_pr.txt


print("\n=== PageRank ===")

alpha = 0.85
num_iters = 10
k = 20

# Charger le graphe
lines_rdd = spark.sparkContext.textFile(str(gnutella_path))
adjacency_rdd = (
    lines_rdd
    .map(parse_adjacency_line)
    .filter(lambda x: x is not None)
    .partitionBy(2)
    .cache()
)

nodes_rdd = adjacency_rdd.keys().cache()
num_nodes = nodes_rdd.count()
print(f"Total nodes: {num_nodes}")

# Initialiser les rangs uniformément
ranks = nodes_rdd.map(lambda node: (node, 1.0 / num_nodes))

print(f"Running PageRank (alpha={alpha}, {num_iters} iterations)...")

for iteration in range(1, num_iters + 1):
    joined = adjacency_rdd.join(ranks)
    
    dangling_mass = (
        joined
        .filter(lambda kv: len(kv[1][0]) == 0)
        .map(lambda kv: kv[1][1])
        .sum()
    )
    
    contribs = (
        joined
        .flatMap(lambda kv: 
            [] if len(kv[1][0]) == 0 
            else [(nbr, kv[1][1] / len(kv[1][0])) for nbr in kv[1][0]]
        )
        .reduceByKey(add)
    )
    
    teleport_mass = (1.0 - alpha) + alpha * dangling_mass
    base = (
        nodes_rdd
        .map(lambda node: (node, 0.0))
        .leftOuterJoin(contribs)
        .mapValues(lambda pair: pair[1] if pair[1] is not None else 0.0)
    )
    
    ranks = base.map(lambda kv: (kv[0], alpha * kv[1] + teleport_mass / num_nodes))
    
    total_mass = ranks.values().sum()
    ranks = ranks.mapValues(lambda v: v / total_mass)
    
    if iteration % 2 == 0:
        preview = ranks.takeOrdered(3, key=lambda kv: -kv[1])
        print(f"  Iteration {iteration:02d} | mass={total_mass:.6f}")

# Top-K
pr_topk = ranks.takeOrdered(k, key=lambda kv: -kv[1])
pr_output_path = OUTPUTS_DIR / "pagerank_top20.csv"
format_topk_csv(pr_topk, pr_output_path)

# Sauvegarder le plan d'exécution
from pyspark.sql import functions as F
pr_df = spark.createDataFrame(pr_topk, schema=["node", "score"]).orderBy(F.desc("score"))
plan_buffer = StringIO()
with redirect_stdout(plan_buffer):
    pr_df.explain("formatted")
(PROOF_DIR / "plan_pr.txt").write_text(plan_buffer.getvalue())

print(f"✓ PageRank complete! Top node: {pr_topk[0]}")   


=== PageRank ===


                                                                                

Total nodes: 6301
Running PageRank (alpha=0.85, 10 iterations)...
  Iteration 02 | mass=1.000000
  Iteration 04 | mass=1.000000
  Iteration 06 | mass=1.000000
  Iteration 08 | mass=1.000000
  Iteration 10 | mass=1.000000
✓ Saved top-20 to /home/aurel/bda_labs/bda_assignment03/outputs/pagerank_top20.csv
✓ PageRank complete! Top node: ('367', 0.002387885575274488)


## 4. Part A — Multi-Source Personalized PageRank

In [6]:
# write some code here
# - parameters: sources list, alpha, iterations, partitions
# - init mass 1/|S| on sources; others 0
# - on jump and dangling mass, teleport uniformly to S
# - use mapPartitions(..., preservesPartitioning=True) when transforming keyed RDDs
# - compute top-20 and write outputs/ppr_top20.csv
# - save any DF stage plan to proof/plan_ppr.txt


print("\n=== Personalized PageRank ===")

# Utiliser les 3 meilleurs nœuds comme sources
sources = [node for node, _ in pr_topk[:3]]
source_set = set(sources)
initial_mass = 1.0 / len(source_set)

print(f"Sources: {sources}")

ppr_ranks = nodes_rdd.map(lambda node: (node, initial_mass if node in source_set else 0.0))

for iteration in range(1, num_iters + 1):
    joined = adjacency_rdd.join(ppr_ranks)
    
    dangling_mass = (
        joined
        .filter(lambda kv: len(kv[1][0]) == 0)
        .map(lambda kv: kv[1][1])
        .sum()
    )
    
    contribs = (
        joined
        .flatMap(lambda kv: 
            [] if len(kv[1][0]) == 0 
            else [(nbr, kv[1][1] / len(kv[1][0])) for nbr in kv[1][0]]
        )
        .reduceByKey(add)
    )
    
    teleport_mass = (1.0 - alpha) + alpha * dangling_mass
    jump_mass = teleport_mass / len(source_set)
    
    base = (
        nodes_rdd
        .map(lambda node: (node, 0.0))
        .leftOuterJoin(contribs)
        .mapValues(lambda pair: pair[1] if pair[1] is not None else 0.0)
    )
    
    ppr_ranks = base.map(lambda kv: (
        kv[0], 
        alpha * kv[1] + (jump_mass if kv[0] in source_set else 0.0)
    ))
    
    total_mass = ppr_ranks.values().sum()
    ppr_ranks = ppr_ranks.mapValues(lambda v: v / total_mass)
    
    if iteration % 2 == 0:
        print(f"  Iteration {iteration:02d} | mass={total_mass:.6f}")

ppr_topk = ppr_ranks.takeOrdered(k, key=lambda kv: -kv[1])
ppr_output_path = OUTPUTS_DIR / "ppr_top20.csv"
format_topk_csv(ppr_topk, ppr_output_path)

ppr_df = spark.createDataFrame(ppr_topk, schema=["node", "score"]).orderBy(F.desc("score"))
plan_buffer = StringIO()
with redirect_stdout(plan_buffer):
    ppr_df.explain("formatted")
(PROOF_DIR / "plan_ppr.txt").write_text(plan_buffer.getvalue())

print(f"✓ PPR complete! Top node: {ppr_topk[0]}")



=== Personalized PageRank ===
Sources: ['367', '249', '145']
  Iteration 02 | mass=1.000000
  Iteration 04 | mass=1.000000
  Iteration 06 | mass=1.000000
  Iteration 08 | mass=1.000000
  Iteration 10 | mass=1.000000
✓ Saved top-20 to /home/aurel/bda_labs/bda_assignment03/outputs/ppr_top20.csv
✓ PPR complete! Top node: ('367', 0.13299374907556302)


## 5. Part B — TrainSpamClassifier (SGD)

In [None]:
# write some code here
# - parameters: delta, epochs, shuffle flag, numReducers=1
# - read training lines: docid label f1 f2 ...
# - emit (0, (docid, isSpam, features)) and groupByKey(1) to a single learner
# - implement SGD updates on the reducer side; save model to outputs/model_*/part-00000


print("\n=== Train Spam Classifier ===")

# Vérifier que le fichier existe
if not spam_train_britney_path.exists():
    print(f"✗ ERROR: {spam_train_britney_path} not found!")
    print("  Run CELLULE 3 first to decompress data.")
else:
    # Charger les données
    print("Loading training data...")
    train_data = []
    with open(spam_train_britney_path, 'r') as f:
        for line in f:
            parsed = parse_spam_line(line)
            if parsed:
                docid, label, features = parsed
                train_data.append((label, features))
    
    print(f"✓ Loaded {len(train_data)} samples")
    
    # Debug première ligne
    if len(train_data) > 0:
        sample_label, sample_features = train_data[0]
        print(f"  First sample: label={sample_label}, features={len(sample_features)}")
    
    # Entraîner SGD
    print("\nTraining SGD classifier...")
    weights = {}
    bias = 0.0
    learning_rate = 0.1
    reg = 1e-5
    epochs = 5
    
    def sigmoid(x):
        return 1.0 / (1.0 + math.exp(-max(-500, min(500, x))))
    
    for epoch in range(epochs):
        random.shuffle(train_data)
        
        for label, features in train_data:
            dot = bias
            for idx, value in features.items():
                dot += weights.get(idx, 0.0) * value
            
            pred = sigmoid(dot)
            error = pred - label
            
            for idx, value in features.items():
                w = weights.get(idx, 0.0)
                grad = error * value + reg * w
                weights[idx] = w - learning_rate * grad
            
            bias -= learning_rate * (error + reg * bias)
        
        learning_rate *= 0.9
        print(f"  Epoch {epoch + 1}/{epochs} | weights={len(weights)} | bias={bias:.4f}")
    
    print(f"\n✓ Training complete!")
    print(f"  Model size: {len(weights)} features")
    
    # Sauvegarder le modèle
    model_output_dir = OUTPUTS_DIR / "model_spam"
    model_output_dir.mkdir(exist_ok=True)
    model_output_path = model_output_dir / "part-00000"
    
    with open(model_output_path, 'w') as f:
        for feat_id, weight in sorted(weights.items()):
            f.write(f"{feat_id}\t{weight}\n")
        f.write(f"-1\t{bias}\n")
    
    print(f"✓ Model saved to {model_output_path}")




=== Part B: Train Spam Classifier ===
Loading training data...


NameError: name 'spam_train_path' is not defined

#### 6. Part B — ApplySpamClassifier

In [None]:
# write some code here
# - load model tuple file to dict or broadcast
# - score test instances and emit (docid, score, predicted_label)
# - write outputs/predictions_*/

print("\n=== Apply Spam Classifier ===")

# Charger le modèle
model_output_path = OUTPUTS_DIR / "model_spam" / "part-00000"
if not model_output_path.exists():
    print("✗ ERROR: Model not found!")
    print(f"  Expected path: {model_output_path}")
    print(f"  Please run CELLULE 7 first to train the model.")
    print("\nTo verify:")
    print(f"  1. Check if CELLULE 7 executed successfully")
    print(f"  2. Check if folder exists: {OUTPUTS_DIR / 'model_spam'}")
    raise FileNotFoundError(f"Model not found at {model_output_path}")

# Si on arrive ici, le modèle existe
model = {}
with open(model_output_path, 'r') as f:
        for line in f:
            parts = line.strip().split('\t')
            if len(parts) == 2:
                feat_id = int(parts[0])
                weight = float(parts[1])
                model[feat_id] = weight
    
bias = model.get(-1, 0.0)
print(f"✓ Model loaded: {len(model)-1} features, bias={bias:.4f}")


# Split les données d'entraînement en train/test
random.seed(42)
random.shuffle(train_data)
split_idx = int(0.8 * len(train_data))
test_data = train_data[split_idx:]
        
print(f"Test set: {len(test_data)} samples")
        
# Prédire
def sigmoid(x):
    return 1.0 / (1.0 + math.exp(-max(-500, min(500, x))))
    
predictions = []
for label, features in test_data:
    dot = bias
    for idx, value in features.items():
        dot += model.get(idx, 0.0) * value
        
        score = sigmoid(dot)
        pred = 1.0 if score >= 0.5 else 0.0
        predictions.append((label, score, pred))
    
    # Évaluer
    tp = fp = fn = tn = 0
    for true_label, score, pred in predictions:
        if true_label == 1.0 and pred == 1.0:
            tp += 1
        elif true_label == 0.0 and pred == 1.0:
            fp += 1
        elif true_label == 1.0 and pred == 0.0:
            fn += 1
        else:
            tn += 1
    
    precision = tp / (tp + fp) if (tp + fp) else 0.0
    recall = tp / (tp + fn) if (tp + fn) else 0.0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0.0
    
    # Calculer AUC
    sorted_scores = sorted(predictions, key=lambda x: x[1])
    pos = sum(1 for label, _, _ in sorted_scores if label == 1.0)
    neg = len(sorted_scores) - pos
    rank_sum = sum(rank for rank, (label, _, _) in enumerate(sorted_scores, 1) if label == 1.0)
    auc = (rank_sum - pos * (pos + 1) / 2.0) / (pos * neg) if pos and neg else 0.0
    
    print(f"\n✓ Evaluation Results:")
    print(f"  Precision: {precision:.4f}")
    print(f"  Recall:    {recall:.4f}")
    print(f"  F1-Score:  {f1:.4f}")
    print(f"  AUC:       {auc:.4f}")
    
    # Sauvegarder les métriques
    metrics_lines = [
        "# Spam Classification Metrics",
        "",
        f"- Precision: {precision:.4f}",
        f"- Recall: {recall:.4f}",
        f"- F1-Score: {f1:.4f}",
        f"- AUC: {auc:.4f}",
        "",
        "## Confusion Matrix",
        f"- TP: {tp}, FP: {fp}",
        f"- FN: {fn}, TN: {tn}",
    ]
    
    metrics_path = OUTPUTS_DIR / "metrics.md"
    metrics_path.write_text('\n'.join(metrics_lines))
    print(f"✓ Metrics saved to {metrics_path}")
            
    


=== Part B: Apply Spam Classifier ===


FileNotFoundError: [Errno 2] No such file or directory: '/home/aurel/bda_labs/bda_assignment03/outputs/model_spam/part-00000'

## 7. Part B — ApplyEnsembleSpamClassifier

In [None]:
# write some code here
# - --method average or vote
# - load multiple part-00000 model files; broadcast
# - average scores or majority vote; write outputs and a small sample

print("\n=== Part B: Ensemble Classifier ===")

# Train models on all three datasets
ensemble_models = {}
ensemble_train_files = [
    ("britney", spam_train_britney_path),
    ("group_x", spam_train_group_x_path),
    ("group_y", spam_train_group_y_path),
]

print("Training ensemble models...")
for name, train_file_path in ensemble_train_files:
    print(f"\n  Training on {name}...")
    
    # Load and prepare training data
    train_for_ensemble = (
        spark.sparkContext.textFile(str(train_file_path))
        .map(parse_spam_line)
        .filter(lambda x: x is not None)
    )
    
    # Train using SGD
    keyed_train_ensemble = train_for_ensemble.map(lambda x: (0, x))
    model_rdd_ensemble = keyed_train_ensemble.groupByKey(numPartitions=1).mapPartitions(sgd_train)
    
    # Store model
    ensemble_models[name] = dict(model_rdd_ensemble.collect())
    print(f"    ✓ Model {name}: {len(ensemble_models[name])} features")

# Save ensemble models
for name, model_dict in ensemble_models.items():
    model_dir = OUTPUTS_DIR / f"model_ensemble_{name}"
    model_dir.mkdir(exist_ok=True)
    model_path = model_dir / "part-00000"
    
    with open(model_path, 'w') as f:
        for feat_id, weight in sorted(model_dict.items()):
            f.write(f"{feat_id}\t{weight}\n")
    
    print(f"Saved ensemble model {name} to {model_path}")

# Apply ensemble - Method 1: Average scores
print("\n=== Ensemble Method: Average Scores ===")

def score_with_ensemble(docid, label, features, method='average'):
    """Score with ensemble of models"""
    scores = []
    
    for name, model_dict in ensemble_models.items():
        bias_m = model_dict.get(-1, 0.0)
        dot = bias_m
        for feat_id, value in features.items():
            dot += model_dict.get(feat_id, 0.0) * value
        score = sigmoid(dot)
        scores.append(score)
    
    if method == 'average':
        final_score = sum(scores) / len(scores)
    elif method == 'vote':
        # Majority vote
        votes = [1.0 if s >= 0.5 else 0.0 for s in scores]
        final_score = sum(votes) / len(votes)
        final_score = 1.0 if final_score > 0.5 else 0.0
    else:
        final_score = scores[0]  # fallback
    
    predicted = 1.0 if final_score >= 0.5 else 0.0
    return (docid, final_score, predicted, label)

# Score test data with ensemble
ensemble_predictions = test_data.map(lambda x: score_with_ensemble(x[0], x[1], x[2], method='average'))

# Evaluate ensemble
ensemble_preds_list = ensemble_predictions.collect()
tp_ens = fp_ens = fn_ens = tn_ens = 0

for docid, score, pred, true_label in ensemble_preds_list:
    if true_label == 1.0 and pred == 1.0:
        tp_ens += 1
    elif true_label == 0.0 and pred == 1.0:
        fp_ens += 1
    elif true_label == 1.0 and pred == 0.0:
        fn_ens += 1
    else:
        tn_ens += 1

precision_ens = tp_ens / (tp_ens + fp_ens) if (tp_ens + fp_ens) else 0.0
recall_ens = tp_ens / (tp_ens + fn_ens) if (tp_ens + fn_ens) else 0.0
f1_ens = 2 * precision_ens * recall_ens / (precision_ens + recall_ens) if (precision_ens + recall_ens) else 0.0

# Compute AUC for ensemble
scored_ens = [(score, true_label) for docid, score, pred, true_label in ensemble_preds_list]
sorted_ens = sorted(scored_ens, key=lambda x: x[0])
pos_ens = sum(1 for _, label in sorted_ens if label == 1.0)
neg_ens = len(sorted_ens) - pos_ens
rank_sum_ens = sum(rank for rank, (score, label) in enumerate(sorted_ens, start=1) if label == 1.0)
auc_ens = (rank_sum_ens - pos_ens * (pos_ens + 1) / 2.0) / (pos_ens * neg_ens) if pos_ens and neg_ens else 0.0

# Save ensemble predictions
ensemble_output_dir = OUTPUTS_DIR / "predictions_ensemble_average"
if ensemble_output_dir.exists():
    shutil.rmtree(ensemble_output_dir)
ensemble_output_dir.mkdir(exist_ok=True)
ensemble_predictions.saveAsTextFile(str(ensemble_output_dir))

print(f"\nEnsemble Performance (Average Method):")
print(f"  Precision: {precision_ens:.4f}")
print(f"  Recall: {recall_ens:.4f}")
print(f"  F1-Score: {f1_ens:.4f}")
print(f"  AUC: {auc_ens:.4f}")

# Append to metrics file
with open(metrics_path, 'a') as f:
    f.write("\n\n## Ensemble Model Performance (Average)\n")
    f.write(f"- Precision: {precision_ens:.4f}\n")
    f.write(f"- Recall: {recall_ens:.4f}\n")
    f.write(f"- F1-Score: {f1_ens:.4f}\n")
    f.write(f"- AUC: {auc_ens:.4f}\n")
    f.write(f"- Models: britney, group_x, group_y\n")

# Optional: Vote method
print("\n=== Ensemble Method: Majority Vote ===")
ensemble_vote_predictions = test_data.map(lambda x: score_with_ensemble(x[0], x[1], x[2], method='vote'))
ensemble_vote_dir = OUTPUTS_DIR / "predictions_ensemble_vote"
if ensemble_vote_dir.exists():
    shutil.rmtree(ensemble_vote_dir)
ensemble_vote_dir.mkdir(exist_ok=True)
ensemble_vote_predictions.saveAsTextFile(str(ensemble_vote_dir))
print(f"Vote predictions saved to {ensemble_vote_dir}")




## 8. Evaluation and shuffle study

In [None]:
# write some code here
# - compute ROC-AUC with Spark ML if desired
# - or invoke external compute_spam_metrics if available (optional)
# - implement --shuffle: random key + sortBy to permute training before SGD
# - run 10 trials on britney; summarize in outputs/metrics.md

print("\n=== Part B: Evaluation ===")

# Evaluate predictions directly from test split
predictions_list = predictions_rdd.collect()
tp = fp = fn = tn = 0

for docid, score, pred, true_label in predictions_list:
    if true_label == 1.0 and pred == 1.0:
        tp += 1
    elif true_label == 0.0 and pred == 1.0:
        fp += 1
    elif true_label == 1.0 and pred == 0.0:
        fn += 1
    else:
        tn += 1

precision = tp / (tp + fp) if (tp + fp) else 0.0
recall = tp / (tp + fn) if (tp + fn) else 0.0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0.0

# Compute AUC
scored_with_labels = [(score, true_label) for docid, score, pred, true_label in predictions_list]
sorted_scores = sorted(scored_with_labels, key=lambda x: x[0])

pos = sum(1 for _, label in sorted_scores if label == 1.0)
neg = len(sorted_scores) - pos

rank_sum = 0.0
for rank, (score, label) in enumerate(sorted_scores, start=1):
    if label == 1.0:
        rank_sum += rank

auc = (rank_sum - pos * (pos + 1) / 2.0) / (pos * neg) if pos and neg else 0.0

metrics_lines = [
    "# Spam Classification Metrics",
    "",
    f"## Model Performance",
    f"- Precision: {precision:.4f}",
    f"- Recall: {recall:.4f}",
    f"- F1-Score: {f1:.4f}",
    f"- AUC: {auc:.4f}",
    "",
    f"## Confusion Matrix",
    f"- True Positives: {tp}",
    f"- False Positives: {fp}",
    f"- False Negatives: {fn}",
    f"- True Negatives: {tn}",
    "",
    f"## Training Parameters",
    f"- Epochs: {epochs}",
    f"- Learning rate: 0.1 (initial), 0.9 decay per epoch",
    f"- Regularization: 1e-5",
    f"- Training file: spam.train.britney.txt",
    "",
    f"## Dataset Split",
    f"- Train: 80%",
    f"- Test: 20%",
]

metrics_path = OUTPUTS_DIR / "metrics.md"
metrics_path.write_text('\n'.join(metrics_lines))
print('\n'.join(metrics_lines))

## 9. Spark UI evidence
Open http://localhost:4040 during runs. Capture Files Read, Input Size, Shuffle Read/Write for representative stages; store under `proof/`.

## 10. Environment and reproducibility

In [None]:
# write some code here
# - print Java version, Spark conf of interest, OS info
# - save ENV.md with versions + key configs


print("\n=== Environment Summary ===")

import subprocess

def get_java_version():
    try:
        output = subprocess.check_output(["java", "-version"], stderr=subprocess.STDOUT)
        return output.decode("utf-8").strip().splitlines()[0]
    except Exception as exc:
        return f"Unavailable ({exc})"

java_version = get_java_version()
print(f"Java: {java_version}")

conf_items = sorted(spark.sparkContext.getConf().getAll())

env_lines = [
    "# Environment Summary",
    "",
    f"- Python: {sys.version.split()[0]}",
    f"- Spark: {spark.version}",
    f"- PySpark: {pyspark.__version__}",
    f"- Java: {java_version}",
    f"- OS: {platform.platform()}",
    "",
    "## Spark Configuration",
]

env_lines.extend(f"- {k} = {v}" for k, v in conf_items)

env_path = BASE_DIR / "ENV.md"
env_path.write_text('\n'.join(env_lines))
print(f"Environment summary saved to {env_path}")

print("\n=== TP Complete ===")
print(f"Outputs saved to: {OUTPUTS_DIR}")
print(f"Proof files saved to: {PROOF_DIR}")
