In [0]:
from pyspark.sql.functions import col, when, broadcast


In [0]:
dbutils.widgets.text("year","")
dbutils.widgets.text("month","")
dbutils.widgets.text("day","")
dbutils.widgets.text("storageName","")

In [0]:
year = dbutils.widgets.get("year")
month = dbutils.widgets.get("month")
day = dbutils.widgets.get("day")
storageName = dbutils.widgets.get("storageName")

In [0]:
themes_df = spark.read.option("inferSchema","true").option("header","true").csv(f"abfss://raw@{storageName}.dfs.core.windows.net/files/themes/Year={year}/Month={month}/Day={day}/*.csv.gz")

In [0]:
themes_df.show()

+---+--------------------+---------+
| id|                name|parent_id|
+---+--------------------+---------+
|  1|             Technic|     NULL|
|  3|         Competition|        1|
|  4|      Expert Builder|        1|
| 16|          RoboRiders|        1|
| 17|      Speed Slammers|        1|
| 18|           Star Wars|        1|
| 19|        Supplemental|        1|
| 20|     Throwbot Slizer|        1|
| 21|Universal Buildin...|        1|
| 22|             Creator|     NULL|
| 34|       Make & Create|     NULL|
| 35|       Bricks & More|       34|
| 50|                Town|     NULL|
| 51|              Arctic|       50|
| 52|                City|     NULL|
| 53|             Airport|       52|
| 54|               Cargo|       52|
| 55|         Coast Guard|       52|
| 56|        Construction|       52|
| 57|                Farm|       52|
+---+--------------------+---------+
only showing top 20 rows



In [0]:
# Adding parent theme name to the data frame
themes_with_parents_df = themes_df.alias('themes1').join(themes_df.alias('themes2'), col('themes1.parent_id') == col('themes2.id'), how='left').select(col('themes1.id'),col('themes1.name'),col('themes1.parent_id'), col('themes2.name').alias('parentTheme'))

In [0]:
themes_with_parents_df.show()

+---+--------------------+---------+-------------+
| id|                name|parent_id|  parentTheme|
+---+--------------------+---------+-------------+
|  1|             Technic|     NULL|         NULL|
|  3|         Competition|        1|      Technic|
|  4|      Expert Builder|        1|      Technic|
| 16|          RoboRiders|        1|      Technic|
| 17|      Speed Slammers|        1|      Technic|
| 18|           Star Wars|        1|      Technic|
| 19|        Supplemental|        1|      Technic|
| 20|     Throwbot Slizer|        1|      Technic|
| 21|Universal Buildin...|        1|      Technic|
| 22|             Creator|     NULL|         NULL|
| 34|       Make & Create|     NULL|         NULL|
| 35|       Bricks & More|       34|Make & Create|
| 50|                Town|     NULL|         NULL|
| 51|              Arctic|       50|         Town|
| 52|                City|     NULL|         NULL|
| 53|             Airport|       52|         City|
| 54|               Cargo|     

In [0]:
# Renaming columns
themes_with_parents_df = themes_with_parents_df.withColumnRenamed("id", "themeID").withColumnRenamed("name", "themeName").withColumnRenamed("parent_id","parentID")

In [0]:
# Creating another data frame, with sets data
sets_df = spark.read.option("inferSchema","true").option("header","true").csv(f"abfss://raw@{storageName}.dfs.core.windows.net/files/sets/Year={year}/Month={month}/Day={day}/*.csv.gz")

In [0]:
sets_df.show()

+------------+--------------------+----+--------+---------+--------------------+
|     set_num|                name|year|theme_id|num_parts|             img_url|
+------------+--------------------+----+--------+---------+--------------------+
|0003977811-1|Ninjago: Book of ...|2022|     761|        1|https://cdn.rebri...|
|       001-1|               Gears|1965|     756|       43|https://cdn.rebri...|
|      0011-2|   Town Mini-Figures|1979|      67|       12|https://cdn.rebri...|
|      0011-3|Castle 2 for 1 Bo...|1987|     199|        0|https://cdn.rebri...|
|      0012-1|  Space Mini-Figures|1979|     143|       12|https://cdn.rebri...|
|      0013-1|  Space Mini-Figures|1979|     143|       12|https://cdn.rebri...|
|      0014-1|  Space Mini-Figures|1979|     143|        2|https://cdn.rebri...|
|      0015-1|  Space Mini-Figures|1979|     143|       18|https://cdn.rebri...|
|      0016-1| Castle Mini Figures|1979|     189|       15|https://cdn.rebri...|
|       002-1|4.5V Samsonite

In [0]:
# Renaming columns in the sets DF
sets_df = sets_df.withColumnRenamed("set_num","setID").withColumnRenamed("name","setName").withColumnRenamed("year","releaseYear").withColumnRenamed("theme_id","themeID").withColumnRenamed("num_parts","partsCount").withColumnRenamed("img_url","imgURL")

In [0]:
# Merging both data frames
merged_df = sets_df.join(broadcast(themes_with_parents_df), sets_df.themeID == themes_with_parents_df.themeID, how='left').drop('themeID', 'parentID')

In [0]:
merged_df.show()

+------------+--------------------+-----------+----------+--------------------+--------------------+-----------+
|       setID|             setName|releaseYear|partsCount|              imgURL|           themeName|parentTheme|
+------------+--------------------+-----------+----------+--------------------+--------------------+-----------+
|0003977811-1|Ninjago: Book of ...|       2022|         1|https://cdn.rebri...|Activity Books wi...|      Books|
|       001-1|               Gears|       1965|        43|https://cdn.rebri...|           Samsonite|     System|
|      0011-2|   Town Mini-Figures|       1979|        12|https://cdn.rebri...|        Classic Town|       Town|
|      0011-3|Castle 2 for 1 Bo...|       1987|         0|https://cdn.rebri...|        Lion Knights|     Castle|
|      0012-1|  Space Mini-Figures|       1979|        12|https://cdn.rebri...|        Supplemental|      Space|
|      0013-1|  Space Mini-Figures|       1979|        12|https://cdn.rebri...|        Supplemen

In [0]:
# Adding 'Complexity' column with values based on parts count
merged_df = merged_df.withColumn('Complexity', when(merged_df.partsCount <= 10, 'Very Low').when(merged_df.partsCount <= 25, 'Low').when(merged_df.partsCount <= 50, 'Normal').when(merged_df.partsCount <= 100, 'Medium').when(merged_df.partsCount <= 300, 'High').otherwise('Very High'))

In [0]:
merged_df.show()

+------------+--------------------+-----------+----------+--------------------+--------------------+-----------+----------+
|       setID|             setName|releaseYear|partsCount|              imgURL|           themeName|parentTheme|Complexity|
+------------+--------------------+-----------+----------+--------------------+--------------------+-----------+----------+
|0003977811-1|Ninjago: Book of ...|       2022|         1|https://cdn.rebri...|Activity Books wi...|      Books|  Very Low|
|       001-1|               Gears|       1965|        43|https://cdn.rebri...|           Samsonite|     System|    Normal|
|      0011-2|   Town Mini-Figures|       1979|        12|https://cdn.rebri...|        Classic Town|       Town|       Low|
|      0011-3|Castle 2 for 1 Bo...|       1987|         0|https://cdn.rebri...|        Lion Knights|     Castle|  Very Low|
|      0012-1|  Space Mini-Figures|       1979|        12|https://cdn.rebri...|        Supplemental|      Space|       Low|
|      0

In [0]:
merged_df = merged_df.dropDuplicates()

In [0]:
merged_df.write.format("delta").mode("overwrite").save(f"abfss://curated@{storageName}.dfs.core.windows.net/files/sets/")