<!-- # Accident Data Processor using Spark Streaming

This notebook processes accident data from a Kafka stream, transforming it into structured data and loading it into a MySQL database. 

--- -->

<!-- HTML structure for header -->
<div style="background-color: #4CAF50; padding: 15px; border-radius: 5px;">
    <h2 style="color: white; text-align: center;">Accident Data Processor using Spark Streaming</h2>
</div>

# Extract Phase

### 1. Initialize Spark Session

In [1]:
from pyspark.sql import SparkSession

In [2]:
# Initialize Spark Session
spark = (
    SparkSession
    .builder
    .appName("AccidentDataProcessor")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,mysql:mysql-connector-java:8.0.33")
    .getOrCreate()
)

In [3]:
# Kafka configuration
kafka_topic = "accident-data"
kafka_bootstrap_servers = "ed-kafka:29092"

## 2. Read Stream from Kafka Topic

In [4]:
# Read stream from Kafka topic
df_kafka = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load()
)

## 3. Define the Schema for Incoming JSON Data

In [5]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Define the schema for the incoming JSON data
schema = StructType([
    StructField("eventId", StringType(), True),
    StructField("eventOffset", IntegerType(), True),
    StructField("Date", StringType(), True),  
    StructField("Time", StringType(), True),  
    StructField("Easting", IntegerType(), True),
    StructField("Northing", IntegerType(), True),
    StructField("Location", StringType(), True),
    StructField("Local_Auth", StringType(), True),
    StructField("Severity", StringType(), True),
    StructField("Road_cond", StringType(), True),
    StructField("Weather", StringType(), True),
    StructField("Visibility", StringType(), True),
    StructField("Speed_Lim", IntegerType(), True),
    StructField("Cross_fac", StringType(), True),
    StructField("Cross_ctrl", StringType(), True),
    StructField("Police_ref", StringType(), True),
    StructField("Casualties", IntegerType(), True),
    StructField("OAPs", IntegerType(), True),
    StructField("Children", IntegerType(), True),
    StructField("Cycles", IntegerType(), True),
    StructField("P2W", IntegerType(), True),
    StructField("Pedestrian", IntegerType(), True),
    StructField("Vehicles", IntegerType(), True)
])

<!-- HTML structure for parsing data --> <div style="background-color: #F44336; padding: 15px; border-radius: 5px; color: white;"> <h3 style="text-align: center;">Parsing the Kafka Stream</h3> </div>

# Transformation Phase
## 4. Parse the Incoming Data

In [6]:
from pyspark.sql.functions import from_json, col

# Parse the value column in Kafka (which is JSON) using the schema
df_parsed = df_kafka.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

In [7]:
df_parsed.printSchema()

root
 |-- eventId: string (nullable = true)
 |-- eventOffset: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Easting: integer (nullable = true)
 |-- Northing: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Local_Auth: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- Road_cond: string (nullable = true)
 |-- Weather: string (nullable = true)
 |-- Visibility: string (nullable = true)
 |-- Speed_Lim: integer (nullable = true)
 |-- Cross_fac: string (nullable = true)
 |-- Cross_ctrl: string (nullable = true)
 |-- Police_ref: string (nullable = true)
 |-- Casualties: integer (nullable = true)
 |-- OAPs: integer (nullable = true)
 |-- Children: integer (nullable = true)
 |-- Cycles: integer (nullable = true)
 |-- P2W: integer (nullable = true)
 |-- Pedestrian: integer (nullable = true)
 |-- Vehicles: integer (nullable = true)




## 5. Extract information for our Schema 

### Date Dimension


In [8]:
from pyspark.sql.functions import  dayofmonth, month, quarter, year, date_format

df_date = df_parsed.selectExpr(
                               "CAST(eventOffset AS BIGINT) as Date_ID",
                               "to_date(Date) as Date",
                               "dayofmonth(Date) as Day", 
                               "date_format(Date, 'EEEE') as Day_Name", 
                               "month(Date) as Month", 
                               "date_format(Date, 'MMMM') as Month_Name", 
                               "quarter(Date) as Quarter", 
                               "year(Date) as Year")


# Reordering the DataFrame to match the schema order
df_date = df_date.select("Date_ID", "Date", "Day", "Day_Name", "Month", "Month_Name", "Quarter", "Year")


Transforms the `df_parsed` DataFrame by selecting and converting date-related fields into various formats, including day, month, quarter, and year. The resulting `df_date` DataFrame provides a structured representation of these date attributes for analysis.


In [9]:

df_date.printSchema()

root
 |-- Date_ID: long (nullable = true)
 |-- Date: date (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Day_Name: string (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Month_Name: string (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Year: integer (nullable = true)



### Time Dimension

In [10]:
from pyspark.sql.functions import hour, minute, to_timestamp

df_time = df_parsed.selectExpr("CAST(eventOffset AS BIGINT) as Time_ID",
                               "to_timestamp(Time) as Time", 
                               "hour(to_timestamp(Time)) as Hour", 
                               "minute(to_timestamp(Time)) as Minute")


# Reordering the DataFrame to match the schema order
df_time = df_time.select("Time_ID", "Time", "Hour", "Minute")

Transforms the `df_parsed` DataFrame by selecting and converting time-related fields, including the event offset and time components, into a structured format. The resulting `df_time` DataFrame includes fields for a unique `Time_ID`, the full timestamp, hour, and minute for easier time-based analysis.


In [11]:
df_time.printSchema()

root
 |-- Time_ID: long (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Minute: integer (nullable = true)



### Location Dimension

In [12]:
# Assuming df_parsed is already defined and contains columns: Easting, Northing, Location, Local_Auth
df_location = df_parsed.selectExpr(
    "CAST(eventOffset AS BIGINT) as Location_ID",
    "Easting as Easting", 
    "Northing as Northing", 
    "Location as Location", 
    "Local_Auth as Local_Authority"  
)

# Reordering the DataFrame to match the schema order
df_location = df_location.select("Location_ID", "Easting", "Northing", "Location", "Local_Authority")

In [13]:
df_location.printSchema()

root
 |-- Location_ID: long (nullable = true)
 |-- Easting: integer (nullable = true)
 |-- Northing: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Local_Authority: string (nullable = true)



### Severity Dimension

In [14]:
df_severity = df_parsed.selectExpr(
    "CAST(eventOffset AS BIGINT) as Severity_ID",
    "Severity as Severity"  
)

# Reordering the DataFrame to match the schema order
df_severity = df_severity.select("Severity_ID", "Severity")

In [15]:
df_severity.printSchema()

root
 |-- Severity_ID: long (nullable = true)
 |-- Severity: string (nullable = true)



### Road Condition Dimension

In [16]:
# Assuming df_parsed is already defined and contains the relevant columns
df_road_condition = df_parsed.selectExpr(
    "CAST(eventOffset AS BIGINT) as Road_Condition_ID",
    "Road_cond as Road_Condition", 
    "Weather as Weather", 
    "Visibility as Visibility", 
    "Speed_Lim as Speed_Limit"
)

# Reordering the DataFrame to match the schema order
df_road_condition = df_road_condition.select(
    "Road_Condition_ID", 
    "Road_Condition", 
    "Weather", 
    "Visibility", 
    "Speed_Limit"
)


In [17]:
df_road_condition.printSchema()

root
 |-- Road_Condition_ID: long (nullable = true)
 |-- Road_Condition: string (nullable = true)
 |-- Weather: string (nullable = true)
 |-- Visibility: string (nullable = true)
 |-- Speed_Limit: integer (nullable = true)



### Crossing Dimension

In [18]:
# Assuming df_parsed is already defined and contains the relevant columns
df_crossing = df_parsed.selectExpr(
    "CAST(eventOffset AS BIGINT) as Crossing_ID",
    "Cross_fac as Crossing_Facility", 
    "Cross_ctrl as Crossing_Control_Type"
)

# Reordering the DataFrame to match the schema order
df_crossing = df_crossing.select(
    "Crossing_ID", 
    "Crossing_Facility", 
    "Crossing_Control_Type"
)

In [19]:
df_crossing.printSchema()

root
 |-- Crossing_ID: long (nullable = true)
 |-- Crossing_Facility: string (nullable = true)
 |-- Crossing_Control_Type: string (nullable = true)



### Finally, Fact Table

In [20]:
# Fact table with IDs from dimensions based on matching columns
df_fact = df_parsed.selectExpr(
                "Police_ref as Police_Ref",
                "CAST(eventOffset AS BIGINT) as Date_ID",
                "CAST(eventOffset AS BIGINT) as Time_ID",
                "CAST(eventOffset AS BIGINT) as Severity_ID",
                "CAST(eventOffset AS BIGINT) as Location_ID",
                "CAST(eventOffset AS BIGINT) as Road_Condition_ID",
                "CAST(eventOffset AS BIGINT) as Crossing_ID",
                "Casualties as Number_Of_Casualties", 
                "OAPs as Number_Of_OAPs", 
                "Children as Number_Of_Children", 
                "Cycles as Cycles_Involved", 
                "P2W as P2W_Involved", 
                "Pedestrian as Pedestrians_Involved", 
                "Vehicles as Motor_Vehicles_Involved")


In [21]:
df_fact.printSchema()

root
 |-- Police_Ref: string (nullable = true)
 |-- Date_ID: long (nullable = true)
 |-- Time_ID: long (nullable = true)
 |-- Severity_ID: long (nullable = true)
 |-- Location_ID: long (nullable = true)
 |-- Road_Condition_ID: long (nullable = true)
 |-- Crossing_ID: long (nullable = true)
 |-- Number_Of_Casualties: integer (nullable = true)
 |-- Number_Of_OAPs: integer (nullable = true)
 |-- Number_Of_Children: integer (nullable = true)
 |-- Cycles_Involved: integer (nullable = true)
 |-- P2W_Involved: integer (nullable = true)
 |-- Pedestrians_Involved: integer (nullable = true)
 |-- Motor_Vehicles_Involved: integer (nullable = true)



In [22]:
# Select final columns for the fact table
fact_incidents = df_fact.select(
    "Police_Ref",
    "Date_ID",
    "Time_ID",
    "Severity_ID",
    "Location_ID",
    "Road_Condition_ID",
    "Crossing_ID",
    "Number_Of_Casualties",
    "Number_Of_OAPs",
    "Number_Of_Children",
    "Cycles_Involved",
    "P2W_Involved",
    "Pedestrians_Involved",
    "Motor_Vehicles_Involved"
)

In [23]:
fact_incidents.printSchema()

root
 |-- Police_Ref: string (nullable = true)
 |-- Date_ID: long (nullable = true)
 |-- Time_ID: long (nullable = true)
 |-- Severity_ID: long (nullable = true)
 |-- Location_ID: long (nullable = true)
 |-- Road_Condition_ID: long (nullable = true)
 |-- Crossing_ID: long (nullable = true)
 |-- Number_Of_Casualties: integer (nullable = true)
 |-- Number_Of_OAPs: integer (nullable = true)
 |-- Number_Of_Children: integer (nullable = true)
 |-- Cycles_Involved: integer (nullable = true)
 |-- P2W_Involved: integer (nullable = true)
 |-- Pedestrians_Involved: integer (nullable = true)
 |-- Motor_Vehicles_Involved: integer (nullable = true)



# Load Phase

## 6. Load Function 

In [24]:
# Function to load DataFrame to MySQL
def load_to_MySQL(batch_df, batch_id, table_name):
    print(f"Batch id: {batch_id} for table {table_name}")
    try:
        batch_df.write \
            .mode("append") \
            .format("jdbc") \
            .option("driver", "com.mysql.cj.jdbc.Driver") \
            .option("url", "jdbc:mysql://172.18.0.5:3306/incidents_DW") \
            .option("dbtable", table_name) \
            .option("user", "root") \
            .option("password", "123456") \
            .save()
    except Exception as e:
        print(f"Error writing to MySQL: {e}")

The `load_to_MySQL` function writes a given DataFrame (`batch_df`) to a specified MySQL table (`table_name`) in append mode, while logging the batch ID. It establishes a JDBC connection using provided credentials and handles any exceptions during the writing process.


## Automated Batch Loading of Dimension Tables 

In [25]:
# Load the dim tables
query_date = (df_date.writeStream
              .foreachBatch(lambda batch_df, batch_id: load_to_MySQL(batch_df, batch_id, "dim_date"))
              .trigger(processingTime='10 seconds')
              .option("checkpointLocation", "checkpoint_dir_kafka_date")
              .start()
             )

query_time = (df_time.writeStream
              .foreachBatch(lambda batch_df, batch_id: load_to_MySQL(batch_df, batch_id, "dim_time"))
              .trigger(processingTime='10 seconds')
              .option("checkpointLocation", "checkpoint_dir_kafka_time")
              .start()
             )

query_severity = (df_severity.writeStream
              .foreachBatch(lambda batch_df, batch_id: load_to_MySQL(batch_df, batch_id, "dim_severity"))
              .trigger(processingTime='10 seconds')
              .option("checkpointLocation", "checkpoint_dir_kafka_severity")
              .start()
             )

query_location = (df_location.writeStream
              .foreachBatch(lambda batch_df, batch_id: load_to_MySQL(batch_df, batch_id, "dim_location"))
              .trigger(processingTime='10 seconds')
              .option("checkpointLocation", "checkpoint_dir_kafka_location")
              .start()
             )

query_road_condition = (df_road_condition.writeStream
              .foreachBatch(lambda batch_df, batch_id: load_to_MySQL(batch_df, batch_id, "dim_road_condition"))
              .trigger(processingTime='10 seconds')
              .option("checkpointLocation", "checkpoint_dir_kafka_road_condition")
              .start()
             )

query_crossing = (df_crossing.writeStream
              .foreachBatch(lambda batch_df, batch_id: load_to_MySQL(batch_df, batch_id, "dim_crossing"))
              .trigger(processingTime='10 seconds')
              .option("checkpointLocation", "checkpoint_dir_kafka_crossing")
              .start()
             )



In [26]:
# Load the fact table
query_fact = (fact_incidents.writeStream
              .foreachBatch(lambda batch_df, batch_id: load_to_MySQL(batch_df, batch_id, "fact_incidents"))
              .trigger(processingTime='10 seconds')
              .option("checkpointLocation", "checkpoint_dir_kafka_fact")
              .start()
             )

In [27]:
# Wait for termination
query_date.awaitTermination()
query_time.awaitTermination()
query_severity.awaitTermination()
query_location.awaitTermination()
query_road_condition.awaitTermination()
query_crossing.awaitTermination()


query_fact.awaitTermination()

Batch id: 0 for table dim_crossing
Batch id: 0 for table dim_severity
Batch id: 0 for table dim_location
Batch id: 0 for table dim_time
Batch id: 0 for table dim_date
Batch id: 0 for table dim_road_condition
Batch id: 0 for table fact_incidents
Error writing to MySQL: An error occurred while calling o207.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (e8f14ff18b62 executor driver): java.sql.BatchUpdateException: Duplicate entry '10015' for key 'dim_crossing.PRIMARY'
	at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:816)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:418)
	at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:795)
	at org.apache.spark.sql.execution.datasources

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 