# Spark and mongodb connector setup

In [52]:
# Import of libraries
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Function to initialize Pyspark wiht MongoDB connector
def init_spark():
    mongo_conn = f"mongodb://mongodb:27017/"
    db = "CS4010"
    coll = "accidents"
    conf = SparkConf()

    # Download mongo-spark-connector and its dependencies.
    conf.set("spark.jars.packages",
             "org.mongodb.spark:mongo-spark-connector:10.0.2")

    # Set up read connection :
    conf.set("spark.mongodb.read.connection.uri", mongo_conn)
    conf.set("spark.mongodb.read.database", db)
    conf.set("spark.mongodb.read.collection", coll)

    # Set up write connection
    conf.set("spark.mongodb.write.connection.uri", mongo_conn)
    conf.set("spark.mongodb.write.database", db)
    conf.set("spark.mongodb.write.collection", coll)

    SparkContext(conf=conf)

    return SparkSession \
        .builder \
        .appName('myApp') \
        .getOrCreate()

# Create the spark session    
spark = init_spark()

# Transform and insert accident data

In [53]:
# Read input data and create dataframes
df_accidents = spark.read.option("header", True).csv("./Datasets/accidents.csv", inferSchema=True )
df_accidents = df_accidents.withColumn("timestamp", to_utc_timestamp(col("Timestamp"),"Europe/Oslo"))

# Rename columns
name_mapping = {
    "RoadReference": "road_reference",
    "timestamp": "timestamp",
    "SpeedLimit(KMH)": "speed_limit",
    "Weather": "weather",
    "RoadConditions": "road_conditions",
    "LightingConditions": "lighting_conditions",
    "RoadLights": "road_lights",
    "RoadWidth(M)": "road_width",
    "RoadType": "road_type",
    "LaneType": "lane_type",
    "Lon": "lon",
    "Lat": "lat"
}

# Use the select operation with alias to rename multiple columns
df_accidents = df_accidents.select([df_accidents[column].alias(new_name) for column, new_name in name_mapping.items()])
df_accidents.show(30) # Sanity check

# Write to mongodb
df_accidents.write.format("mongodb").mode("overwrite").option("database", "CS4010").option("collection", "accidents").save()

+--------------------+-------------------+-----------+-----------------+---------------+--------------------+-----------+----------+---------------+----------------+------------------+------------------+
|      road_reference|          timestamp|speed_limit|          weather|road_conditions| lighting_conditions|road_lights|road_width|      road_type|       lane_type|               lon|               lat|
+--------------------+-------------------+-----------+-----------------+---------------+--------------------+-----------+----------+---------------+----------------+------------------+------------------+
|RV162 S1D1 m2472 ...|2020-08-17 12:17:00|         50|God sikt, opphold|  Tørr, bar veg|             Dagslys|         Ja|      15,4|Vanlig veg/gate|          Ukjent| 10.75370462750887|   59.912271465439|
|     RV162 S1D1 m572|2020-08-07 06:51:00|         50|God sikt, opphold|  Tørr, bar veg|             Dagslys|         Ja|      null|Vanlig veg/gate|Vanlig kjørefelt|10.760100208019608|

# Transform, join and insert traffic data (volume & speed)

In [49]:
# Function to load, transform and save traffic volume and speed data to MongoDB
def transformAndLoadVolumeSpeedToMongo(file_volume, file_speed):
    # Setup
    db_name = "CS4010"
    collection_name = "traffic_data"
    timezone = "Europe/Oslo"
    
    # Importing datasets and creating dataframes
    df_volume = spark.read.option("header", True).csv(f'./Datasets/{file_volume}.csv', inferSchema=True )
    df_speed = spark.read.option("header", True).csv(f'./Datasets/{file_speed}.csv', inferSchema=True )
    
    # Timestamp transformation
    df_volume = df_volume.withColumn("timestamp", to_utc_timestamp(col("toTime"),timezone))
    
    # Combine time and date into a common timestamp
    df_speed = df_speed.withColumn("Til tidspunkt", date_format(col("Til tidspunkt"), "HH:mm:ss"))
    df_speed = df_speed.withColumn("Timestamp", concat_ws(' ', col("Dato"), col("Til tidspunkt")).alias("TIMESTAMP"))
    df_speed = df_speed.withColumn("timestamp", to_utc_timestamp(col("Timestamp"),timezone))
    
    # Rename columns
    volume_name_mapping = {
        "trpid": "trpid",
        "timestamp": "timestamp",
        "coverage": "coverage",
        "volume": "volume"
    }
    
    speed_name_mapping = {
        "timestamp": "timestamp",
        "Gjennomsnittshastighet":"average_speed",
        "85-fraktil": "85fractile_speed"
    }
    
    # Use the select operation with alias to rename multiple columns
    df_volume = df_volume.select([df_volume[column].alias(new_name) for column, new_name in volume_name_mapping.items()])
    df_speed = df_speed.select([df_speed[column].alias(new_name) for column, new_name in speed_name_mapping.items()]) 
    
    # Join dataframes (on time)
    df_joined = df_volume.join(df_speed, ["timestamp"])
    df_joined.orderBy(col("timestamp").asc()).show(5) # Sanity check
    
    # Write to mongodb
    df_joined.write.format("mongodb").mode("append").option("database", db_name).option("collection", collection_name).save()

# Loop through all the traffic volume and speed data files, one pair of CSVs per trp
document_arr = [["29403V625517", "VATERLANDTUNNELEN"], ["64557V625518", "MUNKEDAMSVEIEN"], ["73840V2041694", "STRE_TANGENT"], ["71241V2460301", "KONG_HAKON_5.S_GT_NORDGAENDE"], ["29852V2460300", "KONG_HAKON_5.S_GT_SYDGAENDE"]]
for trp in document_arr:
    transformAndLoadVolumeSpeedToMongo(trp[0], trp[1])

+-------------------+------------+--------+------+--------------+----------------+
|          timestamp|       trpid|coverage|volume|avereage_speed|85fractile_speed|
+-------------------+------------+--------+------+--------------+----------------+
|2019-12-31 23:00:00|29403V625517|   100.0|   575|          57.8|            65.2|
|2020-01-01 00:00:00|29403V625517|   100.0|   618|          57.7|            66.4|
|2020-01-01 01:00:00|29403V625517|   100.0|   650|          56.7|            65.6|
|2020-01-01 02:00:00|29403V625517|   100.0|   527|          59.6|            68.4|
|2020-01-01 03:00:00|29403V625517|   100.0|   555|          60.8|            69.2|
+-------------------+------------+--------+------+--------------+----------------+
only showing top 5 rows

+-------------------+------------+--------+------+--------------+----------------+
|          timestamp|       trpid|coverage|volume|avereage_speed|85fractile_speed|
+-------------------+------------+--------+------+------------

# Transform and insert metadata

In [45]:
# DO NOT RERUN, IT WILL APPEND DATA

def transformAndLoadMetadataToMongo(file_volume, file_speed, df_speed_limit):
    # Setup
    db_name = "CS4010"
    collection_name = "traffic_registration_points"
    
    # Importing datasets and creating dataframes
    df_volume = spark.read.option("header", True).csv(f'./Datasets/{file_volume}.csv', inferSchema=True )
    df_speed = spark.read.option("header", True).csv(f'./Datasets/{file_speed}.csv', inferSchema=True )

    # Join dataframes
    df_joined = df_volume.join(df_speed, df_volume.trpid == df_speed.Trafikkregistreringspunkt, "inner")
    
    # Select and rename columns
    metadata_name_mapping = {
        "trpid": "trpid",
        "Vegreferanse": "road_reference",
        "Navn": "name",
        "lon": "lon",
        "lat": "lat"
    }
    df_joined = df_joined.select([df_joined[column].alias(new_name) for column, new_name in metadata_name_mapping.items()]).limit(1)

    # Finding speed limit for TRP and selecting columns
    df_joined = df_joined.withColumn("meter", split("road_reference", "m")[1])
    df_metadata = df_joined.join(df_speed_limit)
    df_metadata = df_metadata.filter(df_metadata.meter.between(df_metadata.Meter_Fra, df_metadata.Meter_Til))

    # Selecting columns and making the name column uppercase
    df_metadata = df_metadata.select("trpid", "road_reference", "name", "lon", "lat", col("Fartsgrense").alias("speed_limit"))
    df_metadata = df_metadata.withColumn("name", upper("name"))
    df_metadata.show(1) # Sanity Check
    
    # Write to MongoDB
    df_metadata.write.format("mongodb").mode("append").option("database", db_name).option("collection", collection_name).save()

# Read the speed_limits file and create a dataframe
df_speed_limit = spark.read.option("header", True).csv("./Datasets/speed-limits.csv", inferSchema=True )

# Loop through all the traffic volume and speed data files, one pair of CSVs per trp. Also send in the speed limit dataframe
document_arr = [["29403V625517", "VATERLANDTUNNELEN"], ["64557V625518", "MUNKEDAMSVEIEN"], ["73840V2041694", "STRE_TANGENT"], ["71241V2460301", "KONG_HAKON_5.S_GT_NORDGAENDE"], ["29852V2460300", "KONG_HAKON_5.S_GT_SYDGAENDE"]]
for trp in document_arr:
    transformAndLoadMetadataToMongo(trp[0], trp[1], df_speed_limit)

+------------+----------------+-----------------+-------+---------+-----------+
|       trpid|  road_reference|             name|    lon|      lat|speed_limit|
+------------+----------------+-----------------+-------+---------+-----------+
|29403V625517|RV162 S1D1 m2975|VATERLANDTUNNELEN|10.7493|59.916004|         50|
+------------+----------------+-----------------+-------+---------+-----------+

+------------+----------------+--------------+---------+---------+-----------+
|       trpid|  road_reference|          name|      lon|      lat|speed_limit|
+------------+----------------+--------------+---------+---------+-----------+
|64557V625518|RV162 S1D1 m4949|MUNKEDAMSVEIEN|10.725753|59.911418|         50|
+------------+----------------+--------------+---------+---------+-----------+

+-------------+----------------+-------------+---------+---------+-----------+
|        trpid|  road_reference|         name|      lon|      lat|speed_limit|
+-------------+----------------+-------------

In [None]:
# Stop the spark context
spark.sparkContext.stop()