In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml import Pipeline

INPUT_PATHS = [
    "/content/part-00000-4f3bd938-470a-46cb-940f-48b68d9cdb1b-c000.csv",
    "/content/part-00001-4f3bd938-470a-46cb-940f-48b68d9cdb1b-c000.csv",
    "/content/part-00002-4f3bd938-470a-46cb-940f-48b68d9cdb1b-c000.csv"
]

FEATURE_COLUMNS = ["NO2_Mean", "O3_Mean", "SO2_Mean", "CO_Mean"]
N_COMPONENTS = 3

spark = SparkSession.builder.appName("SAT5165_PCA_Sucharitha").getOrCreate()
df = spark.read.option("header", True).option("inferSchema", True).csv(INPUT_PATHS)

for col in FEATURE_COLUMNS:
    df = df.withColumn(col, F.col(col).cast(DoubleType()))

non_null_condition = None
for c in FEATURE_COLUMNS:
    cond = F.col(c).isNotNull()
    non_null_condition = cond if non_null_condition is None else (non_null_condition | cond)
df = df.filter(non_null_condition)

medians = {c: df.approxQuantile(c, [0.5], 0.001)[0] for c in FEATURE_COLUMNS}
fill_map = {k: (v if v is not None else 0.0) for k, v in medians.items()}
df = df.fillna(fill_map)

assembler = VectorAssembler(inputCols=FEATURE_COLUMNS, outputCol="features_unscaled")
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withMean=True, withStd=True)
pca = PCA(k=N_COMPONENTS, inputCol="features", outputCol="pca_features")

pipeline = Pipeline(stages=[assembler, scaler, pca])
model = pipeline.fit(df)
transformed = model.transform(df)

explained_variance = model.stages[-1].explainedVariance.toArray().tolist()
print("Explained variance ratio:", explained_variance)

# Extract PCA component values safely using a UDF
def extract_pca_value(v, idx):
    try:
        return float(v.toArray()[idx])
    except Exception:
        return None

extract_udf = F.udf(lambda v, idx: extract_pca_value(v, idx), DoubleType())

for i in range(N_COMPONENTS):
    transformed = transformed.withColumn(f"pc_{i+1}", extract_udf(F.col("pca_features"), F.lit(i)))

transformed.select([f"pc_{i+1}" for i in range(N_COMPONENTS)] + FEATURE_COLUMNS).show(10, truncate=False)

spark.stop()


Explained variance ratio: [0.525783182686236, 0.22542532144494884, 0.16650776678532875]
+------------------+-------------------+--------------------+--------+--------+--------+--------+
|pc_1              |pc_2               |pc_3                |NO2_Mean|O3_Mean |SO2_Mean|CO_Mean |
+------------------+-------------------+--------------------+--------+--------+--------+--------+
|1.737732749912841 |0.40158725623237845|0.10359616296396407 |4.541667|0.038458|0.958333|0.025   |
|1.7451441500302567|0.4032987370179101 |0.09608461043974453 |4.541667|0.038458|0.958333|0.020833|
|1.7419062347178151|0.3912312407873393 |0.10764207430090147 |4.541667|0.038458|0.925   |0.025   |
|1.749317634835231 |0.3929427215728709 |0.10013052177668194 |4.541667|0.038458|0.925   |0.020833|
|0.6286862157919038|0.0745221420956521 |-0.6099443236202006 |10.0    |0.025208|1.875   |0.095833|
|0.6286862157919038|0.0745221420956521 |-0.6099443236202006 |10.0    |0.025208|1.875   |0.095833|
|0.6286862157919038|0.07452214