This notebook is for creating the dataframes/tables that will hold the labels for each of the attributes' codes. They will serve as a reference guide for the main notebook for data ingestion. 

The dataframes for each dimension are created using spark dataframes syntax, and then are uploaded to the DBFS system - that way they can be accessed from the different notebooks in the same cluster.

Accessing data guide from Azure SQL database

In [0]:
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

database_host = "sql-g4-server.database.windows.net"
database_port = "1433" # update if you use a non-default port
database_name = "sql-g4-db"
table = "[dbo].[Road-Safety-Open-Dataset-Data-Guide]"
user = "server-admin"
password = "adm1np@wd_s3rv7r"

#url = f"jdbc:sqlserver://{database_host}:{database_port}/{database_name}"
url = f"jdbc:sqlserver://sql-g4-server.database.windows.net:1433;database=sql-g4-db;user=server-admin@sql-g4-server;password=adm1np@wd_s3rv7r;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

In [0]:
code_lookups = (spark.read
  .format("jdbc")
  .option("driver", driver)
  .option("url", url)
  .option("dbtable", table)
  .option("user", user)
  .option("password", password)
  .load()
)

display(code_lookups)

schema,dimName,code,label
Accident,accident_index,,
Accident,accident_year,,
Accident,accident_reference,,
Accident,location_easting_osgr,,
Accident,location_northing_osgr,,
Accident,longitude,,
Accident,Latitude,,
Accident,police_force,1,Metropolitan Police
Accident,police_force,3,Cumbria
Accident,police_force,4,Lancashire


Create specific tables for each dimension. Then upload all the dimension dataframes to the DBFS so they can be accessed from a different notebook.

In [0]:
from pyspark.sql.functions import col


police_force = code_lookups.filter(col('dimName') == 'police_force').drop('schema')
new_row = Row("dimName", "code", "label")("police_force", '-1', 'Unknown')
new_df = spark.createDataFrame([new_row])
police_force = police_force.union(new_df)
police_force = police_force.select(col('dimName').alias('police'), col('code').alias('police_code'), col('label').alias('police_label'))

filepath = "/FileStore/tables/dimPoliceForce"
police_force.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

accident_severity = code_lookups.filter(col('dimName') == 'accident_severity').drop('schema')
accident_severity = accident_severity.select(col('dimName').alias('severity'), col('code').alias('severity_code'), col('label').alias('severity_label'))

filepath = "/FileStore/tables/dimAccidentSeverity"
accident_severity.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

day_week = code_lookups.filter(col('dimName') == 'day_of_week').drop('schema')
day_week = day_week.select(col('dimName').alias('day_week'), col('code').alias('day_week_code'), col('label').alias('day_week_label'))

filepath = "/FileStore/tables/dimDayOfWeek"
day_week.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

ons_district = code_lookups.filter(col('dimName') == 'local_authority_ons_district').drop('schema')
new_row = Row("dimName", "code", "label")("local_authority_ons_district", '-1', 'Unknown')
new_df = spark.createDataFrame([new_row])
ons_district = ons_district.union(new_df)
ons_district = ons_district.select(col('dimName').alias('ons_district_week'), col('code').alias('ons_district_code'), col('label').alias('ons_district_label'))

filepath = "/FileStore/tables/dimLocalAuthorityOnsDistrict"
ons_district.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

highway = code_lookups.filter(col('dimName') == 'local_authority_highway').drop('schema')
new_row = Row("dimName", "code", "label")("local_authority_highway", '-1', 'Unknown')
new_df = spark.createDataFrame([new_row])
highway = highway.union(new_df)
highway = highway.select(col('dimName').alias('highway'), col('code').alias('highway_code'), col('label').alias('highway_label'))

filepath = "/FileStore/tables/dimLocalAuthorityHighway"
highway.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

first_road = code_lookups.filter(col('dimName') == 'first_road_class').drop('schema')
new_row = Row("dimName", "code", "label")("first_road_class", '-1', 'Unknown')
new_df = spark.createDataFrame([new_row])
first_road = first_road.union(new_df)
first_road = first_road.select(col('dimName').alias('first_road'), col('code').alias('first_road_code'), col('label').alias('first_road_label'))

filepath = "/FileStore/tables/dimFirstRoadClass"
first_road.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

road_type = code_lookups.filter(col('dimName') == 'road_type').drop('schema')
new_row = Row("dimName", "code", "label")("road_type", '-1', 'Unknown')
new_df = spark.createDataFrame([new_row])
road_type = road_type.union(new_df)
road_type = road_type.select(col('dimName').alias('road_type'), col('code').alias('road_type_code'), col('label').alias('road_type_label'))

filepath = "/FileStore/tables/dimRoadType"
road_type.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

junction_detail = code_lookups.filter(col('dimName') == 'junction_detail').drop('schema')
junction_detail = junction_detail.select(col('dimName').alias('junction_detail'), col('code').alias('junction_detail_code'), col('label').alias('junction_detail_label'))

filepath = "/FileStore/tables/dimJunctionDetail"
junction_detail.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

junction_control = code_lookups.filter(col('dimName') == 'junction_control').drop('schema')
junction_control = junction_control.select(col('dimName').alias('junction_control'), col('code').alias('junction_control_code'), col('label').alias('junction_control_label'))

filepath = "/FileStore/tables/dimJunctionControl"
junction_control.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

second_road = code_lookups.filter(col('dimName') == 'second_road_class').drop('schema')
new_row = Row("dimName", "code", "label")("second_road_class", '-1', 'Unknown')
new_df = spark.createDataFrame([new_row])
second_road = second_road.union(new_df)
second_road = second_road.select(col('dimName').alias('second_road'), col('code').alias('second_road_code'), col('label').alias('second_road_label'))

filepath = "/FileStore/tables/dimSecondRoadClass"
second_road.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

pedestrian_human = code_lookups.filter(col('dimName') == 'pedestrian_crossing_human_control').drop('schema')
pedestrian_human = pedestrian_human.select(col('dimName').alias('pedestrian_human'), col('code').alias('pedestrian_human_code'), col('label').alias('pedestrian_human_label'))

filepath = "/FileStore/tables/dimPedestrianCrossingHumanControl"
pedestrian_human.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

pedestrian_facilities = code_lookups.filter(col('dimName') == 'pedestrian_crossing_physical_facilities').drop('schema')
pedestrian_facilities = pedestrian_facilities.select(col('dimName').alias('pedestrian_facilities'), col('code').alias('pedestrian_facilities_code'), col('label').alias('pedestrian_facilities_label'))

filepath = "/FileStore/tables/dimPedestrianCrossingPhysicalFacilities"
pedestrian_facilities.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

light_conditions = code_lookups.filter(col('dimName') == 'light_conditions').drop('schema')
light_conditions = light_conditions.select(col('dimName').alias('light_conditions'), col('code').alias('light_conditions_code'), col('label').alias('light_conditions_label'))

filepath = "/FileStore/tables/dimLightConditions"
light_conditions.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

weather_conditions = code_lookups.filter(col('dimName') == 'weather_conditions').drop('schema')
weather_conditions = weather_conditions.select(col('dimName').alias('weather_conditions'), col('code').alias('weather_conditions_code'), col('label').alias('weather_conditions_label'))

filepath = "/FileStore/tables/dimWeatherConditions"
weather_conditions.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

road_surface_conditions = code_lookups.filter(col('dimName') == 'road_surface_conditions').drop('schema')
road_surface_conditions = road_surface_conditions.select(col('dimName').alias('road_surface_conditions'), col('code').alias('road_surface_conditions_code'), col('label').alias('road_surface_conditions_label'))

filepath = "/FileStore/tables/dimRoadSurfaceConditions"
road_surface_conditions.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

special_conditions = code_lookups.filter(col('dimName') == 'special_conditions_at_site').drop('schema')
special_conditions = special_conditions.select(col('dimName').alias('special_conditions'), col('code').alias('special_conditions_code'), col('label').alias('special_conditions_label'))

filepath = "/FileStore/tables/dimSpecialConditionsAtSite"
special_conditions.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

carriageway_hazards = code_lookups.filter(col('dimName') == 'carriageway_hazards').drop('schema')
carriageway_hazards = carriageway_hazards.select(col('dimName').alias('carriageway_hazards'), col('code').alias('carriageway_hazards_code'), col('label').alias('carriageway_hazards_label'))

filepath = "/FileStore/tables/dimCarriagewayHazards"
carriageway_hazards.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

urban_or_rural_area = code_lookups.filter(col('dimName') == 'urban_or_rural_area').drop('schema')
urban_or_rural_area = urban_or_rural_area.select(col('dimName').alias('urban_or_rural_area'), col('code').alias('urban_or_rural_area_code'), col('label').alias('urban_or_rural_area_label'))

filepath = "/FileStore/tables/dimUrbanOrRuralArea"
urban_or_rural_area.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

police_attendance = code_lookups.filter(col('dimName') == 'did_police_officer_attend_scene_of_accident').drop('schema')
police_attendance = police_attendance.select(col('dimName').alias('police_attendance'), col('code').alias('police_attendance_code'), col('label').alias('police_attendance_label'))

filepath = "/FileStore/tables/dimPoliceAttendance"
police_attendance.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)

trunk_road = code_lookups.filter(col('dimName') == 'trunk_road_flag').drop('schema')
trunk_road = trunk_road.select(col('dimName').alias('trunk_road'), col('code').alias('trunk_road_code'), col('label').alias('trunk_road_label'))

filepath = "/FileStore/tables/dimTrunkRoadFlag"
trunk_road.write.option("header","true").option("sep",",").mode("overwrite").csv(filepath)



In [0]:
%fs
ls /FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/dimAccidentSeverity/,dimAccidentSeverity/,0,1674738753000
dbfs:/FileStore/tables/dimCarriagewayHazards/,dimCarriagewayHazards/,0,1674738764000
dbfs:/FileStore/tables/dimDayOfWeek/,dimDayOfWeek/,0,1674738753000
dbfs:/FileStore/tables/dimFirstRoadClass/,dimFirstRoadClass/,0,1674738755000
dbfs:/FileStore/tables/dimJunctionControl/,dimJunctionControl/,0,1674738758000
dbfs:/FileStore/tables/dimJunctionDetail/,dimJunctionDetail/,0,1674738757000
dbfs:/FileStore/tables/dimLightConditions/,dimLightConditions/,0,1674738761000
dbfs:/FileStore/tables/dimLocalAuthorityHighway/,dimLocalAuthorityHighway/,0,1674738755000
dbfs:/FileStore/tables/dimLocalAuthorityOnsDistrict/,dimLocalAuthorityOnsDistrict/,0,1674738754000
dbfs:/FileStore/tables/dimPedestrianCrossingHumanControl/,dimPedestrianCrossingHumanControl/,0,1674738759000
