In [1]:
# !pip install pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import (
    RandomForestClassifier, LogisticRegression, MultilayerPerceptronClassifier,
    OneVsRest, LinearSVC, NaiveBayes
)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml import Transformer
from pyspark import keyword_only

import math
import pandas as pd

Setup

In [3]:
TEAM = 29
spark = SparkSession.builder.appName(f"{TEAM} - spark ML").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/06 02:48:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/06 02:48:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/06 02:48:58 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/05/06 02:48:58 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/05/06 02:48:58 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/05/06 02:48:58 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
25/05/06 02:48:58 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
25/05/06 02:48:58 WARN Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047.
25/05/06 02:48:58 WARN Utils: Serv

Load data. TODO: replace with Hive

In [4]:
records_df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("records.csv")
)

stations_df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("stations.csv")
)

df = records_df.join(
    stations_df.select("station_id", "latitude", "longitude"),
    on="station_id",
    how="left"
)

25/05/06 02:49:00 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
                                                                                

Rename and cast. TODO: pnut' Tmf to fix

In [5]:
if "cmaq_organic_carbon" in df.columns:
    df = df.withColumn("cmaq_oc", F.col("cmaq_organic_carbon")).drop("cmaq_organic_carbon")

float_cols = [
    "airnow_ozone", "cmaq_ozone", "cmaq_no2", "cmaq_co", "cmaq_oc",
    "pressure", "pbl", "temperature", "wind_speed", "wind_direction",
    "radiation"
]

for col in float_cols:
    if col in df.columns:
        df = df.withColumn(col, F.col(col).cast("float"))

Fill missing values with median

In [6]:
for col in float_cols:
    median = df.approxQuantile(col, [0.5], 0.25)[0]
    df = df.fillna({col: median})

                                                                                

Feature engineering

In [7]:
class SinCosTransformer(
    Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable
):
    period = Param(
        Params._dummy(),
        "period",
        "Period for sin/cos",
        typeConverter=TypeConverters.toFloat,
    )

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, period=None):
        super(SinCosTransformer, self).__init__()
        self._setDefault(period=1)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, period=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setPeriod(self, value):
        return self._set(period=float(value))

    def getPeriod(self):
        return self.getOrDefault(self.period)

    def setInputCol(self, value):
        return self._set(inputCol=value)

    def setOutputCol(self, value):
        return self._set(outputCol=value)

    def _transform(self, dataset):
        period = self.getPeriod()
        inputCol = self.getInputCol()
        outputCol = self.getOutputCol()

        value = 2 * math.pi * F.col(inputCol) / period
        dataset = dataset.withColumn(f"{outputCol}_sin", F.sin(value))
        dataset = dataset.withColumn(f"{outputCol}_cos", F.cos(value))

        return dataset

In [8]:
from pyspark.sql import functions as F

def add_ecef_columns(df, lat_col="latitude", lon_col="longitude", height_col=None):
    a = 6378137.0
    e2 = 6.6943799901377997e-3

    df = df.withColumn("lat_rad", F.radians(F.col(lat_col)))
    df = df.withColumn("lon_rad", F.radians(F.col(lon_col)))

    if height_col is not None:
        df = df.withColumn("ecef_height", F.col(height_col))
    else:
        df = df.withColumn("ecef_height", F.lit(0.0))

    df = df.withColumn(
        "ecef_N",
        F.lit(a) / F.sqrt(1 - F.lit(e2) * F.sin(F.col("lat_rad")) * F.sin(F.col("lat_rad")))
    )

    df = df.withColumn(
        "ecef_x",
        (F.col("ecef_N") + F.col("ecef_height")) * F.cos(F.col("lat_rad")) * F.cos(F.col("lon_rad"))
    )
    df = df.withColumn(
        "ecef_y",
        (F.col("ecef_N") + F.col("ecef_height")) * F.cos(F.col("lat_rad")) * F.sin(F.col("lon_rad"))
    )
    df = df.withColumn(
        "ecef_z",
        (F.col("ecef_N") * (1 - F.lit(e2)) + F.col("ecef_height")) * F.sin(F.col("lat_rad"))
    )

    return df

In [9]:
df = add_ecef_columns(df)

Split data

In [10]:
train, test = df.randomSplit([0.7, 0.3], seed=42)

Feature pipeline

In [11]:
feature_cols = [
    "cmaq_ozone", "cmaq_oc", "pressure", "pbl", "temperature",
    "cloud_fraction", "wind_speed", "ecef_x", "ecef_y", "ecef_z",
]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="linear_features")
scaler = StandardScaler(inputCol="linear_features", outputCol="scaled_features", withMean=True, withStd=True)

sin_month = SinCosTransformer(inputCol="month", outputCol="month", period=12)
sin_day = SinCosTransformer(inputCol="day", outputCol="day", period=31)
sin_hour = SinCosTransformer(inputCol="hour", outputCol="hour", period=24)

final_assembler = VectorAssembler(
    inputCols=[
        "scaled_features", "month_sin", "month_cos", "day_sin", "day_cos",
        "hour_sin", "hour_cos", "ecef_x", "ecef_y", "ecef_z"
    ],
    outputCol="features"
)

In [12]:
pipeline = Pipeline(stages=[assembler, scaler, sin_month, sin_day, sin_hour, final_assembler])
pipeline = pipeline.fit(train)

train = pipeline.transform(train)
test = pipeline.transform(test)

                                                                                

Multiclass label transformation

In [13]:
categorize_radiation = F.when(F.col("radiation") < 100, 0) \
            .when((F.col("radiation") >= 100) & (F.col("radiation") < 500), 1) \
            .otherwise(2)

train = train.withColumn("label", categorize_radiation)
test = test.withColumn("label", categorize_radiation)

Models and parameter grids (27 combinations each: 3 values x 3 hyperparameters)

In [14]:
num_features = 16
num_classes = 3

In [15]:
mlp = MultilayerPerceptronClassifier(labelCol="label", featuresCol="features", layers=[num_features, 32, num_classes])
svc = LinearSVC(labelCol="label", featuresCol="features")
svc_vs = OneVsRest(classifier=svc)
nb = NaiveBayes(labelCol="label", featuresCol="features", modelType="gaussian")
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=100)
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

models = {
    "MLP": (
        mlp,
        ParamGridBuilder()
            .addGrid(mlp.stepSize, [0.01, 0.05, 0.1])  # Algorithm hyperparameter
            .addGrid(mlp.blockSize, [32, 64, 128])  # Model hyperparameter
            .addGrid(mlp.tol, [1e-4, 1e-3, 1e-2])  # Model hyperparameter
            .build()
    ),
    "NaiveBayes": (
        nb,
        ParamGridBuilder()
            .addGrid(nb.smoothing, [0.5, 1.0, 1.5])  # Algorithm hyperparameter
            .addGrid(nb.thresholds, [
                [1.0, 1.0, 1.0],         # Neutral (no weighting)
                [1.5, 1.0, 0.5],         # Favor class 0
                [0.5, 1.0, 1.5]          # Favor class 2
            ])  # Model hyperparameter
            .build()
    ),
    "LogisticRegression": (
        lr,
        ParamGridBuilder()
            .addGrid(lr.regParam, [0.01, 0.1, 1.0])  # Algorithm hyperparameter
            .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  # Model hyperparameter
            .addGrid(lr.threshold, [0.3, 0.5, 0.7])  # Model hyperparameter
            .build()
    ),
    "RandomForest": (
        rf,
        ParamGridBuilder()
            .addGrid(rf.numTrees, [10, 50, 100])  # Algorithm hyperparameter
            .addGrid(rf.maxDepth, [5, 10, 15])  # Model hyperparameter
            .addGrid(rf.featureSubsetStrategy, ["auto", "sqrt", "log2"])  # Model hyperparameter
            .build()
    ),
    "SVM": (
        svc_vs,
        ParamGridBuilder()
            .addGrid(svc.regParam, [0.01, 0.1, 1.0])  # Algorithm hyperparameter
            .addGrid(svc.tol, [1e-6, 1e-4, 1e-2])  # Model hyperparameter
            .addGrid(svc.standardization, [True, False])  # Model hyperparameter
            .build()
    ),
}


Evaluation

In [16]:
metrics = ["accuracy", "f1"]
evaluators = {
    m: MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName=m)
    for m in metrics
}

Train, evaluate and plot training dynamics

In [17]:
test = test.select("features", "label")
train = train.select("features", "label")
train.show(10)

[Stage 42:>                                                         (0 + 1) / 1]

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-0.5928154197071...|    0|
|[-1.2557137044050...|    0|
|[-0.7585399908816...|    0|
|[-0.1785039917708...|    0|
|[-1.4214382755795...|    0|
|[-0.7585399908816...|    0|
|[0.15294515057813...|    0|
|[-1.5043005611668...|    0|
|[-0.8414022764688...|    0|
|[-0.7585399908816...|    0|
+--------------------+-----+
only showing top 10 rows



                                                                                

In [18]:
train.write.mode("overwrite").json("project/data/train")
test.write.mode("overwrite").json("project/data/test")

                                                                                

In [19]:
summary = []

for name, (model, grid) in models.items():
    print(f"\nTraining and evaluating {name}...")
    cv = CrossValidator(
        estimator=model,
        estimatorParamMaps=grid,
        evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1"),
        numFolds=3
    )

    cv_model = cv.fit(train)
    predictions = cv_model.transform(test)

    print(f"Metrics for {name}:")
    row = {"Model": name}
    for m, e in evaluators.items():
        score = e.evaluate(predictions)
        row[m] = score
        print(f"{m}: {score:.4f}")
    summary.append(row)

    predictions = predictions.select("label", "prediction")
    predictions.coalesce(1).write.mode("overwrite").csv(f"project/output/model_{name}_predictions.csv")
    cv_model.bestModel.write().mode("overwrite").save(f"project/models/model_{name}")



Training and evaluating MLP...


25/05/06 02:51:21 WARN BlockManager: Block rdd_219_6 could not be removed as it was not found on disk or in memory
25/05/06 02:51:25 ERROR Executor: Exception in task 6.0 in stage 48.0 (TID 311)
java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.spark.sql.execution.columnar.ColumnBuilder$.ensureFreeSpace(ColumnBuilder.scala:167)
	at org.apache.spark.sql.execution.columnar.BasicColumnBuilder.appendFrom(ColumnBuilder.scala:73)
	at org.apache.spark.sql.execution.columnar.ComplexColumnBuilder.org$apache$spark$sql$execution$columnar$NullableColumnBuilder$$super$appendFrom(ColumnBuilder.scala:93)
	at org.apache.spark.sql.execution.columnar.NullableColumnBuilder.appendFrom(NullableColumnBuilder.scala:61)
	at org.apache.spark.sql.execution.columnar.NullableColumnBuilder.appendFrom$(NullableColumnBuilder.scala:54)
	at org.apache.spark.sql.execution.columnar.ComplexColumnBuild

Py4JError: py4j.reflection does not exist in the JVM

Export results for Superset

In [None]:
pd.DataFrame(summary).to_csv("model_performance.csv", index=False)