In [3]:
import os
# Set JAVA_HOME to Java 17 which is already installed.
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["JAVA_HOME"], "bin")

#Install the required libraries
!pip install pyspark
# Initialize a Spark session
from pyspark.sql import SparkSession

# Stop any existing Spark session to ensure new configurations take effect
if 'spark' in locals() and spark is not None:
    spark.stop()

spark = (
    SparkSession.builder
    .appName("04_clustering_and_anomaly_pyspark")
    .master("local[*]")
    .config("spark.driver.memory", "6g")
    .config("spark.executor.memory", "6g")
    .config("spark.sql.shuffle.partitions", "8")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")




In [4]:
import os
import argparse
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as F

parser = argparse.ArgumentParser()
parser.add_argument("--featured_parquet", default="/content/data/featured.paraquet")
parser.add_argument("--out_dir", default="/content/results")
parser.add_argument("--k", type=int, default=5)
parser.add_argument("--sample_fraction_for_anomaly", type=float, default=0.05)

args = parser.parse_args(args=[])

In [5]:
df = spark.read.parquet(args.featured_parquet)
df = df.filter(F.col("features").isNotNull()).cache()
print("Loaded features count:", df.count())

Loaded features count: 2499784


In [11]:
from pyspark.ml.functions import vector_to_array
from pyspark.sql import functions as F

df_clean = (
    df
    .withColumn("features_arr", vector_to_array("features"))
    .filter(
        ~F.exists("features_arr", lambda x: F.isnan(x))
    )
    .drop("features_arr")
)


In [12]:
from pyspark.ml.functions import array_to_vector

df_fixed = df.withColumn(
    "features",
    array_to_vector(
        F.transform(
            vector_to_array("features"),
            lambda x: F.when(F.isnan(x), F.lit(0.0)).otherwise(x)
        )
    )
)


In [13]:
kmeans = KMeans(
    k=args.k,
    featuresCol="features",
    seed=42,
    maxIter=20
)

km_model = kmeans.fit(df_fixed)


In [16]:
from pyspark.sql.functions import col
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf
import math

def has_nan_or_inf(v):
    return any(math.isnan(x) or math.isinf(x) for x in v)

has_bad = udf(has_nan_or_inf, "boolean")

df.select(has_bad(col("features")).alias("bad")).groupBy("bad").count().show()


+-----+-------+
|  bad|  count|
+-----+-------+
| true|2499758|
|false|     26|
+-----+-------+



In [17]:
import math
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, IntegerType

def bad_indices(v):
    return [i for i, x in enumerate(v) if math.isnan(x) or math.isinf(x)]

bad_idx_udf = udf(bad_indices, ArrayType(IntegerType()))

df.select(bad_idx_udf(col("features")).alias("bad_idx")) \
  .where("size(bad_idx) > 0") \
  .limit(5) \
  .show(truncate=False)


+--------+
|bad_idx |
+--------+
|[14, 15]|
|[14, 15]|
|[14, 15]|
|[14, 15]|
|[14, 15]|
+--------+



In [26]:
# Based on the SparkRuntimeException from a failed KMeans run, the 'features' vector has 78 dimensions.
# Define feature_cols as a list of indices for these dimensions.
feature_cols = list(range(78))

feature_cols_fixed = [
    c for i, c in enumerate(feature_cols)
    if i not in (14, 15)
]

In [28]:
for i, c in enumerate(feature_cols):
    if i in (14, 15):
        print(i, c)


14 14
15 15


In [32]:
(df
.withColumn("feature_14", F.element_at(vector_to_array("features"), 15))
.withColumn("feature_15", F.element_at(vector_to_array("features"), 16))
.select("feature_14", "feature_15").summary().show())

+-------+----------+----------+
|summary|feature_14|feature_15|
+-------+----------+----------+
|  count|   2499784|   2499784|
|   mean|       NaN|       NaN|
| stddev|       NaN|       NaN|
|    min|       0.0|       0.0|
|    25%|       NaN|       NaN|
|    50%|       NaN|       NaN|
|    75%|       NaN|       NaN|
|    max|       NaN|       NaN|
+-------+----------+----------+



In [33]:
bad_cols = ["feature_14", "feature_15"]

df_fixed = df.drop(*bad_cols)


In [34]:
feature_cols_fixed = [c for c in feature_cols if c not in bad_cols]


In [40]:
feature_cols_fixed = [i for i in range(78) if i not in (14, 15)]

In [41]:
from pyspark.ml.functions import vector_to_array, array_to_vector
from pyspark.sql.functions import array, col, lit

# Convert the existing 'features' vector to an array
df_with_array = df.withColumn("features_array", vector_to_array("features"))

# Create a new array by selecting elements based on feature_cols_fixed (which should now be integers)
new_features_array_expr = array([F.element_at(col("features_array"), lit(idx + 1)) for idx in feature_cols_fixed])

df_fixed = df_with_array.withColumn("features", array_to_vector(new_features_array_expr)).drop("features_array")

In [43]:
import math
from pyspark.sql.functions import col, udf
from pyspark.ml.linalg import VectorUDT

def has_nan_or_inf(v):
    return any(math.isnan(x) or math.isinf(x) for x in v)

has_bad = udf(has_nan_or_inf, "boolean")

df_fixed.select(has_bad(col("features")).alias("bad")) \
    .groupBy("bad").count().show()


+-----+-------+
|  bad|  count|
+-----+-------+
|false|2499784|
+-----+-------+



In [44]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

kmeans = KMeans(
    k=args.k,
    featuresCol="features",
    seed=42,
    predictionCol="cluster"
)

km_model = kmeans.fit(df_fixed)
df_k = km_model.transform(df_fixed)

evaluator = ClusteringEvaluator(
    featuresCol="features",
    predictionCol="cluster",
    metricName="silhouette",
    distanceMeasure="squaredEuclidean"
)

sil = evaluator.evaluate(df_k)
print("KMeans silhouette score:", sil)


KMeans silhouette score: 0.1257080671813052


In [45]:
cluster_counts = df_k.groupBy("cluster").count().orderBy("cluster").toPandas()
cluster_counts.to_csv(os.path.join(args.out_dir, "kmeans_cluster_counts.csv"), index=False)
print("Saved kmeans_cluster_counts.csv")

Saved kmeans_cluster_counts.csv


In [46]:
try:
    from pyspark.ml.iforest import IForest
    print("Using spark-iforest IForest for anomaly detection")
    iso = IForest(contamination=0.01, featuresCol="features", predictionCol="anomaly", scoreCol="anomaly_score")
    iso_model = iso.fit(df)
    df_iso = iso_model.transform(df)
    df_iso.select("anomaly", "anomaly_score").limit(10).show()
    df_iso.select("anomaly", "anomaly_score").write.mode("overwrite").parquet(os.path.join(args.out_dir, "iforest_scores.parquet"))
    print("Saved iforest anomaly scores parquet.")
except Exception as e:
    print("spark-iforest not available or failed:", e)
    print("Falling back to sklearn IsolationForest on a sample (converted to pandas) — only recommended on a sample.")
    sample = df.sample(withReplacement=False, fraction=args.sample_fraction_for_anomaly, seed=42)
    pdf = sample.select("features").toPandas()
    import numpy as np
    X = np.vstack(pdf['features'].apply(lambda v: v.toArray()).values)
    from sklearn.ensemble import IsolationForest
    iso_sklearn = IsolationForest(n_estimators=200, contamination=0.01, random_state=42)
    iso_sklearn.fit(X)
    scores = -iso_sklearn.decision_function(X)
    out_pdf = sample.withColumn("tmp_id", F.monotonically_increasing_id()).toPandas()
    out_pdf['anomaly_score'] = scores
    out_pdf[['anomaly_score']].to_csv(os.path.join(args.out_dir, "isolation_forest_sample_scores.csv"), index=False)
    print("Saved sample anomaly scores to results/")

spark-iforest not available or failed: No module named 'pyspark.ml.iforest'
Falling back to sklearn IsolationForest on a sample (converted to pandas) — only recommended on a sample.
Saved sample anomaly scores to results/


In [None]:
spark.stop()