In [None]:
from pyspark.sql import SparkSession, Row, Window
from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, BucketedRandomProjectionLSH
from pyspark.ml.functions import vector_to_array
from pyspark.sql import functions as F
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from collections import defaultdict
from operator import add
import math
import time
import os
import sys

os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jre1.8.0_441"
os.environ["SPARK_HOME"] = "C:\\Spark\\spark-3.5.5-bin-hadoop3" 
os.environ["HADOOP_HOME"] = "C:\\hadoop"
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable

In [None]:
data_path = "outlier_dataset_200_200.csv"
nodes_list = [1, 2, 4, 8, 16, 32, 64]
p = 2
m_max = 5
k = 10

In [None]:
# Loop over different simulated node counts
for cores in nodes_list:
    t0 = time.time()

    # Start Spark session with given number of local cores
    spark = SparkSession.builder \
        .appName(f"LOF_Eval_{cores}cores") \
        .master(f"local[{cores}]") \
        .config("spark.driver.memory", "4g") \
        .config("spark.executor.memory", "4g") \
        .getOrCreate()

    # 1) Load data and cache
    df = spark.read.csv(data_path, header=True, inferSchema=True).cache()
    cols = df.columns
    label_col = cols[-1]
    feature_cols = cols[:-1]

    # 2) Normalize features to [0,1]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vec")
    df_vec = assembler.transform(df).cache()
    scaler = MinMaxScaler(inputCol="features_vec", outputCol="scaled_vec")
    scaler_model = scaler.fit(df_vec)
    df_scaled = scaler_model.transform(df_vec).cache()
    df_scaled = df_scaled.withColumn("scaled_arr", vector_to_array("scaled_vec"))
    scaled_exprs = [F.col("scaled_arr")[i].alias(feature_cols[i]) for i in range(len(feature_cols))]
    df_norm = df_scaled.select(*scaled_exprs, F.col(label_col)).cache()

    # 3) Grid-based density via binning
    width = 1.0 / p
    def to_binned_row(row):
        bins = [min(max(int(math.ceil(row[f] / width)), 1), p) for f in feature_cols]
        rec = {f"{feature_cols[i]}_bin": bins[i] for i in range(len(feature_cols))}
        rec["cube"] = bins  # list of ints
        return Row(**rec)
    binned_rdd = df_norm.select(*feature_cols).rdd.map(to_binned_row)

    # Explicit schema for binned DataFrame
    bin_fields = [StructField(f"{feature_cols[i]}_bin", IntegerType(), False) for i in range(len(feature_cols))]
    schema = StructType(bin_fields + [StructField("cube", ArrayType(IntegerType(), containsNull=False), False)])
    df_binned = spark.createDataFrame(binned_rdd, schema).cache()

    # Count per cube (use tuple as key)
    cube_count = df_binned.rdd.map(lambda r: (tuple(r["cube"]), 1)).reduceByKey(add)
    # Build density grid DataFrame
    dg_schema = StructType([
        StructField("cube", ArrayType(IntegerType(), containsNull=False), False),
        StructField("density", IntegerType(), False)
    ])
    dg = (cube_count
          .map(lambda x: (list(x[0]), x[1]))
          .toDF(schema=dg_schema)
          .cache())

    # 4) mRMRD feature selection
    bin_cols = [f"{c}_bin" for c in feature_cols]
    df_dens = df_binned.join(dg, on="cube").select(*bin_cols, "density").cache()
    N = df_dens.count()

    # Compute relevance MI(feature; density)
    def rel_map(row):
        y = row["density"]
        for f in feature_cols:
            yield ((f, row[f"{f}_bin"], y), 1)
    cont_rel = df_dens.rdd.flatMap(rel_map).reduceByKey(add).collect()
    n_xy, n_x, n_y = defaultdict(int), defaultdict(int), defaultdict(int)
    for (f, b, y), cnt in cont_rel:
        n_xy[(f, b, y)] = cnt
        n_x[(f, b)] += cnt
        n_y[y] += cnt
    mi_fd = {f: sum((cnt/N) * math.log((cnt/N)/((n_x[(f,b)]/N)*(n_y[y]/N)))
                 for ((ff,b,y),cnt) in n_xy.items() if ff==f)
             for f in feature_cols}
    Th = sum(mi_fd.values()) / len(mi_fd)

    selected, candidates, mi_red = [], feature_cols.copy(), {}
    while candidates and len(selected) < m_max:
        if not selected:
            best = max(candidates, key=lambda x: mi_fd[x])
        else:
            s = selected[-1]
            def red_map(row):
                bs = row[f"{s}_bin"]
                for j in candidates:
                    yield ((j, row[f"{j}_bin"], bs), 1)
            cont_rs = df_binned.rdd.flatMap(red_map).reduceByKey(add).collect()
            n_xy_rs, n_x_rs, n_s_rs = defaultdict(int), defaultdict(int), defaultdict(int)
            for (j, b_j, b_s), cnt in cont_rs:
                n_xy_rs[(j,b_j,b_s)] = cnt
                n_x_rs[(j,b_j)] += cnt
                n_s_rs[b_s] += cnt
            for j in candidates:
                if (j,s) not in mi_red:
                    I_js = sum((cnt/N) * math.log((cnt/N)/((n_x_rs[(j,b_j)]/N)*(n_s_rs[b_s]/N)))
                               for ((jj,b_j,b_s),cnt) in n_xy_rs.items() if jj==j)
                    mi_red[(j,s)] = mi_red[(s,j)] = I_js
            best, best_score = None, float('-inf')
            for j in candidates:
                rel = mi_fd[j]
                red_avg = sum(mi_red[(j,t)] for t in selected)/len(selected) if selected else 0.0
                score = rel - red_avg
                if score > best_score:
                    best, best_score = j, score
        if mi_fd[best] < Th:
            break
        selected.append(best)
        candidates.remove(best)
    print(f"Final mRMRD subspace: {selected}")

    # 5) LOF computation
    df_proj = df_norm.select(*selected, label_col).withColumn("id", F.monotonically_increasing_id()).cache()
    assembler2 = VectorAssembler(inputCols=selected, outputCol="features_vec")
    df_vec2 = assembler2.transform(df_proj).select("id","features_vec").cache()
    lsh = BucketedRandomProjectionLSH(inputCol="features_vec", outputCol="hashes", bucketLength=math.sqrt(len(selected))/2)
    model = lsh.fit(df_vec2)
    max_dist = math.sqrt(len(selected))
    pairs = (model.approxSimilarityJoin(df_vec2, df_vec2, max_dist, distCol="dist")
             .select(F.col("datasetA.id").alias("pid"), F.col("datasetB.id").alias("oid"), "dist")
             .filter("pid < oid"))
    pairs = pairs.unionByName(pairs.selectExpr("oid as pid","pid as oid","dist"))
    w = F.row_number().over(Window.partitionBy("pid").orderBy("dist"))
    knn = pairs.withColumn("rn",w).filter(F.col("rn")<=k).select("pid","oid","dist").cache()
    kdist = knn.groupBy("oid").agg(F.max("dist").alias("kdist")).cache()
    rd = knn.join(kdist, on="oid").withColumn("reach_dist",F.greatest("dist","kdist")).cache()
    lrd = rd.groupBy("pid").agg((F.lit(k)/F.sum("reach_dist")).alias("lrd")).cache()
    lof = (rd.join(lrd.select(F.col("pid").alias("oid"),"lrd"),on="oid")
           .groupBy("pid").agg(F.avg("lrd").alias("avg_lrd_o"))
           .join(lrd,on="pid")
           .withColumn("LOF",F.col("avg_lrd_o")/F.col("lrd")).cache())

    # 6) AUC evaluation
    score_label = (df_proj.join(lof.withColumnRenamed("pid","id").select("id","LOF"),on="id")
                   .select("LOF", label_col)
                   .rdd.map(lambda r: (float(r[0]), float(r[1]))))
    metrics = BinaryClassificationMetrics(score_label)
    auc = metrics.areaUnderROC

    # Print results
    t1 = time.time()
    print(f"Cores={cores:2d}  Time={(t1-t0):.1f}s  AUC={auc:.4f}")

    # Stop Spark session
    spark.stop()