In [0]:
dbutils.widgets.text("catalog", "test_drive", "catalog")
dbutils.widgets.text("schema", "road_crash", "schema")
dbutils.widgets.text("volume", "source", "volume")

In [0]:
catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
volume = dbutils.widgets.get("volume")

In [0]:
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
spark.sql(f"USE CATALOG {catalog}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema}")
spark.sql(f"USE SCHEMA {schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {volume}")

## SA Road Crash Dataset (2021)
https://data.sa.gov.au/data/dataset/road-crash-data



In [0]:
import requests
import zipfile
import os

# Define the URL and the local path
url = "https://data.sa.gov.au/data/dataset/21386a53-56a1-4edf-bd0b-61ed15f10acf/resource/1057e9ae-4672-4123-9c1d-1877483da401/download/2021_data_sa_as_at_20220706.zip"
local_zip_path = f"/Volumes/{catalog}/{schema}/{volume}/2021_data_sa_as_at_20220706.zip"
unzip_dir = f"/Volumes/{catalog}/{schema}/{volume}/"

# Download the file
response = requests.get(url)
with open(local_zip_path, 'wb') as file:
    file.write(response.content)

# Unzip the file
with zipfile.ZipFile(local_zip_path, 'r') as zip_ref:
    zip_ref.extractall(unzip_dir)

# Optionally, remove the zip file after extraction
os.remove(local_zip_path)

In [0]:
display(dbutils.fs.ls(f"/Volumes/{catalog}/{schema}/{volume}/"))

In [0]:
df_crash = spark.read.csv(f"/Volumes/{catalog}/{schema}/{volume}/2021_DATA_SA_Crash.csv", header=True, inferSchema=True)
display(df_crash)

In [0]:
from pyspark.sql.functions import expr

df_crash_transformed = df_crash.withColumn(
    "accloc_4326",
    expr("st_astext(st_transform(st_setsrid(st_point(accloc_x, accloc_y), 3107), 4326))")
).withColumn(
    "longitude",
    expr("st_x(st_geomfromtext(accloc_4326))")
).withColumn(
    "latitude",
    expr("st_y(st_geomfromtext(accloc_4326))")
)
display(df_crash_transformed)

In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.crash
TBLPROPERTIES ('delta.columnMapping.mode' = 'name')
""")

df_crash_transformed.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{catalog}.{schema}.crash")

spark.sql(f"""
ALTER TABLE {catalog}.{schema}.crash
ALTER COLUMN REPORT_ID SET NOT NULL
""")

spark.sql(f"""
ALTER TABLE {catalog}.{schema}.crash
ADD CONSTRAINT pk_crash_report_id PRIMARY KEY (REPORT_ID)
""")

In [0]:
display(spark.sql(f"DESCRIBE EXTENDED {catalog}.{schema}.crash"))

In [0]:
df_units = spark.read.csv(f"/Volumes/{catalog}/{schema}/{volume}/2021_DATA_SA_Units.csv", header=True, inferSchema=True)
display(df_units)

In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.units
TBLPROPERTIES ('delta.columnMapping.mode' = 'name')
""")

df_units.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{catalog}.{schema}.units")

spark.sql(f"""
ALTER TABLE {catalog}.{schema}.units
ALTER COLUMN REPORT_ID SET NOT NULL
""")

spark.sql(f"""
ALTER TABLE {catalog}.{schema}.units
ALTER COLUMN `Unit No` SET NOT NULL
""")

spark.sql(f"""
ALTER TABLE {catalog}.{schema}.units
ADD CONSTRAINT pk_units_composite PRIMARY KEY (REPORT_ID, `Unit No`)
""")

spark.sql(f"""
ALTER TABLE {catalog}.{schema}.units
ADD CONSTRAINT fk_units_report_id FOREIGN KEY (REPORT_ID) REFERENCES {catalog}.{schema}.crash(REPORT_ID)
""")

In [0]:
display(spark.sql(f"DESCRIBE EXTENDED {catalog}.{schema}.units"))

In [0]:
df_casuality = spark.read.csv(f"/Volumes/{catalog}/{schema}/{volume}/2021_DATA_SA_Casualty.csv", header=True, inferSchema=True)
display(df_casuality)

In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.casuality
TBLPROPERTIES ('delta.columnMapping.mode' = 'name')
""")

df_casuality.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{catalog}.{schema}.casuality")

spark.sql(f"""
ALTER TABLE {catalog}.{schema}.casuality
ALTER COLUMN REPORT_ID SET NOT NULL
""")

spark.sql(f"""
ALTER TABLE {catalog}.{schema}.casuality
ALTER COLUMN UND_UNIT_NUMBER SET NOT NULL
""")

spark.sql(f"""
ALTER TABLE {catalog}.{schema}.casuality
ALTER COLUMN CASUALTY_NUMBER SET NOT NULL
""")

spark.sql(f"""
ALTER TABLE {catalog}.{schema}.casuality
ADD CONSTRAINT pk_casuality_composite PRIMARY KEY (REPORT_ID, UND_UNIT_NUMBER, CASUALTY_NUMBER)
""")

spark.sql(f"""
ALTER TABLE {catalog}.{schema}.casuality
ADD CONSTRAINT fk_casuality_report_id FOREIGN KEY (REPORT_ID) REFERENCES {catalog}.{schema}.crash(REPORT_ID)
""")

spark.sql(f"""
ALTER TABLE {catalog}.{schema}.casuality
ADD CONSTRAINT fk_casuality_composite FOREIGN KEY (REPORT_ID, UND_UNIT_NUMBER) REFERENCES {catalog}.{schema}.units(REPORT_ID, `Unit No`)
""")

In [0]:
sdf = spark.sql(f"""
SELECT 
    kcu.table_schema AS schema_name,
    kcu.table_name,
    kcu.column_name,
    tc.constraint_type,
    kcu.ordinal_position,
    fk_kcu.table_schema AS referenced_schema_name,
    fk_kcu.table_name AS referenced_table_name,
    fk_kcu.column_name AS referenced_column_name
FROM {catalog}.information_schema.table_constraints AS tc
JOIN {catalog}.information_schema.key_column_usage AS kcu
    ON tc.constraint_name = kcu.constraint_name
    AND tc.table_schema = kcu.table_schema
    AND tc.table_name = kcu.table_name
LEFT JOIN {catalog}.information_schema.referential_constraints AS rc
    ON tc.constraint_name = rc.constraint_name
    AND tc.table_schema = rc.constraint_schema
LEFT JOIN {catalog}.information_schema.key_column_usage AS fk_kcu
    ON rc.unique_constraint_name = fk_kcu.constraint_name
    AND kcu.ordinal_position = fk_kcu.ordinal_position
    AND rc.unique_constraint_schema = fk_kcu.constraint_schema
WHERE kcu.table_schema = '{schema}'
ORDER BY kcu.table_schema, kcu.table_name, tc.constraint_type DESC, kcu.ordinal_position
""")

display(sdf)