<a href="https://colab.research.google.com/github/ducline/BST/blob/master/spark/challenges/challenge_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CHALLENGE 3
##  Implement ENRICH process
- Set up path in the "lake"
  - !mkdir -p /content/lake/gold

- Read data from SILVER layer
  - Paths:
    - vehicles - path: /content/lake/silver/vehicles
    - lines - path: /content/lake/silver/lines
    - municipalities - path: /content/lake/silver/municipalities
  - Use StructFields to enforce schema

- Enrichment
  - Enrich vehicles dataset with information from the line and municipalities
    - join vehicles with lines and municipalities
      - select all columns from vehicles + lines.long_name (name: line_name, format:string) + municipalities.name (name: municipality_name, format: array)
      - Note that "municipalities.name" is an array

- Write data as PARQUET into the GOLD layer (/content/lake/gold)
  - Dataset name: vehicles_enriched
  - Partition "vehicles_enriched" by "date" column
  - Paths:
    - vehicles - path: /content/lake/gold/vehicles_enriched
  - Make sure there is only 1 single parquet created
  - Use overwrite as write mode

# Setting up PySpark

In [None]:
!mkdir -p /content/lake/gold

In [None]:
%pip install pyspark



In [None]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *
import requests

class ETLFlow:

    def __init__(self, spark: SparkSession) -> None:
        self.spark = spark

    def extract_from_file(self, format: str, path: str, **kwargs) -> DataFrame:
        df = self.spark.read.format(format).load(path)
        return df

    def extract_from_api(self, url: str, schema: StructType = None):
        response = requests.get(url)
        rdd = spark.sparkContext.parallelize(response.json())

        if schema:
            df = spark.read.schema(schema).json(rdd)
        else:
            df = spark.read.json(rdd)
        return df

    def load(self, df: DataFrame, format: str, path: str, partition_column: str = None, **kwargs) -> None:
        if partition_column:
            df.coalesce(1).write.mode("overwrite").partitionBy(partition_column).format(format).save(path)
        else:
            df.coalesce(1).write.mode("overwrite").format(format).save(path)

class ETLTask(ETLFlow):

    def __init__(self, spark: SparkSession) -> None:
        super().__init__(spark)

    def enrich_vehicles(self):
        # Leitura dos dados no layer Silver
        vehicles_df = self.extract_from_file(format="parquet", path="/content/lake/silver/vehicles")
        lines_df = self.extract_from_file(format="parquet", path="/content/lake/silver/lines")
        municipalities_df = self.extract_from_file(format="parquet", path="/content/lake/silver/municipalities")

        # Join entre vehicles, lines e municipalities
        enriched_df = vehicles_df \
            .join(lines_df, vehicles_df["line_id"] == lines_df["id"], "left") \
            .join(municipalities_df, municipalities_df["id"].isin(vehicles_df["municipality_ids"]), "left")

        # Seleção das colunas desejadas
        enriched_df = enriched_df \
            .select(
                *vehicles_df.columns,
                lines_df["long_name"].alias("line_name"),
                municipalities_df["name"].alias("municipality_name")
            )

        # Gravação do dataset enriquecido no layer Gold, particionando por "date"
        self.load(df=enriched_df, format="parquet", path="/content/lake/gold/vehicles_enriched", partition_column="date")

if __name__ == '__main__':
    spark = SparkSession.builder.master('local').appName('ETL Program').getOrCreate()

    print("Starting ETL program")
    etl = ETLTask(spark)

    print("Running Task - Enrich Vehicles")
    etl.enrich_vehicles()

    print("ETL program completed")