In [0]:
from pyspark.sql.functions import col, regexp_replace, sum, when

In [0]:
spark.conf.set(
  "fs.azure.account.key.vehicletheftsta.dfs.core.windows.net",
  dbutils.secrets.get("dbScope", "saSecret")
)

df = spark.read.csv(
  "abfss://bronze@vehicletheftsta.dfs.core.windows.net/",
  header=True,
  inferSchema=True
)


In [0]:
dbutils.fs.ls("abfss://bronze@vehicletheftsta.dfs.core.windows.net/")

[FileInfo(path='abfss://bronze@vehicletheftsta.dfs.core.windows.net/locations.csv', name='locations.csv', size=716, modificationTime=1767960384000),
 FileInfo(path='abfss://bronze@vehicletheftsta.dfs.core.windows.net/make_details.csv', name='make_details.csv', size=2855, modificationTime=1767960410000),
 FileInfo(path='abfss://bronze@vehicletheftsta.dfs.core.windows.net/stolen_vehicles.csv', name='stolen_vehicles.csv', size=222313, modificationTime=1767960429000),
 FileInfo(path='abfss://bronze@vehicletheftsta.dfs.core.windows.net/stolen_vehicles_db_data_dictionary.csv', name='stolen_vehicles_db_data_dictionary.csv', size=850, modificationTime=1767960449000)]

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Define the schema (replace with your actual column names and types)
schema = StructType([
    StructField("column_name1", StringType(), True),
    StructField("column_name2", IntegerType(), True),
    StructField("column_name3", FloatType(), True),
    # Add more fields as needed, matching the actual structure of your data
])

# Set configuration to access the storage account
spark.conf.set(
  "fs.azure.account.key.vehicletheftsta.dfs.core.windows.net",
  dbutils.secrets.get("dbScope", "saSecret")
)

# Silver container
df_silver = spark.read.csv(
  "abfss://silver@vehicletheftsta.dfs.core.windows.net/",
  header=True,
  schema=schema  # Apply the schema explicitly
)

# Gold container
df_gold = spark.read.csv(
  "abfss://gold@vehicletheftsta.dfs.core.windows.net/",
  header=True,
  schema=schema  # Apply the schema explicitly
)

# Show first few rows of the data
df_silver.show(5)
df_gold.show(5)


+------------+------------+------------+
|column_name1|column_name2|column_name3|
+------------+------------+------------+
+------------+------------+------------+

+------------+------------+------------+
|column_name1|column_name2|column_name3|
+------------+------------+------------+
+------------+------------+------------+



In [0]:
# Loading the CSV file into a DataFrame
location_df = spark.read.format("csv") \
    .option("header", "true") \
    .load("abfss://bronze@vehicletheftsta.dfs.core.windows.net/locations.csv")

# Show the schema and first few rows of the DataFrame to verify
location_df.printSchema()
display(location_df.limit(10))
location_df.schema

root
 |-- location_id: string (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- population: string (nullable = true)
 |-- density: string (nullable = true)



location_id,region,country,population,density
101,Northland,New Zealand,201500,16.11
102,Auckland,New Zealand,1695200,343.09
103,Waikato,New Zealand,513800,21.5
104,Bay of Plenty,New Zealand,347700,28.8
105,Gisborne,New Zealand,52100,6.21
106,Hawke's Bay,New Zealand,182700,12.92
107,Taranaki,New Zealand,127300,17.55
108,Manawatū-Whanganui,New Zealand,258200,11.62
109,Wellington,New Zealand,543500,67.52
110,Tasman,New Zealand,58700,6.1


StructType([StructField('location_id', StringType(), True), StructField('region', StringType(), True), StructField('country', StringType(), True), StructField('population', StringType(), True), StructField('density', StringType(), True)])

In [0]:
spark.conf.set(
    "fs.azure.account.key.vehicletheftsta.blob.core.windows.net",
    dbutils.secrets.get("dbScope", "saSecret")
)

# Load 'locations.csv' directly from Azure Blob Storage
location_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("wasbs://bronze@vehicletheftsta.blob.core.windows.net/locations.csv")

# Load 'make_details.csv' directly from Azure Blob Storage
make_details_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("wasbs://bronze@vehicletheftsta.blob.core.windows.net/make_details.csv")

# Load 'stolen_vehicles.csv' directly from Azure Blob Storage
stolen_vehicles_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("wasbs://bronze@vehicletheftsta.blob.core.windows.net/stolen_vehicles.csv")

# Load 'stolen_vehicles_db_data_dictionary.csv' directly from Azure Blob Storage
database_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("wasbs://bronze@vehicletheftsta.blob.core.windows.net/stolen_vehicles_db_data_dictionary.csv")

# Show first few rows to verify if data is loaded properly
location_df.show(5)
make_details_df.show(5)
stolen_vehicles_df.show(5)
database_df.show(5)


+-----------+-------------+-----------+----------+-------+
|location_id|       region|    country|population|density|
+-----------+-------------+-----------+----------+-------+
|        101|    Northland|New Zealand|   201,500|  16.11|
|        102|     Auckland|New Zealand| 1,695,200| 343.09|
|        103|      Waikato|New Zealand|   513,800|   21.5|
|        104|Bay of Plenty|New Zealand|   347,700|   28.8|
|        105|     Gisborne|New Zealand|    52,100|   6.21|
+-----------+-------------+-----------+----------+-------+
only showing top 5 rows
+-------+-------------+---------+
|make_id|    make_name|make_type|
+-------+-------------+---------+
|    501|Aakron Xpress| Standard|
|    502|         ADLY| Standard|
|    503|        Alpha| Standard|
|    504|        Anglo| Standard|
|    505|      Aprilia| Standard|
+-------+-------------+---------+
only showing top 5 rows
+----------+------------+-------+----------+-------------------+------+-----------+-----------+
|vehicle_id|vehicle

In [0]:
database_df.show(5)

+---------------+------------+--------------------+
|          Table|       Field|         Description|
+---------------+------------+--------------------+
|stolen_vehicles|  vehicle_id|Unique ID of a st...|
|stolen_vehicles|vehicle_type|     Type of vehicle|
|stolen_vehicles|     make_id|Matches make_id i...|
|stolen_vehicles|  model_year|Model year of veh...|
|stolen_vehicles|vehicle_desc|Description of ve...|
+---------------+------------+--------------------+
only showing top 5 rows


In [0]:
location_df.printSchema()

root
 |-- location_id: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- population: string (nullable = true)
 |-- density: double (nullable = true)



In [0]:
location_df = location_df.withColumn("population", regexp_replace(col("population"), ",", "").cast("int"))
location_df.printSchema()
location_df.show(5)

root
 |-- location_id: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- density: double (nullable = true)

+-----------+-------------+-----------+----------+-------+
|location_id|       region|    country|population|density|
+-----------+-------------+-----------+----------+-------+
|        101|    Northland|New Zealand|    201500|  16.11|
|        102|     Auckland|New Zealand|   1695200| 343.09|
|        103|      Waikato|New Zealand|    513800|   21.5|
|        104|Bay of Plenty|New Zealand|    347700|   28.8|
|        105|     Gisborne|New Zealand|     52100|   6.21|
+-----------+-------------+-----------+----------+-------+
only showing top 5 rows


In [0]:
make_details_df.printSchema()
make_details_df.show(5)
stolen_vehicles_df.printSchema()
stolen_vehicles_df.show(5)
database_df.printSchema()

root
 |-- make_id: integer (nullable = true)
 |-- make_name: string (nullable = true)
 |-- make_type: string (nullable = true)

+-------+-------------+---------+
|make_id|    make_name|make_type|
+-------+-------------+---------+
|    501|Aakron Xpress| Standard|
|    502|         ADLY| Standard|
|    503|        Alpha| Standard|
|    504|        Anglo| Standard|
|    505|      Aprilia| Standard|
+-------+-------------+---------+
only showing top 5 rows
root
 |-- vehicle_id: integer (nullable = true)
 |-- vehicle_type: string (nullable = true)
 |-- make_id: integer (nullable = true)
 |-- model_year: integer (nullable = true)
 |-- vehicle_desc: string (nullable = true)
 |-- color: string (nullable = true)
 |-- date_stolen: date (nullable = true)
 |-- location_id: integer (nullable = true)

+----------+------------+-------+----------+-------------------+------+-----------+-----------+
|vehicle_id|vehicle_type|make_id|model_year|       vehicle_desc| color|date_stolen|location_id|
+-------

In [0]:
location_df = location_df.toDF(*[column.lower().replace(" ", "_") for column in location_df.columns])
make_details_df = make_details_df.toDF(*[column.lower().replace(" ", "_") for column in make_details_df.columns])   
stolen_vehicles_df = stolen_vehicles_df.toDF(*[column.lower().replace(" ", "_") for column in stolen_vehicles_df.columns])
database_df = database_df.toDF(*[column.lower().replace(" ", "_") for column in database_df.columns])

# column = location_df.columns
# new_column = []
# for col in column:
#   clean_column = col.replace(" ", "_").lower()
#   new_column.append(clean_column)
# location_df = location_df.toDF(*new_column)

location_df.show(2)
make_details_df.show(2)
stolen_vehicles_df.show(2)
database_df.show(2)


+-----------+---------+-----------+----------+-------+
|location_id|   region|    country|population|density|
+-----------+---------+-----------+----------+-------+
|        101|Northland|New Zealand|    201500|  16.11|
|        102| Auckland|New Zealand|   1695200| 343.09|
+-----------+---------+-----------+----------+-------+
only showing top 2 rows
+-------+-------------+---------+
|make_id|    make_name|make_type|
+-------+-------------+---------+
|    501|Aakron Xpress| Standard|
|    502|         ADLY| Standard|
+-------+-------------+---------+
only showing top 2 rows
+----------+------------+-------+----------+-------------------+------+-----------+-----------+
|vehicle_id|vehicle_type|make_id|model_year|       vehicle_desc| color|date_stolen|location_id|
+----------+------------+-------+----------+-------------------+------+-----------+-----------+
|         1|     Trailer|    623|      2021|           BST2021D|Silver| 2021-11-05|        102|
|         2|Boat Trailer|    623| 

2

In [0]:
location_df.write.option("header", "true").mode("overwrite").csv("abfss://silver@vehicletheftsta.dfs.core.windows.net/locations")
make_details_df.write.option("header", "true").mode("overwrite").csv("abfss://silver@vehicletheftsta.dfs.core.windows.net/make_details")
stolen_vehicles_df.write.option("header", "true").mode("overwrite").csv("abfss://silver@vehicletheftsta.dfs.core.windows.net/stolen_vehicles")
database_df.write.option("header", "true").mode("overwrite").csv("abfss://silver@vehicletheftsta.dfs.core.windows.net/stolen_vehicles_db")


In [0]:
null_count_location = location_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in location_df.columns])
null_count_make_details = make_details_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in make_details_df.columns])
null_count_stolen_vehicles = stolen_vehicles_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in stolen_vehicles_df.columns])
null_count_database = database_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in database_df.columns])

null_count_location.show()
null_count_make_details.show()
null_count_stolen_vehicles.show()
null_count_database.show()

+-----------+------+-------+----------+-------+
|location_id|region|country|population|density|
+-----------+------+-------+----------+-------+
|          0|     0|      0|         0|      0|
+-----------+------+-------+----------+-------+

+-------+---------+---------+
|make_id|make_name|make_type|
+-------+---------+---------+
|      0|        0|        0|
+-------+---------+---------+

+----------+------------+-------+----------+------------+-----+-----------+-----------+
|vehicle_id|vehicle_type|make_id|model_year|vehicle_desc|color|date_stolen|location_id|
+----------+------------+-------+----------+------------+-----+-----------+-----------+
|         0|          26|     15|        15|          33|   15|          0|          0|
+----------+------------+-------+----------+------------+-----+-----------+-----------+

+-----+-----+-----------+
|table|field|description|
+-----+-----+-----------+
|    0|    0|          0|
+-----+-----+-----------+



In [0]:
stolen_vehicles_df.printSchema()

root
 |-- vehicle_id: integer (nullable = true)
 |-- vehicle_type: string (nullable = true)
 |-- make_id: integer (nullable = true)
 |-- model_year: integer (nullable = true)
 |-- vehicle_desc: string (nullable = true)
 |-- color: string (nullable = true)
 |-- date_stolen: date (nullable = true)
 |-- location_id: integer (nullable = true)



In [0]:
stolen_vehicles_df = stolen_vehicles_df.fillna({
    "vehicle_type": "No data",
    "make_id": 0,
    "model_year": 0,
    "vehicle_desc": "No data",
    "color": "No data"
})


In [0]:
stolen_vehicles_df.tail(5)

[Row(vehicle_id=4549, vehicle_type='No data', make_id=0, model_year=0, vehicle_desc='No data', color='No data', date_stolen=datetime.date(2022, 2, 18), location_id=102),
 Row(vehicle_id=4550, vehicle_type='No data', make_id=0, model_year=0, vehicle_desc='No data', color='No data', date_stolen=datetime.date(2022, 2, 14), location_id=109),
 Row(vehicle_id=4551, vehicle_type='No data', make_id=0, model_year=0, vehicle_desc='No data', color='No data', date_stolen=datetime.date(2022, 3, 9), location_id=102),
 Row(vehicle_id=4552, vehicle_type='No data', make_id=0, model_year=0, vehicle_desc='No data', color='No data', date_stolen=datetime.date(2022, 3, 7), location_id=109),
 Row(vehicle_id=4553, vehicle_type='No data', make_id=0, model_year=0, vehicle_desc='No data', color='No data', date_stolen=datetime.date(2022, 3, 14), location_id=102)]

In [0]:
null_count_stolen_vehicles = stolen_vehicles_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in stolen_vehicles_df.columns])
null_count_stolen_vehicles.show()

+----------+------------+-------+----------+------------+-----+-----------+-----------+
|vehicle_id|vehicle_type|make_id|model_year|vehicle_desc|color|date_stolen|location_id|
+----------+------------+-------+----------+------------+-----+-----------+-----------+
|         0|           0|      0|         0|           0|    0|          0|          0|
+----------+------------+-------+----------+------------+-----+-----------+-----------+



In [0]:
stolen_vehicles_df.show(5)

+----------+------------+-------+----------+-------------------+------+-----------+-----------+
|vehicle_id|vehicle_type|make_id|model_year|       vehicle_desc| color|date_stolen|location_id|
+----------+------------+-------+----------+-------------------+------+-----------+-----------+
|         1|     Trailer|    623|      2021|           BST2021D|Silver| 2021-11-05|        102|
|         2|Boat Trailer|    623|      2021|OUTBACK BOATS FT470|Silver| 2021-12-13|        105|
|         3|Boat Trailer|    623|      2021|         ASD JETSKI|Silver| 2022-02-13|        102|
|         4|     Trailer|    623|      2021|            MSC 7X4|Silver| 2021-11-13|        106|
|         5|     Trailer|    623|      2018|          D-MAX 8X5|Silver| 2022-01-10|        102|
+----------+------------+-------+----------+-------------------+------+-----------+-----------+
only showing top 5 rows


In [0]:
location_df.write.option("header", "true").mode("overwrite").csv("abfss://gold@vehicletheftsta.dfs.core.windows.net/locations")
make_details_df.write.option("header", "true").mode("overwrite").csv("abfss://gold@vehicletheftsta.dfs.core.windows.net/make_details")
stolen_vehicles_df.write.option("header", "true").mode("overwrite").csv("abfss://gold@vehicletheftsta.dfs.core.windows.net/stolen_vehicles")
database_df.write.option("header", "true").mode("overwrite").csv("abfss://gold@vehicletheftsta.dfs.core.windows.net/stolen_vehicles_db")

In [0]:
location_df.createOrReplaceTempView("location")
make_details_df.createOrReplaceTempView("make_details")
stolen_vehicles_df.createOrReplaceTempView("stolen_vehicles")
database_df.createOrReplaceTempView("database")

In [0]:
%sql

SELECT model_year, count(*) AS number_of_vehicles_stolen FROM stolen_vehicles GROUP BY model_year ORDER BY number_of_vehicles_stolen DESC;

model_year,number_of_vehicles_stolen
2005,347
2006,333
2007,251
2004,238
2008,190
2002,181
2003,173
1998,159
1996,156
2001,152
