In [1]:
import os
os.makedirs('/tmp/spark-events', exist_ok=True)

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Stop any existing context
try:
    SparkContext.getOrCreate().stop()
except:
    pass

# Now create a new session
spark = SparkSession.builder.appName("LogisticRegression").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/26 23:05:36 WARN Utils: Your hostname, datnd-Nitro-AN515-57, resolves to a loopback address: 127.0.1.1; using 192.168.101.128 instead (on interface wlp0s20f3)
25/12/26 23:05:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/26 23:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [4]:
df_train = spark.read.csv("/home/datnd/Projects/spark-kaggle/data/train.csv", header=True, inferSchema=True)

target_col = "accident_risk_level"

def add_features(df):
    return (
        df
        .withColumn("is_night", (F.col("lighting") == "night").cast("int"))
        .withColumn("bad_weather", (F.col("weather").isin("foggy", "rainy")).cast("int"))
        .withColumn("high_curvature", (F.col("curvature") >= 0.5).cast("int"))
        .withColumn("high_speed", (F.col("speed_limit") >= 60).cast("int"))
        .withColumn("night_high_curvature", (F.col("is_night") * F.col("high_curvature")).cast("int"))
        .withColumn("night_high_speed", (F.col("is_night") * F.col("high_speed")).cast("int"))
        .withColumn("high_curvature_bad_weather", (F.col("high_curvature") * F.col("bad_weather")).cast("int"))
        .withColumn("curvature_x_night", F.col("curvature") * F.col("is_night"))
        .withColumn("speed_x_night", F.col("speed_limit") * F.col("is_night"))
        .withColumn("road_signs_present_i", F.col("road_signs_present").cast("int"))
        .withColumn("public_road_i", F.col("public_road").cast("int"))
        .withColumn("holiday_i", F.col("holiday").cast("int"))
        .withColumn("school_season_i", F.col("school_season").cast("int"))
    )

# split
train_df, test_df = df_train.randomSplit([0.8, 0.2], seed=42)
train_fe = add_features(train_df)
test_fe  = add_features(test_df)

categorical_cols = ["road_type", "lighting", "weather", "time_of_day"]

numeric_cols = ["num_lanes", "curvature", "speed_limit", "num_reported_accidents"]
interaction_numeric_cols = ["curvature_x_night", "speed_x_night"]
numeric_to_scale = numeric_cols + interaction_numeric_cols

boolean_features = [
    "is_night", "bad_weather", "high_curvature", "high_speed",
    "night_high_curvature", "night_high_speed", "high_curvature_bad_weather",
    "road_signs_present_i", "public_road_i", "holiday_i", "school_season_i"
]

label_indexer = StringIndexer(inputCol=target_col, outputCol="label", handleInvalid="keep")
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_ohe", dropLast=True) for c in categorical_cols]

num_assembler = VectorAssembler(inputCols=numeric_to_scale, outputCol="num_raw", handleInvalid="keep")

scaler = StandardScaler(inputCol="num_raw", outputCol="num_scaled", withMean=False, withStd=True)

bool_assembler = VectorAssembler(inputCols=boolean_features, outputCol="bool_features", handleInvalid="keep")

final_assembler = VectorAssembler(
    inputCols=["num_scaled", "bool_features"] + [f"{c}_ohe" for c in categorical_cols],
    outputCol="features",
    handleInvalid="keep"
)

lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    family="multinomial",
    maxIter=100,
    regParam=0.01,
    elasticNetParam=0.0
)

pipeline = Pipeline(stages=[
    label_indexer,
    *indexers, *encoders,
    num_assembler, scaler,
    bool_assembler,
    final_assembler,
    lr
])

model = pipeline.fit(train_fe)
train = model.transform(train_fe)
pred = model.transform(test_fe)


evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")


print("Train Accuracy:", evaluator_acc.evaluate(train))
print("Train Precision:", evaluator_precision.evaluate(train))
print("Train Recall:", evaluator_recall.evaluate(train))
print("Train F1:", evaluator_f1.evaluate(train))
print("Test Accuracy:", evaluator_acc.evaluate(pred))
print("Test Precision:", evaluator_precision.evaluate(pred))
print("Test Recall:", evaluator_recall.evaluate(pred))
print("Test F1:", evaluator_f1.evaluate(pred))


25/12/26 23:05:48 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Train Accuracy: 0.7883241261187369


                                                                                

Train Precision: 0.8029936399661157


                                                                                

Train Recall: 0.7883241261187368


                                                                                

Train F1: 0.7922538670149973
Test Accuracy: 0.7879562892350617
Test Precision: 0.8027204909542864
Test Recall: 0.7879562892350616
Test F1: 0.7919204364899317


In [None]:
# ===== Feature Importance Extraction =====
import pandas as pd

# 1) Extract the LogisticRegressionModel from the Pipeline
lr_model = model.stages[-1]
print("Coefficients shape:", lr_model.coefficientMatrix.toArray().shape)

# 2) Reconstruct feature names
# Numeric + Interaction features
num_features = numeric_to_scale

# Boolean features
bool_feats = boolean_features

# OHE features
# We iterate through the StringIndexer models to get labels.
# pipeline.stages[1:5] are the StringIndexers (4 categorical cols)
ohe_features = []

# The StringIndexers are at indices 1 to 4 (inclusive) in the pipeline stages definition:
# stages=[label_indexer (0), *indexers (1..4), *encoders (5..8), ...]
# Let's grab them dynamically to be safe.
from pyspark.ml.feature import StringIndexerModel

# Filter for StringIndexerModels that were part of our categorical input
si_models = [stage for stage in model.stages if isinstance(stage, StringIndexerModel) and stage.getOutputCol() in [f"{c}_idx" for c in categorical_cols]]

for si in si_models:
    c_name = si.getInputCol()
    labels = si.labels
    # With handleInvalid="keep", there is an extra hidden bucket for invalid/unknown values at index = len(labels).
    # OneHotEncoder(dropLast=True) drops the last category.
    # Since the indices are [0, 1, ..., len(labels)-1, len(labels)], 
    # the last one (the 'invalid' bucket) is dropped.
    # Thus, ALL original labels are kept as features.
    ohe_features += [f"{c_name}={lbl}" for lbl in labels]

feature_names = num_features + bool_feats + ohe_features

print(f"Total features constructed: {len(feature_names)}")
print(f"Model features: {lr_model.numFeatures}")

assert len(feature_names) == lr_model.numFeatures, "Feature count mismatch!"

# 3) Create a DataFrame of Feature Importances
# For multinomial logistic regression, coefficientMatrix is (numClasses, numFeatures)
# We can look at the coefficients for a specific class or aggregate them.
# Here, let's look at the max absolute coefficient across classes for each feature.
coefs = lr_model.coefficientMatrix.toArray()

# If binary, it might be (1, numFeatures). If multinomial with K classes, (K, numFeatures).
# Let's take the mean absolute value across classes to see general importance, 
# or if it's binary, just take the absolute value.
import numpy as np
if coefs.shape[0] == 1:
    feat_imp = np.abs(coefs[0])
else:
    # For each feature, take max absolute impact across all classes
    feat_imp = np.max(np.abs(coefs), axis=0)

df_importance = pd.DataFrame({
    "feature": feature_names,
    "importance": feat_imp
}).sort_values("importance", ascending=False)

print(df_importance.head(20))
