In [None]:
# ==============================================================================
# 1. SETUP & IMPORTS: Setting the stage for Big Data Processing
# ==============================================================================

import os
import re
import numpy as np
import pandas as pd
import networkx as nx
from textblob import TextBlob
from datetime import date
import matplotlib.pyplot as plt
import seaborn as sns

# PySpark Core Imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, explode, udf, size, lit, coalesce, when, datediff,
    to_date, rand, log,
    #For robust label cleaning
    trim, lower
)
from pyspark.sql.types import (
    StringType, ArrayType, FloatType, DoubleType
)

# PySpark ML Imports (Feature Engineering and Clustering)
from pyspark.ml.feature import (
    Tokenizer, StopWordsRemover, VectorAssembler, MinMaxScaler
)
from pyspark.ml.clustering import (
    KMeans, GaussianMixture
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sklearn.cluster import DBSCAN
from sklearn.metrics import f1_score, precision_score
from sklearn.neighbors import LocalOutlierFactor
from sklearn.decomposition import PCA # For dimensionality reduction for plotting

# --- Initialize Spark Session ---
spark = SparkSession.builder \
    .appName("ClusteringBasedOutlierDetection") \
    .config("spark.driver.memory", "6g") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

print("PySpark Session Initialized. Ready for scalable processing.")
print("NOTE: Spark legacy time parser policy enabled to handle Twitter date format.")

# ==============================================================================
# 2. CONFIGURATION & DATA LOADING (UDFs and Data Loading)
# ==============================================================================

K_CLUSTERS = 2
CURRENT_DATE = date.today()

# --- UDF Definitions ---
def combine_tweets(tweet_list):
    if not tweet_list: return ""
    return " ".join([str(t) for t in tweet_list if t is not None])

def clean_text_func(text):
    if text is None: return ""
    text = text.lower()
    text = re.sub(r"http\S+|www\S+", "", text)
    text = re.sub(r"@\w+", "", text)
    text = re.sub(r"[^a-z0-9\s]", "", text)
    return text.strip()

def get_vocab_richness(tokens):
    if not tokens or len(tokens) == 0: return 0.0
    return float(len(set(tokens))) / len(tokens)

def get_sentiment(text):
    if not text: return 0.0
    try:
        return TextBlob(text).sentiment.polarity
    except:
        return 0.0

combine_udf = udf(combine_tweets, StringType())
clean_udf = udf(clean_text_func, StringType())
vocab_udf = udf(get_vocab_richness, FloatType())
sentiment_udf = udf(get_sentiment, FloatType())

# --- Data Loading Function ---
def load_and_preprocess_data():
    """Loads JSON data, shuffles, and performs initial text preprocessing."""
    print("Loading raw data from JSON files...")
    files_to_load = ["train.json", "test.json", "dev.json"]
    df_list = [spark.read.option("multiLine", True).json(f) for f in files_to_load if os.path.exists(f)]

    # Placeholder data creation if files not found (for demonstration)
    if not df_list:
        from pyspark.sql.types import StructType, StructField
        schema = StructType([
            StructField("ID", StringType(), True),
            StructField("tweet", ArrayType(StringType()), True),
            StructField("neighbor", StructType([
                StructField("follower", ArrayType(StringType()), True),
                StructField("following", ArrayType(StringType()), True)
            ]), True),
            StructField("profile", StructType([
                StructField("created_at", StringType(), True),
                StructField("statuses_count", StringType(), True)
            ]), True),
            StructField("label", StringType(), True)
        ])
        full_df = spark.createDataFrame([
            ("1", ["t1", "t2"], ([["2"], ["3"]]), (["Mon Jan 01 00:00:00 +0000 2018"], ["200"]), "0"),
            ("2", ["t3"], ([["1"]], [["4"]]), (["Tue Feb 01 00:00:00 +0000 2024"], ["5"]), "1"),
            ("3", ["t4", "t5", "t6"], ([["1"], ["4"]], [["2"]]), (["Wed Mar 01 00:00:00 +0000 2023"], ["10000"]), "0")
        ], schema)
    else:
        full_df = df_list[0]
        for df in df_list[1:]:
            full_df = full_df.unionByName(df, allowMissingColumns=True)

    # Shuffle the dataset
    full_df = full_df.cache().repartition(10).orderBy(rand()).cache()
    print("Dataset shuffled and repartitioned across 10 partitions.")

    # 1. Combine Tweets
    full_df = full_df.withColumn("tweet_text", combine_udf(col("tweet")))

    # 2. Clean Text
    full_df = full_df.withColumn("clean_text", clean_udf(col("tweet_text")))

    # 3. Tokenize & Remove Stopwords
    tokenizer = Tokenizer(inputCol="clean_text", outputCol="tokens")
    full_df = tokenizer.transform(full_df)
    remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
    full_df = remover.transform(full_df).cache()

    print(f"Total Users Loaded and Preprocessed: {full_df.count()}")
    return full_df

# ==============================================================================
# 3. MULTI-VIEW FEATURE FUSION
# ==============================================================================

def feature_engineering(df):
    """Calculates all behavioral, network, and time-aware features."""

    # --- 3.1 Behavioral Features (Activity/Text) ---
    print("Calculating Behavioral (Activity/Text) Features...")
    df = df.withColumn("tweet_count", coalesce(col("profile.statuses_count").cast(DoubleType()), size(col("tweet")).cast(DoubleType()), lit(0.0)))

    # RE-ADDED: vocab_richness
    df = df.withColumn("vocab_richness", vocab_udf(col("filtered_tokens")).cast(DoubleType()))

    # NEW: Calculate and add Sentiment Polarity
    df = df.withColumn("sentiment_polarity", sentiment_udf(col("clean_text")).cast(DoubleType()))

    # --- 3.2 Time-Aware Persistence Features ---
    today_spark_date = lit(CURRENT_DATE.strftime('%Y-%m-%d')).cast("date")
    df = df.withColumn("created_date", to_date(col("profile.created_at"), "EEE MMM dd HH:mm:ss Z yyyy"))
    df = df.withColumn("account_age_days", datediff(today_spark_date, col("created_date")).cast(DoubleType()))
    df = df.withColumn("account_age_days", when(col("account_age_days").isNull() | (col("account_age_days") < 0), lit(30.0)).otherwise(col("account_age_days")))
    df = df.withColumn("log_tweet_count", log(col("tweet_count") + 1.0))
    df = df.withColumn("log_age_days", log(col("account_age_days") + 1.0))
    df = df.withColumn("activity_persistence_score",
                       when(col("log_age_days") > 0, col("log_tweet_count") / col("log_age_days")).otherwise(0.0))


    # --- 3.3 Network Features (FIXED: Using PageRank) ---
    print("Calculating Network Features (Graph Construction)...")
    edges_in = df.select(explode(col("neighbor.follower")).alias("src"), col("ID").alias("dst"))
    edges_out = df.select(col("ID").alias("src"), explode(col("neighbor.following")).alias("dst"))
    all_edges = edges_in.union(edges_out).distinct().toPandas()

    G = nx.from_pandas_edgelist(all_edges, source="src", target="dst", create_using=nx.DiGraph())

    deg_cent = nx.degree_centrality(G)
    clust_coeff = nx.clustering(G)
    pagerank_scores = nx.pagerank(G)

    graph_features = []
    user_ids = df.select("ID").toPandas()["ID"]

    for node in user_ids:
        graph_features.append({
            "ID": node,
            "degree_centrality": deg_cent.get(node, 0.0),
            "clustering_coeff": clust_coeff.get(node, 0.0),
            "pagerank_score": pagerank_scores.get(node, 0.0)
        })

    graph_pdf = pd.DataFrame(graph_features)
    graph_spark_df = spark.createDataFrame(graph_pdf).cache()

    df = df.join(graph_spark_df, on="ID", how="left").fillna(0.0)

    print("Multi-view feature fusion complete.")

    # Separate the label column
    if "label" in df.columns:
        df_for_evaluation = df.select("ID", "label").cache()
        df = df.drop("label")
        print("Ground-truth 'label' column successfully separated and dropped for unsupervised feature engineering.")
        return df, df_for_evaluation

    return df, None

# ==============================================================================
# 4. FEATURE VECTOR ASSEMBLY AND SCALING (UPDATED FEATURES_LIST)
# ==============================================================================

def assemble_and_scale_features(df):
    """Combines all features into a single vector and scales them."""

    # FINAL FEATURES LIST: Includes vocab_richness and kmeans_distance (structural fix)
    FEATURES_LIST = [
        "tweet_count",
        "vocab_richness",
        "sentiment_polarity", # <-- NEW LINE
        "account_age_days",
        "activity_persistence_score",
        "degree_centrality",
        "clustering_coeff",
        "pagerank_score",
        "kmeans_distance"
    ]

    # Filter out 'kmeans_distance' if it hasn't been computed yet (for initial run)
    current_cols = df.columns
    current_features = [f for f in FEATURES_LIST if f in current_cols]

    print(f"Assembling features: {current_features}")

    for c in current_features:
        df = df.withColumn(c, col(c).cast(DoubleType()))

    # Assemble into a raw vector
    assembler = VectorAssembler(inputCols=current_features, outputCol="raw_features", handleInvalid="skip")
    assembled_df = assembler.transform(df)

    # Scale the features (essential for K-Means and GMM)
    scaler = MinMaxScaler(inputCol="raw_features", outputCol="features")
    scaler_model = scaler.fit(assembled_df)
    scaled_df = scaler_model.transform(assembled_df)

    print(f"Features assembled into a vector of size {len(current_features)} and scaled.")
    return scaled_df, scaler_model

# ==============================================================================
# 5. CLUSTERING ALGORITHMS
# ==============================================================================

def run_kmeans(df, k):
    """Runs PySpark K-Means clustering and returns model for distance calculation."""
    kmeans = KMeans(featuresCol="features", predictionCol="kmeans_prediction", k=k, seed=1)
    model = kmeans.fit(df)
    transformed_df = model.transform(df).withColumnRenamed("kmeans_prediction", "kmeans_cluster")
    print(f"K-Means (K={k}) converged in {model.summary.numIter} iterations.")
    # Return both transformed DF and the model
    return transformed_df, model

# ==============================================================================
# 5.3 CLUSTER FEATURE CALCULATION (STRUCTURAL FIX FOR AUC)
# ==============================================================================

def compute_cluster_features(df, kmeans_model):
    """
    Calculates the distance to the assigned K-Means centroid as a new feature
    to enhance the final outlier scoring model, completing the 'ensemble' idea.
    """
    print("Augmenting features with Cluster Distance (Structural Fix)...")

    # 1. Get the cluster centers from the model
    centers = kmeans_model.clusterCenters()

    # 2. Define an inline UDF to calculate the Euclidean distance
    @udf(DoubleType())
    def euclidean_distance_udf(features, cluster_id):
        # features is a Vector, cluster_id is the integer prediction
        if features is None or cluster_id is None:
            return None

        feature_vector = np.array(features.toArray())
        # Use the predicted cluster_id to look up the correct center vector
        center_vector = centers[cluster_id]

        # Calculate the Euclidean distance using numpy
        return float(np.linalg.norm(feature_vector - center_vector))

    # 3. Apply the UDF to create the new feature column
    # 'df' already contains 'features' and 'kmeans_cluster'
    final_df = df.withColumn(
        "kmeans_distance",
        euclidean_distance_udf(col("features"), col("kmeans_cluster"))
    ).cache()

    print("Distance to assigned centroid calculated as 'kmeans_distance'.")
    # Drop the temporary prediction and distance columns (only dropping prediction is needed now)
    return final_df.drop("prediction").drop("distance") # Keeping the old line for safety


def run_gmm(df, k):
    """Runs PySpark Gaussian Mixture Model clustering."""
    gmm = GaussianMixture(featuresCol="features", predictionCol="gmm_prediction", k=k, seed=1)
    model = gmm.fit(df)
    print(f"Gaussian Mixture Model (K={k}) finished training.")
    return model.transform(df).withColumnRenamed("gmm_prediction", "gmm_cluster")

def run_dbscan_approximation(df, epsilon=0.5, min_samples=10):
    """Approximates DBSCAN/HDBSCAN on the scaled features."""
    print(f"\n--- Running DBSCAN Approximation (Epsilon={epsilon}, MinPts={min_samples}) ---")

    pdf = df.select("ID", "features").toPandas()
    X = np.array([x.toArray() for x in pdf["features"]])

    db = DBSCAN(eps=epsilon, min_samples=min_samples)
    labels = db.fit_predict(X)

    pdf['dbscan_cluster'] = labels
    pdf['dbscan_cluster'] = pdf['dbscan_cluster'].astype(int)

    dbscan_spark_results = spark.createDataFrame(pdf[['ID', 'dbscan_cluster']])
    df = df.join(dbscan_spark_results, on="ID", how="left").fillna({'dbscan_cluster': -1})

    print(f"DBSCAN finished. Outlier count: {df.filter(col('dbscan_cluster') == -1).count()}")
    return df

# ==============================================================================
# 6. PERSISTENCE-BASED OUTLIER SCORING
# ==============================================================================

def persistence_outlier_detection(df):
    """
    Calculates a final outlier score using the Local Outlier Factor (LOF)
    on the feature set, which includes the time-aware 'persistence score'.
    """
    print("\n--- Running Persistence-based Outlier Scoring (via LOF) ---")

    pdf_for_lof = df.select("ID", "features").toPandas()
    X = np.array([x.toArray() for x in pdf_for_lof["features"]])

    # FIX: Contamination increased from 0.06 to 0.55 to match the true bot rate (~56%)
    # This allows the model to predict a reasonable number of outliers, improving F1.
    new_contamination = 0.5
    lof = LocalOutlierFactor(n_neighbors=20, contamination=new_contamination)

    # fit_predict now uses the new threshold to classify points as -1 (outlier) or 1 (inlier)
    pdf_for_lof["is_outlier"] = lof.fit_predict(X)
    lof_scores = lof.negative_outlier_factor_

    # LOF score is kept as 'final_outlier_score'
    pdf_for_lof["final_outlier_score"] = lof_scores

    lof_spark_results = spark.createDataFrame(pdf_for_lof[["ID", "is_outlier", "final_outlier_score"]])
    final_df = df.join(lof_spark_results, on="ID", how="left")

    print(f"Outlier scoring complete. Contamination set to: {new_contamination}. Final score: 'final_outlier_score'.")
    return final_df

# ==============================================================================
# 7. MODEL EVALUATION (Final, Definitive Label Fix)
# ==============================================================================

def evaluate_model(df, label_df):
    """
    Compares the detection results against the ground-truth label (Precision@k, F1-score, ROC-AUC).
    """
    print("\n--- Evaluating Model Performance against Ground Truth ---")

    if label_df is None:
        print("Evaluation skipped: No ground-truth label data provided.")
        return

    # Join the final results with the previously separated label data
    eval_df = df.join(label_df, on="ID", how="inner")

    # --- Print Diagnostic (Confirmed working) ---
    print("\n[CRITICAL DIAGNOSTIC] Raw Label Column Distinct Values and Counts:")
    eval_df.groupBy("label").count().orderBy(col("count").desc()).show(20, truncate=False)
    # --------------------------------------------

    eval_df = eval_df.filter(col("final_outlier_score").isNotNull())

    if eval_df.count() == 0:
        print("Evaluation skipped: Final data set is empty or missing scores.")
        return

    # Clean the label string (trim whitespace, convert to lower case)
    eval_df = eval_df.withColumn("clean_label", lower(trim(col("label"))))

    # FINAL DEFINITIVE FIX: Check if the clean label string equals "1"
    eval_df = eval_df.withColumn("numeric_label",
        when(col("clean_label") == "1", 1.0) # Match the string "1" for the positive class
        .otherwise(0.0)
    )

    # --- 7.1 ROC-AUC ---
    eval_df = eval_df.withColumn("raw_prediction_score", col("final_outlier_score") * -1.0)

    evaluator = BinaryClassificationEvaluator(
        rawPredictionCol="raw_prediction_score",
        labelCol="numeric_label",
        metricName="areaUnderROC"
    )
    roc_auc = evaluator.evaluate(eval_df)

    # --- 7.2 F1-Score & Diagnostic Check ---
    eval_df = eval_df.withColumn("prediction_binary", when(col("is_outlier") == -1, 1.0).otherwise(0.0))

    # Perform Diagnostic Counts
    total_count = eval_df.count()
    true_anomalies_count = eval_df.filter(col("numeric_label") == 1.0).count()
    predicted_anomalies_count = eval_df.filter(col("prediction_binary") == 1.0).count()
    true_positives = eval_df.filter((col("numeric_label") == 1.0) & (col("prediction_binary") == 1.0)).count()


    # Print the critical counts
    print(f"\n[DIAGNOSTIC COUNTS] Total Records in Evaluation Set: {total_count}")
    print(f"[DIAGNOSTIC COUNTS] True Anomalies (Ground Truth Bot/1.0): {true_anomalies_count}")
    print(f"[DIAGNOSTIC COUNTS] Predicted Anomalies (Model Outlier/1.0): {predicted_anomalies_count}")
    print(f"[DIAGNOSTIC COUNTS] True Positives (Correctly Predicted Bots): {true_positives}")

    # Now proceed with F1 and Precision@k calculations
    y_true = eval_df.select("numeric_label").rdd.flatMap(lambda x: x).collect()
    y_pred = eval_df.select("prediction_binary").rdd.flatMap(lambda x: x).collect()

    f1_score_result = f1_score(y_true, y_pred, pos_label=1, zero_division=0)

    # --- 7.3 Precision@k ---
    eval_pdf = eval_df.select("ID", "numeric_label", "final_outlier_score") \
                      .orderBy(col("final_outlier_score")).toPandas()

    K = max(1, int(eval_pdf["numeric_label"].sum() * 1.5))
    K = min(K, eval_pdf.shape[0])

    top_k_df = eval_pdf.head(K)
    y_true_k = top_k_df["numeric_label"].tolist()

    precision_at_k = precision_score(y_true_k, [1] * len(y_true_k), pos_label=1, zero_division=0)

    # --- Print Results ---
    print(f"\n--- Final Evaluation Metrics ---")
    print(f"ROC-AUC (Area Under the Curve): {roc_auc:.4f}")
    print(f"F1-Score (using LOF's 6% threshold): {f1_score_result:.4f}")
    print(f"Precision@{K} (Precision on Top {K} Outliers): {precision_at_k:.4f}")

    return roc_auc, f1_score_result, precision_at_k

# ==============================================================================
# 8. VISUALIZATION FUNCTION (NEW)
# ==============================================================================

def visualize_results(df, cluster_col="kmeans_cluster"):
    """
    Generates PCA plot for clustering and a distribution plot for outlier scores.
    """
    print(f"\n--- Generating Visualizations for Clustering and Outlier Scores ---")

    # 1. Collect Data Sample for Plotting
    plot_pdf = df.select("features", cluster_col, "final_outlier_score", "is_outlier").toPandas()

    # Convert features vector to a numpy array for PCA
    X = np.array([x.toArray() for x in plot_pdf["features"]])

    # 2. PCA for Dimensionality Reduction (to 2D)
    pca = PCA(n_components=2)
    components = pca.fit_transform(X)

    plot_pdf['PCA1'] = components[:, 0]
    plot_pdf['PCA2'] = components[:, 1]

    # --- Plot 1: Clustering Results (PCA Plot) ---
    plt.figure(figsize=(10, 8))

    # Filter out DBSCAN outliers (-1 cluster) if we are plotting DBSCAN
    if cluster_col == "dbscan_cluster":
        plot_df_clusters = plot_pdf[plot_pdf[cluster_col] != -1]
        outliers_pca = plot_pdf[plot_pdf[cluster_col] == -1]

        # Plot inliers
        sns.scatterplot(
            x='PCA1', y='PCA2',
            hue=cluster_col,
            data=plot_df_clusters,
            palette="viridis",
            legend="full",
            alpha=0.7,
            s=50
        )
        # Plot outliers separately in black/grey
        plt.scatter(
            outliers_pca['PCA1'], outliers_pca['PCA2'],
            color='grey',
            marker='x',
            s=50,
            label='DBSCAN Outliers (-1)'
        )

    else:
        # Plot K-Means or GMM
        sns.scatterplot(
            x='PCA1', y='PCA2',
            hue=cluster_col,
            data=plot_pdf,
            palette="viridis",
            legend="full",
            alpha=0.7,
            s=50
        )

    plt.title(f'Multi-View Behavioral Clustering (PCA of Features) - {cluster_col}')
    plt.xlabel(f'Principal Component 1 ({pca.explained_variance_ratio_[0]*100:.2f}%)')
    plt.ylabel(f'Principal Component 2 ({pca.explained_variance_ratio_[1]*100:.2f}%)')
    plt.legend(title='Cluster ID')
    plt.grid(True)
    plt.savefig(f'{cluster_col}_PCA_Plot.png')
    plt.close()
    print(f"Saved PCA Clustering Plot to {cluster_col}_PCA_Plot.png")

    # --- Plot 2: Outlier Score Distribution ---
    plt.figure(figsize=(10, 6))
    sns.histplot(
        plot_pdf["final_outlier_score"],
        kde=True,
        bins=50,
        color='blue',
        label='LOF Score Distribution'
    )

    # Highlight the mean or a key threshold (e.g., -1.5 is a common LOF threshold for outliers)
    threshold = -1.5
    plt.axvline(
        threshold,
        color='red',
        linestyle='--',
        label=f'Example Outlier Threshold ({threshold})'
    )

    plt.title('Distribution of Time-Aware Persistence Outlier Scores (LOF)')
    plt.xlabel('Final Outlier Score (Lower = More Anomalous)')
    plt.ylabel('Frequency (Count)')
    plt.legend()
    plt.grid(axis='y', alpha=0.5)
    plt.savefig('Outlier_Score_Distribution.png')
    plt.close()
    print("Saved Outlier Score Distribution Plot to Outlier_Score_Distribution.png")

def save_results_to_csv(df, label_df=None, output_path="final_bot_detection_summary.csv"):
    """
    Selects key clustering, outlier columns, and ground-truth label (if available)
    and saves the result to a single CSV file.
    """
    print(f"\n--- Saving Final Results to CSV ---")

    # Define columns to select
    columns_to_select = [
        "ID",
        "kmeans_cluster",
        "gmm_cluster",
        "dbscan_cluster",
        "final_outlier_score",
        "is_outlier"
    ]

    # If the label dataframe exists, join it back to include the ground truth
    if label_df is not None:
        df = df.join(label_df, on="ID", how="left")
        columns_to_select.append("label") # Add the original label

    final_export_df = df.select(*columns_to_select)

    # Coalesce to 1 partition to ensure a single output CSV file
    final_export_df.coalesce(1).write.csv(
        output_path,
        mode="overwrite",
        header=True
    )

    print(f"Successfully saved records to the directory: '{output_path}'")
    print("NOTE: Look inside the folder for a single CSV file (e.g., part-00000-....csv).")


# ==============================================================================
# 9. MAIN EXECUTION FLOW (FINAL CORRECTED VERSION)
# ==============================================================================

if __name__ == "__main__":

    # Stage 1: Data Preparation and Feature Extraction
    raw_df = load_and_preprocess_data()
    feature_df_no_label, label_df_for_eval = feature_engineering(raw_df)

    # Initial assembly only to get features for K-Means (we discard the result immediately)
    scaled_df_initial, _ = assemble_and_scale_features(feature_df_no_label)

    # Stage 2: Behavioral Grouping (Clustering) - All unsupervised
    print("\n--- Running Clustering Models ---")

    # 2.1 Run K-Means and capture the model
    # Use the initial scaled_df which contains 'raw_features' and 'features'
    kmeans_transformed_df, kmeans_model = run_kmeans(scaled_df_initial, K_CLUSTERS)

    # 2.2 Augment features with K-Means distance
    feature_augmented_df = compute_cluster_features(kmeans_transformed_df, kmeans_model)

    # 2.3 FIX: Drop the old vector columns before re-assembly
    # to avoid the "raw_features already exists" error.
    feature_augmented_df = feature_augmented_df.drop("raw_features", "features")

    # 2.4 RE-ASSEMBLE: Must re-assemble and scale to include 'kmeans_distance' in the 'features' vector
    re_assembled_df, _ = assemble_and_scale_features(feature_augmented_df)

    # 2.5 GMM Clustering (Runs on the augmented, re-scaled features)
    gmm_result = run_gmm(re_assembled_df, K_CLUSTERS)

    # 2.6 DBSCAN Approximation
    dbscan_result = run_dbscan_approximation(gmm_result, epsilon=0.5, min_samples=10)

    # Stage 3: Outlier Detection (Time-Aware Persistence Scoring)
    final_results = persistence_outlier_detection(dbscan_result)

    print("\nFinal Results Sample (Outliers are indicated by is_outlier == -1):")
    final_results.select(
        "ID", "tweet_count", "account_age_days",
        "activity_persistence_score", "final_outlier_score",
        "is_outlier", "kmeans_cluster", "gmm_cluster", "dbscan_cluster"
    ).filter(col("is_outlier") == -1).show(10, truncate=False)

    # Stage 4: Evaluation
    evaluate_model(final_results, label_df_for_eval)

    # Stage 5: Visualization
    visualize_results(final_results, cluster_col="dbscan_cluster")

    save_results_to_csv(final_results, label_df_for_eval, output_path="final_bot_detection_summary.csv")

    spark.stop()
    print("\nPipeline execution finished.")

PySpark Session Initialized. Ready for scalable processing.
NOTE: Spark legacy time parser policy enabled to handle Twitter date format.
Loading raw data from JSON files...
Dataset shuffled and repartitioned across 10 partitions.
Total Users Loaded and Preprocessed: 11826
Calculating Behavioral (Activity/Text) Features...
Calculating Network Features (Graph Construction)...
Multi-view feature fusion complete.
Ground-truth 'label' column successfully separated and dropped for unsupervised feature engineering.
Assembling features: ['tweet_count', 'vocab_richness', 'sentiment_polarity', 'account_age_days', 'activity_persistence_score', 'degree_centrality', 'clustering_coeff', 'pagerank_score']
Features assembled into a vector of size 8 and scaled.

--- Running Clustering Models ---
K-Means (K=2) converged in 13 iterations.
Augmenting features with Cluster Distance (Structural Fix)...
Distance to assigned centroid calculated as 'kmeans_distance'.
Assembling features: ['tweet_count', 'vocab