Installing dependencies

In [13]:
!pip install pyspark==3.1.3



In [14]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window

Start a Spark Session

In [15]:
spark = SparkSession\
    .builder\
    .appName("brewery_transformation")\
    .getOrCreate()

In [61]:
df = spark.read.json("../../datalake/Bronze/brewery_data")
df.show()

                                                                                

+--------------------+---------+---------+------------+--------------+-------------+--------------------+---------------+----------------+--------------------+------------+-----------+-------------+--------------+--------------------+--------------------+------------+
|           address_1|address_2|address_3|brewery_type|          city|      country|                  id|       latitude|       longitude|                name|       phone|postal_code|        state|state_province|              street|         website_url|extract_date|
+--------------------+---------+---------+------------+--------------+-------------+--------------------+---------------+----------------+--------------------+------------+-----------+-------------+--------------+--------------------+--------------------+------------+
|      1716 Topeka St|     null|     null|       micro|        Norman|United States|5128df48-79fc-4f0...|    35.25738891|    -97.46818222|    (405) Brewing Co|  4058160490| 73069-8224|     Okla

In [62]:
df.printSchema()

root
 |-- address_1: string (nullable = true)
 |-- address_2: string (nullable = true)
 |-- address_3: string (nullable = true)
 |-- brewery_type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- id: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_province: string (nullable = true)
 |-- street: string (nullable = true)
 |-- website_url: string (nullable = true)
 |-- extract_date: date (nullable = true)



In [63]:
df.select('extract_date').distinct().show()



+------------+
|extract_date|
+------------+
|  2025-06-29|
|  2025-07-01|
|  2025-06-30|
+------------+



                                                                                

In [64]:
# Filtering the latest extract_date
latest_date = df.agg(f.max('extract_date').alias('latest_date')).collect()[0]['latest_date']
print(f'The latest date is: {latest_date}')

The latest date is: 2025-07-01


Silver layer test

In [65]:
# df_main.coalesce(1).write.mode('overwrite').parquet('../../output/brewery_data_main')
# df_hist.coalesce(1).write.mode('overwrite').parquet('../../output/brewery_data_hist')

In [66]:
df_main_silver = spark.read.parquet('../../datalake/Silver/brewery_data/brewery_data_main')

In [67]:
df_main_silver.show()

+--------------------+---------+---------+------------+--------------+-------------+--------------------+---------------+----------------+--------------------+------------+-----------+-------------+--------------+--------------------+--------------------+------------+
|           address_1|address_2|address_3|brewery_type|          city|      country|                  id|       latitude|       longitude|                name|       phone|postal_code|        state|state_province|              street|         website_url|process_date|
+--------------------+---------+---------+------------+--------------+-------------+--------------------+---------------+----------------+--------------------+------------+-----------+-------------+--------------+--------------------+--------------------+------------+
|      1716 Topeka St|     null|     null|       micro|        Norman|United States|5128df48-79fc-4f0...|    35.25738891|    -97.46818222|    (405) Brewing Co|  4058160490| 73069-8224|     Okla

Aggregating test

In [68]:
df_latest_data = df_main_silver.filter(f.col('process_date') == latest_date)
df_agg_loc_type = df_latest_data.groupBy('brewery_type', 'country')\
    .count()\
    .orderBy('country', ascending=False)

df_agg_loc_type.show()



+------------+-------------+-----+
|brewery_type|      country|count|
+------------+-------------+-----+
|     brewpub|United States| 2402|
|     taproom|United States|    9|
|    contract|United States|  189|
|      closed|United States|  226|
|         bar|United States|    2|
|    regional|United States|  216|
|        nano|United States|   11|
|    planning|United States|  674|
|       micro|United States| 4163|
|    location|United States|    1|
|       large|United States|   74|
|  proprietor|United States|   69|
|       micro|       Sweden|   10|
|       micro|  South Korea|    5|
|     brewpub|  South Korea|   56|
|       large|    Singapore|    1|
|     brewpub|    Singapore|    4|
|         bar|    Singapore|   28|
|      closed|     Scotland|    1|
|       micro|     Scotland|    9|
+------------+-------------+-----+
only showing top 20 rows



                                                                                

Reading the Gold Layer

In [69]:
df_gold = spark.read.parquet('../../datalake/Gold/brewery_data/brewery_data_main')
df_gold.show()

+------------+-------------+-----+------------+
|brewery_type|      country|count|process_date|
+------------+-------------+-----+------------+
|    regional|United States|  216|  2025-07-01|
|       micro|United States| 4163|  2025-07-01|
|        nano|United States|   11|  2025-07-01|
|         bar|United States|    2|  2025-07-01|
|       large|United States|   74|  2025-07-01|
|      closed|United States|  226|  2025-07-01|
|    location|United States|    1|  2025-07-01|
|    planning|United States|  674|  2025-07-01|
|     taproom|United States|    9|  2025-07-01|
|     brewpub|United States| 2402|  2025-07-01|
|    contract|United States|  189|  2025-07-01|
|  proprietor|United States|   69|  2025-07-01|
|       micro|       Sweden|   10|  2025-07-01|
|     brewpub|  South Korea|   56|  2025-07-01|
|       micro|  South Korea|    5|  2025-07-01|
|       large|    Singapore|    1|  2025-07-01|
|         bar|    Singapore|   28|  2025-07-01|
|     brewpub|    Singapore|    4|  2025