In [1]:
import os
os.environ['PYSPARK_PYTHON'] = '/home/thanhtk/kafka_airflow_spark/airflow/myenv/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/home/thanhtk/kafka_airflow_spark/airflow/myenv/bin/python'



from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, sum
from pyspark.ml.feature import StringIndexer
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType


spark = SparkSession.builder \
    .appName("KafkaSparkStreaming") \
    .master("spark://spark-master:7077") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

# Verify which python is being used
import sys
print(sys.executable)

/opt/conda/bin/python


In [2]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType,TimestampType,DoubleType

schema = StructType([ 
    StructField("crash_date", TimestampType(), True),
    StructField("crash_time", StringType(), True), 
    StructField("on_street_name", StringType(), True), 
    StructField("off_street_name", StringType(), True), 
    StructField("cross_street_name", StringType(), True), 
    StructField("number_of_persons_injured", StringType(), True), 
    StructField("number_of_persons_killed", StringType(), True), 
    StructField("number_of_pedestrians_injured", StringType(), True), 
    StructField("number_of_pedestrians_killed", StringType(), True), 
    StructField("number_of_cyclist_injured", StringType(), True), 
    StructField("number_of_cyclist_killed", StringType(), True), 
    StructField("number_of_motorist_injured", StringType(), True), 
    StructField("number_of_motorist_killed", StringType(), True), 
    StructField("contributing_factor_vehicle_1", StringType(), True), 
    StructField("contributing_factor_vehicle_2", StringType(), True),
    StructField("contributing_factor_vehicle_3", StringType(), True), 
    StructField("contributing_factor_vehicle_4", StringType(), True), 
    StructField("contributing_factor_vehicle_5", StringType(), True), 
    StructField("collision_id", StringType(), True), 
    StructField("vehicle_type_code1", StringType(), True), 
    StructField("vehicle_type_code2", StringType(), True), 
    StructField("borough", StringType(), True), 
    StructField("zip_code", StringType(), True), 
    StructField("latitude", StringType(), True), 
    StructField("longitude", StringType(), True), 
    StructField("location", StructType([ 
        StructField("latitude", StringType(), True),
        StructField("longitude", StringType(), True), 
        StructField("human_address", StringType(), True) ]), True) ])

# Load the JSON file with the schema
df = spark.read.schema(schema).option("multiline", "true").json("hdfs://namenode:9000/raw_data/h9gi-nx95.json")

# Show the first few rows
df.show()



+-------------------+----------+--------------------+--------------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+--------------------+--------------------+---------+--------+---------+----------+--------------------+
|         crash_date|crash_time|      on_street_name|     off_street_name|   cross_street_name|number_of_persons_injured|number_of_persons_killed|number_of_pedestrians_injured|number_of_pedestrians_killed|number_of_cyclist_injured|number_of_cyclist_killed|number_of_motorist_injured|number_of_motorist_killed|contributing_factor_vehicle_1|contributing_factor_vehicle_2|contributing_factor_vehicle_3|contributing_factor_vehic

In [8]:
import os
print("PYSPARK_PYTHON: ", os.environ.get('PYSPARK_PYTHON'))


PYSPARK_PYTHON:  /home/thanhtk/kafka_airflow_spark/airflow/myenv/bin/python


In [3]:
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, Imputer, VectorAssembler
from pyspark.ml import Pipeline

# Step 1: Convert relevant columns to appropriate data types
df = df.withColumn("number_of_persons_injured", df["number_of_persons_injured"].cast("int"))

# Calculate total number of injured people
total_injured = df.agg({"number_of_persons_injured": "sum"}).collect()[0][0]
print("Total number of persons injured:", total_injured)

# Step 2: Add the `is_injured` column (1 if injury or fatality, otherwise 0)
df = df.withColumn('is_injured',
                   F.when((F.col('number_of_persons_injured') + F.col('number_of_persons_killed')) > 0, 1)
                    .otherwise(0))

# Step 3: Drop irrelevant columns
cols_to_drop = [
    'latitude', 'longitude', 'on_street_name', 'off_street_name', 'number_of_persons_injured',
    'number_of_pedestrians_killed', 'number_of_cyclist_injured', 'number_of_motorist_injured',
    'number_of_motorist_killed', 'vehicle_type_code3', 'vehicle_type_code4', 'vehicle_type_code5',
    'crash_date', 'crash_time', 'number_of_persons_killed', 'number_of_pedestrians_injured',
    'number_of_cyclist_killed', 'location', 'zip_code'
]
df_dropped_cols = df.drop(*cols_to_drop)

# Step 4: Handle missing values for categorical columns using mode
categorical_cols = ['contributing_factor_vehicle_1', 'contributing_factor_vehicle_2',
                    'vehicle_type_code1', 'vehicle_type_code2', 'borough']

# Fill missing values with the mode of each categorical column
for col_name in categorical_cols:
    mode_row = df_dropped_cols.groupBy(col_name).count().orderBy('count', ascending=False).first()
    
    # Handle missing mode value (if the mode is None, use 0)
    if mode_row is not None and mode_row[0] is not None:
        mode_value = mode_row[0]
    else:
        mode_value = 0

    df_dropped_cols = df_dropped_cols.fillna({col_name: mode_value})

# Step 5: Encode categorical columns using StringIndexer
def encode_categorical_cols(df, categorical_cols):
    for col_name in categorical_cols:
        indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_encoded", handleInvalid='skip')
        df = indexer.fit(df).transform(df)
    return df

df_encoded = encode_categorical_cols(df_dropped_cols, categorical_cols)

# Step 6: Drop the original categorical columns
df_final = df_encoded.drop(*categorical_cols)

# Step 7: Check for any remaining missing values
missing_counts = df_final.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df_final.columns])
missing_counts.show()

# Step 8: Handle missing values for numerical columns (e.g., zip_code) using Imputer
numerical_cols = ['contributing_factor_vehicle_1_encoded', 'contributing_factor_vehicle_2_encoded', 
                  'vehicle_type_code1_encoded', 'vehicle_type_code2_encoded', 'borough_encoded']

imputer = Imputer(inputCols=numerical_cols, outputCols=[col + "_imputed" for col in numerical_cols])
df_imputed = imputer.fit(df_final).transform(df_final)

# Step 9: Assemble features into a single vector column
assembler = VectorAssembler(inputCols=[col + "_imputed" for col in numerical_cols], outputCol="features")
df_final_assembled = assembler.transform(df_imputed)

# Show the final dataset with 'features' and 'is_injured' column
df_final_assembled.select("features", "is_injured").show(5)

# Now you can safely access the 'features' column as shown below in RDD
test_rdd = df_final_assembled.select("features", "is_injured").rdd
test_rdd = test_rdd.map(lambda row: (row["features"], row["is_injured"]))


Total number of persons injured: 454
+-----------------+-----------------------------+-----------------------------+-----------------------------+------------+----------+-------------------------------------+-------------------------------------+--------------------------+--------------------------+---------------+
|cross_street_name|contributing_factor_vehicle_3|contributing_factor_vehicle_4|contributing_factor_vehicle_5|collision_id|is_injured|contributing_factor_vehicle_1_encoded|contributing_factor_vehicle_2_encoded|vehicle_type_code1_encoded|vehicle_type_code2_encoded|borough_encoded|
+-----------------+-----------------------------+-----------------------------+-----------------------------+------------+----------+-------------------------------------+-------------------------------------+--------------------------+--------------------------+---------------+
|              742|                          909|                          973|                          994|           0| 

In [4]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
import math

# Feature and label columns
x = df_final_assembled.drop("is_injured")
y = df_final_assembled.select("is_injured")

# Split the data into train and test sets
train, test = df_final_assembled.randomSplit([0.8, 0.2], seed=42)

# Shard the training data into three parts
train_shards = train.randomSplit([1/3, 1/3, 1/3], seed=42)

# Convert test DataFrame to RDD with features and labels
test_rdd = test.select("features", "is_injured").rdd
test_rdd = test_rdd.map(lambda row: (row[0], row[1]))  # Row[0] -> 'features', Row[1] -> 'is_injured'

# Convert train shards to RDDs with index
train_shards_rdd = [
    shard.select("features", "is_injured").rdd.zipWithIndex().map(lambda row: (row[1], row[0])) 
    for shard in train_shards
]

In [5]:
import sys
print(sys.executable)


/opt/conda/bin/python


In [6]:
import numpy
print(numpy.__file__)


/opt/conda/lib/python3.11/site-packages/numpy/__init__.py


In [7]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import math
import numpy  

# Step 1: Define the Euclidean distance UDF
def calculate_distance_udf(test_features, train_features):
    """
    Calculate the Euclidean distance between two feature vectors.
    """
    test_features = test_features.toArray()  # Convert sparse vector to dense array
    train_features = train_features.toArray()  # Convert sparse vector to dense array
    
    squared_differences = [(t - r) ** 2 for t, r in zip(test_features, train_features)]
    distance = math.sqrt(sum(squared_differences))
    return distance

# Register the UDF with PySpark
distance_udf = udf(calculate_distance_udf, DoubleType())

# Step 2: Apply the UDF to calculate distances between test and train rows
# Assuming 'test_rdd' and 'train_rdd' are already loaded into DataFrames with 'features' columns

# Join test data with train data to calculate distances
df_test = test.select("features", "is_injured").alias("test_data")
df_train = train.select("features", "is_injured").alias("train_data")

# Add a new column "distance" using the UDF
df_with_distance = df_test.crossJoin(df_train) \
    .withColumn("distance", distance_udf("test_data.features", "train_data.features"))

df_with_distance.show(5)


Py4JJavaError: An error occurred while calling o582.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 44.0 failed 4 times, most recent failure: Lost task 0.3 in stage 44.0 (TID 34) (172.19.0.8 executor 0): java.io.IOException: Cannot run program "/home/thanhtk/kafka_airflow_spark/airflow/myenv/bin/python": error=2, No such file or directory
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1143)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1073)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:222)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:134)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:82)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.IOException: error=2, No such file or directory
	at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:314)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:244)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1110)
	... 28 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:354)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:382)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: Cannot run program "/home/thanhtk/kafka_airflow_spark/airflow/myenv/bin/python": error=2, No such file or directory
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1143)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1073)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:222)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:134)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:82)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.IOException: error=2, No such file or directory
	at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:314)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:244)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1110)
	... 28 more


In [None]:
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
import heapq
import math

# Euclidean distance function
def calculate_distance(test_row, train_row):
    """
    Calculate the Euclidean distance between a test row and a train row.
    """
    test_features = test_row.features.toArray()  # Convert sparse vector to dense array
    train_features = train_row.features.toArray()  # Convert sparse vector to dense array

    squared_differences = [(t - r) ** 2 for t, r in zip(test_features, train_features)]
    distance = math.sqrt(sum(squared_differences))
    return distance

# Mapper function for each test partition
def mapper(test_data_partition, train_rdd_broadcast, k):
    """
    Map function to find k nearest neighbors for each test row from the broadcasted train data.
    """
    results = []
    # Get the broadcasted train data
    train_data = train_rdd_broadcast.value
    for test_row in test_data_partition:
        k_neighbors = []
        for train_row in train_data:
            distance = calculate_distance(test_row, train_row)
            if len(k_neighbors) < k:
                heapq.heappush(k_neighbors, (-distance, train_row.is_injured))
            else:
                heapq.heappushpop(k_neighbors, (-distance, train_row.is_injured))
        results.append((test_row.is_injured, k_neighbors))
    return results

# Reducer function to merge results
def reducer(mapped_results_rdd, k):
    """
    Merge k-nearest neighbors from all test partitions.
    """
    def merge_neighbors(a, b):
        combined = sorted(a + b, key=lambda x: x[0], reverse=True)
        return combined[:k]

    reduced_results = (
        mapped_results_rdd
        .flatMap(lambda x: [(x[0], x[1])])  # Expand into (test_id, neighbors)
        .reduceByKey(merge_neighbors)  # Merge neighbors for each test_id
    )

    return reduced_results

# Majority voting for final prediction
def majority_vote(neighbors):
    """
    Perform majority voting on neighbors to determine the predicted class.
    """
    class_votes = {}
    for _, label in neighbors:
        class_votes[label] = class_votes.get(label, 0) + 1
    return max(class_votes, key=class_votes.get)  # Class with the most votes

# KNN Prediction without broadcasting
def knn_predict(test_rdd, train_shards_rdd, k=5):
    """
    Predict using KNN with broadcasting to avoid collecting training data.
    """
    mapped_results_rdd = None

    # Broadcast the training data for each partition
    for train_rdd in train_shards_rdd:
        train_data = train_rdd.collect()  # Collect the training data for each shard
        train_rdd_broadcast = test_rdd.context.broadcast(train_data)  # Broadcast it to all workers

        # Process each shard against test data
        shard_results = test_rdd.mapPartitions(
            lambda test_partition: mapper(test_partition, train_rdd_broadcast, k)
        )
        if mapped_results_rdd is None:
            mapped_results_rdd = shard_results
        else:
            mapped_results_rdd = mapped_results_rdd.union(shard_results)

    # Reduce phase: Merge k-nearest neighbors from all shards
    reduced_results_rdd = reducer(mapped_results_rdd, k)

    # Perform majority voting
    predictions_rdd = reduced_results_rdd.map(lambda x: (x[0], majority_vote(x[1])))

    return predictions_rdd

# Example usage: Predicting with KNN
k = 5
# Assuming train_rdd and test_rdd are already defined
predictions_rdd = knn_predict(test_rdd, train_shards_rdd, k)

# Show results (you can now take the first 5 predictions)
print(predictions_rdd.take(5))


In [None]:
from pyspark import SparkContext
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Assuming predictions_rdd is the RDD containing the predicted results (test_id, predicted_label)
# and the true labels from the test dataset (test_id, actual_label).

# Convert predictions to a DataFrame for evaluation (this is easier with Spark ML evaluation methods)
predictions_df = predictions_rdd.toDF(["test_id", "predicted_label"])

# Join with the actual labels
actual_labels_df = test.select("is_injured").withColumnRenamed("is_injured", "actual_label")
predictions_with_labels_df = predictions_df.join(actual_labels_df, predictions_df.test_id == actual_labels_df.test_id, "inner")

# Now calculate confusion matrix components (TP, TN, FP, FN)
tp = predictions_with_labels_df.filter("predicted_label = 1 AND actual_label = 1").count()
tn = predictions_with_labels_df.filter("predicted_label = 0 AND actual_label = 0").count()
fp = predictions_with_labels_df.filter("predicted_label = 1 AND actual_label = 0").count()
fn = predictions_with_labels_df.filter("predicted_label = 0 AND actual_label = 1").count()

# Calculate Accuracy, Recall, Precision, F1 Score
accuracy = (tp + tn) / (tp + tn + fp + fn)
recall = tp / (tp + fn) if tp + fn > 0 else 0
precision = tp / (tp + fp) if tp + fp > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0

# Display the metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Recall: {recall:.4f}")
print(f"Precision: {precision:.4f}")
print(f"F1 Score: {f1_score:.4f}")

# Confusion Matrix Output
print(f"Confusion Matrix:\nTP: {tp}, TN: {tn}, FP: {fp}, FN: {fn}")

# You can also use Spark's MulticlassClassificationEvaluator for automated metrics
evaluator = MulticlassClassificationEvaluator(labelCol="actual_label", predictionCol="predicted_label")
accuracy_spark = evaluator.evaluate(predictions_with_labels_df)
print(f"Accuracy (via Spark Evaluator): {accuracy_spark:.4f}")


In [None]:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import time

# Start timer
start_time = time.time()

# Ensure predictions_rdd is in the correct format: [(test_id, predicted_label)]
# Example: predictions_rdd = [(1, 0), (2, 1), (3, 0), ...]

# Define a custom schema for the DataFrame
schema = StructType([
    StructField("test_id", IntegerType(), True),
    StructField("prediction", IntegerType(), True)
])

# Convert predictions RDD to DataFrame with explicit schema
predictions_df = spark.createDataFrame(predictions_rdd, schema)

# Add true labels from the test dataset
# Assuming test DataFrame has the true labels with schema: [test_id, true_label]
test_labels_df = test.select(col("collision_id").alias("test_id"), col("is_injured").alias("true_label"))

# Join predictions with true labels
predictions_with_labels = predictions_df.join(test_labels_df, on="test_id")

# Calculate Accuracy
total_count = predictions_with_labels.count()
correct_count = predictions_with_labels.filter(col("prediction") == col("true_label")).count()
accuracy = correct_count / total_count

# Create Confusion Matrix
confusion_matrix = (
    predictions_with_labels.groupBy("true_label", "prediction")
    .count()
    .orderBy("true_label", "prediction")
)

# Stop timer
end_time = time.time()
execution_time = end_time - start_time

# Show Results
print(f"Accuracy: {accuracy}")
print(f"Time of execution: {execution_time:.2f}s")

# Show Confusion Matrix
confusion_matrix.show()

# Optional: Visualize the confusion matrix (if using matplotlib and seaborn)
try:
    import pandas as pd
    import seaborn as sns
    import matplotlib.pyplot as plt

    # Convert Spark DataFrame to Pandas for visualization
    cm_pd = confusion_matrix.toPandas()

    # Create a pivot table for the heatmap
    cm_matrix = cm_pd.pivot(index="true_label", columns="prediction", values="count").fillna(0)

    # Plot the heatmap
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm_matrix, annot=True, fmt="d", cmap="Blues", cbar=False)
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.show()
except ImportError:
    print("Seaborn or Matplotlib not installed, skipping visualization.")
