In [0]:
!python --version

Python 3.9.5


In [0]:
from pyspark.sql.functions import *

In [0]:
connection_string = "mongodb+srv://group10:1040120623@cluster0.yolwbeu.mongodb.net/KSI.KSI"
collection = spark.read.format("mongo").option("database", "KSI").option("collection", "KSI").option("partitioner", "MongoSinglePartitioner").option("spark.mongodb.input.uri", connection_string).load()

collection.printSchema()

root
 |-- ACCLASS: string (nullable = true)
 |-- ACCLOC: string (nullable = true)
 |-- ACCNUM: double (nullable = true)
 |-- AG_DRIV: string (nullable = true)
 |-- ALCOHOL: string (nullable = true)
 |-- AUTOMOBILE: string (nullable = true)
 |-- CYCACT: string (nullable = true)
 |-- CYCCOND: string (nullable = true)
 |-- CYCLIST: string (nullable = true)
 |-- CYCLISTYPE: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- DISABILITY: string (nullable = true)
 |-- DISTRICT: string (nullable = true)
 |-- DIVISION: string (nullable = true)
 |-- DRIVACT: string (nullable = true)
 |-- DRIVCOND: string (nullable = true)
 |-- EMERG_VEH: string (nullable = true)
 |-- FATAL_NO: double (nullable = true)
 |-- HOOD_140: string (nullable = true)
 |-- HOOD_158: string (nullable = true)
 |-- IMPACTYPE: string (nullable = true)
 |-- INDEX_: integer (nullable = true)
 |-- INITDIR: string (nullable = true)
 |-- INJURY: string (nullable = true)
 |-- INVAGE: string (nullable = true)
 |-- INVT

In [0]:
collection.createOrReplaceTempView("temp")

df = spark.sql("SELECT LEFT(DATE, 10) Date, INJURY Injury, VISIBILITY Visibility, LIGHT Light, RDSFCOND RoadCondition, IMPACTYPE ImpactType FROM temp")

df = df.withColumn("Date", to_date(col("Date"), 'yyyy/MM/dd'))

In [0]:
# explore number of distinct values

count_ = df.select(countDistinct("Injury").alias("Injury"),
                   countDistinct("Visibility").alias("Visibility"),
                   countDistinct("Light").alias("Light"),
                   countDistinct("RoadCondition").alias("RoadCondition"),
                   countDistinct("ImpactType").alias("ImpactType")
                  )
count_.show()

+------+----------+-----+-------------+----------+
|Injury|Visibility|Light|RoadCondition|ImpactType|
+------+----------+-----+-------------+----------+
|     5|         8|    9|            9|        10|
+------+----------+-----+-------------+----------+



In [0]:
df_groupByDate = df.groupBy('Date').agg(
    count(when(df.Injury=="None", df.Injury)).alias("NoInjury"), # Injury
    count(when(df.Injury=="Fatal", df.Injury)).alias("FatalInjury"),
    count(when(df.Injury=="Minor", df.Injury)).alias("MinorInjury"),
    count(when(df.Injury=="Major", df.Injury)).alias("MajorInjury"),
    count(when(df.Injury=="Minimal", df.Injury)).alias("MinimalInjury"),
    count(when((df.Light=="Daylight") | (df.Light=="DayLight, artificial"), df.Light)).alias("DayLight"), # Light
    count(when((df.Light=="Dark") | (df.Light=="Dark, artificial"), df.Light)).alias("Dark"),
    count(when((df.Light=="Dawn") | (df.Light=="Dawn, artificial"), df.Light)).alias("Dawn"),
    count(when((df.Light=="Dusk") | (df.Light=="Dusk, artificial"), df.Light)).alias("Dusk"),
    count(when(df.Light=="Other", df.Light)).alias("OtherLightCond"),
    count(when(df.RoadCondition=="Dry", df.RoadCondition)).alias("DryRoad"), # RoadCondition
    count(when(df.RoadCondition=="Ice", df.RoadCondition)).alias("IcyRoad"),
    count(when((df.RoadCondition=="Wet") | (df.RoadCondition=="Spilled liquid"), df.RoadCondition)).alias("WetRoad"),
    count(when((df.RoadCondition=="Slush") | (df.RoadCondition=="Loose Snow") | (df.RoadCondition=="Packed Snow"), df.RoadCondition)).alias("SnowyRoad"),
    count(when(df.RoadCondition=="Loose Sand or Gravel", df.RoadCondition)).alias("GravelRoad"),
    count(when(df.RoadCondition=="Other", df.RoadCondition)).alias("OtherRoadCond"),
    count(when(df.Visibility=="Clear", df.Visibility)).alias("ClearV"), # Visibility
    count(when((df.Visibility=="Rain") | (df.Visibility=="Freezing Rain"), df.Visibility)).alias("RainV"),
    count(when((df.Visibility=="Snow") | (df.Visibility=="Drifting Snow"), df.Visibility)).alias("SnowV"),
    count(when(df.Visibility=="Fog, Mist, Smoke, Dust", df.Visibility)).alias("FogV"),
    count(when(df.Visibility=="Strong wind", df.Visibility)).alias("WindV"),
    count(when(df.Visibility=="Other", df.Visibility)).alias("OtherV"),
    count(when(df.ImpactType=="Pedestrian Collisions", df.ImpactType)).alias("PedestrianColision"), # ImpactType
    count(when(df.ImpactType=="Turning Movement", df.ImpactType)).alias("Turning"),
    count(when(df.ImpactType=="Approaching", df.ImpactType)).alias("Approaching"),
    count(when(df.ImpactType=="Cyclist Collisions", df.ImpactType)).alias("CyclistColision"),
    count(when(df.ImpactType=="Angle", df.ImpactType)).alias("Angle"),
    count(when(df.ImpactType.like("%SMV%"), df.ImpactType)).alias("SMV"),
    count(when(df.ImpactType.like("%Rear%"), df.ImpactType)).alias("Rear"),
    count(when(df.ImpactType.like("%Side%"), df.ImpactType)).alias("Sideswipe"),
    count(when(df.ImpactType=="Other", df.ImpactType)).alias("OtherImpact")
)

display(df_groupByDate)

Date,NoInjury,FatalInjury,MinorInjury,MajorInjury,MinimalInjury,DayLight,Dark,Dawn,Dusk,OtherLightCond,DryRoad,IcyRoad,WetRoad,SnowyRoad,GravelRoad,OtherRoadCond,ClearV,RainV,SnowV,FogV,WindV,OtherV,PedestrianColision,Turning,Approaching,CyclistColision,Angle,SMV,Rear,Sideswipe,OtherImpact
2006-05-17,3,1,0,2,0,6,0,0,0,0,4,0,2,0,0,0,4,2,0,0,0,0,4,0,0,2,0,0,0,0,0
2006-05-21,3,0,0,1,0,4,0,0,0,0,0,0,4,0,0,0,0,4,0,0,0,0,0,4,0,0,0,0,0,0,0
2007-04-20,1,0,1,2,0,2,4,0,0,0,6,0,0,0,0,0,6,0,0,0,0,0,4,0,0,0,0,2,0,0,0
2007-11-15,1,0,0,1,0,3,0,0,0,0,3,0,0,0,0,0,3,0,0,0,0,0,3,0,0,0,0,0,0,0,0
2007-11-23,6,2,0,3,0,8,3,0,2,0,2,4,4,0,0,3,9,0,4,0,0,0,6,0,0,0,4,3,0,0,0
2009-07-25,1,0,1,6,0,4,7,0,0,0,9,0,2,0,0,0,9,2,0,0,0,0,0,0,9,0,0,2,0,0,0
2013-05-21,2,0,0,1,0,4,0,0,0,0,4,0,0,0,0,0,4,0,0,0,0,0,0,4,0,0,0,0,0,0,0
2013-09-09,1,0,2,2,0,6,0,0,0,0,6,0,0,0,0,0,6,0,0,0,0,0,0,0,0,3,0,0,0,3,0
2015-05-19,0,1,0,0,1,0,0,0,0,0,2,0,0,0,0,0,2,0,0,0,0,0,0,0,0,2,0,0,0,0,0
2014-09-26,2,0,0,2,0,4,0,0,0,0,4,0,0,0,0,0,4,0,0,0,0,0,2,0,0,2,0,0,0,0,0


In [0]:
hostName = "group10server"
port = 1433
database = "torontoWeatherKSI"
user = "azureuser"
password = "Thien03@n1922"

In [0]:
df_groupByDate.write.jdbc(
     url=f"jdbc:sqlserver://{hostName}.database.windows.net:{port};database={database};user={user};password={password};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;driver=com.microsoft.sqlserver.jdbc.SQLServerDriver",
     table='TorontoKSIDailyStats',
     mode='overwrite')

df.write.jdbc(
     url=f"jdbc:sqlserver://{hostName}.database.windows.net:{port};database={database};user={user};password={password};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;driver=com.microsoft.sqlserver.jdbc.SQLServerDriver",
     table='TorontoKSIEvents',
     mode='overwrite')