| Developer         | Version | Task (JIRA number) |
|--------------|:-----:|-----------:|
| Dejan Cvetkovski |  1.00 |        LOKA-Interview-1 |

These widgets will be used as parameters in the Workflow to run this notebook daily and load new files.

You can specify the process date, the S3 bucket location or the type of files you want to load (JSON by default).

In [0]:
dbutils.widgets.text("file_location", "s3://de-tech-assessment-2022/data", "Upload Location")
dbutils.widgets.dropdown("file_type", "json", ["csv", 'parquet', 'json'])
dbutils.widgets.text("process_date", "2019-06-01", "Process Date")

This part will be in the Workflow (Databricks Job) to calculate the day before today and run the job with passing this to the job as paramter.

For the purpose of this excercise we hardcoded the date to 2019-06-10 the only day the S3 bucket has files for.

In [0]:
from pyspark.sql.functions import current_date

yesterday = spark.sql("SELECT current_date()-1").collect()[0][0]
print(yesterday)
process_date = dbutils.widgets.get("process_date")
print(process_date)

2023-02-21
2019-06-01


In [0]:
df_bronze = spark.read.format(dbutils.widgets.get("file_type")).option("inferSchema", "true").load(dbutils.widgets.get("file_location"))

In [0]:
df_bronze.count()

Out[16]: 35351

In [0]:
df_bronze.show()

+--------------------+--------------------+------+-------+---------------+----------+
|                  at|                data| event|     on|organization_id| date_only|
+--------------------+--------------------+------+-------+---------------+----------+
|2019-06-01T18:17:...|{null, bac5188f-6...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, 3a3eb23a-f...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, f06eb89c-a...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, f0b87796-b...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, e641b45f-f...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, 9152c5d8-7...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, 949798fc-5...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, 9d6a8840-d...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, 3b0640d6-5...|update|vehi

In [0]:
df_bronze.write.format("delta").saveAsTable("door2door_table_bronze")


In [0]:
from pyspark.sql.functions import from_utc_timestamp, date_format

# Assume the DataFrame is called `df` and the column is called `utc_date`
df_bronze = df_bronze.withColumn("date_only", date_format(from_utc_timestamp("at", "UTC"), "yyyy-MM-dd"))

# Show the results
df_bronze.show()


+--------------------+--------------------+------+-------+---------------+----------+
|                  at|                data| event|     on|organization_id| date_only|
+--------------------+--------------------+------+-------+---------------+----------+
|2019-06-01T18:17:...|{null, bac5188f-6...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, 3a3eb23a-f...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, f06eb89c-a...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, f0b87796-b...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, e641b45f-f...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, 9152c5d8-7...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, 949798fc-5...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, 9d6a8840-d...|update|vehicle|         org-id|2019-06-01|
|2019-06-01T18:17:...|{null, 3b0640d6-5...|update|vehi

In the cell below we flatten the nested JSON and pick only the current date we want to process and merge in the final data

In [0]:
from pyspark.sql.functions import *

df_silver = df_bronze.select("at", "data.id", "data.start", "data.finish", "data.location.lat", "data.location.lng", "event", "on", "organization_id").filter(col("date_only") == process_date)


In [0]:
df_silver.show()

+--------------------+--------------------+-----+------+--------+--------+------+-------+---------------+
|                  at|                  id|start|finish|     lat|     lng| event|     on|organization_id|
+--------------------+--------------------+-----+------+--------+--------+------+-------+---------------+
|2019-06-01T18:17:...|bac5188f-67c6-496...| null|  null|52.45246|13.45908|update|vehicle|         org-id|
|2019-06-01T18:17:...|3a3eb23a-f22e-4fe...| null|  null|52.46068|13.52142|update|vehicle|         org-id|
|2019-06-01T18:17:...|f06eb89c-ada0-41c...| null|  null|52.49762|13.44824|update|vehicle|         org-id|
|2019-06-01T18:17:...|f0b87796-b25c-40b...| null|  null|52.50432|13.33656|update|vehicle|         org-id|
|2019-06-01T18:17:...|e641b45f-f007-4d7...| null|  null|52.49196|13.26401|update|vehicle|         org-id|
|2019-06-01T18:17:...|9152c5d8-79cf-4fe...| null|  null|52.50558|13.51499|update|vehicle|         org-id|
|2019-06-01T18:17:...|949798fc-50aa-47a...| nu

In [0]:
df_silver.write.format("delta").saveAsTable("door2door_table_silver")

I have decided to use Delta Lake as the DWH layer. It is querible by Spark SQL and it satisfies the ACID principles for a database.

In [0]:
%sql

select * from door2door_table_silver

at,id,start,finish,lat,lng,event,on,organization_id
2019-06-01T18:17:10.101Z,bac5188f-67c6-4965-81dc-4ef49622e280,,,52.45133,13.46045,update,vehicle,org-id
2019-06-01T18:17:10.109Z,3a3eb23a-f22e-4fe9-8c20-f37220a81909,,,52.45848,13.52647,update,vehicle,org-id
2019-06-01T18:17:10.109Z,f0b87796-b25c-40b0-9145-8822509c17e1,,,52.50309,13.33435,update,vehicle,org-id
2019-06-01T18:17:10.111Z,9152c5d8-79cf-4fe3-96ad-359abb08a729,,,52.50536,13.51655,update,vehicle,org-id
2019-06-01T18:17:10.111Z,f06eb89c-ada0-41cb-bdd1-0a60398f901b,,,52.49697,13.44936,update,vehicle,org-id
2019-06-01T18:17:10.111Z,9d6a8840-def2-42b6-af24-f19a3c6de059,,,52.46324,13.34227,update,vehicle,org-id
2019-06-01T18:17:10.111Z,3b0640d6-502d-4623-b782-39cc789b4e8d,,,52.57786,13.26756,update,vehicle,org-id
2019-06-01T18:17:10.111Z,98c8b8cb-7c2b-415d-ad30-fa9f0571a5f5,,,52.50036,13.25032,update,vehicle,org-id
2019-06-01T18:17:10.111Z,d6880741-ae7f-4741-aa04-950b0a1e2d3b,,,52.47874,13.32032,update,vehicle,org-id
2019-06-01T18:17:10.111Z,d759fc35-b25c-4877-8e40-9b0104447b31,,,52.44846,13.46759,update,vehicle,org-id


In [0]:
%sql

select * from door2door_table_silver where on = 'vehicle' and id = '0317b799-8d70-4822-8b05-34a8f7f097c4'

at,id,start,finish,lat,lng,event,on,organization_id
2019-06-01T18:17:40.256Z,0317b799-8d70-4822-8b05-34a8f7f097c4,,,52.53281,13.47615,update,vehicle,org-id
2019-06-01T18:17:41.257Z,0317b799-8d70-4822-8b05-34a8f7f097c4,,,52.53293,13.47607,update,vehicle,org-id
2019-06-01T18:17:42.258Z,0317b799-8d70-4822-8b05-34a8f7f097c4,,,52.5329,13.47588,update,vehicle,org-id
2019-06-01T18:17:43.258Z,0317b799-8d70-4822-8b05-34a8f7f097c4,,,52.53289,13.47582,update,vehicle,org-id
2019-06-01T18:17:44.259Z,0317b799-8d70-4822-8b05-34a8f7f097c4,,,52.53287,13.47567,update,vehicle,org-id
2019-06-01T18:17:45.259Z,0317b799-8d70-4822-8b05-34a8f7f097c4,,,52.53261,13.47459,update,vehicle,org-id
2019-06-01T18:17:46.272Z,0317b799-8d70-4822-8b05-34a8f7f097c4,,,52.53223,13.47288,update,vehicle,org-id
2019-06-01T18:17:47.275Z,0317b799-8d70-4822-8b05-34a8f7f097c4,,,52.532,13.47187,update,vehicle,org-id
2019-06-01T18:17:48.278Z,0317b799-8d70-4822-8b05-34a8f7f097c4,,,52.53199,13.47179,update,vehicle,org-id
2019-06-01T18:17:49.278Z,0317b799-8d70-4822-8b05-34a8f7f097c4,,,52.53192,13.47148,update,vehicle,org-id


In [0]:
%sql

select * from door2door_table_silver where on = 'operating_period'

at,id,start,finish,lat,lng,event,on,organization_id
2019-06-01T18:17:03.087Z,op_2,2019-06-01T18:17:04.079Z,2019-06-01T18:22:04.079Z,,,create,operating_period,org-id
2019-06-01T18:17:04.086Z,op_1,2019-06-01T18:23:04.079Z,2019-06-01T18:28:04.079Z,,,create,operating_period,org-id


In [0]:
%sql

CREATE TABLE operating_period AS
SELECT at, id, start, finish, event 
FROM door2door_table_silver


num_affected_rows,num_inserted_rows


In [0]:
%sql

CREATE TABLE vehicle AS
SELECT at, id, lat, lng, event 
FROM door2door_table_silver


num_affected_rows,num_inserted_rows


In [0]:
df_operating_period = spark.sql("SELECT * from operating_period")
df_vehicle = spark.sql("SELECT * from vehicle")

In [0]:
from pyspark.sql.functions import *

# Create a temporary view for the new data
df_vehicle.createOrReplaceTempView("tmp_vehicle")

# Merge the new data with the Delta table using the join condition
merged_data = (spark.table("vehicle").alias("target")
               .join(spark.table("tmp_vehicle").alias("source"), ["at", "id"], "outer")
               .selectExpr("source.*")
               .where(col("target.id").isNull() & col("target.at").isNull())
              )

# Append the new rows to the Delta table
merged_data.write.format("delta").mode("append").saveAsTable("vehicle")

df_operating_period.createOrReplaceTempView("tmp_operating_period")

# Merge the new data with the Delta table using the join condition
merged_data = (spark.table("operating_period").alias("target")
               .join(spark.table("tmp_operating_period").alias("source"), ["at", "id"], "outer")
               .selectExpr("source.*")
               .where(col("target.id").isNull() & col("target.at").isNull())
              )

# Append the new rows to the Delta table
merged_data.write.format("delta").mode("append").saveAsTable("operating_period")

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

vehicle_df = spark.sql("SELECT * from vehicle")

windowSpec = Window.partitionBy("id").orderBy("at")

vehicleWithRowNum = vehicle_df.withColumn("row_num", row_number().over(windowSpec))

# Show the new DataFrame with row numbers
vehicleWithRowNum.show()


+--------------------+--------------------+--------+--------+--------+-------+
|                  at|                  id|     lat|     lng|   event|row_num|
+--------------------+--------------------+--------+--------+--------+-------+
|2019-06-01T18:17:...|0317b799-8d70-482...|    null|    null|register|      1|
|2019-06-01T18:17:...|0317b799-8d70-482...| 52.5266|13.48931|  update|      2|
|2019-06-01T18:17:...|0317b799-8d70-482...|52.52661|13.48892|  update|      3|
|2019-06-01T18:17:...|0317b799-8d70-482...|52.52661|13.48878|  update|      4|
|2019-06-01T18:17:...|0317b799-8d70-482...|52.52662|13.48846|  update|      5|
|2019-06-01T18:17:...|0317b799-8d70-482...|52.52662|13.48801|  update|      6|
|2019-06-01T18:17:...|0317b799-8d70-482...|52.52664|13.48725|  update|      7|
|2019-06-01T18:17:...|0317b799-8d70-482...|52.52665|13.48687|  update|      8|
|2019-06-01T18:17:...|0317b799-8d70-482...|52.52665|13.48674|  update|      9|
|2019-06-01T18:17:...|0317b799-8d70-482...|52.52666|