In [0]:
from pyspark.sql.functions import *

In [0]:
spark.conf.set("fs.azure.account.key.vehicletheftvarad.blob.core.windows.net",
               dbutils.secrets.get("dbScope","saSecret"))




In [0]:
location_df = spark.read.csv("wasbs://bronze@vehicletheftvarad.blob.core.windows.net/locations.csv", header=True,inferSchema=True)
make_details_df = spark.read.csv("wasbs://bronze@vehicletheftvarad.blob.core.windows.net/make_details.csv", header=True,inferSchema=True)
database_df=spark.read.csv("wasbs://bronze@vehicletheftvarad.blob.core.windows.net/stolen_vehicles_db_data_dictionary.csv", header=True,inferSchema=True)
stolen_vehicles_df=spark.read.csv("wasbs://bronze@vehicletheftvarad.blob.core.windows.net/stolen_vehicles.csv", header=True,inferSchema=True)



In [0]:
location_df.printSchema()

root
 |-- location_id: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- population: string (nullable = true)
 |-- density: double (nullable = true)



In [0]:
location_df = location_df.withColumn("Population", 
                                    regexp_replace(col("Population"), ",", "")
                                    .cast("integer"))


In [0]:
location_df.show(2)

+-----------+---------+-----------+----------+-------+
|location_id|   region|    country|Population|density|
+-----------+---------+-----------+----------+-------+
|        101|Northland|New Zealand|    201500|  16.11|
|        102| Auckland|New Zealand|   1695200| 343.09|
+-----------+---------+-----------+----------+-------+
only showing top 2 rows


In [0]:
location_df.printSchema()

root
 |-- location_id: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- density: double (nullable = true)



In [0]:
make_details_df.printSchema()

root
 |-- make_id: integer (nullable = true)
 |-- make_name: string (nullable = true)
 |-- make_type: string (nullable = true)



In [0]:
make_details_df.show(2)

+-------+-------------+---------+
|make_id|    make_name|make_type|
+-------+-------------+---------+
|    501|Aakron Xpress| Standard|
|    502|         ADLY| Standard|
+-------+-------------+---------+
only showing top 2 rows


In [0]:
database_df.printSchema()

root
 |-- Table: string (nullable = true)
 |-- Field: string (nullable = true)
 |-- Description: string (nullable = true)



In [0]:
database_df.show(2)

+---------------+------------+--------------------+
|          Table|       Field|         Description|
+---------------+------------+--------------------+
|stolen_vehicles|  vehicle_id|Unique ID of a st...|
|stolen_vehicles|vehicle_type|     Type of vehicle|
+---------------+------------+--------------------+
only showing top 2 rows


In [0]:
stolen_vehicles_df.printSchema()

root
 |-- vehicle_id: integer (nullable = true)
 |-- vehicle_type: string (nullable = true)
 |-- make_id: integer (nullable = true)
 |-- model_year: integer (nullable = true)
 |-- vehicle_desc: string (nullable = true)
 |-- color: string (nullable = true)
 |-- date_stolen: date (nullable = true)
 |-- location_id: integer (nullable = true)



In [0]:
stolen_vehicles_df.show(2)

+----------+------------+-------+----------+-------------------+------+-----------+-----------+
|vehicle_id|vehicle_type|make_id|model_year|       vehicle_desc| color|date_stolen|location_id|
+----------+------------+-------+----------+-------------------+------+-----------+-----------+
|         1|     Trailer|    623|      2021|           BST2021D|Silver| 2021-11-05|        102|
|         2|Boat Trailer|    623|      2021|OUTBACK BOATS FT470|Silver| 2021-12-13|        105|
+----------+------------+-------+----------+-------------------+------+-----------+-----------+
only showing top 2 rows


In [0]:
location_df = location_df.toDF(*[Column.lower().replace(" ", "_") for Column in location_df.columns])
make_details_df = make_details_df.toDF(*[Column.lower().replace(" ", "_") for Column in make_details_df.columns])
stolen_vehicles_df = stolen_vehicles_df.toDF(*[Column.lower().replace(" ", "_") for Column in stolen_vehicles_df.columns])
database_df = database_df.toDF(*[Column.lower().replace(" ", "_") for Column in database_df.columns])


In [0]:
spark.conf.set(
    "fs.azure.account.key.vehicletheftvarad.dfs.core.windows.net",
    dbutils.secrets.get("dbScope", "saSecret")
)

## Move data to Silver layer Storage

In [0]:

location_df.write.option("header", True).csv(
    "abfss://silver@vehicletheftvarad.dfs.core.windows.net/location/"
)

make_details_df.write.option("header", True).csv(
    "abfss://silver@vehicletheftvarad.dfs.core.windows.net/make_details/"
)

stolen_vehicles_df.write.option("header", True).csv(
"abfss://silver@vehicletheftvarad.dfs.core.windows.net/stolen_vehicles/"
)

database_df.write.option("header", True).csv(
"abfss://silver@vehicletheftvarad.dfs.core.windows.net/database/"
)



In [0]:
location_df.show(5)

+-----------+-------------+-----------+----------+-------+
|location_id|       region|    country|population|density|
+-----------+-------------+-----------+----------+-------+
|        101|    Northland|New Zealand|    201500|  16.11|
|        102|     Auckland|New Zealand|   1695200| 343.09|
|        103|      Waikato|New Zealand|    513800|   21.5|
|        104|Bay of Plenty|New Zealand|    347700|   28.8|
|        105|     Gisborne|New Zealand|     52100|   6.21|
+-----------+-------------+-----------+----------+-------+
only showing top 5 rows


In [0]:
null_count_location = location_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in location_df.columns])
null_count_make_details = make_details_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in make_details_df.columns])
null_count_stolen_vehicles = stolen_vehicles_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in stolen_vehicles_df.columns])
null_count_database = database_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in database_df.columns])


In [0]:
null_count_location.show()

+-----------+------+-------+----------+-------+
|location_id|region|country|population|density|
+-----------+------+-------+----------+-------+
|          0|     0|      0|         0|      0|
+-----------+------+-------+----------+-------+



In [0]:
null_count_make_details.show()

+-------+---------+---------+
|make_id|make_name|make_type|
+-------+---------+---------+
|      0|        0|        0|
+-------+---------+---------+



In [0]:
null_count_stolen_vehicles.show()

+----------+------------+-------+----------+------------+-----+-----------+-----------+
|vehicle_id|vehicle_type|make_id|model_year|vehicle_desc|color|date_stolen|location_id|
+----------+------------+-------+----------+------------+-----+-----------+-----------+
|         0|          26|     15|        15|          33|   15|          0|          0|
+----------+------------+-------+----------+------------+-----+-----------+-----------+



In [0]:
null_count_database.show()

+-----+-----+-----------+
|table|field|description|
+-----+-----+-----------+
|    0|    0|          0|
+-----+-----+-----------+



In [0]:
stolen_vehicles_df.printSchema()

root
 |-- vehicle_id: integer (nullable = true)
 |-- vehicle_type: string (nullable = true)
 |-- make_id: integer (nullable = true)
 |-- model_year: integer (nullable = true)
 |-- vehicle_desc: string (nullable = true)
 |-- color: string (nullable = true)
 |-- date_stolen: date (nullable = true)
 |-- location_id: integer (nullable = true)



### Replacing null values in df stolen_vehicles_df

In [0]:
stolen_vehicles_df = stolen_vehicles_df.fillna({
    "vehicle_type": "No data",
    "make_id": 0,
    "model_year": 0,
    "vehicle_desc": "No data",
    "color": "No data"
})


In [0]:
null_count_stolen_vehicles = stolen_vehicles_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in stolen_vehicles_df.columns])
null_count_stolen_vehicles.show()

+----------+------------+-------+----------+------------+-----+-----------+-----------+
|vehicle_id|vehicle_type|make_id|model_year|vehicle_desc|color|date_stolen|location_id|
+----------+------------+-------+----------+------------+-----+-----------+-----------+
|         0|           0|      0|         0|           0|    0|          0|          0|
+----------+------------+-------+----------+------------+-----+-----------+-----------+



In [0]:
stolen_vehicles_df.tail(5)

[Row(vehicle_id=4549, vehicle_type='No data', make_id=0, model_year=0, vehicle_desc='No data', color='No data', date_stolen=datetime.date(2022, 2, 18), location_id=102),
 Row(vehicle_id=4550, vehicle_type='No data', make_id=0, model_year=0, vehicle_desc='No data', color='No data', date_stolen=datetime.date(2022, 2, 14), location_id=109),
 Row(vehicle_id=4551, vehicle_type='No data', make_id=0, model_year=0, vehicle_desc='No data', color='No data', date_stolen=datetime.date(2022, 3, 9), location_id=102),
 Row(vehicle_id=4552, vehicle_type='No data', make_id=0, model_year=0, vehicle_desc='No data', color='No data', date_stolen=datetime.date(2022, 3, 7), location_id=109),
 Row(vehicle_id=4553, vehicle_type='No data', make_id=0, model_year=0, vehicle_desc='No data', color='No data', date_stolen=datetime.date(2022, 3, 14), location_id=102)]

### Sending data to Gold Layer

In [0]:

location_df.write.option("header", True).csv(
    "abfss://gold@vehicletheftvarad.dfs.core.windows.net/location/"
)

make_details_df.write.option("header", True).csv(
    "abfss://gold@vehicletheftvarad.dfs.core.windows.net/make_details/"
)

stolen_vehicles_df.write.option("header", True).csv(
"abfss://gold@vehicletheftvarad.dfs.core.windows.net/stolen_vehicles/"
)

database_df.write.option("header", True).csv(
"abfss://gold@vehicletheftvarad.dfs.core.windows.net/database/"
)

