## Script testing

In [1]:
import os
import math
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import FeatureHasher, Imputer, OneHotEncoder, StandardScaler, StringIndexer, VectorAssembler
from pyspark.ml.functions import vector_to_array
from pyspark.sql import DataFrame, SparkSession, Window
from pyspark.sql.functions import count, countDistinct, lit, row_number, udf, when
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql.window import Window
from pyspark.ml import Transformer, Pipeline
from pyspark.ml.param.shared import HasInputCols, HasOutputCols, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import udf, radians, sin, cos, lit, sqrt, pow
from pyspark.sql.types import StructType, StructField, DoubleType, ArrayType

from pyspark.sql.functions import year, month, dayofweek, hour, lit, cos, sin, udf, minute, second
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCols
from pyspark.ml.util import DefaultParamsWritable, DefaultParamsReadable

team = 13
warehouse = "project/hive/warehouse"
spark = (
    SparkSession.builder.appName("{} - spark ML".format(team))
    .master("yarn")
    .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")
    .config("spark.sql.warehouse.dir", warehouse)
    .config("spark.sql.avro.compression.codec", "snappy")
    .enableHiveSupport()
    .getOrCreate()
)


def get_data() -> DataFrame:
    global spark
    df = spark.sql("SELECT * FROM team13_projectdb.accidents_partitioned_bucketed")

    return df


def preprocess_numerical_features(df: DataFrame, numerical_cols: list) -> DataFrame:
    global spark
    window_state = Window.partitionBy("State")

    for column in numerical_cols:
        df = df.withColumn(column, F.coalesce(F.col(column), F.mean(F.col(column)).over(window_state)))

    assembler = VectorAssembler(inputCols=numerical_cols, outputCol="num_features_raw")

    scaler = StandardScaler(
        inputCol=assembler.getOutputCol(), outputCol="scaled_num_features", withMean=True, withStd=True
    )

    pipeline = Pipeline(stages=[assembler, scaler])
    pipeline_model = pipeline.fit(df)
    processed_df = pipeline_model.transform(df)

    array_df = processed_df.withColumn("scaled_array", vector_to_array("scaled_num_features"))

    for i, column in enumerate(numerical_cols):
        array_df = array_df.withColumn(f"scaled_{column}", F.col("scaled_array")[i].cast("double"))

    cols_to_drop = numerical_cols + ["num_features_raw", "scaled_num_features", "scaled_array"]
    final_df = array_df.drop(*cols_to_drop)

    for column in numerical_cols:
        final_df = final_df.withColumnRenamed(f"scaled_{column}", column)

    return final_df

# Helper to convert vector to array
vector_to_array = udf(lambda v: v.toArray().tolist(), ArrayType(DoubleType()))

class DropNullTimestampTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, colsToDropNulls=None):
        super(DropNullTimestampTransformer, self).__init__()
        self.colsToDropNulls = colsToDropNulls

    def _transform(self, df):
        if self.colsToDropNulls:
            # Drop rows where any of the specified columns have null values
            return df.na.drop(subset=self.colsToDropNulls)
        else:
            return df


class ExtractTimeFeaturesTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, timestampCols=None):
        super(ExtractTimeFeaturesTransformer, self).__init__()
        self.timestampCols = timestampCols

    def _transform(self, df):
        for ts_col in self.timestampCols:
            df = df.withColumn(f"{ts_col}_year", year(ts_col).cast(DoubleType())) \
                   .withColumn(f"{ts_col}_month", month(ts_col).cast(DoubleType())) \
                   .withColumn(f"{ts_col}_weekday", dayofweek(ts_col).cast(DoubleType())) \
                   .withColumn(f"{ts_col}_hour", hour(ts_col).cast(DoubleType())) \
                   .withColumn(f"{ts_col}_minute", minute(ts_col).cast(DoubleType())) \
                   .withColumn(f"{ts_col}_second", second(ts_col).cast(DoubleType()))
        return df


class CyclicalEncodingTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, timestampCols=None):
        super(CyclicalEncodingTransformer, self).__init__()
        self.timestampCols = timestampCols

    def _transform(self, df):
        def cyclical_encode(col_name, period):
            radians = (2 * lit(3.141592653589793) * col_name) / period
            return cos(radians).cast(DoubleType()), sin(radians).cast(DoubleType())

        for ts_col in self.timestampCols:
            month_sin, month_cos = cyclical_encode(df[f"{ts_col}_month"], 12)
            hour_sin, hour_cos = cyclical_encode(df[f"{ts_col}_hour"], 24)
            minute_sin, minute_cos = cyclical_encode(df[f"{ts_col}_minute"], 60)
            second_sin, second_cos = cyclical_encode(df[f"{ts_col}_second"], 60)

            df = df.withColumn(f"{ts_col}_month_sin", month_sin) \
                   .withColumn(f"{ts_col}_month_cos", month_cos) \
                   .withColumn(f"{ts_col}_hour_sin", hour_sin) \
                   .withColumn(f"{ts_col}_hour_cos", hour_cos) \
                   .withColumn(f"{ts_col}_minute_sin", minute_sin) \
                   .withColumn(f"{ts_col}_minute_cos", minute_cos) \
                   .withColumn(f"{ts_col}_second_sin", second_sin) \
                   .withColumn(f"{ts_col}_second_cos", second_cos)
        return df

def encode_timestamp_features(df: DataFrame) -> DataFrame:
    timestamp_cols = ['start_time', 'end_time', 'weather_timestamp']

    # Step 1: Drop rows with nulls in weather_timestamp
    drop_null_transformer = DropNullTimestampTransformer(colsToDropNulls=['weather_timestamp'])

    # Step 2: Extract time features
    extract_transformer = ExtractTimeFeaturesTransformer(timestampCols=timestamp_cols)

    # Step 3: Apply cyclical encoding
    cyclical_transformer = CyclicalEncodingTransformer(timestampCols=timestamp_cols)

    # Step 4: Define columns to scale
    columns_to_scale = []
    for ts_col in timestamp_cols:
        columns_to_scale.extend([
            f"{ts_col}_year", 
            f"{ts_col}_weekday"
        ])

    # Step 5: Impute and scale
    imputer = Imputer(
        inputCols=columns_to_scale,
        outputCols=[f"{col}_imputed" for col in columns_to_scale],
        strategy="mean"
    )
    assembler = VectorAssembler(
        inputCols=imputer.getOutputCols(),
        outputCol="features_to_scale"
    )
    scaler = StandardScaler(
        inputCol="features_to_scale",
        outputCol="scaled_features",
        withMean=True,
        withStd=True
    )

    # Step 6: Build pipeline
    pipeline = Pipeline(stages=[
        drop_null_transformer,
        extract_transformer,
        cyclical_transformer,
        imputer,
        assembler,
        scaler
    ])

    # Step 7: Transform data
    scaled_df = pipeline.fit(df).transform(df)

    # Step 8: Convert vector to array and extract scaled values
    scaled_df = scaled_df.withColumn("scaled_array", vector_to_array("scaled_features"))

    for i, col in enumerate(columns_to_scale):
        scaled_df = scaled_df.withColumn(f"{col}_scaled", scaled_df["scaled_array"][i].cast(DoubleType()))

    # Step 9: Clean up — now including minute and second
    cols_to_drop = (
        timestamp_cols +
        [f"{ts_col}_month" for ts_col in timestamp_cols] +
        [f"{ts_col}_hour" for ts_col in timestamp_cols] +
        [f"{ts_col}_minute" for ts_col in timestamp_cols] +
        [f"{ts_col}_second" for ts_col in timestamp_cols] +
        columns_to_scale +
        imputer.getOutputCols() +
        ["features_to_scale", "scaled_features", "scaled_array"]
    )

    scaled_df = scaled_df.drop(*cols_to_drop)

    # Step 10: Rename scaled columns back to original names
    for col in columns_to_scale:
        scaled_df = scaled_df.withColumnRenamed(f"{col}_scaled", col)

    return scaled_df


def impute_categorical_features(df: DataFrame, categorical_features: list) -> DataFrame:
    # Identify categorical features with null values and impute them with mode values
    df_imputed = df
    features_with_nulls = []

    # First identify which features have nulls
    for feature in categorical_features:
        missing_count = df.filter(df[feature].isNull()).count()
        if missing_count > 0:
            features_with_nulls.append(feature)
            print(f"Feature {feature} has {missing_count} missing values - will impute with mode")

    # Special handling for City - use mode by State
    if "City" in features_with_nulls:
        print("\nSpecial handling for City: Using mode city within each State")

        # Calculate mode city for each state
        state_city_modes = (
            df.filter(F.col("City").isNotNull())
            .groupBy("State", "City")
            .count()
            .orderBy("State", F.col("count").desc())
        )

        # Get the most common city in each state
        window = Window.partitionBy("State").orderBy(F.col("count").desc())
        city_modes_by_state = (
            state_city_modes.withColumn("row", row_number().over(window))
            .filter(F.col("row") == 1)
            .select("State", "City")
        )

        # Get the global mode for fallback
        global_city_mode = (
            df.filter(F.col("City").isNotNull())
            .groupBy("City")
            .count()
            .orderBy(F.col("count").desc())
            .limit(1)
            .collect()[0]["City"]
        )

        print(f"Global city mode (fallback): {global_city_mode}")

        # Collect the state-city mapping to a dictionary for faster lookup
        state_to_city_dict = {row["State"]: row["City"] for row in city_modes_by_state.collect()}

        # Create a UDF to map state to its mode city
        def get_mode_city_by_state(state):
            return state_to_city_dict.get(state, global_city_mode)

        mode_city_udf = udf(get_mode_city_by_state, StringType())

        # Apply imputation for City based on State
        df_imputed = df_imputed.withColumn(
            "City", when(F.col("City").isNull(), mode_city_udf(F.col("State"))).otherwise(F.col("City"))
        )

        # Remove City from features_with_nulls as it's handled separately
        features_with_nulls.remove("City")

    # Calculate modes and apply imputation for other features with nulls
    for feature in features_with_nulls:
        # Calculate mode value (most frequent non-null value)
        mode_value = (
            df.filter(F.col(feature).isNotNull())
            .groupBy(feature)
            .count()
            .orderBy("count", ascending=False)
            .limit(1)
            .collect()[0][feature]
        )

        print(f"Imputing {feature} with mode value: {mode_value}")

        # Apply imputation
        df_imputed = df_imputed.withColumn(
            feature, when(F.col(feature).isNull(), lit(mode_value)).otherwise(F.col(feature))
        )

    # Set working dataframe to imputed version
    df = df_imputed

    return df


def encode_categorical_features(df: DataFrame) -> DataFrame:
    # 1. Frequency encoding for high-cardinality features (County and City)
    # Calculate frequency counts
    county_freq = df.groupBy("County").count()
    city_freq = df.groupBy("City").count()

    # Calculate total counts for percentage calculation
    total_count = df.count()
    county_freq = county_freq.withColumn("county_frequency", F.col("count") / total_count)
    city_freq = city_freq.withColumn("city_frequency", F.col("count") / total_count)

    # Join frequency information back to the main dataframe
    df = df.join(county_freq.select("County", "county_frequency"), on="County", how="left")
    df = df.join(city_freq.select("City", "city_frequency"), on="City", how="left")

    # 2. For low/medium cardinality features, use StringIndexer + OneHotEncoder
    side_indexer = StringIndexer(inputCol="side", outputCol="SideIndex", handleInvalid="keep")
    side_encoder = OneHotEncoder(inputCol="SideIndex", outputCol="side_vec")

    state_indexer = StringIndexer(inputCol="state", outputCol="StateIndex", handleInvalid="keep")
    state_encoder = OneHotEncoder(inputCol="StateIndex", outputCol="state_vec")

    weather_indexer = StringIndexer(inputCol="Weather_Condition", outputCol="WeatherIndex", handleInvalid="keep")
    weather_encoder = OneHotEncoder(inputCol="WeatherIndex", outputCol="weather_vec")

    # Create pipeline for the encoders (excluding the frequency encoding part)
    pipeline = Pipeline(
        stages=[
            side_indexer,
            side_encoder,
            state_indexer,
            state_encoder,
            weather_indexer,
            weather_encoder,
        ]
    )

    # Apply pipeline transformation
    model = pipeline.fit(df)
    df = model.transform(df)
    
    return df


class GeoToECEFTransformer(Transformer, HasInputCols, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable):
    inputInRadians = Param(
        Params._dummy(),
        "inputInRadians",
        "Whether input coordinates are in radians (True) or degrees (False)",
        typeConverter=TypeConverters.toBoolean,
    )

    def __init__(self, inputCols=None, outputCols=None, inputInRadians=False):
        super(GeoToECEFTransformer, self).__init__()
        self._setDefault(inputCols=None, outputCols=None, inputInRadians=False)

        if inputCols and outputCols:
            self.setInputCols(inputCols)
            self.setOutputCols(outputCols)

        self.setInputInRadians(inputInRadians)

    def setInputCols(self, value):
        """
        Sets the input columns.
        Expected order: [latitude, longitude, altitude]
        """
        return self._set(inputCols=value)

    def setOutputCols(self, value):
        """
        Sets the output columns.
        Expected order: [ecef_x, ecef_y, ecef_z]
        """
        return self._set(outputCols=value)

    def setInputInRadians(self, value):
        """
        Sets whether input coordinates are in radians (True) or degrees (False).
        """
        return self._set(inputInRadians=value)

    def getInputInRadians(self):
        """
        Gets whether input coordinates are in radians.
        """
        return self.getOrDefault(self.inputInRadians)

    def _transform(self, dataset):
        """
        Transform the dataset by converting geographic coordinates to ECEF coordinates.
        """
        # Constants from the Java class
        a = 6378137.0  # WGS-84 semi-major axis
        e2 = 6.6943799901377997e-3  # WGS-84 first eccentricity squared

        # Get input column names
        lat_col, lon_col, alt_col = self.getInputCols()

        # Get output column names
        x_col, y_col, z_col = self.getOutputCols()

        # Create a combined UDF for ECEF calculation
        def geo_to_ecef(lat, lon, alt):
            """
            Convert geographic coordinates to ECEF.
            Input: latitude and longitude in radians, altitude in meters
            Output: tuple of (x, y, z) in meters
            """
            if lat is None or lon is None or alt is None:
                return (None, None, None)

            # Calculate the radius of curvature in the prime vertical
            n = a / math.sqrt(1 - e2 * math.sin(lat) * math.sin(lat))

            # Calculate ECEF coordinates
            x = (n + alt) * math.cos(lat) * math.cos(lon)
            y = (n + alt) * math.cos(lat) * math.sin(lon)
            z = (n * (1 - e2) + alt) * math.sin(lat)

            return (x, y, z)

        # Register UDF with appropriate return type
        ecef_schema = StructType(
            [
                StructField("x", DoubleType(), True),
                StructField("y", DoubleType(), True),
                StructField("z", DoubleType(), True),
            ]
        )
        ecef_udf = udf(geo_to_ecef, ecef_schema)

        # If input is in degrees, convert to radians first
        if not self.getInputInRadians():
            lat_expr = radians(F.col(lat_col))
            lon_expr = radians(F.col(lon_col))
        else:
            lat_expr = F.col(lat_col)
            lon_expr = F.col(lon_col)

        # Apply the transformation
        result = dataset.withColumn("ecef_coords", ecef_udf(lat_expr, lon_expr, F.col(alt_col)))
        # Extract individual ECEF components
        result = result.withColumn(x_col, result.ecef_coords.x)
        result = result.withColumn(y_col, result.ecef_coords.y)
        result = result.withColumn(z_col, result.ecef_coords.z)

        # Drop the temporary struct column
        return result.drop("ecef_coords")

    def copy(self, extra=None):
        """
        Creates a copy of this instance.
        """
        if extra is None:
            extra = {}
        return super(GeoToECEFTransformer, self).copy(extra)


def encode_spatial_features(df: DataFrame) -> DataFrame:
    if "Altitude" not in df.columns:
        df = df.withColumn("Altitude", lit(0.0))

    geo_to_ecef_start = GeoToECEFTransformer(
        inputCols=["Start_Lat", "Start_Lng", "Altitude"],
        outputCols=["ecef_start_x", "ecef_start_y", "ecef_start_z"],
        inputInRadians=False,
    )
    geo_to_ecef_end = GeoToECEFTransformer(
        inputCols=["End_Lat", "End_Lng", "Altitude"],
        outputCols=["ecef_end_x", "ecef_end_y", "ecef_end_z"],
        inputInRadians=False,
    )
    pipeline = Pipeline(stages=[geo_to_ecef_start, geo_to_ecef_end])
    df = pipeline.fit(df).transform(df)
    return df


def cleanup_data(df: DataFrame) -> DataFrame:
    df = df.drop(
        "City",
        "county",
        "start_lat",
        "start_lng",
        "end_lat",
        "end_lng",
        "side",
        "Weather_Condition",
        "state",
        "sunrise_sunset",
        "SideIndex",
        "WeatherIndex",
        "StateIndex",
        "Altitude",
    )
    return df

In [None]:
df = get_data()

In [93]:
numerical_features = [
    "distance_mi",
    "temperature_f",
    "wind_chill_f",
    "humidity_percent",
    "pressure_in",
    "visibility_mi",
    "wind_speed_mph",
    "precipitation_in",
]

categorical_features = ["Side", "City", "County", "Weather_Condition", "State"]

encoded_df = preprocess_numerical_features(df, numerical_features)

In [94]:
encoded_df = encode_timestamp_features(encoded_df)

In [95]:
encoded_df = impute_categorical_features(encoded_df, categorical_features)

Feature City has 136 missing values - will impute with mode
Feature Weather_Condition has 19900 missing values - will impute with mode

Special handling for City: Using mode city within each State
Global city mode (fallback): Miami
Imputing Weather_Condition with mode value: Fair


In [96]:
encoded_df = encode_categorical_features(encoded_df)

In [97]:
encoded_df = encode_spatial_features(encoded_df)

In [98]:
encoded_df = cleanup_data(encoded_df)

In [99]:
encoded_df.limit(1).show(vertical=True)

-RECORD 0--------------------------------------------
 id                           | A-828658             
 severity                     | 2                    
 distance_mi                  | -0.4057452258518801  
 temperature_f                | 1.151593692195921    
 wind_chill_f                 | 1.1882032202674286   
 humidity_percent             | -0.5014226742221052  
 pressure_in                  | 0.5661143189473108   
 visibility_mi                | 0.33571104611777897  
 wind_speed_mph               | -0.815321075545157   
 precipitation_in             | -0.08368500835329849 
 start_time_month_sin         | 0.5000000000000001   
 start_time_month_cos         | -0.8660254037844386  
 start_time_hour_sin          | -1.0                 
 start_time_hour_cos          | 1.224646799147353... 
 start_time_minute_sin        | -0.9945218953682734  
 start_time_minute_cos        | -0.10452846326765305 
 start_time_second_sin        | 1.0                  
 start_time_second_cos      

## Split data and save it

In [105]:
import os
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler

def run(command):
    return os.popen(command).read()

def prepare_and_save_data(
    encoded_df: DataFrame,
    test_size: float = 0.3,
    label_col: str = "severity",
    feature_exclude_cols: list = ["id", "severity"],
    hdfs_train_path: str = "/user/team13/project/data/train",
    hdfs_test_path: str = "/user/team13/project/data/test",
    local_train_path: str = "/home/team13/project/BigData-Car-Accident/data/train.json",
    local_test_path: str = "/home/team13/project/BigData-Car-Accident/data/test.json"
):

    feature_cols = [col for col in encoded_df.columns if col not in feature_exclude_cols]
    train_df, test_df = encoded_df.randomSplit([1 - test_size, test_size], seed=42)

    assembler = VectorAssembler(
        inputCols=feature_cols,
        outputCol="features",
        handleInvalid="skip"
    )

    train_assembled = assembler.transform(train_df).select("features", F.col(label_col).alias("label"))
    test_assembled = assembler.transform(test_df).select("features", F.col(label_col).alias("label"))

    train_assembled.select("features", "label") \
        .coalesce(1) \
        .write \
        .mode("overwrite") \
        .format("json") \
        .save(hdfs_train_path)

    test_assembled.select("features", "label") \
        .coalesce(1) \
        .write \
        .mode("overwrite") \
        .format("json") \
        .save(hdfs_test_path)

    run(f"hdfs dfs -cat {hdfs_train_path}/*.json > {local_train_path}")
    run(f"hdfs dfs -cat {hdfs_test_path}/*.json > {local_test_path}")

    print(f"Train data saved to: {local_train_path}")
    print(f"Test data saved to: {local_test_path}")

    return train_assembled, test_assembled

In [106]:
train, test = prepare_and_save_data(encoded_df)

Train data saved to: /home/team13/project/BigData-Car-Accident/data/train.json
Test data saved to: /home/team13/project/BigData-Car-Accident/data/test.json


# First Model

## Train model

In [108]:
from pyspark.sql import DataFrame
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as F

def train_classifier(
    train_df,
    test_df,
    modelType="lr"
):
    if modelType == "dt":
        classifier = DecisionTreeClassifier(
            featuresCol="features",
            labelCol="label",
            seed=42
        )
    elif modelType == "lr":
        classifier = LogisticRegression(
            featuresCol="features",
            labelCol="label",
            family="multinomial",
            regParam=0.01,
            elasticNetParam=0.0
        )
        
    print("Training model...")
    model = classifier.fit(train_df)
    
    predictions = model.transform(test_df)
    
    evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
    test_acc = evaluator.evaluate(predictions)
    print(f"Test Accuracy: {test_acc:.4f}")

    evaluator_f1 = MulticlassClassificationEvaluator(metricName="f1")
    test_f1 = evaluator_f1.evaluate(predictions)
    print(f"Test F1 Score: {test_f1:.4f}")
    
    for i in [1, 2, 3, 4]:
        print(f"Severity: {i}")
        evaluator_pbl = MulticlassClassificationEvaluator(metricName="precisionByLabel", metricLabel=i)
        test_pbl = evaluator_pbl.evaluate(predictions)
        print(f"Test PBL: {test_pbl}")

        evaluator_rbl = MulticlassClassificationEvaluator(metricName="recallByLabel", metricLabel=i)
        test_rbl = evaluator_rbl.evaluate(predictions)
        print(f"Test RBL: {test_rbl}")
    
    pass

In [111]:
train_classifier(train, test, "lr")

Training model...
Test Accuracy: 0.8912
Test F1 Score: 0.8588
Severity: 1
Test PBL: 0.6404494382022472
Test RBL: 0.6404494382022472
Severity: 2


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.6/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib64/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [156]:
def save_predictions(
    predictions: DataFrame,
    hdfs_path: str,
    local_path: str
):
    # Select only the necessary columns and coalesce to one partition
    selected_df = predictions.select("label", "prediction").coalesce(1)

    # Write to HDFS as CSV with header
    selected_df.write \
        .mode("overwrite") \
        .format("csv") \
        .option("header", "true") \
        .save(hdfs_path)

    # Copy from HDFS to local filesystem
    run(f"hdfs dfs -cat {hdfs_path}/*.csv > {local_path}")

    print(f"Predictions saved to HDFS at: {hdfs_path}")
    print(f"Local CSV file saved to: {local_path}")

    return local_path

In [154]:
from pyspark.sql import DataFrame

def save_metrics_to_csv(
    metrics_list,
    local_file_path: str,
    hdfs_file_path: str
):

    # Create DataFrame
    global spark
    metrics_df = spark.createDataFrame(metrics_list)
    # Save to CSV
    metrics_df.coalesce(1) \
        .write \
        .mode("overwrite") \
        .format("csv") \
        .option("header", "true") \
        .save(hdfs_file_path)

    run(f"hdfs dfs -cat {hdfs_file_path}/*.csv > {local_file_path}")

    print(f"Metrics saved successfully to: {local_file_path}")
    return local_file_path

In [162]:
from pyspark.sql import DataFrame
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pyspark.sql.functions as F

def train_lr_with_grid_search(
    train_df: DataFrame,
    test_df: DataFrame,
    output_model_path: str = "project/models/model1",
    output_prediction_path: str = "project/output/model1_predictions"
):
    lr = LogisticRegression(
        featuresCol="features",
        labelCol="label",
        family="multinomial"
    )

    # paramGrid = ParamGridBuilder() \
    #     .addGrid(lr.regParam, [0.1]) \ #[0.01, 0.2])
    #     .addGrid(lr.elasticNetParam, [0.0]) \ #[0.0, 0.8]) 
    #     .build()

    paramGrid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.1])  \
        .addGrid(lr.elasticNetParam, [0.0]) \
        .build()

    # 5. Define evaluator (F1 score used for model selection)
    evaluator = MulticlassClassificationEvaluator(metricName="f1")

    # 6. Set up CrossValidator
    crossval = CrossValidator(
        estimator=lr,
        estimatorParamMaps=paramGrid,
        evaluator=evaluator,
        numFolds=2,
        # numFolds=3,
        seed=42
    )

    # 7. Train the CrossValidator on the training data
    print("Training cross validator...")
    cv_model = crossval.fit(train_df)
    
    # 8. Extract the best model from cross-validation
    best_model = cv_model.bestModel
    
    # 9. Save the best model to HDFS
    best_model.write().overwrite().save(output_model_path)
    
    # 10. Make predictions on test data
    predictions = best_model.transform(test_df)

    # 11. Evaluate on test data using both accuracy and F1
    accuracy_evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
    test_acc = accuracy_evaluator.evaluate(predictions)
    print(f"Test Accuracy: {test_acc:.4f}")
    
    test_f1 = evaluator.evaluate(predictions)
    print(f"Test F1 Score: {test_f1:.4f}")
    model_data = {
        "f1": test_f1,
        "acc": test_acc,
    }
    
    for i in [1, 2, 3, 4]:
        try:
            print(f"Severity: {i}")
            evaluator_pbl = MulticlassClassificationEvaluator(metricName="precisionByLabel", metricLabel=i)
            test_pbl = evaluator_pbl.evaluate(predictions)
            print(f"Test PBL: {test_pbl}")

            evaluator_rbl = MulticlassClassificationEvaluator(metricName="recallByLabel", metricLabel=i)
            test_rbl = evaluator_rbl.evaluate(predictions)
            print(f"Test RBL: {test_pbl}")

            model_data[f"recall_{i}"] = test_rbl
            model_data[f"precision_{i}"] = test_pbl
        except Exception:
            model_data[f"recall_{i}"] = -1
            model_data[f"precision_{i}"] = -1

    model_data["name"] = f"lr: regParam:{best_model.getRegParam()}, elasticNetParam: {best_model.getElasticNetParam()}"

    return predictions, model_data

In [163]:
predictions1, model_data1 = train_lr_with_grid_search(train, test)
model_data1

Training cross validator...
Test Accuracy: 1.0000
Test F1 Score: 1.0000
Severity: 1
Severity: 2
Test PBL: 1.0
Test RBL: 1.0
Severity: 3
Severity: 4


{'f1': 1.0,
 'acc': 1.0,
 'recall_1': -1,
 'precision_1': -1,
 'recall_2': 1.0,
 'precision_2': 1.0,
 'recall_3': -1,
 'precision_3': -1,
 'recall_4': -1,
 'precision_4': -1,
 'name': 'lr: regParam:0.1, elasticNetParam: 0.0'}

In [None]:
save_metrics_to_csv(
    metrics_list=[model_data1],
    local_file_path="/home/team13/project/BigData-Car-Accident/output/evaluation.csv",
    hdfs_file_path="/user/team13/project/output/evaluation"
)

In [168]:
save_predictions(
    predictions1,
    hdfs_path="/user/team13/project/output/model1_predictions",
    local_path="/home/team13/project/BigData-Car-Accident/output/model1_predictions.csv"
)

Predictions saved to HDFS at: /user/team13/project/output/model1_predictions
Local CSV file saved to: /home/team13/project/BigData-Car-Accident/output/model1_predictions.csv


'/home/team13/project/BigData-Car-Accident/output/model1_predictions.csv'

# Second Model

## Train model

In [None]:
train_classifier(train, test, "dt")

## Grid search

In [171]:
from pyspark.sql import DataFrame
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pyspark.sql.functions as F

def train_dt_with_grid_search(
    train_df: DataFrame,
    test_df: DataFrame,
    output_model_path: str = "project/models/model1",
    output_prediction_path: str = "project/output/model1_predictions"
):
    dt = DecisionTreeClassifier(
        featuresCol="features", 
        labelCol="label",
        seed=42
    )

    # paramGrid = ParamGridBuilder() \
    #     .addGrid(dt.maxDepth, [3, 5]) \
    #     .addGrid(dt.impurity, ["gini", "entropy"]) \
    #     .build()
    paramGrid = ParamGridBuilder() \
        .addGrid(dt.maxDepth, [2]) \
        .addGrid(dt.impurity, ["gini"]) \
        .build()

    # 5. Define evaluator (F1 score used for model selection)
    evaluator = MulticlassClassificationEvaluator(metricName="f1")

    # 6. Set up CrossValidator
    crossval = CrossValidator(
        estimator=dt,
        estimatorParamMaps=paramGrid,
        evaluator=evaluator,
        numFolds=2,
        # numFolds=3,
        seed=42
    )

    # 7. Train the CrossValidator on the training data
    print("Training cross validator...")
    cv_model = crossval.fit(train_df)
    
    # 8. Extract the best model from cross-validation
    best_model = cv_model.bestModel

    # 9. Save the best model to HDFS
    best_model.write().overwrite().save(output_model_path)

    # 10. Make predictions on test data
    predictions = best_model.transform(test_df)

    # 11. Evaluate on test data using both accuracy and F1
    accuracy_evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
    test_acc = accuracy_evaluator.evaluate(predictions)
    print(f"Test Accuracy: {test_acc:.4f}")

    test_f1 = evaluator.evaluate(predictions)
    print(f"Test F1 Score: {test_f1:.4f}")
    
    model_data = {
        "f1": test_f1,
        "acc": test_acc,
    }
    
    for i in [1, 2, 3, 4]:
        try:
            print(f"Severity: {i}")
            evaluator_pbl = MulticlassClassificationEvaluator(metricName="precisionByLabel", metricLabel=i)
            test_pbl = evaluator_pbl.evaluate(predictions)
            print(f"Test PBL: {test_pbl}")

            evaluator_rbl = MulticlassClassificationEvaluator(metricName="recallByLabel", metricLabel=i)
            test_rbl = evaluator_rbl.evaluate(predictions)
            print(f"Test RBL: {test_pbl}")

            model_data[f"recall_{i}"] = test_rbl
            model_data[f"precision_{i}"] = test_pbl
        except Exception:
            model_data[f"recall_{i}"] = -1
            model_data[f"precision_{i}"] = -1
        
    model_data["name"] = f"dt: maxDepth:{best_model.getMaxDepth()}, impurity: {best_model.getImpurity()}"

    return predictions, model_data

In [173]:
predictions2, model_data2 = train_dt_with_grid_search(train, test)

Training cross validator...
Test Accuracy: 1.0000
Test F1 Score: 1.0000
Severity: 1
Severity: 2
Test PBL: 1.0
Test RBL: 1.0
Severity: 3
Severity: 4


In [174]:
save_metrics_to_csv(
    metrics_list=[model_data1, model_data2],
    local_file_path="/home/team13/project/BigData-Car-Accident/output/evaluation.csv",
    hdfs_file_path="/user/team13/project/output/evaluation"
)

+---+---+--------------------+-----------+-----------+-----------+-----------+--------+--------+--------+--------+
|acc| f1|                name|precision_1|precision_2|precision_3|precision_4|recall_1|recall_2|recall_3|recall_4|
+---+---+--------------------+-----------+-----------+-----------+-----------+--------+--------+--------+--------+
|1.0|1.0|lr: regParam:0.1,...|         -1|        1.0|         -1|         -1|      -1|     1.0|      -1|      -1|
|1.0|1.0|lr: maxDepth:2, i...|         -1|        1.0|         -1|         -1|      -1|     1.0|      -1|      -1|
+---+---+--------------------+-----------+-----------+-----------+-----------+--------+--------+--------+--------+

Metrics saved successfully to: /home/team13/project/BigData-Car-Accident/output/evaluation.csv


'/home/team13/project/BigData-Car-Accident/output/evaluation.csv'

In [175]:
save_predictions(
    predictions2,
    hdfs_path="/user/team13/project/output/model2_predictions",
    local_path="/home/team13/project/BigData-Car-Accident/output/model2_predictions.csv"
)

Predictions saved to HDFS at: /user/team13/project/output/model2_predictions
Local CSV file saved to: /home/team13/project/BigData-Car-Accident/output/model2_predictions.csv


'/home/team13/project/BigData-Car-Accident/output/model2_predictions.csv'