In [1]:

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *

spark = SparkSession.builder.master('local').appName('Enrich').getOrCreate()

24/12/01 23:53:17 WARN Utils: Your hostname, Iness-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.9 instead (on interface en0)
24/12/01 23:53:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/01 23:53:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/01 23:53:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/12/01 23:53:18 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
lines_result=spark.read.format("parquet").load("./content/lake/silver/lines/*")
vehicles_result = spark.read.format("parquet").load("./content/lake/silver/vehicles/*")
municipalities_result = spark.read.format("parquet").load("./content/lake/silver/municipalities/*")

                                                                                

In [68]:
# Check data for debug
lines_result.sort('id').show(2)
vehicles_result.sort('line_id').show(2)
municipalities_result.show(2)

+-------+----------+----+--------------------+--------------------+--------------+--------------------+--------+----------+----------+
|  color|facilities|  id|          localities|           long_name|municipalities|            patterns|  routes|short_name|text_color|
+-------+----------+----+--------------------+--------------------+--------------+--------------------+--------+----------+----------+
|#C61D23|        []|1001|[Alfragide, Amado...|Alfragide (Estr S...|        [1115]|[1001_0_1, 1001_0_2]|[1001_0]|      1001|   #FFFFFF|
|#C61D23|        []|1002|[Reboleira, Amado...|Reboleira (Estaçã...|        [1115]|          [1002_0_3]|[1002_0]|      1002|   #FFFFFF|
+-------+----------+----+--------------------+--------------------+--------------+--------------------+--------+----------+----------+
only showing top 2 rows

+-------+---------+--------------+-------+---------+-------+---------+----------+--------+---------------------+--------+---------+-------+-------------------+------

In [26]:
# Prefix the column names to avoid ambiguous columns (ex: id)
df = (vehicles_result
      .select(
          [F.col(c).alias("vh_"+c) 
               for c in vehicles_result.columns]
               )
        )

In [41]:
#Add long_name and municipalities from lines
df_lines = (df
            .join(lines_result,
                  df["vh_line_id"] == lines_result["id"],
                  how = "inner")
            .select(df.columns+['long_name','municipalities']))

In [63]:
#Explode data to turn ids into names and re-group 
temp_df = df_lines.withColumn('municipalities', 
                              explode(df_lines.municipalities)
                              )

enriched_data = (temp_df
                 .join(municipalities_result,
                       temp_df.municipalities==municipalities_result.id,
                       how='inner')
                 .groupBy(df.columns+['long_name'])
                 .agg(array_agg('name').alias('municipalities_name'))
                 .withColumn("date", expr("date(vh_timestamp)"))
                )

In [69]:
#Remove prefix from columns
cleaned_columns = (enriched_data
                   .select(
                       [F.col(c).alias(c.replace('vh_','')) for c in enriched_data.columns]
                       )
                       )

In [67]:
#Save gold layer
(cleaned_columns
 .coalesce(1).write.mode("overwrite")
 .format("parquet")
 .save("./content/lake/gold/vehicles_enriched"))

<a href="https://colab.research.google.com/github/lucprosa/dataeng-basic-course/blob/main/spark/challenges/challenge_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CHALLENGE 3
##  Implement ENRICH process
- Set up path in the "lake"
  - !mkdir -p /content/lake/gold

- Read data from SILVER layer
  - Paths:
    - vehicles - path: /content/lake/silver/vehicles
    - lines - path: /content/lake/silver/lines
    - municipalities - path: /content/lake/silver/municipalities
  - Use StructFields to enforce schema

- Enrichment
  - Enrich vehicles dataset with information from the line and municipalities
    - join vehicles with lines and municipalities
      - select all columns from vehicles + lines.long_name (name: line_name, format:string) + municipalities.name (name: municipality_name, format: array)
      - Note that "municipalities.name" is an array

- Write data as PARQUET into the GOLD layer (/content/lake/gold)
  - Dataset name: vehicles_enriched
  - Partition "vehicles_enriched" by "date" column
  - Paths:
    - vehicles - path: /content/lake/gold/vehicles_enriched
  - Make sure there is only 1 single parquet created
  - Use overwrite as write mode

# Setting up PySpark

In [None]:
%pip install pyspark

