In [19]:
!pip -q install pyspark==3.5.1 pyarrow==16.1.0 pandas==2.2.2 scikit-learn==1.5.1 datasets==2.20.0

import os, json, time, random
import pandas as pd

from google.colab import drive
drive.mount("/content/drive")

PROJECT_DIR = "/content/drive/MyDrive/7006SCN_project"
DATA_DIR    = os.path.join(PROJECT_DIR, "data")
OUT_DIR     = os.path.join(PROJECT_DIR, "outputs")
MODEL_DIR   = os.path.join(PROJECT_DIR, "models")
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(OUT_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)

PARQUET_PATH = os.path.join(DATA_DIR, "reddit_movie_sample_50k.parquet")
CSV_PATH     = os.path.join(DATA_DIR, "reddit_movie_sample_50k.csv")

print("Project:", PROJECT_DIR)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Project: /content/drive/MyDrive/7006SCN_project


In [20]:
# ============================================================
# A) Ensure sample exists (streaming extraction if missing)
# ============================================================
from datasets import load_dataset

def build_sample_if_missing(n_sample=50_000, seed=42):
    if os.path.exists(PARQUET_PATH) and os.path.exists(CSV_PATH):
        print("Sample already exists.")
        return

    print("Sample not found -> creating via streaming (no full download)...")
    random.seed(seed)

    ds = load_dataset("ZhankuiHe/reddit_movie_large_v1", split="train", streaming=True)

    rows = []
    for ex in ds:
        # pick text
        t = ex.get("processed")
        if t is None or str(t).strip()=="":
            t = ex.get("raw","")
        if t is None:
            continue
        t = str(t)
        if len(t) < 3:
            continue

        # strict label
        lab = ex.get("is_seeker", None)
        if lab is None:
            continue

        rows.append({
            "conv_id": ex.get("conv_id"),
            "turn_id": ex.get("turn_id"),
            "turn_order": ex.get("turn_order"),
            "utc_time": ex.get("utc_time"),
            "upvotes": ex.get("upvotes"),
            "text": t,
            "is_seeker": bool(lab)
        })

        if len(rows) >= n_sample:
            break

    pdf = pd.DataFrame(rows)
    pdf["text"] = pdf["text"].astype(str)
    pdf["is_seeker"] = pdf["is_seeker"].astype(bool)
    pdf["upvotes"] = pd.to_numeric(pdf["upvotes"], errors="coerce").fillna(0.0)

    pdf.to_parquet(PARQUET_PATH, index=False)
    pdf.to_csv(CSV_PATH, index=False)

    print("Saved sample:")
    print("PARQUET:", PARQUET_PATH)
    print("CSV:", CSV_PATH)

build_sample_if_missing()

Sample already exists.


In [21]:
# ============================================================
# B) Load sample into Spark + strict binary label (0/1)
# ============================================================
from pyspark.sql.functions import col, when, length

df = spark.read.parquet(PARQUET_PATH)

# Clean + enforce binary numeric label
df = (
    df
    .withColumn("text", col("text").cast("string"))
    .withColumn("is_seeker", col("is_seeker").cast("boolean"))
    .filter(col("is_seeker").isNotNull())
    .withColumn("label", when(col("is_seeker") == True, 1.0).otherwise(0.0))  # STRICT 0/1
    .withColumn("upvotes", when(col("upvotes").isNull(), 0.0).otherwise(col("upvotes").cast("double")))
    .filter(length(col("text")) >= 3)
)

# Sanity check: label cardinality MUST be 2
print("Distinct label values:", [r["label"] for r in df.select("label").distinct().collect()])

# Partition + cache
if "conv_id" in df.columns:
    df = df.repartition(32, "conv_id")
else:
    df = df.repartition(32)

df = df.persist()
print("Rows:", df.count(), "Cols:", len(df.columns))

Distinct label values: [1.0, 0.0]
Rows: 50000 Cols: 8


In [22]:
# ============================================================
# C) Train/test split
# ============================================================
SEED = 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=SEED)
train_df = train_df.persist()
test_df  = test_df.persist()
print("Train:", train_df.count(), "Test:", test_df.count())




Train: 40076 Test: 9924


In [23]:
# ============================================================
# D) Feature pipeline (text -> features)
# ============================================================
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF

tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W+", minTokenLength=2)
stop      = StopWordsRemover(inputCol="tokens", outputCol="tokens_clean")
tf        = HashingTF(inputCol="tokens_clean", outputCol="tf", numFeatures=1<<18)
idf       = IDF(inputCol="tf", outputCol="features")

In [24]:
# ============================================================
# D) Feature pipeline (text -> features)
# ============================================================
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF

tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W+", minTokenLength=2)
stop      = StopWordsRemover(inputCol="tokens", outputCol="tokens_clean")
tf        = HashingTF(inputCol="tokens_clean", outputCol="tf", numFeatures=1<<18)
idf       = IDF(inputCol="tf", outputCol="features")


In [26]:
# ============================================================
# E) Models (4) + evaluation
# ============================================================
from pyspark.ml.classification import LogisticRegression, NaiveBayes, LinearSVC, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

evaluator_auc = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

def compute_metrics(pred_df):
    auc = evaluator_auc.evaluate(pred_df)

    pred_and_labels = pred_df.select(
        col("prediction").cast("double"),
        col("label").cast("double")
    ).rdd.map(tuple)

    mm = MulticlassMetrics(pred_and_labels)
    return {
        "auc": float(auc),
        "accuracy": float(mm.accuracy),
        "precision": float(mm.weightedPrecision),
        "recall": float(mm.weightedRecall),
        "f1": float(mm.weightedFMeasure())
    }

def fit_and_eval(model, model_name):
    pipeline = Pipeline(stages=[tokenizer, stop, tf, idf, model])

    t0 = time.time()
    fitted = pipeline.fit(train_df)
    fit_seconds = time.time() - t0

    select_cols = ["label", "prediction", "rawPrediction"]
    # LinearSVC does not produce a 'probability' column
    if model_name != "LinearSVC":
        select_cols.append("probability")

    preds = fitted.transform(test_df).select(*select_cols)
    metrics = compute_metrics(preds)
    metrics["model"] = model_name
    metrics["fit_seconds"] = float(fit_seconds)
    return metrics, fitted

# Fast settings for Colab
lr  = LogisticRegression(maxIter=30, regParam=0.05, elasticNetParam=0.0, featuresCol="features", labelCol="label")
nb  = NaiveBayes(smoothing=1.0, modelType="multinomial", featuresCol="features", labelCol="label")
svm = LinearSVC(maxIter=30, regParam=0.1, featuresCol="features", labelCol="label")
rf  = RandomForestClassifier(numTrees=80, maxDepth=10, featureSubsetStrategy="sqrt",
                             featuresCol="features", labelCol="label", seed=SEED)

results = []
fitted_models = {}

for name, mdl in [
    ("LogisticRegression", lr),
    ("NaiveBayes", nb),
    ("LinearSVC", svm),
    ("RandomForest", rf),
]:
    print(f"\n--- Training {name} ---")
    m, fitted = fit_and_eval(mdl, name)
    results.append(m)
    fitted_models[name] = fitted
    print({k: round(v, 4) if isinstance(v, float) else v for k, v in m.items()})

results_df = pd.DataFrame(results).sort_values(by="f1", ascending=False)
print("\n=== PySpark Model Comparison (sorted by F1) ===")
print(results_df)

METRICS_CSV  = os.path.join(OUT_DIR, "pyspark_model_metrics.csv")
METRICS_JSON = os.path.join(OUT_DIR, "pyspark_model_metrics.json")
results_df.to_csv(METRICS_CSV, index=False)
with open(METRICS_JSON, "w") as f:
    json.dump(results, f, indent=2)

print("\nSaved PySpark metrics:", METRICS_CSV, METRICS_JSON)



--- Training LogisticRegression ---




{'auc': 0.9996, 'accuracy': 0.9947, 'precision': 0.9947, 'recall': 0.9947, 'f1': 0.9947, 'model': 'LogisticRegression', 'fit_seconds': 51.2862}

--- Training NaiveBayes ---
{'auc': 0.3208, 'accuracy': 0.9185, 'precision': 0.918, 'recall': 0.9185, 'f1': 0.9181, 'model': 'NaiveBayes', 'fit_seconds': 6.0598}

--- Training LinearSVC ---
{'auc': 1.0, 'accuracy': 0.9998, 'precision': 0.9998, 'recall': 0.9998, 'f1': 0.9998, 'model': 'LinearSVC', 'fit_seconds': 59.0462}

--- Training RandomForest ---
{'auc': 0.9787, 'accuracy': 0.72, 'precision': 0.8035, 'recall': 0.72, 'f1': 0.6473, 'model': 'RandomForest', 'fit_seconds': 1075.0993}

=== PySpark Model Comparison (sorted by F1) ===
        auc  accuracy  precision    recall        f1               model  \
2  1.000000  0.999798   0.999799  0.999798  0.999798           LinearSVC   
0  0.999608  0.994659   0.994691  0.994659  0.994650  LogisticRegression   
1  0.320770  0.918480   0.918050  0.918480  0.918114          NaiveBayes   
3  0.978748  

In [27]:
# ============================================================
# F) CrossValidator tuning (small grid) — safe on Colab
# ============================================================
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

tune_lr = LogisticRegression(maxIter=30, featuresCol="features", labelCol="label")
tune_pipeline = Pipeline(stages=[tokenizer, stop, tf, idf, tune_lr])

param_grid = (ParamGridBuilder()
              .addGrid(tune_lr.regParam, [0.1, 0.03])
              .addGrid(tune_lr.elasticNetParam, [0.0, 0.5])
              .build())

crossval = CrossValidator(
    estimator=tune_pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator_auc,
    numFolds=3,
    parallelism=2
)

print("\n--- Running CrossValidator (LR small grid) ---")
t0 = time.time()
cv_model = crossval.fit(train_df)
cv_seconds = time.time() - t0

cv_preds = cv_model.transform(test_df).select("label", "prediction", "rawPrediction", "probability")
cv_metrics = compute_metrics(cv_preds)
cv_metrics.update({"model": "LogisticRegression_CV", "fit_seconds": float(cv_seconds)})
print("CV metrics:", {k: round(v, 4) if isinstance(v, float) else v for k, v in cv_metrics.items()})

results.append(cv_metrics)
results_df = pd.DataFrame(results).sort_values(by="f1", ascending=False)
results_df.to_csv(METRICS_CSV, index=False)
with open(METRICS_JSON, "w") as f:
    json.dump(results, f, indent=2)


--- Running CrossValidator (LR small grid) ---




CV metrics: {'auc': 0.9998, 'accuracy': 0.999, 'precision': 0.999, 'recall': 0.999, 'f1': 0.999, 'model': 'LogisticRegression_CV', 'fit_seconds': 525.1831}


In [29]:
# ============================================================
# G) Save best model + predictions
# ============================================================
best_row = results_df.iloc[0].to_dict()
best_name = best_row["model"]
print("\nBest model by F1:", best_name)

BEST_MODEL_PATH = os.path.join(MODEL_DIR, f"best_model_{best_name}")
best_obj = cv_model if best_name == "LogisticRegression_CV" else fitted_models[best_name]
best_obj.write().overwrite().save(BEST_MODEL_PATH)
print("Saved best Spark model to:", BEST_MODEL_PATH)

# Predictions for Tableau/report
select_cols = [
    "conv_id", "turn_id", "turn_order", "utc_time", "upvotes",
    col("text").alias("text"),
    col("label").alias("label"),
    col("prediction").alias("prediction")
]

# Only add 'probability' if the model produces it (e.g., not LinearSVC)
if best_name != "LinearSVC":
    select_cols.append("probability")

pred_export = best_obj.transform(test_df).select(*select_cols)

PRED_PARQUET = os.path.join(OUT_DIR, "test_predictions.parquet")
PRED_CSV     = os.path.join(OUT_DIR, "test_predictions_sample.csv")

pred_export.write.mode("overwrite").parquet(PRED_PARQUET)
pred_export.limit(5000).toPandas().to_csv(PRED_CSV, index=False)

print("Saved predictions parquet:", PRED_PARQUET)
print("Saved predictions CSV sample:", PRED_CSV)



Best model by F1: LinearSVC
Saved best Spark model to: /content/drive/MyDrive/7006SCN_project/models/best_model_LinearSVC
Saved predictions parquet: /content/drive/MyDrive/7006SCN_project/outputs/test_predictions.parquet
Saved predictions CSV sample: /content/drive/MyDrive/7006SCN_project/outputs/test_predictions_sample.csv


In [30]:
# ============================================================
# H) sklearn baseline (single node) — TFIDF + LogisticRegression
# ============================================================
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression as SkLR
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score

pdf = pd.read_parquet(PARQUET_PATH)
X = pdf["text"].astype(str).values
y = pdf["is_seeker"].astype(bool).astype(int).values

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=SEED, stratify=y
)

tfidf = TfidfVectorizer(max_features=200_000, ngram_range=(1,2))
Xtr = tfidf.fit_transform(X_train)
Xte = tfidf.transform(X_test)

sk = SkLR(max_iter=200, n_jobs=-1)
t0 = time.time()
sk.fit(Xtr, y_train)
sk_seconds = time.time() - t0

proba = sk.predict_proba(Xte)[:, 1]
pred = (proba >= 0.5).astype(int)

acc = accuracy_score(y_test, pred)
prec, rec, f1, _ = precision_recall_fscore_support(y_test, pred, average="weighted")
auc = roc_auc_score(y_test, proba)

sk_metrics = {
    "model": "sklearn_TFIDF_LogReg",
    "auc": float(auc),
    "accuracy": float(acc),
    "precision": float(prec),
    "recall": float(rec),
    "f1": float(f1),
    "fit_seconds": float(sk_seconds)
}

SK_JSON = os.path.join(OUT_DIR, "sklearn_baseline_metrics.json")
with open(SK_JSON, "w") as f:
    json.dump(sk_metrics, f, indent=2)

print("\n=== sklearn baseline ===")
print({k: round(v, 4) if isinstance(v, float) else v for k, v in sk_metrics.items()})
print("Saved sklearn baseline metrics:", SK_JSON)

# Append sklearn baseline to main metrics too
results.append(sk_metrics)
final_df = pd.DataFrame(results).sort_values(by="f1", ascending=False)
final_df.to_csv(METRICS_CSV, index=False)
with open(METRICS_JSON, "w") as f:
    json.dump(results, f, indent=2)

print("\nFINAL OUTPUTS saved to Drive:")
print("- Sample Parquet:", PARQUET_PATH)
print("- Sample CSV:", CSV_PATH)
print("- Metrics CSV/JSON:", METRICS_CSV, METRICS_JSON)
print("- Best model folder:", BEST_MODEL_PATH)
print("- Predictions:", PRED_PARQUET, PRED_CSV)

# ---------------------------
# Unpersist (good practice)
# ---------------------------
train_df.unpersist()
test_df.unpersist()
df.unpersist()

print("\nDone.")


=== sklearn baseline ===
{'model': 'sklearn_TFIDF_LogReg', 'auc': 1.0, 'accuracy': 0.9966, 'precision': 0.9966, 'recall': 0.9966, 'f1': 0.9966, 'fit_seconds': 3.5407}
Saved sklearn baseline metrics: /content/drive/MyDrive/7006SCN_project/outputs/sklearn_baseline_metrics.json

FINAL OUTPUTS saved to Drive:
- Sample Parquet: /content/drive/MyDrive/7006SCN_project/data/reddit_movie_sample_50k.parquet
- Sample CSV: /content/drive/MyDrive/7006SCN_project/data/reddit_movie_sample_50k.csv
- Metrics CSV/JSON: /content/drive/MyDrive/7006SCN_project/outputs/pyspark_model_metrics.csv /content/drive/MyDrive/7006SCN_project/outputs/pyspark_model_metrics.json
- Best model folder: /content/drive/MyDrive/7006SCN_project/models/best_model_LinearSVC
- Predictions: /content/drive/MyDrive/7006SCN_project/outputs/test_predictions.parquet /content/drive/MyDrive/7006SCN_project/outputs/test_predictions_sample.csv

Done.
