In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, PCA
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from pyspark.ml.evaluation import ClusteringEvaluator

# Start Spark session
spark = SparkSession.builder.appName("TrafficDataML").getOrCreate()

# Disable Arrow Optimization in Spark to prevent issues with VectorUDT conversion
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

# Load data
input_path = "dbfs:/user/mehak/processed/berlin_clean.csv"
spark_df = spark.read.csv(input_path, header=True, inferSchema=True)

# Show data and schema
spark_df.show(5)
spark_df.printSchema()

# Define categorical and numeric columns
categorical_low = ["spatial_type"]  # One-Hot Encoding
categorical_high = ["name", "berlin_bez"]  # Label Encoding
numeric_cols = ["zahl_tvz", "vz_typ_no", "lor_prg"]  # Adjust based on your dataset

# Encoding Categorical Features
indexers_high = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") for col in categorical_high]
indexers_low = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") for col in categorical_low]
encoders_low = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_vec") for col in categorical_low]

# Assemble features
assembler_inputs = numeric_cols + [col+"_index" for col in categorical_high] + [col+"_vec" for col in categorical_low]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

# KMeans Model Initialization
kmeans = KMeans(featuresCol="scaledFeatures", predictionCol="cluster", k=5, seed=42)

# Build pipeline with all stages (encoding, assembling, scaling, clustering)
stages = indexers_high + indexers_low + encoders_low + [assembler, scaler, kmeans]
pipeline = Pipeline(stages=stages)

# Fit model and transform the data
model = pipeline.fit(spark_df)
clustered_df = model.transform(spark_df)

# Show the first 10 cluster assignments
clustered_df.select("name", "spatial_type", "cluster").show(10)

# -------------------------------
# ✅ Step 1: Clustering Metrics & Centers
# -------------------------------
kmeans_model = model.stages[-1]  # Last stage is KMeans
wssse = kmeans_model.summary.trainingCost  # Within Set Sum of Squared Errors (WSSSE)
print(f"Within Set Sum of Squared Errors (WSSSE): {wssse}")

# Cluster Centers
centers = kmeans_model.clusterCenters()
print("Cluster Centers:")
for idx, center in enumerate(centers):
    print(f"Cluster {idx}: {center}")

# -------------------------------
# ✅ Step 2: Silhouette Score for Clustering Quality
# -------------------------------
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.setPredictionCol('cluster').evaluate(clustered_df)
print(f"Silhouette Score: {silhouette}")

# -------------------------------
# ✅ Step 3: PCA for Visualization (Dimensionality Reduction to 2D)
# -------------------------------
# Convert the vector columns (e.g., scaledFeatures) to an array before converting to Pandas
to_array_udf = udf(lambda v: v.toArray().tolist(), ArrayType(DoubleType()))
cleaned_df = clustered_df.withColumn("pcaArray", to_array_udf("scaledFeatures"))

# Apply PCA to reduce scaled features to 2D
pca = PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures")
pca_model = pca.fit(cleaned_df)
pca_df = pca_model.transform(cleaned_df)

# Collect data to Pandas using .rdd (avoid Arrow optimization)
plot_df = pca_df.select("pcaFeatures", "cluster").rdd.map(lambda row: (row['pcaFeatures'], row['cluster'])).toDF(["pcaArray", "cluster"]).toPandas()

# Extract x, y from PCA array
plot_df["x"] = plot_df["pcaArray"].apply(lambda x: float(x[0]))
plot_df["y"] = plot_df["pcaArray"].apply(lambda x: float(x[1]))

# Plot the clusters
plt.figure(figsize=(8, 6))
for cluster_id in plot_df["cluster"].unique():
    subset = plot_df[plot_df["cluster"] == cluster_id]
    plt.scatter(subset["x"], subset["y"], label=f"Cluster {cluster_id}", alpha=0.6)
plt.title("KMeans Clusters (PCA 2D Projection)")
plt.xlabel("PCA Feature 1")
plt.ylabel("PCA Feature 2")
plt.legend()
plt.show()

# -------------------------------
# ✅ Step 4: Elbow Method for Optimal k (Selecting k based on WSSSE)
# -------------------------------
wssse_list = []
k_range = range(2, 11)  # Testing k from 2 to 10 clusters
for k_val in k_range:
    kmeans.setK(k_val)  # Set k value
    model = pipeline.fit(spark_df)  # Fit model with new k
    clustered_df = model.transform(spark_df)  # Apply to data
    wssse_list.append(model.stages[-1].summary.trainingCost)  # Collect WSSSE for each k

# Plot Elbow Method to visualize the best k
plt.figure(figsize=(8, 6))
plt.plot(k_range, wssse_list, marker='o')
plt.title("Elbow Method for Optimal k")
plt.xlabel("Number of Clusters (k)")
plt.ylabel("WSSSE")
plt.xticks(k_range)
plt.grid(True)
plt.show()

# -------------------------------
# ✅ Step 5: Cluster Analysis (Mean Values of Features by Cluster)
# -------------------------------
# Compute the mean values for each feature per cluster
cluster_summary = clustered_df.groupBy("cluster").mean("zahl_tvz", "vz_typ_no", "lor_prg")
cluster_summary.show()

# -------------------------------
# ✅ Step 6: Visualize Cluster Characteristics (Traffic Volume and Vehicle Type Distribution)
# -------------------------------
# Plot the distribution of traffic volume by cluster
sns.boxplot(x="cluster", y="zahl_tvz", data=clustered_df.toPandas())
plt.title("Traffic Volume Distribution by Cluster")
plt.show()

# Visualize the distribution of vehicle type by cluster
sns.boxplot(x="cluster", y="vz_typ_no", data=clustered_df.toPandas())
plt.title("Vehicle Type Distribution by Cluster")
plt.show()