## <span style="color:#0b486b">Part 1: MongoDB Data Model</span>

### Task 1.1 Collection Design

<div style="text-align: center"> 
    <img src="erddiagram.png"></img>
    <p style="text-align: center">Figure 1 – Data Model Design </p>
</div>

This ERD diagram illustrates the design of our MongoDB database, detailing the structure of the three main collections: Violation, Vehicle, and Camera. It also indicates the primary keys, foreign keys, indexing keys, and shard keys. The relationships between the collections are clearly shown, except for the logged_pairs violation collection, which has no relationship with the others. The embedded documents for the Violation collection are also clearly specified, treating each record as a single object.

#### Collection 1: Vehicle

* This collection is used to store the static metadata about the registered vehicles to enable quick identification and linking to traffic violations

* Schema and sample for this collection. The field _id is used as an alias for the car_plate column in the provided CSV file.
    * schema
        ```json
        {
          "_id": String,
          "owner_name": String,
          "owner_addr": String,
          "vehicle_type": String,
          "registration_date": Date
        }
        ```
    * sample
        ```json
        {
          "_id": "FZP 98
          "owner_name": "Priya a/l Anita",
          "owner_addr": "24 Jalan Bukit Jelutong, Kuala Lumpur",
          "vehicle_type": "Hatchback",
          "registration_date": 1991-04-08T21:23:30.000+00:00
        } 
        ```

* Indexes
    * The chosen field is _id, which serves as the default indexing key in MongoDB and is sorted in ascending order.
    * The index type is a single field index.
    * The purpose of this indexes is to ensure that each document in this collection has a unique _id (car_plate) value and to facilitate efficient querying based on the _id (car_plate) field.
    
* Data retention policy
    * The data of the vehicle registrations are stored permanently.
    * If the car_plate is deregistered, its associated document will be removed from the collection.
    * Each _id (representing a car_plate) is uniquely assigned to one individual at any given time to ensure no duplicate registration, thus the latest registration is stored only.

#### Collection 2: Camera

* This collection is used to hold static information about traffic cameras used for monitoring and capturing violations.

* Schema and sample for this collection. The field _id is used as an alias for the camera_id column in the provided CSV file.
    * schema
        ```json
        {
          "_id": Int,
          "latitude": Double,
          "longitude": Double,
          "position": Double,
          "speed_limit": Int
        }
        ```
    * sample
        ```json
        {
          "_id": 2,
          "latitude": 2.162418757,
          "longitude": 102.6524549,
          "position": 153.5,
          "speed_limit": 110
        }
        ```

* Indexes
    * The chosen field is _id, which serves as the default indexing key in MongoDB and is sorted in ascending order.
    * The index type is a single field index.
    * The purpose of this indexes is to ensure that each document in this collection has a unique _id (camera_id) value and to facilitate efficient querying based on the _id (camera_id) field.
    
* Data retention policy
    * The data of the traffic cameras are stored permanently.

#### Collection 3: Violation

* This collection is used to store dynamic records of flagged violations detected by cameras, including the vehicle involved and timestamp. The historic violations is stored in this collection as well.

* Schema and sample for this collection. The field _id is used as an alias for the camera_id column in the provided CSV file.
    * schema
        ```json
        {
          "_id": ObjectId,
          "car_plate": String,
          "date": Date,
          "updatedAt": Date,
          "violations": Array[instant_violation | average_violation]
        }

        instant_violation = {
            "camera_id_end": Int,
            "timestamp_end": Date,
            "speed_reading": Double,
            "type": String,
            "updateAt": Date
        }

        average_violation = {
            "camera_id_start": Int,
            "camera_id_end": Int,
            "timestamp_start": Date,
            "timestamp_end": Date,
            "speed_reading": Double,
            "type": String,
            "updateAt": Date
        }
        ```
    * sample
        ```json
        {
          "_id": ObjectId("6831d6e89505ee6a199c06ec"),
          "car_plate": "UTT 229",
          "date": 2024-01-01T00:00:00.000Z,
          "updateAt": 2025-05-24T14:26:00.351Z,
          "violations": [
            {
              "camera_id_end": 1,
              "timestamp_end": 2024-01-01T08:00:00.000Z,
              "speed_reading": 140.9,
              "type": "instant",
              "updatedAt": 2025-05-24T14:25:44.161Z
            },
            {
              "camera_id_end": 2,
              "timestamp_end": 2024-01-01T08:00:26.887Z,
              "speed_reading": 134.1,
              "type": "instant",
              "updatedAt": 2025-05-24T14:25:44.161Z
            },
            {
              "camera_id_end": 3,
              "timestamp_end": 2024-01-01T08:00:54.958Z,
              "speed_reading": 130.8,
              "type": "instant",
              "updatedAt": 2025-05-24T14:25:44.161Z
            },
            {
              "camera_id_start": 1,
              "camera_id_end": 2,
              "timestamp_start": 2024-01-01T08:00:00.000Z,
              "timestamp_end": 2024-01-01T08:00:26.887Z,
              "speed_reading": 133.89285632420857,
              "type": "average",
              "updatedAt": 2025-05-24T14:26:00.351Z
            },
            {
              "camera_id_start": 2,
              "camera_id_end": 3,
              "timestamp_start": 2024-01-01T08:00:26.887Z,
              "timestamp_end": 2024-01-01T08:00:54.958Z,
              "speed_reading": 128.24658958816858,
              "type": "average",
              "updatedAt": 2025-05-24T14:26:00.351Z
            }
          ]
        }
        ```

* Indexes
    * _id
        * The chosen field is _id, which serves as the default indexing key in MongoDB and is sorted in ascending order.
        * The index type is a single field index.
        * The purpose of this indexes is to ensure that each document in this collection has a unique _id (camera_id) value and to facilitate efficient querying based on the _id (camera_id) field.
    
    * car_plate, date
        * The chosen field is car_plate and date {car_plate, date}, and both of the sort order is in ascending order.
        * The index type is compound index
        * The purpose of this design is used to improve the performance of the query that contains these two fields. For example, the streaming application needs to query on car_plate and date to update the document or insert a new document based on the car_plate and date.

* Shard key strategy (if any) by specifying
    * The chosen shard key is the car_plate and date {car_plate, date}
    * The shard key type is ranged-based sharding on compound index
    * Sharding is used in this collection as this collection will continuously grow up to store the violation records captured by the cameras. Using sharding can improves the data distribution. The fields car_plate and date is chosen as the shard key as they match the query pattern of the streaming application. Therefore, both are selected to ensure efficient query routing and balanced data distribution across shards

* Data retention policy
    * All violation records are stored permanently in the collection
    * A timestamp field is included in each document to record the exact time the record was stored in the collection.
    * The timestamp is useful for performance the real time data analytics.

#### Collection 4: dropped_pairs

* This collection stores logged pairs of records that have been dropped from the system, including vehicles that could not be matched with other cameras due to late data arrival. It is useful for auditing and tracking purposes.

* Schema and sample for this collection. The field _id is used as an alias for the car_plate column in the provided CSV file.
    * schema
        ```json
        {
          "_id": String,
          "car_plate": String,
          "camera_id": Int,
          "timestamp": Date
        }
        ```
    * sample
        ```json
        {
        "_id": ObjextId(“d6079f0a-8926-40e0-b791-2bc8a3002b13”),
        "car_plate”: “OIE 79”,
        "camera_id": 1,
        "timestamp”: 2024-01-01T13:39:48,134.3
        }
        ```

* Indexes
    * The chosen field is _id, which serves as the default indexing key in MongoDB and is sorted in ascending order.
    * The index type is a single field index.
    
* Data retention policy
    * The date are retained for a maximum of 1 year from the time of ingestion.
    * A one-year retention window balances the need for auditing and debugging with storage optimization and system performance, ensuring manageable database growth and efficient query response times.

### Task 1.2 Collection Relationship

#### Vehicle and Violation
* It is a one-to-many relationship between the `Vehicle` collection and the `Violation` collection. It means one vehicle can have multiples of violation record in the `Violation` collection, but only one record in a day.
* As there are two collections involved, thus the strategy for modelling their relationship is reference. The `Vehicle` collection uses _id (car_plate) as its primary key. The `Violation` collection has a car_plate field that serves as a foreign key reference to the `Vehicle` collection.
* Referencing model is chosen here as the violation records are frequently inserted or updated in the streaming application. The only information related to the vehicle can be captured by the camera is the car_plate only. Therefore, the metadata of the vehicle is not frequently involved in the query in this context. Additionally, the metadata of the vehicle is relative static, referencing can avoid the duplicating of the metadata of the vehicle in the violation. This design introduces a join cost when querying vehicle information alongside violations, but the cost is manageable and justified by the benefit of avoiding data duplication and saving storage space. Storing vehicle metadata separately helps ensure consistency of vehicle information across all related records.

#### Camera and Violation
* It is a many-to-many relationship between the `Camera` collection and the `Violation` collection. This means that one camera can be associated with multiple violation records in the `Violation` collection, and a single violation record can involve multiple cameras.
* As there are two collection involved, thus the strategy for modeling their relationship is reference. The `Camera` collection uses _id (camera_id) as its primary key. The `Violation` collection has camera_id_start or camera_id_end fields in the violations array field  that serves as a foreign key reference to the `Camera` collection.
* Referencing model is chosen here as the violation records are frequently inserted or updated in the streaming application. Only the camera id is record in the violation records as the camera data rarely change. The camera data is static and do not change frequently, thus this design can prevent the duplication of the camera information within the violation record. The join cost is acceptable than the storing cost to store duplicate data. This referencing model can ensure the consistency on the camera details, especially when the information of the camera need to change, and only need to update in the `Camera` collection. 

#### Violation
* In `Violation` collection, the specific violation detail is embedded in the model, instead of using referencing model to saparate the violation record details. 
* As one car one day only can have one document inside the `Violation` collection, thus the violations field is bounded to only capture the violation detail of the car within that day. 
* Each record in the violations array is unique and tightly couple with the car_plate and the date. An embedded model is chosen because the violation details are essential in the data analysis. Embedding can reduce the need for the join cost during the queries, and lead to a better performance to retriece data.
* The combination of the car_plate and the date is unqiue and the violations array is unique to ensure the consistency of the data in this collection.
* Two types of violations are stored within the violations array field, making it easier and more efficient to query and analyze total violations.

### Task 1.3 Discussion

* Consistency and Idempotency
    * A unique compound index is applied to the combination of the car_plate and date fields to ensure that each vehicle can have at most one violation document per day. The uniqueness of the combination of car_plate and date ensure that when writing the violation of a certain car in the same day does not create duplicate document. Each violation is embedded under the violations array field. The violation details stored in the array are unique and this can be ensured by using the `addToSet` operation.
    * Upsert pattern is used when inserting or updating the violation records in the straming application. When detecting a new violation record, the streaming application used the car_plate and date as the query key to write the violation record into the collections. If the document with that car_plate and date existed, the new violation detail is appended into the violations array of that document. Contrastively, if no such document exists, a new document with the car_plate and date is created and the violation detail is appended in the create document as well. 

* Scalability and Fault-Tolerance
    * The data model is designed to support high ingest rates. The embedding of the violation details inside the same document can reduce the number of operation to write the violation into the multiple collections when comparing with the referencing model. Additionally, the `Violation` collection is sharded using the car_plate and date as the shard key. The data distibution can helps to balance the load and enable parallel processing to handle high ingest rates.
    * The model supports low-latency lookups, especially to retieving all the violation records of a car on a specific date. The indexing on the car_plate and date fields help to achieve this to prevent the query search through all the documents in the collection. The embedded structure prevents the join cost if storing the violation details in other collections. 

* Justify and explain the trade-off made in your design
    * Choosing the embedded model on the `Violation` collection avoids the expensive join cost. The data analysis is interested in the violation details, thus this embedded model can provide a better performance than the referencing model. The updatedAt timestamp is indexing to ensure a better reading performance on the latest data during the real-time data analysis. This design increases the size for a single document in the `Violation` collection. However, the violation is grouped by the car_plate and the date, thus it can ensur that the violations array field is bounded to only store the violation details happened in that specific date. This can help to reduce the risk of exceeding the document size limit in MongoDB.


### Required imports
Ensure that the three utility Python files are downloaded and located in the same directory as this Jupyter notebook before importing them.
- read_stream_utils.py
- online_join_utils.py
- mongo_sink.py

In [27]:
# Required library
import os
from pymongo import MongoClient, UpdateOne
from pathlib import Path
import pandas as pd
from datetime import datetime as dt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import (
    StructType, 
    StructField,
    StringType,
    IntegerType,
    TimestampType,
    DoubleType
)
from utils.read_stream_utils import read_stream, get_kafka_header, get_kafka_message
from utils.online_join_utils import (
    calculate_time_boundary,
    stream_stream_join_event,
    get_matched_stream,
    get_violated_average_stream,
)
from utils.mongo_sink_utils import write_dropped_pairs, write_violations_to_mongo

## <span style="color:#0b486b">Additional: MongoDB Data Model Implementation</span>

This is an additional part used to setup the MongoDB collection using pymongo

#### Connect to MongoClient

In [30]:
# Basic configuration parameters
database = "fit3182_a2"
violation = "violation"
camera = "camera"
vehicle = "vehicle"
HOST_IP = "192.168.64.1"
PORT = 27017

# Establish the connecttion to mongos
client = MongoClient(HOST_IP, PORT)

# Drop the database 
client.drop_database(database)

# Enable sharding on the fit3182_a2 database
client.admin.command("enableSharding", database) 

#### Declare variables for the collections in the database

In [31]:
violation_collection = client[database][violation]
vehicle_collection = client[database][vehicle]
camera_collection = client[database][camera]

#### Setup for the Violation collection

Refer to [Indexes](https://www.mongodb.com/docs/manual/indexes/)

Refer to [Shrad Keys](https://www.mongodb.com/docs/manual/core/sharding-shard-key/)

In [32]:
# Create index on 'violation' collection for ('car_plate', 'date') combination
violation_collection.create_index(
    [("car_plate", 1), ("date", 1)],
    unique = True
)

# Create index on 'violation' for updateAt field
violation_collection.create_index([("updatedAt", 1)])

# Add sharded key for 'violation' collection
client.admin.command(
    "shardCollection", 
    f"{database}.{violation}", 
    key = {"car_plate": 1, "date": 1}
)

'updatedAt_1'

#### Load the CSV files to be written into the corresponding collections

In [33]:
# Define the root directory for data files
root_dir = Path("data")

# Load the CSV files using pandas
camera = pd.read_csv(root_dir / "camera.csv")
vehicle = pd.read_csv(root_dir / "vehicle.csv")
violation = pd.read_csv(root_dir / "camera_event_historic.csv")

#### Build the Camera collection in the database

In [34]:
# Convert the camera dataframe to list of dictionaries
camera_records = camera.to_dict(orient="records")

# Rename the column camera_id to _id
for r in camera_records:
    r["_id"] = r.pop("camera_id")
    
# Insert camera data into the Camera collection
camera_collection.insert_many(camera_records)

InsertManyResult([1, 2, 3], acknowledged=True)

#### Build the Vehicle collection in the database

In [35]:
# Remove duplicate car_plates, keeping the latest registration date
vehicle['registration_date'] = pd.to_datetime(vehicle['registration_date'])
vehicle_unique = vehicle.sort_values('registration_date').groupby('car_plate', as_index=False).last()
vehicle_unique_records = vehicle_unique.to_dict(orient="records")

# Rename the column car_plate to _id
for r in vehicle_unique_records:
    r["_id"] = r.pop("car_plate")
    
# Insert vehicle data into the Vehicle collection
vehicle_collection.insert_many(vehicle_unique_records)

InsertManyResult(['AA 1', 'AA 157', 'AA 32', 'AA 3607', 'AA 499', 'AA 50', 'AA 501', 'AA 556', 'AA 80', 'AA 93', 'AAC 835', 'AAO 08', 'AAP 3294', 'AAU 24', 'AAU 7147', 'AAV 14', 'AAV 9257', 'AB 0', 'AB 05', 'AB 0896', 'AB 114', 'AB 371', 'AB 7', 'AB 9', 'ABA 2', 'ABD 9', 'ABD 90', 'ABO 2', 'ABY 4', 'ABZ 3936', 'AC 372', 'AC 4295', 'AC 55', 'AC 6', 'AC 6862', 'AC 74', 'AC 7429', 'AC 9', 'ACA 705', 'ACC 168', 'ACF 790', 'ACM 02', 'ACN 411', 'ACQ 5073', 'ACR 12', 'ACR 273', 'ACT 8', 'ACY 089', 'AD 019', 'AD 2453', 'AD 246', 'AD 35', 'AD 5218', 'AD 6412', 'AD 8', 'AD 9374', 'ADF 7514', 'ADH 04', 'ADQ 91', 'ADR 04', 'ADU 916', 'ADZ 39', 'AE 07', 'AE 135', 'AE 38', 'AE 439', 'AE 504', 'AE 51', 'AE 691', 'AE 8', 'AE 822', 'AE 831', 'AE 91', 'AE 99', 'AEI 23', 'AEQ 56', 'AET 713', 'AF 3478', 'AF 6', 'AF 6128', 'AF 62', 'AF 8', 'AF 84', 'AF 89', 'AFB 1', 'AFE 364', 'AFP 3', 'AFS 0', 'AFW 912', 'AG 0', 'AG 050', 'AG 07', 'AG 10', 'AG 2', 'AG 2010', 'AG 6', 'AG 8571', 'AG 86', 'AG 9300', 'AGA 47'

#### Build the Violation collection in the database (historic data)

In [36]:
# Preprocess on the violation dataframe
violation["timestamp_start"] = pd.to_datetime(violation['timestamp_start'], format="mixed")
violation["timestamp_end"] = pd.to_datetime(violation['timestamp_end'], format="mixed")
violation['date'] = violation['timestamp_start'].dt.date

# Create operations
operations = []

# Create operation for each row of data in the violation dataframe
for _, row in violation.iterrows():
    car_plate = row["car_plate"]
    date = dt.combine(row["date"], dt.min.time())
    updateAt = row["timestamp_end"]
    
    search_condition = {
        "car_plate": car_plate,
        "date": date
    }

    
    violation_payload = {
        "camera_id_start": row["camera_id_start"],
        "camera_id_end": row["camera_id_end"],
        "timestamp_start": row["timestamp_start"],
        "timestamp_end": row["timestamp_end"],
        "speed_reading": row["speed_reading"],
        "type": "average",
        "updateAt": updateAt
    }
    
    update = {
        "$set": {
            "updatedAt": updateAt
        },
        "$addToSet": {
            "violations": violation_payload
        }
    }
    
    operations.append(UpdateOne(search_condition, update, upsert=True))

# Bulk write the violation records to the Violation collection
if operations:
    violation_collection.bulk_write(operations)
else:
    print("no operation is created yet")

## <span style="color:#0b486b">Part 2: Streaming Application</span>

### Task 2.1 Data Stream Processing (Streaming Application)

<div style="text-align: center"> 
    <img src="application_part1.png"></img>
    <p style="text-align: center">Figure 2.1 – Overview of Streaming Application (Part 1) </p>
</div>

This diagram illustrates the first part of our streaming algorithm, where data streams are read from the Kafka producer, processed in PySpark, and the watermarking technique is applied to handle late-arriving data.

<div style="text-align: center"> 
    <img src="application_part2.png"></img>
    <p style="text-align: center">Figure 2.2 – Overview of Streaming Application (Part 2) </p>
</div>

This diagram illustrates the second part of our streaming algorithm, showing how we join the data streams, process matched records to filter out violations and store them in MongoDB, and handle unmatched data by sinking it into a logged pairs collection.

#### PySpark runtime environment to support Kafka integration
PySpark 3.5.5 is used here, the dependencies added using spark-submit. Refer to [Structured Streaming Kafka Integration - Deploying](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying)

In [37]:
# Configure PySpark to include Kafka support via spark-streaming and spark-sql Kafka packages
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.5,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5 pyspark-shell'

#### Basic configuration for the streaming application
This section defines the constants for the streaming application. Please update the HOST_IP value if required

In [38]:
# Configuration parameters
TOPIC_A = "camera_event_a"
TOPIC_B = "camera_event_b"
TOPIC_C = "camera_event_c"
TOLERANCE_RATE = "30 minutes"
METADATA = {
    "producer_id": "int",
    "speed_limit": "int",
    "dist_next": "int",
    "processing_timestamp": "timestamp",
}

# Schema to parse the message into json
SCHEMA = StructType(
    [
        StructField("event_id", StringType(), True),
        StructField("batch_id", IntegerType(), True),
        StructField("car_plate", StringType(), True),
        StructField("camera_id", IntegerType(), True),
        StructField("timestamp", TimestampType(), True),
        StructField("speed_reading", DoubleType(), True),
    ]
)

# Initialize Spark session
spark = (
    SparkSession.builder.master("local[*]")
    .appName("Streaming Parking Data")
    .getOrCreate()
)

#### Read message from Kafka

In [39]:
# Read from corresponding topic
event_a = read_stream(spark, TOPIC_A, HOST_IP)
event_b = read_stream(spark, TOPIC_B, HOST_IP)
event_c = read_stream(spark, TOPIC_C, HOST_IP)

#### Extract metadata from header

In [40]:
header_a = get_kafka_header(event_a, METADATA)
header_b = get_kafka_header(event_b, METADATA)
header_c = get_kafka_header(event_c, METADATA)

#### Extract message
The Kafka message is extracted and converted into rows along with the Kafka headers.

In [41]:
stream_a = get_kafka_message(header_a, SCHEMA)
stream_b = get_kafka_message(header_b, SCHEMA)
stream_c = get_kafka_message(header_c, SCHEMA)

#### Adding watermark
Watermarking is a technique used in PySpark to handle late-arriving data in streaming applications. Each stream is assigned a watermark to keep track of the maximum event time seen so far. The latest timestamp in each stream will be remembered by the pyspark, if new data arrives with a timestamp after the `TOLERANCE_RATE`, it will be considered too late and ignored by the pyspark. Watermarking can help to clear the outdated data.

The `TOLERANCE_RATE` is set to 30 minutes in our case which means PySpark will only process messages that arrive within 30 minutes of the latest observed event time in each stream. This helps manage state and prevents unbounded growth of intermediate results by discarding outdated data. However, while performing the online joins across multiple streams, PySpark will determine a global watermark which is typically the minimum watermark among the three streams, as defined in the references below.
The example below show that watermark is set to 20 minutes

This example showing that setting watermark to 20 minutes:

Event timestamps:
- 10:00AM
- 10:15AM ( latest timestamp )
- 10:08AM ( will be processes, within 20 minutes of the latest event )
- 09:50AM ( ignored, older than the 20-minute threshold )

In this case, any event earlier than 09:55 AM would be dropped, given the latest event occurred at 10:15 AM and the watermark is set to 20 minutes.

Refer to [Handling Late Data and Watermarking](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking)

Refer to [Policy for handling multiple watermarks](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#policy-for-handling-multiple-watermarks)

In [42]:
marked_a = stream_a.withWatermark("timestamp", TOLERANCE_RATE)
marked_b = stream_b.withWatermark("timestamp", TOLERANCE_RATE)
marked_c = stream_c.withWatermark("timestamp", TOLERANCE_RATE)

#### Instant violation Detection
instant violation is detected independently to ensure that all the instant violation will be recorded. If the instant violation is detected with the average speed violation in the section below. Some instant violation will be dropped as some violated instant speed will be dropped in the online join process. 

In [43]:
# Filter violated row 
violated_a = marked_a.filter(col("speed_reading") >= col("speed_limit"))
violated_b = marked_b.filter(col("speed_reading") >= col("speed_limit"))
violated_c = marked_c.filter(col("speed_reading") >= col("speed_limit"))

# Union to perform sink action at once
instant_violation = violated_a.union(violated_b).union(violated_c)

#### Online Join
The detail of the implementation are in the online_join_utils.py file. Full outer join is used here as dropped data needed to be recorded. The window size for the online join is determine using the boundary time that might be average speed violation and the derivation are shown below. However, the futher validation on the violation are required as PySpark only allow using integer for the event-time windows

$$\frac{\text{distance (km)}}{\text{duration (hours)}} \geq \text{speed limit (km/h)}$$

$$\text{duration (hours)} \leq \frac{\text{distance (km)}}{\text{speed limit (km/h)}}$$

$$\text{duration (seconds)} \leq \frac{\text{distance (km)} \times 3600}{\text{speed limit (km/h)}}$$

Uisng the watermark in the stream-stream join can ensure the data won't drop the data in the `TOLERANCE_RATE` for joining. The NULL results will be shown once it did not find match pairs and it is outdated according to the watermarking mechanism.

Refer to [Stream-stream joins](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-stream-joins)

In [44]:
# Calculate the window size
window_size_ab = calculate_time_boundary(1, 110)
window_size_bc = calculate_time_boundary(1, 90)

# Stream-stream full outer join
stream_ab = stream_stream_join_event(marked_a, marked_b, window_size_ab)
stream_bc = stream_stream_join_event(marked_b, marked_c, window_size_bc)

#### Average Violation
A duration column is added here by using the formula above, a filtering is applied here to filter out the event that the average speed violated the speed limit of the ending camera. An average speed column is added to calculate the average speed to record in the database.

In [45]:
# Filter match pairs
match_ab = get_matched_stream(stream_ab)
match_bc = get_matched_stream(stream_bc)

# Detection and calculation
average_violated_ab = get_violated_average_stream(match_ab)
average_violated_bc = get_violated_average_stream(match_bc)

# Union to perform sink action at once
average_violation = average_violated_ab.union(average_violated_bc)

#### Log Drop Pairs

In [46]:
# Filter unmatch data
drop_ab = stream_ab.filter(
    col("timestamp_start").isNull() | col("timestamp_end").isNull()
)
drop_bc = stream_bc.filter(
    col("timestamp_start").isNull() | col("timestamp_end").isNull()
)

# Union to perform sink action at once
drop = drop_ab.union(drop_bc)

#### Violation and Drop Pairs Written to MongoDB
The implementation can be found in the sink_mongodb.py. Bulk write is used to improve the performance to perform the action to the database. UpdateOne with upsert is used as the operation to update or insert a document into the collection. To ensure the violations array do not have duplicate data, addToSet operator is used. To allow bulk write operation can be used, `foreachbatch` is used to ensure that pyspark can perform the sink action on batch of dataframe, instead of row by row using `foreach`.

Refer to [addToSet](https://www.mongodb.com/docs/manual/reference/operator/update/addToSet/)

Refer to [BulkWrite](https://www.mongodb.com/docs/manual/reference/method/db.collection.bulkWrite/)

Refer to [foreachbatch](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.foreachBatch.html)

In [47]:
# Write of average violations
average_violation_writer = (
    average_violation.writeStream.format("mongodb")
    .option("checkpointLocation", "./average_checkpoints")
    .outputMode("append")
    .foreachBatch(
        lambda batch_df, _: write_violations_to_mongo(
            batch_df, _, HOST_IP, "average"
        )
    )
)

# Writer of instant violations
instant_violation_writer = (
    instant_violation.writeStream.format("mongodb")
    .option("checkpointLocation", "./instant_checkpoints")
    .outputMode("append")
    .foreachBatch(
        lambda batch_df, _: write_violations_to_mongo(
            batch_df, _, HOST_IP, "instant"
        )
    )
)

# Writer of dropped pairs
drop_violation_writer = (
    drop.writeStream.format("mongodb")
    .option("checkpointLocation", "./drop_checkpoints")
    .outputMode("append")
    .foreachBatch(
        lambda batch_df, _: write_dropped_pairs(batch_df, _, HOST_IP)
    )
)

#### Start Streaming

In [None]:
try:
    average_query = average_violation_writer.start()
    instant_query = instant_violation_writer.start()
    drop_query = drop_violation_writer.start()
    
    average_query.awaitTermination()
    instant_query.awaitTermination()
    drop_query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopping query.')
finally:
    average_query.stop()
    instant_query.stop()
    drop_query.stop()