In [0]:
feature_cols = [
    "sepal_length", "sepal_width",
    "petal_length", "petal_width",
    "sepal_area", "petal_area", "petal_ratio"
]

In [0]:
df_gold = spark.table("workspace.iris_gold.df_iris_gold")

In [0]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline

df_gold = df_gold.select(
    *[F.col(c).cast("double").alias(c) for c in feature_cols],
    F.col("species")
)

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_raw",
    handleInvalid="skip"
)

scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features_scaled",
    withMean=True,
    withStd=True
)

pca = PCA(
    k=2,
    inputCol="features_scaled",
    outputCol="pca_features"
)

kmeans = KMeans(
    k=3,
    seed=42,
    featuresCol="pca_features",
    predictionCol="cluster"
)

pipeline = Pipeline(stages=[assembler, scaler, pca, kmeans])
pca_model = pipeline.fit(df_gold)
df_pca = pca_model.transform(df_gold)

model = pipeline.fit(df_gold)
df_km = model.transform(df_gold)

display(df_km.select("pca_features", "cluster", "species"))

In [0]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator(
    featuresCol="pca_features",
    predictionCol="cluster",
    metricName="silhouette",
    distanceMeasure="squaredEuclidean"
)

sil = evaluator.evaluate(df_km)
print("Silhouette:", sil)

In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

pdf = df_km.select("pca_features", "cluster", "species").toPandas()
X = np.vstack(pdf["pca_features"].apply(lambda v: v.toArray()).values)

plt.figure()
for c in sorted(pdf["cluster"].unique()):
    idx = pdf["cluster"] == c
    plt.scatter(X[idx, 0], X[idx, 1], label=f"cluster {c}")
plt.legend()
plt.title("KMeans sobre PCA (Iris)")
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.show()