In [0]:
from datetime import datetime
from math import sqrt

import psycopg2
from psycopg2 import connect
from psycopg2.extras import RealDictCursor, RealDictRow
from psycopg2.extensions import connection
import pyspark
from pyspark.sql import Row, DataFrame
from pyspark.sql.functions import col, udf, lit, min, broadcast
from pyspark.sql.types import ArrayType, StringType, IntegerType, FloatType, BooleanType
from pyspark.dbutils import *
import h3



# Accessing Azure Storage via Azure Key Vault

In [0]:
# Pull the name of the secret scope
dbutils.widgets.text("kv_secret_scope_name", "", "Name of the secret scope for Key Vault secrets")
secret_scope_name = dbutils.widgets.get("kv_secret_scope_name")

## Azure Blob Storage

In [0]:
# Pull the name of the Blob Storage account
dbutils.widgets.text("collision_storage_account", "", "Name of the Azure Blob Storage Account containing collision data")
blob_storage_account_name = dbutils.widgets.get("collision_storage_account")

# Pull the name of the Blob container
dbutils.widgets.text("collision_st_blob_container", "","Name of the Blob container in the Blob Storage Account containing collision data")
blob_blob_container_name = dbutils.widgets.get("collision_st_blob_container")

# Pull the name of the key of the account key secret
dbutils.widgets.text("kv_secret_collision_sta_key", "", "Name of the key to access the Blob Storage account key in Key Vault")
kv_blob_storage_account_key = dbutils.widgets.get("kv_secret_collision_sta_key")
blob_account_key = dbutils.secrets.get(scope=secret_scope_name, key=kv_blob_storage_account_key)

In [0]:
spark.conf.set(
    f"fs.azure.account.key.{blob_storage_account_name}.blob.core.windows.net", blob_account_key)

In [0]:
# Pull the name of the file containing collision data
dbutils.widgets.text("collision_file_path", "", "Name of the file containing collision data")
collision_file_path = dbutils.widgets.get("collision_file_path")

collision_file_url = f"wasbs://{blob_blob_container_name}@{blob_storage_account_name}.blob.core.windows.net/{collision_file_path}"

## Azure Data Lake Storage

### Raw Data Blob Container

In [0]:
# Pull the name of the ADLS account
dbutils.widgets.text("dls_storage_account", "", "Name of the Azure Data Lake Storage Account")
dls_storage_account_name = dbutils.widgets.get("dls_storage_account")

# Pull the name of the extract Blob container
dbutils.widgets.text("dls_raw_blob_container", "", "Name of the Blob container containing raw data in the Data Lake Storage Account")
dls_blob_container_name = dbutils.widgets.get("dls_raw_blob_container")

# Pull the name of the key of the ADLS key secret
dbutils.widgets.text("kv_secret_dls_raw_key", "", "Name of the key to access the extract Blob container via the Data Lake Storage account key secret in Key Vault")
kv_dls_extract_storage_account_key = dbutils.widgets.get("kv_secret_dls_raw_key")
dls_extract_account_key = dbutils.secrets.get(
    scope=secret_scope_name,
    key="data-lake-account-key"
)

In [0]:
spark.conf.set(
    f"fs.azure.account.key.{dls_storage_account_name}.blob.core.windows.net", dls_extract_account_key)

In [0]:
dbutils.widgets.text("pcd_local_authorities_file_path", "", "Name of file path to access the local authorities file path in the raw data Blob container")
la_file_path = dbutils.widgets.get("pcd_local_authorities_file_path")

la_file_url = f"wasbs://{dls_blob_container_name}@{dls_storage_account_name}.blob.core.windows.net/{la_file_path}"

path,name,size,modificationTime
wasbs://raw-data@sldlssparkuks01.blob.core.windows.net/NSPL_2021_AUG_2024/Documents/2011 Census Output Area Classification Names and Codes UK.csv,2011 Census Output Area Classification Names and Codes UK.csv,5527,1729698741000
wasbs://raw-data@sldlssparkuks01.blob.core.windows.net/NSPL_2021_AUG_2024/Documents/2011 Census Output Area Classification Names and Codes UK.xlsx,2011 Census Output Area Classification Names and Codes UK.xlsx,14677,1729698740000
wasbs://raw-data@sldlssparkuks01.blob.core.windows.net/NSPL_2021_AUG_2024/Documents/BUA22_names and codes EW as at 12_22.csv,BUA22_names and codes EW as at 12_22.csv,224265,1729698741000
wasbs://raw-data@sldlssparkuks01.blob.core.windows.net/NSPL_2021_AUG_2024/Documents/BUA22_names and codes EW as at 12_22.xlsx,BUA22_names and codes EW as at 12_22.xlsx,198529,1729698740000
wasbs://raw-data@sldlssparkuks01.blob.core.windows.net/NSPL_2021_AUG_2024/Documents/BUA_names and codes EW as at 12_22.csv,BUA_names and codes EW as at 12_22.csv,224265,1729698740000
wasbs://raw-data@sldlssparkuks01.blob.core.windows.net/NSPL_2021_AUG_2024/Documents/BUA_names and codes EW as at 12_22.xlsx,BUA_names and codes EW as at 12_22.xlsx,198529,1729698740000
wasbs://raw-data@sldlssparkuks01.blob.core.windows.net/NSPL_2021_AUG_2024/Documents/CALNCV names and codes EN as at 04_20.csv,CALNCV names and codes EN as at 04_20.csv,695,1729698741000
wasbs://raw-data@sldlssparkuks01.blob.core.windows.net/NSPL_2021_AUG_2024/Documents/CALNCV names and codes EN as at 04_20.xlsx,CALNCV names and codes EN as at 04_20.xlsx,14204,1729698740000
wasbs://raw-data@sldlssparkuks01.blob.core.windows.net/NSPL_2021_AUG_2024/Documents/Country names and codes UK as at 08_12.csv,Country names and codes UK as at 08_12.csv,241,1729698740000
wasbs://raw-data@sldlssparkuks01.blob.core.windows.net/NSPL_2021_AUG_2024/Documents/Country names and codes UK as at 08_12.xlsx,Country names and codes UK as at 08_12.xlsx,11781,1729698741000


### Raw Modified Data Blob Container

In [0]:
# Pull the name of the raw_modified files Blob container
dbutils.widgets.text("dls_raw_modified_blob_container", "", "Name of the Blob container containing raw modified data in the Data Lake Storage Account")
dls_raw_modified_blob_container_name = dbutils.widgets.get("dls_raw_modified_blob_container")

# Pull the name of the key of the ADLS key secret
dbutils.widgets.text("kv_secret_dls_raw_modified_key", "", "Name of the key to access the raw modified data Blob container via the Data Lake Storage account key secret in Key Vault")
kv_dls_raw_modified_storage_account_key = dbutils.widgets.get("kv_secret_dls_raw_modified_key")
dls_raw_modified_blob_container_sas_token = dbutils.secrets.get(
    scope=secret_scope_name,
    key=kv_dls_raw_modified_storage_account_key
)

In [0]:
spark.conf.set(
    f"fs.azure.account.auth.type.{dls_raw_modified_blob_container_name}.blob.core.windows.net", 
    "SAS"
)

spark.conf.set(
    f"fs.azure.sas.token.provider.type.{dls_raw_modified_blob_container_name}.blob.core.windows.net", 
    "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider"
)

spark.conf.set(
    f"fs.azure.sas.{dls_raw_modified_blob_container_name}.{dls_storage_account_name}.blob.core.windows.net",
    dls_raw_modified_blob_container_sas_token
)

In [0]:
dbutils.widgets.text("merged_pcd_csv_file_directory", "", "File path of the merged postcode CSV file in the raw modified data Blob container.")
merged_pcd_file_dir = dbutils.widgets.get("merged_pcd_csv_file_directory")

dbutils.widgets.text("merged_pcd_csv_file_name", "", "File name of he merged postcode CSV file in the raw modified data Blob container.")
merged_pcd_file_name = dbutils.widgets.get("merged_pcd_csv_file_name")

merged_pcd_file_url = f"wasbs://{dls_raw_modified_blob_container_name}@{dls_storage_account_name}.blob.core.windows.net/{merged_pcd_file_dir}/{merged_pcd_file_name}"

path,name,size,modificationTime
wasbs://merged-raw-data@sldlssparkuks01.blob.core.windows.net/howard_postcode/howard_merged_postcode.csv,howard_merged_postcode.csv,13370941,1731081390000


# Dataset Loading

We firstly load the data of all collisions in the last 5 years.

In [0]:
# We assign a job group to this section of the notebook.

sc.setJobGroup("howard-data-loading-cleaning", "Loading data and cleaning it")

In [0]:
collisions_df = spark.read.csv(collision_file_url, header=True, inferSchema=True)

collisions_df.head(2)

[Row(accident_index='2019010128300', accident_year=2019, accident_reference='010128300', location_easting_osgr='528218', location_northing_osgr='180407', longitude='-0.153842', latitude='51.508057', police_force=1, accident_severity=3, number_of_vehicles=2, number_of_casualties=3, date=datetime.date(2019, 2, 18), day_of_week=2, time=datetime.datetime(2024, 11, 5, 17, 50), local_authority_district=1, local_authority_ons_district='E09000033', local_authority_highway='E09000033', first_road_class=3, first_road_number=4202, road_type=1, speed_limit=30, junction_detail=1, junction_control=2, second_road_class=3, second_road_number=4202, pedestrian_crossing_human_control=0, pedestrian_crossing_physical_facilities=5, light_conditions=1, weather_conditions=1, road_surface_conditions=1, special_conditions_at_site=0, carriageway_hazards=0, urban_or_rural_area=1, did_police_officer_attend_scene_of_accident=3, trunk_road_flag=2, lsoa_of_accident_location='E01004762', enhanced_severity_collision=-1

# Data Cleaning

We now review the structure and column types of the collisions DataFrame.

In [0]:
collisions_df.printSchema()

root
 |-- accident_index: string (nullable = true)
 |-- accident_year: integer (nullable = true)
 |-- accident_reference: string (nullable = true)
 |-- location_easting_osgr: string (nullable = true)
 |-- location_northing_osgr: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- police_force: integer (nullable = true)
 |-- accident_severity: integer (nullable = true)
 |-- number_of_vehicles: integer (nullable = true)
 |-- number_of_casualties: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- local_authority_district: integer (nullable = true)
 |-- local_authority_ons_district: string (nullable = true)
 |-- local_authority_highway: string (nullable = true)
 |-- first_road_class: integer (nullable = true)
 |-- first_road_number: integer (nullable = true)
 |-- road_type: integer (nullable = true)
 |-- speed_limit: integer (nullable 

**Initial thoughts:**
- Some `StringType` columns would be more suited to `IntegerType`/`FloatType` (eg. `location_easting_osgr`, `location_northing_osgr` would be better suited as `IntegerType`; `longitude`, `latitude` would be better as `FloatType`).

- _NB: Values in the columns `accident_index`, `accident_reference` can have alphanumeric characters in them,_ and so should be left as `StringType`.

In [0]:
# We change the types of the columns

collisions_df = collisions_df.withColumn('location_easting_osgr', col('location_easting_osgr').cast(IntegerType())) \
    .withColumn('location_northing_osgr', col('location_northing_osgr').cast(IntegerType())) \
    .withColumn('longitude', col('longitude').cast(FloatType())) \
    .withColumn('latitude', col('latitude').cast(FloatType()))

We look at rows with missing/invalid data.

In [0]:
collisions_df.count()

520084

In [0]:
# We identify our columns with null values; location_easting_osgr, location_northing_osgr, longitude, latitude

null_columns = []

for column_name in collisions_df.columns:
    if collisions_df.select(column_name).filter(col(column_name).isNull()).count() > 0:
        null_columns.append(column_name)

null_columns

['location_easting_osgr', 'location_northing_osgr', 'longitude', 'latitude']

In [0]:

collisions_df = collisions_df.dropna(subset=null_columns)

In [0]:
# Values for longitude should be between -180 and 180.
# UK longitude values are within the range -8 to 2.

collisions_df.select("longitude").describe().show()

+-------+-------------------+
|summary|          longitude|
+-------+-------------------+
|  count|             519991|
|   mean|-1.2098557379612207|
| stddev| 1.3653226903535824|
|    min|          -7.525273|
|    max|           1.759829|
+-------+-------------------+



In [0]:
# Values for latitude should be between -90 and 90.
# UK latitude values are within the range 49 to 61.

collisions_df.select("latitude").describe().show()

+-------+------------------+
|summary|          latitude|
+-------+------------------+
|  count|            519991|
|   mean|  52.3651311472486|
| stddev|1.3251424302604045|
|    min|          49.91433|
|    max|         60.541145|
+-------+------------------+



In [0]:
# Clear job group

sc.setJobGroup("", "")

# Data Transformation

## Create new columns

We create new columns to find the year, month, and hour of the occurring collision.

In [0]:
# Create new job group

sc.setJobGroup("howard-collisions-data-transformation", "Adding columns to collisions DataFrame")

In [0]:
collisions_df.select(["date", "time"]).head(5)

[Row(date=datetime.date(2019, 2, 18), time=datetime.datetime(2024, 11, 5, 17, 50)),
 Row(date=datetime.date(2019, 1, 15), time=datetime.datetime(2024, 11, 5, 21, 45)),
 Row(date=datetime.date(2019, 1, 1), time=datetime.datetime(2024, 11, 5, 1, 50)),
 Row(date=datetime.date(2019, 1, 1), time=datetime.datetime(2024, 11, 5, 1, 20)),
 Row(date=datetime.date(2019, 1, 1), time=datetime.datetime(2024, 11, 5, 0, 40))]

In [0]:
# Create user-defined functions to implement into each new column.

get_year = udf(lambda x: x.year)

get_month = udf(lambda x: x.month)

get_hour = udf(lambda x: x.hour)

In [0]:
# We drop the existing accident_year column

collisions_df = collisions_df.drop("accident_year")

In [0]:
# We create new columns, casting them all as integers.

new_cols_collisions_df = collisions_df.withColumn("accident_year", get_year(col('date')).cast(IntegerType())) \
            .withColumn("accident_month", get_month(col("date")).cast(IntegerType())) \
            .withColumn("accident_hour", get_hour(col("time")).cast(IntegerType())) \
            .persist()

new_cols_collisions_df.head(5)

[Row(accident_index='2019010128300', accident_reference='010128300', location_easting_osgr=528218, location_northing_osgr=180407, longitude=-0.1538420021533966, latitude=51.508056640625, police_force=1, accident_severity=3, number_of_vehicles=2, number_of_casualties=3, date=datetime.date(2019, 2, 18), day_of_week=2, time=datetime.datetime(2024, 11, 5, 17, 50), local_authority_district=1, local_authority_ons_district='E09000033', local_authority_highway='E09000033', first_road_class=3, first_road_number=4202, road_type=1, speed_limit=30, junction_detail=1, junction_control=2, second_road_class=3, second_road_number=4202, pedestrian_crossing_human_control=0, pedestrian_crossing_physical_facilities=5, light_conditions=1, weather_conditions=1, road_surface_conditions=1, special_conditions_at_site=0, carriageway_hazards=0, urban_or_rural_area=1, did_police_officer_attend_scene_of_accident=3, trunk_road_flag=2, lsoa_of_accident_location='E01004762', enhanced_severity_collision=-1, accident_y

In [0]:
# Clear job group

sc.setJobGroup("", "")

## Feature engineering

We then load the data of all postcodes.
<p>
For the sake of saving compute resources, we implement a customisable limit for rows to consider for now

In [0]:
# Create new job group

sc.setJobGroup("howard-postcodes-data-transformation", "Transforming postcodes DataFrame")

In [0]:
postcode_lookup_df = spark.read.csv(merged_pcd_file_url, header=True, inferSchema=True)

postcode_lookup_df.head(2)

[Row(pcd='AB1 0AA', pcd2='AB1  0AA', pcds='AB1 0AA', dointr=198001, doterm=199606, usertype=0, oseast1m=385386, osnrth1m=801193, osgrdind=1, oa21='S00137176', cty='S99999999', ced='S99999999', laua='S12000033', ward='S13002843', nhser='S99999999', ctry='S92000003', rgn='S99999999', pcon='S14000061', ttwa='S22000047', itl='S30000026', npark='S99999999', lsoa21=None, msoa21=None, wz11='S34002990', sicbl='S03000012', bua22='S99999999', ru11ind=3, oac11='1C3', lat=57.101474, long=-2.242851, lep1='S99999999', lep2=None, pfa='S23000009', imd=6715, icb='S99999999'),
 Row(pcd='AB1 0AB', pcd2='AB1  0AB', pcds='AB1 0AB', dointr=198001, doterm=199606, usertype=0, oseast1m=385177, osnrth1m=801314, osgrdind=1, oa21='S00137176', cty='S99999999', ced='S99999999', laua='S12000033', ward='S13002843', nhser='S99999999', ctry='S92000003', rgn='S99999999', pcon='S14000061', ttwa='S22000047', itl='S30000026', npark='S99999999', lsoa21=None, msoa21=None, wz11='S34002990', sicbl='S03000012', bua22='S99999999

In [0]:
postcode_lookup_df = postcode_lookup_df.limit(1000)

We attempt to match the latitude and longitude of each collision to a respective local authority.

In [0]:
postcode_lookup_df.printSchema()

root
 |-- pcd: string (nullable = true)
 |-- pcd2: string (nullable = true)
 |-- pcds: string (nullable = true)
 |-- dointr: integer (nullable = true)
 |-- doterm: integer (nullable = true)
 |-- usertype: integer (nullable = true)
 |-- oseast1m: integer (nullable = true)
 |-- osnrth1m: integer (nullable = true)
 |-- osgrdind: integer (nullable = true)
 |-- oa21: string (nullable = true)
 |-- cty: string (nullable = true)
 |-- ced: string (nullable = true)
 |-- laua: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- nhser: string (nullable = true)
 |-- ctry: string (nullable = true)
 |-- rgn: string (nullable = true)
 |-- pcon: string (nullable = true)
 |-- ttwa: string (nullable = true)
 |-- itl: string (nullable = true)
 |-- npark: string (nullable = true)
 |-- lsoa21: string (nullable = true)
 |-- msoa21: string (nullable = true)
 |-- wz11: string (nullable = true)
 |-- sicbl: string (nullable = true)
 |-- bua22: string (nullable = true)
 |-- ru11ind: string (nullable

In [0]:
# We convert the lat and long columns to float type.

lat_long_postcode_lookup_df = postcode_lookup_df.withColumn("latitude", col("lat").cast(FloatType())) \
    .withColumn("longitude", col("long").cast(FloatType())) \
    .withColumnRenamed('pcd', 'postcode') \
    .drop("lat", "long")

lat_long_postcode_lookup_df.head(2)

[Row(postcode='AB1 0AA', pcd2='AB1  0AA', pcds='AB1 0AA', dointr=198001, doterm=199606, usertype=0, oseast1m=385386, osnrth1m=801193, osgrdind=1, oa21='S00137176', cty='S99999999', ced='S99999999', laua='S12000033', ward='S13002843', nhser='S99999999', ctry='S92000003', rgn='S99999999', pcon='S14000061', ttwa='S22000047', itl='S30000026', npark='S99999999', lsoa21=None, msoa21=None, wz11='S34002990', sicbl='S03000012', bua22='S99999999', ru11ind='3', oac11='1C3', lep1='S99999999', lep2=None, pfa='S23000009', imd=6715, icb='S99999999', latitude=57.10147476196289, longitude=-2.2428510189056396),
 Row(postcode='AB1 0AB', pcd2='AB1  0AB', pcds='AB1 0AB', dointr=198001, doterm=199606, usertype=0, oseast1m=385177, osnrth1m=801314, osgrdind=1, oa21='S00137176', cty='S99999999', ced='S99999999', laua='S12000033', ward='S13002843', nhser='S99999999', ctry='S92000003', rgn='S99999999', pcon='S14000061', ttwa='S22000047', itl='S30000026', npark='S99999999', lsoa21=None, msoa21=None, wz11='S340029

## Adding H3 index column to postcode DataFrame

In [0]:
# We additionally add a new column for the h3 index of each postcode.
# This will greatly aid in partitioning the postcode dataframe.

dbutils.widgets.text("h3_resolution", "", "Resolution value for each h3 hexagon to be allocated per postcode.")
resolution = int(dbutils.widgets.get("h3_resolution"))

geo_to_h3_udf = udf(lambda x, y, z: h3.geo_to_h3(x, y, z))

lat_long_postcode_lookup_df = lat_long_postcode_lookup_df.withColumn("h3_index", 
                                        geo_to_h3_udf(col("latitude"), 
                                                    col("longitude"), 
                                                    lit(resolution))
                                        ).persist()

lat_long_postcode_lookup_df.head(2)

[Row(postcode='AB1 0AA', pcd2='AB1  0AA', pcds='AB1 0AA', dointr=198001, doterm=199606, usertype=0, oseast1m=385386, osnrth1m=801193, osgrdind=1, oa21='S00137176', cty='S99999999', ced='S99999999', laua='S12000033', ward='S13002843', nhser='S99999999', ctry='S92000003', rgn='S99999999', pcon='S14000061', ttwa='S22000047', itl='S30000026', npark='S99999999', lsoa21=None, msoa21=None, wz11='S34002990', sicbl='S03000012', bua22='S99999999', ru11ind='3', oac11='1C3', lep1='S99999999', lep2=None, pfa='S23000009', imd=6715, icb='S99999999', latitude=57.10147476196289, longitude=-2.2428510189056396, h3_index='89197616353ffff'),
 Row(postcode='AB1 0AB', pcd2='AB1  0AB', pcds='AB1 0AB', dointr=198001, doterm=199606, usertype=0, oseast1m=385177, osnrth1m=801314, osgrdind=1, oa21='S00137176', cty='S99999999', ced='S99999999', laua='S12000033', ward='S13002843', nhser='S99999999', ctry='S92000003', rgn='S99999999', pcon='S14000061', ttwa='S22000047', itl='S30000026', npark='S99999999', lsoa21=None

In [0]:
# We select only the columns in postcode_lookup_df we're interested in; latitude, longitude, and the LAUA (Local Authority Unitary) column.

lat_long_postcode_lookup_df = lat_long_postcode_lookup_df.select("postcode", "latitude", "longitude", "laua", "h3_index")

In [0]:
# We also consider the LAUA dataset.

laua_df = spark.read.csv(la_file_url, header=True, inferSchema=True)

laua_df.head(2)

[Row(LAD23CD='E06000001', LAD23NM='Hartlepool', LAU121CD='E06000001', LAU121NM='Hartlepool', ITL321CD='TLC11', ITL321NM='Hartlepool and Stockton-on-Tees', ITL221CD='TLC1', ITL221NM='Tees Valley and Durham', ITL121CD='TLC', ITL121NM='North East (England)'),
 Row(LAD23CD='E06000002', LAD23NM='Middlesbrough', LAU121CD='E06000002', LAU121NM='Middlesbrough', ITL321CD='TLC12', ITL321NM='South Teesside', ITL221CD='TLC1', ITL221NM='Tees Valley and Durham', ITL121CD='TLC', ITL121NM='North East (England)')]

In [0]:
# We check all columns are in the correct format.

laua_df.printSchema()

root
 |-- LAD23CD: string (nullable = true)
 |-- LAD23NM: string (nullable = true)
 |-- LAU121CD: string (nullable = true)
 |-- LAU121NM: string (nullable = true)
 |-- ITL321CD: string (nullable = true)
 |-- ITL321NM: string (nullable = true)
 |-- ITL221CD: string (nullable = true)
 |-- ITL221NM: string (nullable = true)
 |-- ITL121CD: string (nullable = true)
 |-- ITL121NM: string (nullable = true)



In [0]:
# We rename the relevant columns.

laua_df = laua_df.select("LAD23CD", "LAD23NM", "ITL321NM", "ITL221NM", "ITL121NM") \
            .withColumnsRenamed({"LAD23CD": "laua",
                                 "LAD23NM": "laua_name",
                                 "ITL321NM": "itl_lvl_3",
                                 "ITL221NM": "itl_lvl_2",
                                 "ITL121NM": "itl_lvl_1"}
                                )

laua_df.head()

Row(laua='E06000001', laua_name='Hartlepool', itl_lvl_3='Hartlepool and Stockton-on-Tees', itl_lvl_2='Tees Valley and Durham', itl_lvl_1='North East (England)')

In [0]:
# To minimise compute resources in the long-run, we join laua_df with lat_long_postcode_lookup_df.

lat_long_postcode_df = laua_df.join(lat_long_postcode_lookup_df, on="laua") \
                            .drop("laua") \
                            .withColumnRenamed("latitude", "postcode_latitude") \
                            .withColumnRenamed("longitude", "postcode_longitude") \
                            .persist()

lat_long_postcode_df.head()

Row(laua_name='Aberdeen City', itl_lvl_3='Aberdeen City and Aberdeenshire', itl_lvl_2='North Eastern Scotland', itl_lvl_1='Scotland', postcode='AB1 0AA', postcode_latitude=57.10147476196289, postcode_longitude=-2.2428510189056396, h3_index='89197616353ffff')

In [0]:
display(lat_long_postcode_df)

laua_name,itl_lvl_3,itl_lvl_2,itl_lvl_1,postcode,postcode_latitude,postcode_longitude,h3_index
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 0AA,57.101475,-2.242851,89197616353ffff
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 0AB,57.102554,-2.246308,8919761635bffff
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 0AD,57.100555,-2.248342,8919761634bffff
Aberdeenshire,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 0AE,57.084442,-2.255708,89197614553ffff
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 0AF,57.096657,-2.258102,891976144bbffff
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 0AG,57.097084,-2.267513,89197614403ffff
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 0AJ,57.09901,-2.252854,89197614487ffff
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 0AL,57.101765,-2.254688,89197614497ffff
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 0AN,57.097553,-2.245483,8919761634fffff
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 0AP,57.098244,-2.245768,8919761634fffff


In [0]:
# Clear job group

sc.setJobGroup("", "")

### The Cross Join

This will involve:

- Broadcasting the collisions DataFrame.
- Partitioning the postcodes DataFrame by the first _n_ bits of their h3 index.
- Performing a cross-join between partitions of the postcode DataFrame, and the broadcasted postcodes DataFrame.
- Finding the Euclidean distance between each Cartesian product of latitude-longitude values of postcodes and collisions.
- Grouping the resulting cross-joined DataFrame by accident index, with the aggregation function being `min(distance)`.
- Finally performing an inner join on the grouped cross-joined DataFrame, with its ungrouped equivalent, to find the closest postcode of a given accident.
- Left-joining this resultant DataFrame to our initial collisions DataFrame, and cleaning any null values for postcodes, and removing any unnecessary columns (eg. for latitude, longitude, etc.).


In [0]:
# Create new job group

sc.setJobGroup("howard-feature-engineering", "Cross-joining postcode and collision DataFrames")

In [0]:
# We firstly broadcast the collisions DataFrame.

broadcasted_collisions_df = broadcast(new_cols_collisions_df).persist()

In [0]:
# We partition the postcode DataFrame by their h3 index (ie. so that postcodes with the same/similar h3 index remain in the same partition).
# We firstly get ALL unique h3 indices, and assign a key for each unique configuration of the first n bits, defined by `h3_number_of_bits`.
# We will aim for AT LEAST 4 partitions for each run-through of the cross-join; consider increasing `h3_number_of_bits` for more partitions.


dbutils.widgets.text("h3_bits_partition", "", "The number of starting bits per h3 index to partition indices by")
h3_number_of_bits = int(dbutils.widgets.get("h3_bits_partition"))

unique_postcode_h3_indices = lat_long_postcode_df.select("h3_index").distinct().collect()

h3_key_dict = {}

for row in unique_postcode_h3_indices:

    if row["h3_index"][:h3_number_of_bits] not in h3_key_dict.keys():

        n = len(h3_key_dict) + 1

        h3_key_dict[row["h3_index"][:h3_number_of_bits]] = n

h3_key_dict


{'89197616': 1,
 '89197614': 2,
 '89197617': 3,
 '891976bb': 4,
 '8919768c': 5,
 '8919296a': 6,
 '891976b9': 7,
 '8919768d': 8}

In [0]:
def custom_h3_partitioning(h3_index: list[Row], h3_key_config: dict[int: str], h3_input_number_of_bits: int) -> int:
    """Given a list of unique h3 indices, we partition them by the first n bits of the index, where n is specified."""

    if h3_index[:h3_input_number_of_bits] in h3_key_config.keys():

        return h3_key_config[h3_index[:h3_input_number_of_bits]]

    else:

        return len(h3_key_config) + 1

custom_h3_partitioning_udf = udf(lambda h3_index: custom_h3_partitioning(h3_index, h3_key_dict, h3_number_of_bits), IntegerType())

In [0]:
# Function for Euclidean distance

def lat_long_euclid_distance(lat_1: float, long_1: float, lat_2: float, long_2: float) -> float:
    """
    Given two latitude-longitude points, this function returns the Euclidean distance between the two points if the absolute difference between latitudes and longitudes is less than a specified error, otherwise it returns None.
    """
    
    lat_diff = abs(lat_1 - lat_2)
    long_diff = abs(long_1 - long_2)

    return sqrt(lat_diff**2 + long_diff**2)

lat_long_euclid_distance_udf = udf(lat_long_euclid_distance, BooleanType())

In [0]:
# We state the function for finding distances between individual postcodes and collisions.
# This is to be applied into a distance column for the cross-join between the postcode dataframe partition and the broadcasted accident dataframe.

def get_collision_postcode_distance(collision_lat: float, collision_long: float, 
                                    postcode_lat: float, postcode_long: float, error: float) -> float | None:
    """Returns the Euclidean distance between a collision and postcode. To be applied in a distance column for each partition in the postcode dataframe"""

    if collision_lat is None or collision_long is None or postcode_lat is None or postcode_long is None:

        return None

    lat_diff = abs(collision_lat - postcode_lat)
    long_diff = abs(collision_long - postcode_long)

    if lat_diff >= error or long_diff >= error:

        return None

    return lat_long_euclid_distance(collision_lat, collision_long, postcode_lat, postcode_long)

get_collision_postcode_distance_udf = udf(get_collision_postcode_distance, FloatType())

In [0]:
# We get a list of all unique accident indices for filtration purposes for our cross-joined DataFrame.

unique_accident_indices_list_rows = new_cols_collisions_df.select("accident_index").distinct().collect()

unique_accident_indices = [row["accident_index"] for row in unique_accident_indices_list_rows]

In [0]:
# We assemble all of the pieces together now!
# Firstly, we partition the postcode dataframe.

lat_long_postcode_df_w_partitions = lat_long_postcode_df.withColumn("partition", custom_h3_partitioning_udf(col("h3_index")))

lat_long_postcode_df_partitioned = lat_long_postcode_df_w_partitions.repartition(len(h3_key_dict), "partition")

# We remove previous DataFrames out of cache memory to optimise the following heavy computations.

lat_long_postcode_df.unpersist()

lat_long_postcode_lookup_df.unpersist()

new_cols_collisions_df.unpersist()

lat_long_postcode_df_partitioned.show()

+-------------+--------------------+--------------------+---------+--------+-----------------+------------------+---------------+---------+
|    laua_name|           itl_lvl_3|           itl_lvl_2|itl_lvl_1|postcode|postcode_latitude|postcode_longitude|       h3_index|partition|
+-------------+--------------------+--------------------+---------+--------+-----------------+------------------+---------------+---------+
|Aberdeen City|Aberdeen City and...|North Eastern Sco...| Scotland| AB1 1AA|         57.14764|         -2.093853|891976bb66fffff|        2|
|Aberdeen City|Aberdeen City and...|North Eastern Sco...| Scotland| AB1 1AB|        57.148003|         -2.092532|891976bb66fffff|        2|
|Aberdeen City|Aberdeen City and...|North Eastern Sco...| Scotland| AB1 1AD|         57.14827|         -2.091872|891976bb667ffff|        2|
|Aberdeen City|Aberdeen City and...|North Eastern Sco...| Scotland| AB1 1AE|         57.14836|         -2.091872|891976bb667ffff|        2|
|Aberdeen City|Aberd

In [0]:
# We perform a cross-join between collision and postcode DataFrames, dropping all rows with an empty distance value 
# (ie. the latitude-longitude of the accident is not within the given error bounds of the postcode's latitude-longitude values).

dbutils.widgets.text("distance_error", "", "The accuracy to whether a postcode and collision can be deemed sufficiently close to be considered.")
distance_error = float(dbutils.widgets.get("distance_error"))

xjoined_postcode_collision_df = lat_long_postcode_df_partitioned.crossJoin(broadcasted_collisions_df \
                                                                           .select("accident_index", "latitude", "longitude")) \
                                                                            .withColumn("distance",     get_collision_postcode_distance_udf(
                                                                                col("latitude"),
                                                                                col("longitude"),
                                                                                col("postcode_latitude"),
                                                                                col("postcode_longitude"),
                                                                                lit(distance_error))
                                                                            ).dropna(subset=["distance"]) \
                                                                            .persist()

display(xjoined_postcode_collision_df)

laua_name,itl_lvl_3,itl_lvl_2,itl_lvl_1,postcode,postcode_latitude,postcode_longitude,h3_index,partition,accident_index,latitude,longitude,distance
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 1AA,57.14764,-2.093853,891976bb66fffff,2,2021991025811,57.157604,-2.09572,0.010137405
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 1AA,57.14764,-2.093853,891976bb66fffff,2,2021991041896,57.150185,-2.094892,0.0027483753
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 1AA,57.14764,-2.093853,891976bb66fffff,2,2021991119034,57.145424,-2.095574,0.0028060067
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 1AA,57.14764,-2.093853,891976bb66fffff,2,2023991261357,57.15373,-2.099704,0.008444017
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 1AA,57.14764,-2.093853,891976bb66fffff,2,2023991279264,57.149925,-2.094157,0.0023051351
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 1AA,57.14764,-2.093853,891976bb66fffff,2,2023991322911,57.13829,-2.083883,0.013668178
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 1AA,57.14764,-2.093853,891976bb66fffff,2,2023991328151,57.147076,-2.096761,0.0029622896
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 1AA,57.14764,-2.093853,891976bb66fffff,2,2023991343106,57.150925,-2.103316,0.010016854
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 1AA,57.14764,-2.093853,891976bb66fffff,2,2023991350160,57.14902,-2.097708,0.0040948614
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 1AA,57.14764,-2.093853,891976bb66fffff,2,2023991357867,57.149406,-2.091065,0.0033004237


In [0]:
# We then filter out all rows for a given accident index, and extract the postcode with least distance from our given accident.

xjoined_postcode_collision_min_dist_df = xjoined_postcode_collision_df.groupBy(["accident_index", 
                                                                                "latitude", 
                                                                                "longitude"]) \
                                                                      .agg(min("distance")
                                                                         .alias("min_distance"))

display(xjoined_postcode_collision_min_dist_df)

accident_index,latitude,longitude,min_distance
2020990922958,57.142876,-2.113876,0.00045910338
2023991344395,57.15394,-2.105357,0.00049192825
2021991049858,57.14066,-2.120115,0.00054626144
2019920898426,57.119884,-2.117571,0.0006829433
2023991351006,57.144386,-2.108899,9.062949e-05
2019921900037,57.131927,-2.136755,0.0004538047
2021991140611,57.165245,-2.128974,0.011374239
2023991299039,57.12853,-2.103584,0.0027639112
2019920882211,57.098347,-2.253797,0.0005075707
2022991195777,57.097866,-2.222791,0.0052152434


In [0]:
# We now have the minimum distances for each accident to a given postcode in our column `min_distance`. We now want to find the postcode, which we accomplish via an inner join onto the initially cross-joined DataFrame. 
# We use Spark SQL to execute this complex join.

xjoined_postcode_collision_df.createOrReplaceTempView("xjoined_df_table")

xjoined_postcode_collision_min_dist_df.createOrReplaceTempView("xjoined_df_min_dist_table")

In [0]:
closest_postcode_df = spark.sql(
    """SELECT xjoined_df_table.*, xjoined_df_min_dist_table.min_distance
          FROM xjoined_df_table
          INNER JOIN xjoined_df_min_dist_table
          ON (xjoined_df_table.accident_index = xjoined_df_min_dist_table.accident_index) AND
          (xjoined_df_table.distance = xjoined_df_min_dist_table.min_distance)"""
    ).persist()

display(closest_postcode_df)

laua_name,itl_lvl_3,itl_lvl_2,itl_lvl_1,postcode,postcode_latitude,postcode_longitude,h3_index,partition,accident_index,latitude,longitude,distance,min_distance
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB101PY,57.142418,-2.113841,891976bb25bffff,2,2020990922958,57.142876,-2.113876,0.00045910338,0.00045910338
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 1ER,57.153606,-2.10572,8919768cda3ffff,3,2023991344395,57.15394,-2.105357,0.00049192825,0.00049192825
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB115EG,57.150772,-2.084738,891976bb63bffff,2,2019921900186,57.1585,-2.087424,0.00818203,0.00818203
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 6SR,57.140755,-2.120653,8919768c987ffff,3,2021991049858,57.14066,-2.120115,0.00054626144,0.00054626144
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 5UL,57.120304,-2.11811,891976b9447ffff,2,2019920898426,57.119884,-2.117571,0.0006829433,0.0006829433
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB101SX,57.144447,-2.108966,891976bb2cbffff,2,2023991351006,57.144386,-2.108899,9.062949e-05,9.062949e-05
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB106WQ,57.131523,-2.136961,8919768c977ffff,3,2019921900037,57.131927,-2.136755,0.0004538047,0.0004538047
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB101DY,57.15542,-2.134702,8919768cd43ffff,3,2021991140611,57.165245,-2.128974,0.011374239,0.011374239
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB119PJ,57.138004,-2.083692,891976bb2b3ffff,2,2023991322911,57.13829,-2.083883,0.00034398446,0.00034398446
Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB106WL,57.131523,-2.136961,8919768c977ffff,3,2019921900037,57.131927,-2.136755,0.0004538047,0.0004538047


In [0]:
columns_to_drop = ["latitude", "longitude", "min_distance", "location_easting_osgr", "location_northing_osgr", "distance",
                   "postcode_latitude", "postcode_longitude"]

final_collisions_df = broadcasted_collisions_df.join(
    closest_postcode_df,
    "accident_index",
    "left") \
    .dropna(subset=["postcode"]) \
    .drop(*columns_to_drop) \
    .persist()

display(final_collisions_df)

accident_index,accident_reference,police_force,accident_severity,number_of_vehicles,number_of_casualties,date,day_of_week,time,local_authority_district,local_authority_ons_district,local_authority_highway,first_road_class,first_road_number,road_type,speed_limit,junction_detail,junction_control,second_road_class,second_road_number,pedestrian_crossing_human_control,pedestrian_crossing_physical_facilities,light_conditions,weather_conditions,road_surface_conditions,special_conditions_at_site,carriageway_hazards,urban_or_rural_area,did_police_officer_attend_scene_of_accident,trunk_road_flag,lsoa_of_accident_location,enhanced_severity_collision,accident_year,accident_month,accident_hour,laua_name,itl_lvl_3,itl_lvl_2,itl_lvl_1,postcode,h3_index,partition
2020990922958,990922958,99,2,1,1,2020-01-21,3,2024-11-05T09:40:00Z,910,S12000033,S12000033,6,0,6,20,9,4,3,978,0,4,1,1,2,0,0,1,1,-1,-1,7,2020,1,9,Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB101PY,891976bb25bffff,2
2023991344395,991344395,99,2,1,1,2023-03-04,7,2024-11-05T19:21:00Z,-1,S12000033,S12000033,3,944,6,30,0,-1,0,-1,0,0,4,1,1,0,0,1,1,-1,-1,6,2023,3,19,Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 1ER,8919768cda3ffff,3
2019921900186,921900186,92,2,2,1,2019-01-05,7,2024-11-05T21:05:00Z,910,S12000033,S12000033,3,96,6,30,3,4,6,0,0,5,4,1,2,0,0,1,1,-1,-1,-1,2019,1,21,Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB115EG,891976bb63bffff,2
2021991049858,991049858,99,2,2,2,2021-05-27,5,2024-11-05T16:00:00Z,-1,S12000033,S12000033,6,0,6,30,3,4,6,0,0,0,1,1,1,0,0,1,1,-1,-1,7,2021,5,16,Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 6SR,8919768c987ffff,3
2019920898426,920898426,92,3,2,1,2019-11-13,4,2024-11-05T16:45:00Z,910,S12000033,S12000033,3,92,3,40,9,4,6,0,0,0,4,1,1,0,2,1,2,-1,-1,3,2019,11,16,Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB1 5UL,891976b9447ffff,2
2023991351006,991351006,99,2,1,1,2023-09-10,1,2024-11-05T01:15:00Z,-1,S12000033,S12000033,6,0,6,30,9,3,6,0,0,0,4,1,1,0,0,1,1,-1,-1,7,2023,9,1,Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB101SX,891976bb2cbffff,2
2019921900037,921900037,92,3,2,1,2019-01-16,4,2024-11-05T17:35:00Z,910,S12000033,S12000033,3,93,6,30,3,4,6,0,0,0,4,1,2,0,0,1,2,-1,-1,-1,2019,1,17,Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB106WQ,8919768c977ffff,3
2021991140611,991140611,99,1,2,1,2021-12-30,5,2024-11-05T18:25:00Z,-1,S12000033,S12000033,6,0,6,30,6,4,6,0,0,0,4,1,2,0,0,1,1,-1,-1,1,2021,12,18,Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB101DY,8919768cd43ffff,3
2023991322911,991322911,99,3,2,1,2023-06-25,1,2024-11-05T19:20:00Z,-1,S12000033,S12000033,6,0,6,30,6,4,6,0,0,0,1,1,2,0,0,1,1,-1,-1,3,2023,6,19,Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB119PJ,891976bb2b3ffff,2
2019921900037,921900037,92,3,2,1,2019-01-16,4,2024-11-05T17:35:00Z,910,S12000033,S12000033,3,93,6,30,3,4,6,0,0,0,4,1,2,0,0,1,2,-1,-1,-1,2019,1,17,Aberdeen City,Aberdeen City and Aberdeenshire,North Eastern Scotland,Scotland,AB106WL,8919768c977ffff,3


Possible improvements:

- How to deal with postcodes that have the SAME latitude-longitude values?
- Approach for finding H3 resolution + partitioning method + distance error is quite arbitrary; needs to be standardised for different use cases.
- Implement further optimisation measures (eg. to reduce shuffling between nodes, especially in the cross-join).

In [0]:
# Close job group

sc.setJobGroup("", "")

# Loading Data into Azure Data Lake Storage

### Accessing Cleaned Data Parquet Blob Container

In [0]:
# Create new job group
sc.setJobGroup("howard-loading-parquet-adls", "Loading cleaned data into Azure Data Lake Storage")

In [0]:
# Pull the name of the ADLS Blob container
dbutils.widgets.text("dls_load_parq_blob_container", "", "Name of the Blob container with clean data")
dls_load_blob_container_name = dbutils.widgets.get("dls_load_parq_blob_container")

# Pull the name of the key of the Blob container SAS token secret
dbutils.widgets.text("kv_secret_dls_load_parq_key", "", "Name of the key of the Blob container SAS token secret.")
kv_dls_load_blob_sas_token_key = dbutils.widgets.get("kv_secret_dls_load_parq_key")
dls_load_blob_container_sas_token = dbutils.secrets.get(
    scope=secret_scope_name, 
    key=kv_dls_load_blob_sas_token_key
)

In [0]:
spark.conf.set(
    f"fs.azure.account.auth.type.{dls_storage_account_name}.blob.core.windows.net", 
    "SAS"
)

spark.conf.set(
    f"fs.azure.sas.token.provider.type.{dls_storage_account_name}.blob.core.windows.net", 
    "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider"
)

spark.conf.set(
    f"fs.azure.sas.{dls_load_blob_container_name}.{dls_storage_account_name}.blob.core.windows.net",
    dls_load_blob_container_sas_token
)

In [0]:
dbutils.widgets.text("cleaned_data_parq_file_path", "", "File directory to store the cleaned Parquet files.")
parq_file_path = dbutils.widgets.get("cleaned_data_parq_file_path")

dbutils.widgets.text("cleaned_data_parq_filename_suffix", "", "Suffix filename for the Parquet file to take.")
parq_filename_suffix = dbutils.widgets.get("cleaned_data_parq_filename_suffix")

today_date = datetime.today().strftime('%Y-%m-%d')

adls_path = f"wasbs://{dls_blob_container_name}@{dls_storage_account_name}.blob.core.windows.net/{parq_file_path}/{today_date}_{parq_filename_suffix}"

In [0]:
# Save to ADLS Blob container

final_collisions_df.write.partitionBy("partition").mode("overwrite").format("parquet").save(adls_path)

In [0]:
# Clear job group
sc.setJobGroup("", "")

# Data Aggregation and Basic Analysis

In [0]:
# Create new job group
sc.setJobGroup("howard-data-agg-az-sql-db", "Aggregating data and loading into Azure SQL Database")

In [0]:
dbutils.widgets.text("kv_secret_psql_db_pw_key", "", "Name of the key referring to the PostgreSQL database password")
psql_db_password_key = dbutils.widgets.get("kv_secret_psql_db_pw_key")
db_password = dbutils.secrets.get(
    scope=secret_scope_name,
    key=psql_db_password_key
)

In [0]:
dbutils.widgets.text("psql_db_name", "", "Name of PostgreSQL database")
db_name = dbutils.widgets.get("psql_db_name")

dbutils.widgets.text("psql_db_user", "", "Name of PostgreSQL database user")
db_user = dbutils.widgets.get("psql_db_user")

dbutils.widgets.text("psql_db_host", "", "Name of PostgreSQL database host")
db_host = dbutils.widgets.get("psql_db_host")

dbutils.widgets.text("psql_db_server", "", "Name of PostgreSQL database host")
db_server = dbutils.widgets.get("psql_db_server")

dbutils.widgets.text("psql_db_port", "", "Name of PostgreSQL database host")
db_port = dbutils.widgets.get("psql_db_port")

connection_details = {"DB_NAME": db_name,
                      "DB_USER": db_user,
                      "DB_PASSWORD": db_password,
                      "DB_HOST": db_host,
                      "DB_SERVER": db_server,
                      "DB_PORT": db_port}

In [0]:
def get_db_connection(config) -> connection:
    """Returns a connection to the database."""

    return connect(
        database=config["DB_NAME"],
        user=config["DB_USER"],
        password=config["DB_PASSWORD"],
        host=config["DB_HOST"],
        port=config["DB_PORT"],
        cursor_factory=RealDictCursor
    )

We write each `pyspark.sql.DataFrame` object into `.csv` files, stored inside ADLS Blob containers.

In [0]:
# Pull the name of the Blob container
dbutils.widgets.text("dls_load_table_blob_container", "", "Name of the Blob container with clean data for SQL tables")
dls_load_table_blob_container_name = dbutils.widgets.get("dls_load_table_blob_container")


# Pull the name of the key of the Blob container SAS token secret
dbutils.widgets.text("kv_secret_dls_load_table_key", "", "Name of the key of the Blob container SAS token secret.")
kv_dls_load_blob_sas_token = dbutils.widgets.get("kv_secret_dls_load_table_key")
dls_load_table_blob_container_sas_token = dbutils.secrets.get(
    scope=secret_scope_name, 
    key=kv_dls_load_blob_sas_token
)

In [0]:
spark.conf.set(
    f"fs.azure.account.auth.type.{dls_storage_account_name}.blob.core.windows.net", 
    "SAS"
)

spark.conf.set(
    f"fs.azure.sas.token.provider.type.{dls_storage_account_name}.blob.core.windows.net", 
    "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider"
)

spark.conf.set(
    f"fs.azure.sas.{dls_load_table_blob_container_name}.{dls_storage_account_name}.blob.core.windows.net",
    dls_load_table_blob_container_sas_token
)

In [0]:
today_date = datetime.today().strftime('%Y-%m-%d')

dbutils.widgets.text("dls_load_table_file_path", "", "File path of tables to be stored in Blob container")
dls_load_sql_table_file_path = dbutils.widgets.get("dls_load_table_file_path")

dls_load_sql_table_blob_container_url = f"wasbs://{dls_load_table_blob_container_name}@{dls_storage_account_name}.blob.core.windows.net/{dls_load_sql_table_file_path}/{today_date}/"

We use Spark SQL to aggregate the data to find high-risk times and high-risk locations.

## Aggregations on Time of Collisions

In [0]:
new_cols_collisions_df.createOrReplaceTempView("all_collisions_table")

We will input into 4 tables for aggregations on time of collisions:
- Collisions occurring by the hour, `howard_collisions_by_hour`;
- Collisions occurring by the month `howard_collisions_by_month`;
- Collisions occurring by the year `howard_collisions_by_year`;
- Collisions grouped by _cube_ grouping sets for hour, month, and year `howard_collisions_cubed`.

In [0]:
# Set up table in case it hasn't been set up yet

conn = get_db_connection(connection_details)

with conn.cursor() as cur:

    cur.execute('''CREATE TABLE IF NOT EXISTS howard_collisions_by_hour (
                    accident_hour INT UNIQUE NOT NULL,
                    "Number of Accidents" INT NOT NULL,
                    "Overall Average Number of Accidents" FLOAT NOT NULL,
                    "Average Accident Severity By Hour" FLOAT NOT NULL,
                    "Average Number of Vehicles By Hour" FLOAT NOT NULL,
                    "Average Number of Casualties By Hour" FLOAT NOT NULL,
                    "Average Speed Limit By Hour" FLOAT NOT NULL,
                    PRIMARY KEY (accident_hour)
                );''')
    
conn.commit()

In [0]:
col_by_hour_df = spark.sql(
    """SELECT `accident_hour`, COUNT(`accident_hour`) AS `Number of Accidents`,
    ROUND((AVG(COUNT(`accident_hour`)) OVER()), 2) AS `Overall Average Number of Accidents`,
    ROUND((AVG(`accident_severity`)), 2) AS `Average Accident Severity By Hour`,
    ROUND((AVG(`number_of_vehicles`)), 2) AS `Average Number of Vehicles By Hour`,
    ROUND((AVG(`number_of_casualties`)), 2) AS `Average Number of Casualties By Hour`,
    ROUND((AVG(`speed_limit`)), 2) AS `Average Speed Limit By Hour`
    FROM all_collisions_table
    GROUP BY `accident_hour`
    ORDER BY `Number of Accidents` DESC"""
)

# Set the output path
temp_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_hour_temp"

# Coalesce to a single partition and write to a temporary directory
col_by_hour_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_output_path)

# Use dbutils to list files and rename the part file
files = dbutils.fs.ls(temp_output_path)
csv_file = [f.path for f in files if f.path.endswith(".csv")][0]  # Get the single CSV part file

# Define final destination path with desired filename
final_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_hour.csv"

# Move and rename the CSV file
dbutils.fs.mv(csv_file, final_output_path)

# Optionally, delete the temporary directory
dbutils.fs.rm(temp_output_path, recurse=True)

In [0]:
# Set up table in case it hasn't been set up yet

conn = get_db_connection(connection_details)

with conn.cursor() as cur:

    cur.execute('''CREATE TABLE IF NOT EXISTS howard_collisions_by_month (
                    accident_month INT UNIQUE NOT NULL,
                    "Number of Accidents" INT NOT NULL,
                    "Overall Average Number of Accidents" FLOAT NOT NULL,
                    "Average Accident Severity By Month" FLOAT NOT NULL,
                    "Average Number of Vehicles By Month" FLOAT NOT NULL,
                    "Average Number of Casualties By Month" FLOAT NOT NULL,
                    "Average Speed Limit By Month" FLOAT NOT NULL,
                    PRIMARY KEY (accident_month)
                );''')
    
conn.commit()

In [0]:
col_by_month_df = spark.sql(
    """SELECT `accident_month`, COUNT(`accident_month`) AS `Number of Accidents`,
    ROUND((AVG(COUNT(`accident_month`)) OVER()), 2) AS `Overall Average Number of Accidents`,
    ROUND((AVG(`accident_severity`)), 2) AS `Average Accident Severity By Month`,
    ROUND((AVG(`number_of_vehicles`)), 2) AS `Average Number of Vehicles By Month`,
    ROUND((AVG(`number_of_casualties`)), 2) AS `Average Number of Casualties By Month`,
    ROUND((AVG(`speed_limit`)), 2) AS `Average Speed Limit By Month`
    FROM all_collisions_table
    GROUP BY `accident_month`
    ORDER BY `Number of Accidents` DESC"""
)

# Set the output path
temp_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_month_temp"

# Coalesce to a single partition and write to a temporary directory
col_by_month_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_output_path)

# Use dbutils to list files and rename the part file
files = dbutils.fs.ls(temp_output_path)
csv_file = [f.path for f in files if f.path.endswith(".csv")][0]  # Get the single CSV part file

# Define final destination path with desired filename
final_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_month.csv"

# Move and rename the CSV file
dbutils.fs.mv(csv_file, final_output_path)

# Optionally, delete the temporary directory
dbutils.fs.rm(temp_output_path, recurse=True)

In [0]:
# Set up table in case it hasn't been set up yet

conn = get_db_connection(connection_details)

with conn.cursor() as cur:

    cur.execute('''CREATE TABLE IF NOT EXISTS howard_collisions_by_year (
                    accident_year INT UNIQUE NOT NULL,
                    "Number of Accidents" INT NOT NULL,
                    "Overall Average Number of Accidents" FLOAT NOT NULL,
                    "Average Accident Severity By Year" FLOAT NOT NULL,
                    "Average Number of Vehicles By Year" FLOAT NOT NULL,
                    "Average Number of Casualties By Year" FLOAT NOT NULL,
                    "Average Speed Limit By Year" FLOAT NOT NULL,
                    PRIMARY KEY (accident_year)
                );''')
    
conn.commit()

In [0]:
col_by_year_df = spark.sql(
    """SELECT `accident_year`, COUNT(`accident_year`) AS `Number of Accidents`,
    ROUND((AVG(COUNT(`accident_year`)) OVER()), 2) AS `Overall Average Number of Accidents`,
    ROUND((AVG(`accident_severity`)), 2) AS `Average Accident Severity By Year`,
    ROUND((AVG(`number_of_vehicles`)), 2) AS `Average Number of Vehicles By Year`,
    ROUND((AVG(`number_of_casualties`)), 2) AS `Average Number of Casualties By Year`,
    ROUND((AVG(`speed_limit`)), 2) AS `Average Speed Limit By Year`
    FROM all_collisions_table
    GROUP BY `accident_year`
    ORDER BY `Number of Accidents` DESC"""
)

# Set the output path
temp_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_year_temp"

# Coalesce to a single partition and write to a temporary directory
col_by_year_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_output_path)

# Use dbutils to list files and rename the part file
files = dbutils.fs.ls(temp_output_path)
csv_file = [f.path for f in files if f.path.endswith(".csv")][0]  # Get the single CSV part file

# Define final destination path with desired filename
final_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_year.csv"

# Move and rename the CSV file
dbutils.fs.mv(csv_file, final_output_path)

# Optionally, delete the temporary directory
dbutils.fs.rm(temp_output_path, recurse=True)

In [0]:
# Set up table in case it hasn't been set up yet

conn = get_db_connection(connection_details)

with conn.cursor() as cur:

    cur.execute('''CREATE TABLE IF NOT EXISTS howard_collisions_cubed (
                    accident_pset_id INT GENERATED ALWAYS AS IDENTITY,
                    accident_year INT,
                    accident_month INT,
                    accident_hour INT,
                    "Number of Accidents" INT NOT NULL,
                    "Average Number of Accidents By Non-Null Columns" FLOAT NOT NULL,
                    PRIMARY KEY (accident_pset_id),
                    UNIQUE (accident_year, accident_month, accident_hour)
                );''')
    
conn.commit()

In [0]:
col_cubed_df = spark.sql('''WITH accident_counts AS (
    SELECT 
        `accident_year`, 
        `accident_month`, 
        `accident_hour`, 
        COUNT(*) AS `Number of Accidents`
    FROM 
        all_collisions_table
    GROUP BY 
        CUBE(`accident_year`, `accident_month`, `accident_hour`)
)

SELECT 
    a.`accident_year`, 
    a.`accident_month`, 
    a.`accident_hour`, 
    a.`Number of Accidents`,
    CASE 
        WHEN a.`accident_year` IS NOT NULL AND a.`accident_month` IS NOT NULL AND a.`accident_hour` IS NOT NULL THEN 
            ROUND(AVG(`Number of Accidents`) OVER(), 2)
        
        WHEN a.`accident_year` IS NOT NULL AND a.`accident_month` IS NOT NULL THEN 
            ROUND(AVG(`Number of Accidents`) OVER(PARTITION BY `accident_year`, `accident_month`), 2)

        WHEN a.`accident_year` IS NOT NULL AND a.`accident_hour` IS NOT NULL THEN 
            ROUND(AVG(`Number of Accidents`) OVER(PARTITION BY `accident_year`, `accident_hour`), 2)
        
        WHEN a.`accident_month` IS NOT NULL AND a.`accident_hour` IS NOT NULL THEN 
            ROUND(AVG(`Number of Accidents`) OVER(PARTITION BY `accident_month`, `accident_hour`), 2)
        
        WHEN a.`accident_year` IS NOT NULL THEN 
            ROUND(AVG(`Number of Accidents`) OVER(PARTITION BY `accident_year`), 2)
        
        WHEN a.`accident_month` IS NOT NULL THEN 
            ROUND(AVG(`Number of Accidents`) OVER(PARTITION BY `accident_month`), 2)
        
        WHEN a.`accident_hour` IS NOT NULL THEN 
            ROUND(AVG(`Number of Accidents`) OVER(PARTITION BY `accident_hour`), 2)
        
        ELSE 
            a.`Number of Accidents`

    END AS `Average Number of Accidents By Non-Null Columns`

FROM accident_counts AS a
ORDER BY `Number of Accidents` DESC;''')

# Set the output path
temp_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_cubed_temp"

# Coalesce to a single partition and write to a temporary directory
col_cubed_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_output_path)

# Use dbutils to list files and rename the part file
files = dbutils.fs.ls(temp_output_path)
csv_file = [f.path for f in files if f.path.endswith(".csv")][0]  # Get the single CSV part file

# Define final destination path with desired filename
final_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_cubed.csv"

# Move and rename the CSV file
dbutils.fs.mv(csv_file, final_output_path)

# Optionally, delete the temporary directory
dbutils.fs.rm(temp_output_path, recurse=True)

## Aggregations on Location of Collisions

We will input into 5 tables for aggregations on location of collisions,

- Collisions occurring by postcode `howard_collisions_by_postcode`;
- Collisions occurring by local authority `howard_collisions_by_la`;
- Collisions occurring by each of the 3 ITLs (_International Territorial Levels_), with one table for each, in the form `howard_collisions_by_itl_`**X**.

In [0]:
final_collisions_df.createOrReplaceTempView("cleaned_collisions_table")

In [0]:
# Set up table in case it hasn't been set up yet

conn = get_db_connection(connection_details)

with conn.cursor() as cur:

    cur.execute('''CREATE TABLE IF NOT EXISTS howard_collisions_by_postcode (
                    accident_postcode VARCHAR(8) UNIQUE NOT NULL,
                    "Number of Accidents" INT NOT NULL,
                    "Overall Average Number of Accidents" FLOAT NOT NULL,
                    "Average Accident Severity By Postcode" FLOAT NOT NULL,
                    "Average Number of Vehicles By Postcode" FLOAT NOT NULL,
                    "Average Number of Casualties By Postcode" FLOAT NOT NULL,
                    "Average Speed Limit By Postcode" FLOAT NOT NULL,
                    PRIMARY KEY (accident_postcode)
                );''')
    
conn.commit()

In [0]:
col_by_postcode_df = spark.sql('''SELECT postcode AS `accident_postcode`, COUNT(postcode) AS `Number of Accidents`,
    ROUND((AVG(COUNT(postcode)) OVER()), 2) AS `Overall Average Number of Accidents`,
    ROUND((AVG(`accident_severity`)), 2) AS `Average Accident Severity By Postcode`,
    ROUND((AVG(`number_of_vehicles`)), 2) AS `Average Number of Vehicles By Postcode`,
    ROUND((AVG(`number_of_casualties`)), 2) AS `Average Number of Casualties By Postcode`,
    ROUND((AVG(`speed_limit`)), 2) AS `Average Speed Limit By Postcode`
FROM cleaned_collisions_table
GROUP BY postcode
ORDER BY `Number of Accidents` DESC'''
)

# Set the output path
temp_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_postcode_temp"

# Coalesce to a single partition and write to a temporary directory
col_by_postcode_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_output_path)

# Use dbutils to list files and rename the part file
files = dbutils.fs.ls(temp_output_path)
csv_file = [f.path for f in files if f.path.endswith(".csv")][0]  # Get the single CSV part file

# Define final destination path with desired filename
final_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_postcode.csv"

# Move and rename the CSV file
dbutils.fs.mv(csv_file, final_output_path)

# Optionally, delete the temporary directory
dbutils.fs.rm(temp_output_path, recurse=True)

In [0]:
# Set up table in case it hasn't been set up yet

conn = get_db_connection(connection_details)

with conn.cursor() as cur:

    cur.execute('''CREATE TABLE IF NOT EXISTS howard_collisions_by_laua (
                    accident_local_authority VARCHAR(50) UNIQUE NOT NULL,
                    "Number of Accidents" INT NOT NULL,
                    "Overall Average Number of Accidents" FLOAT NOT NULL,
                    "Average Accident Severity By Local Authority" FLOAT NOT NULL,
                    "Average Number of Vehicles By Local Authority" FLOAT NOT NULL,
                    "Average Number of Casualties By Local Authority" FLOAT NOT NULL,
                    "Average Speed Limit By Local Authority" FLOAT NOT NULL,
                    PRIMARY KEY (accident_local_authority)
                );''')
    
conn.commit()

In [0]:
col_by_laua_df = spark.sql('''SELECT laua_name AS `accident_local_authority`, COUNT(laua_name) AS `Number of Accidents`,
    ROUND((AVG(COUNT(laua_name)) OVER()), 2) AS `Overall Average Number of Accidents`,
    ROUND((AVG(`accident_severity`)), 2) AS `Average Accident Severity By Local Authority`,
    ROUND((AVG(`number_of_vehicles`)), 2) AS `Average Number of Vehicles By Local Authority`,
    ROUND((AVG(`number_of_casualties`)), 2) AS `Average Number of Casualties By Local Authority`,
    ROUND((AVG(`speed_limit`)), 2) AS `Average Speed Limit By Local Authority`
FROM cleaned_collisions_table
GROUP BY laua_name
ORDER BY `Number of Accidents` DESC'''
)

# Set the output path
temp_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_laua_temp"

# Coalesce to a single partition and write to a temporary directory
col_by_laua_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_output_path)

# Use dbutils to list files and rename the part file
files = dbutils.fs.ls(temp_output_path)
csv_file = [f.path for f in files if f.path.endswith(".csv")][0]  # Get the single CSV part file

# Define final destination path with desired filename
final_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_laua.csv"

# Move and rename the CSV file
dbutils.fs.mv(csv_file, final_output_path)

# Optionally, delete the temporary directory
dbutils.fs.rm(temp_output_path, recurse=True)

In [0]:
# Set up table in case it hasn't been set up yet

conn = get_db_connection(connection_details)

with conn.cursor() as cur:

    cur.execute('''CREATE TABLE IF NOT EXISTS howard_collisions_by_itl_3 (
                    accident_itl_3 VARCHAR(50) UNIQUE NOT NULL,
                    "Number of Accidents" INT NOT NULL,
                    "Overall Average Number of Accidents" FLOAT NOT NULL,
                    "Average Accident Severity By ITL3" FLOAT NOT NULL,
                    "Average Number of Vehicles By ITL3" FLOAT NOT NULL,
                    "Average Number of Casualties By ITL3" FLOAT NOT NULL,
                    "Average Speed Limit By ITL3" FLOAT NOT NULL,
                    PRIMARY KEY (accident_itl_3)
                );''')
    
conn.commit()

In [0]:
col_by_itl_3_df = spark.sql('''SELECT itl_lvl_3 AS `accident_itl_3`, COUNT(itl_lvl_3) AS `Number of Accidents`,
    ROUND((AVG(COUNT(itl_lvl_3)) OVER()), 2) AS `Overall Average Number of Accidents`,
    ROUND((AVG(`accident_severity`)), 2) AS `Average Accident Severity By ITL3`,
    ROUND((AVG(`number_of_vehicles`)), 2) AS `Average Number of Vehicles By ITL3`,
    ROUND((AVG(`number_of_casualties`)), 2) AS `Average Number of Casualties By ITL3`,
    ROUND((AVG(`speed_limit`)), 2) AS `Average Speed Limit By ITL3`
FROM cleaned_collisions_table
GROUP BY itl_lvl_3
ORDER BY `Number of Accidents` DESC'''
)

# Set the output path
temp_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_itl_3_temp"

# Coalesce to a single partition and write to a temporary directory
col_by_itl_3_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_output_path)

# Use dbutils to list files and rename the part file
files = dbutils.fs.ls(temp_output_path)
csv_file = [f.path for f in files if f.path.endswith(".csv")][0]  # Get the single CSV part file

# Define final destination path with desired filename
final_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_itl_3.csv"

# Move and rename the CSV file
dbutils.fs.mv(csv_file, final_output_path)

# Optionally, delete the temporary directory
dbutils.fs.rm(temp_output_path, recurse=True)

In [0]:
# Set up table in case it hasn't been set up yet

conn = get_db_connection(connection_details)

with conn.cursor() as cur:

    cur.execute('''CREATE TABLE IF NOT EXISTS howard_collisions_by_itl_2 (
                    accident_itl_2 VARCHAR(50) UNIQUE NOT NULL,
                    "Number of Accidents" INT NOT NULL,
                    "Overall Average Number of Accidents" FLOAT NOT NULL,
                    "Average Accident Severity By ITL2" FLOAT NOT NULL,
                    "Average Number of Vehicles By ITL2" FLOAT NOT NULL,
                    "Average Number of Casualties By ITL2" FLOAT NOT NULL,
                    "Average Speed Limit By ITL2" FLOAT NOT NULL,
                    PRIMARY KEY (accident_itl_2)
                );''')
    
conn.commit()

In [0]:
col_by_itl_2_df = spark.sql('''SELECT itl_lvl_2 AS `accident_itl_2`, COUNT(itl_lvl_2) AS `Number of Accidents`,
    ROUND((AVG(COUNT(itl_lvl_2)) OVER()), 2) AS `Overall Average Number of Accidents`,
    ROUND((AVG(`accident_severity`)), 2) AS `Average Accident Severity By ITL2`,
    ROUND((AVG(`number_of_vehicles`)), 2) AS `Average Number of Vehicles By ITL2`,
    ROUND((AVG(`number_of_casualties`)), 2) AS `Average Number of Casualties By ITL2`,
    ROUND((AVG(`speed_limit`)), 2) AS `Average Speed Limit By ITL2`
FROM cleaned_collisions_table
GROUP BY itl_lvl_2
ORDER BY `Number of Accidents` DESC'''
)

# Set the output path
temp_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_itl_2_temp"

# Coalesce to a single partition and write to a temporary directory
col_by_itl_2_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_output_path)

# Use dbutils to list files and rename the part file
files = dbutils.fs.ls(temp_output_path)
csv_file = [f.path for f in files if f.path.endswith(".csv")][0]  # Get the single CSV part file

# Define final destination path with desired filename
final_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_itl_2.csv"

# Move and rename the CSV file
dbutils.fs.mv(csv_file, final_output_path)

# Optionally, delete the temporary directory
dbutils.fs.rm(temp_output_path, recurse=True)

In [0]:
# Set up table in case it hasn't been set up yet

conn = get_db_connection(connection_details)

with conn.cursor() as cur:

    cur.execute('''CREATE TABLE IF NOT EXISTS howard_collisions_by_itl_1 (
                    accident_itl_1 VARCHAR(50) UNIQUE NOT NULL,
                    "Number of Accidents" INT NOT NULL,
                    "Overall Average Number of Accidents" FLOAT NOT NULL,
                    "Average Accident Severity By ITL1" FLOAT NOT NULL,
                    "Average Number of Vehicles By ITL1" FLOAT NOT NULL,
                    "Average Number of Casualties By ITL1" FLOAT NOT NULL,
                    "Average Speed Limit By ITL1" FLOAT NOT NULL,
                    PRIMARY KEY (accident_itl_1)
                );''')
    
conn.commit()

In [0]:
col_by_itl_1_df = spark.sql('''SELECT itl_lvl_1 AS `accident_itl_1`, COUNT(itl_lvl_1) AS `Number of Accidents`,
    ROUND((AVG(COUNT(itl_lvl_1)) OVER()), 2) AS `Overall Average Number of Accidents`,
    ROUND((AVG(`accident_severity`)), 2) AS `Average Accident Severity By ITL1`,
    ROUND((AVG(`number_of_vehicles`)), 2) AS `Average Number of Vehicles By ITL1`,
    ROUND((AVG(`number_of_casualties`)), 2) AS `Average Number of Casualties By ITL1`,
    ROUND((AVG(`speed_limit`)), 2) AS `Average Speed Limit By ITL1`
FROM cleaned_collisions_table
GROUP BY itl_lvl_1
ORDER BY `Number of Accidents` DESC'''
)

# Set the output path
temp_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_itl_1_temp"

# Coalesce to a single partition and write to a temporary directory
col_by_itl_1_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_output_path)

# Use dbutils to list files and rename the part file
files = dbutils.fs.ls(temp_output_path)
csv_file = [f.path for f in files if f.path.endswith(".csv")][0]  # Get the single CSV part file

# Define final destination path with desired filename
final_output_path = dls_load_sql_table_blob_container_url + "howard_collisions_by_itl_1.csv"

# Move and rename the CSV file
dbutils.fs.mv(csv_file, final_output_path)

# Optionally, delete the temporary directory
dbutils.fs.rm(temp_output_path, recurse=True)

In [0]:
# We end the job group

sc.setJobGroup("", "")

# Remove all cached objects

In [0]:
# We start a new job group
sc.setJobGroup("howard-remove-cached-objects", "We unpersist any remaining objects cached in memory.")

In [0]:
xjoined_postcode_collision_df.unpersist()

closest_postcode_df.unpersist()

final_collisions_df.unpersist()

broadcasted_collisions_df.unpersist()

DataFrame[accident_index: string, accident_reference: string, location_easting_osgr: int, location_northing_osgr: int, longitude: float, latitude: float, police_force: int, accident_severity: int, number_of_vehicles: int, number_of_casualties: int, date: date, day_of_week: int, time: timestamp, local_authority_district: int, local_authority_ons_district: string, local_authority_highway: string, first_road_class: int, first_road_number: int, road_type: int, speed_limit: int, junction_detail: int, junction_control: int, second_road_class: int, second_road_number: int, pedestrian_crossing_human_control: int, pedestrian_crossing_physical_facilities: int, light_conditions: int, weather_conditions: int, road_surface_conditions: int, special_conditions_at_site: int, carriageway_hazards: int, urban_or_rural_area: int, did_police_officer_attend_scene_of_accident: int, trunk_road_flag: int, lsoa_of_accident_location: string, enhanced_severity_collision: int, accident_year: int, accident_month: i

In [0]:
# We end the job group

sc.setJobGroup("", "")