<a href="https://colab.research.google.com/github/carsofferrei/04_data_processing/blob/main/spark/challenges/challenges_CF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [19]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import requests


class ETLFlow:
    def __init__(self, spark: SparkSession) -> None:
        self.spark = spark
        self.spark.sparkContext.setLogLevel("INFO")

    def extract_from_file(self, format: str, path: str, **kwargs) -> DataFrame:
        df = self.spark.read.format(format).load(path)
        return df

    def extract_from_api(self, url: str, schema: StructType = None):
      response = requests.get(url)
      rdd = spark.sparkContext.parallelize(response.json())
      if schema:
        df = spark.read.schema(schema).json(rdd)
      else:
        df = spark.read.json(rdd)
      return df

    def load(self, df: DataFrame, aggregate_file: bool = True, n_files: int = 1, partition_by: str = None, format: str = "parquet", path: str = None, **kwargs) -> None:
      print(f"Options on load: aggregate_file = {aggregate_file}, saved in {n_files} files on folder and partitioned by {partition_by} in {format}.")
      if aggregate_file:
        if partition_by is not None:
          df.coalesce(n_files).write.mode("overwrite").partitionBy(partition_by).format(format).save(path)
        else:
          df.coalesce(n_files).write.mode("overwrite").format(format).save(path)
      else:
        df.write.mode("overwrite").format(format).save(path)


class ETLTask(ETLFlow):

    def __init__(self, spark: SparkSession) -> None:
        self.spark = spark

    def ingestion_vehicles(self):
      print("Starting ingestion process for vehicles.")
      print("Defining the schema for vehicles.")
      vehicle_schema = StructType([StructField('bearing', IntegerType(), True),
                                  StructField('block_id', StringType(), True),
                                  StructField('current_status', StringType(), True),
                                  StructField('id', StringType(), True),
                                  StructField('lat', FloatType(), True),
                                  StructField('line_id', StringType(), True),
                                  StructField('lon', FloatType(), True),
                                  StructField('pattern_id', StringType(), True),
                                  StructField('route_id', StringType(), True),
                                  StructField('schedule_relationship', StringType(), True),
                                  StructField('shift_id', StringType(), True),
                                  StructField('speed', FloatType(), True),
                                  StructField('stop_id', StringType(), True),
                                  StructField('timestamp', TimestampType(), True),
                                  StructField('trip_id', StringType(), True)])

      print(f"Extracting from API")
      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/vehicles", schema=vehicle_schema)
      print(f"Creating date column")
      df = df.withColumn("date", date_format('timestamp', "yyyyMMdd"))
      print(f"Loading on bronze layer")
      self.load(df=df, aggregate_file = True, n_files = 1, partition_by = "date", format="parquet", path="/content/lake/bronze/vehicles")


    def ingestion_lines(self):
      print(f"Defining the schema for lines")
      lines_schema = StructType([StructField('_corrupt_record', StringType(), True),
                           StructField('color', StringType(), True),
                           StructField('facilities', StringType(), True),
                           StructField('id', StringType(), True),
                           StructField('localities', ArrayType(StringType()), True),
                           StructField('long_name', StringType(), True),
                           StructField('municipalities', ArrayType(StringType()), True),
                           StructField('patterns', ArrayType(StringType()), True),
                           StructField('routes', StringType(), True),
                           StructField('short_name', StringType(), True),
                           StructField('text_color', StringType(), True)
                           ])

      print(f"Extracting from API")
      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/lines", schema=lines_schema)
      print(f"Loading on bronze layer")
      self.load(df=df, format="parquet", path="/content/lake/bronze/lines")


    def ingestion_municipalities(self):
      print(f"Defining the schema for municipalities")
      municipalities_schema = StructType([StructField('district_id', StringType(), True),
                                    StructField('district_name', StringType(), True),
                                    StructField('id', StringType(), True),
                                    StructField('name', StringType(), True),
                                    StructField('prefix', StringType(), True),
                                    StructField('region_id', StringType(), True),
                                    StructField('region_name', StringType(), True)
                                    ])

      print(f"Extracting from API")
      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/municipalities", schema=municipalities_schema)
      print(f"Loading on bronze layer")
      self.load(df=df, format="parquet", path="/content/lake/bronze/municipalities")


    def cleansing_vehicles(self, df: DataFrame):
      print(f"Reading vehicles file from bronze layer")
      df = self.extract_from_file(format="parquet", path="/content/lake/bronze/vehicles")

      print(f"Starting applying some transformations to the Dataframe")
      print(f"Renaming some columns: lat - latitude and lon - longitude")
      df = df.withColumnRenamed("lat", "latitude").withColumnRenamed("lon", "longitude")

      print(f"Removing duplicate records")
      df = df.dropDuplicates()
      print(f"Removing records where CORRENT_STATUS is null")
      df = df.filter(df["current_status"].isNotNull())

      print(f"Loading vehicles on silver layer")
      self.load(df=df, aggregate_file = True, n_files = 1, partition_by = "date", format="parquet", path="/content/lake/silver/vehicles")



    def cleansing_lines(self, df: DataFrame):
      print(f"Reading lines file from bronze layer")
      df = self.extract_from_file(format="parquet", path="/content/lake/bronze/lines")


      print(f"Starting applying some transformations to the Dataframe")
      print(f"Removing duplicate records")
      df = df.dropDuplicates()
      print(f"Removing records where LONG_NAME is null")
      df = df.filter(df["long_name"].isNotNull())
      print(f"Removing corrupt records")
      df = df.filter(df["_corrupt_record"].isNull())

      print(f"Loading lines on silver layer")
      self.load(df=df, format="parquet", path="/content/lake/silver/lines")



    def cleansing_municipalities(self, df: DataFrame):
      print(f"Reading lines file from bronze layer")
      df = self.extract_from_file(format="parquet", path="/content/lake/bronze/municipalities")

      print(f"Starting applying some transformations to the Dataframe")
      print(f"Removing duplicate records")
      df = df.dropDuplicates()
      print(f"Removing records where name OR district_name are null")
      df = df.filter((df["name"].isNotNull()) | (df["district_name"].isNotNull()))

      print(f"Loading lines on silver layer")
      self.load(df=df, format="parquet", path="/content/lake/silver/municipalities")



    def enrich(self, path: str = "/content/lake/silver"):
      vehicles = self.extract_from_file(format="parquet", path = f"{path}/vehicles")
      lines = self.extract_from_file(format="parquet", path = f"{path}/lines")
      municipalities = self.extract_from_file(format="parquet", path = f"{path}/municipalities")

      print(f"Extracting info from the array column on lines DataFrame")
      lines_treated = lines.select("facilities", "id", "localities", explode(lines.municipalities).alias("municipalities_id")).dropDuplicates()
      lines_treated = lines_treated.withColumnRenamed("id", "lines_id")


      print(f"Joining vehicles with lines information")
      vehicles_lines = vehicles.join(lines_treated, vehicles['line_id'] == lines_treated['lines_id'], how = 'left')

      print(f"Joining previous DataFrame with municipalities information")
      municipalities = municipalities.withColumnRenamed("id", "id_municipalities")
      vehicles_enriched = vehicles_lines.join(municipalities, lines_treated['municipalities_id'] == municipalities['id_municipalities'], how = 'left')

      print(f"Keep only some columns and remove duplicated records")
      vvehicles_enriched = vehicles_enriched.select(
                                                      "line_id"
                                                    , "current_status"
                                                    , "schedule_relationship"
                                                    , "shift_id"
                                                    , "speed"
                                                    , "stop_id"
                                                    , "date"
                                                    , "facilities"
                                                    , "municipalities_id"
                                                    , "district_name"
                                                    , "name"
                                                    , "prefix"
                                                    , "region_id"
                                                    , "region_name"
                                                ).dropDuplicates()


      print(f"Removing records where name OR district_name are null")
      vehicles_enriched = vehicles_enriched.filter((vehicles_enriched["name"].isNotNull()) | (vvehicles_enriched["district_name"].isNotNull()))


      print(f"Loading vehicles_enriched on gold layer")
      self.load(df = vehicles_enriched, aggregate_file = True, n_files = 1, partition_by = "date", format="parquet", path="/content/lake/gold/vehicles_enriched")


    def answers(self):

      vehicles_enriched = self.extract_from_file(format="parquet", path = "/content/lake/gold/vehicles_enriched")

      data_for_answers = vehicles_enriched.select("line_id", "name", "speed").dropDuplicates()
      data = data_for_answers.groupBy("name").agg(
                                                  count_distinct("line_id").alias("count_line_ids")
                                                , round(sum("speed"),2).alias("sum_speed")
                                                )

      print("What are the top 3 municipalities by vehicles routes?")
      print("The answers is: ")
      data.sort(data.count_line_ids.desc()).limit(3).select("name").show()

      print("What are the top 3 municipalities with higher speed on average?")
      print("The answers is: ")
      data.withColumn("average_speed",round((col("sum_speed") / col("count_line_ids")),2)).orderBy(col("average_speed").desc()).limit(3).select("name").show()



if __name__ == '__main__':

    # init spark
    from pyspark.sql import SparkSession
    print("Initializing Spark session")
    spark = SparkSession.builder.master('local').appName('ETL Program').getOrCreate()

    print("Starting ETL program")
    etl = ETLTask(spark)

    # run tasks
    print("Running Task - Ingestion")
    bronze_vehicles = etl.ingestion_vehicles()
    bronze_lines = etl.ingestion_lines()
    bronze_municipalities = etl.ingestion_municipalities()

    print("Running Task - Cleansing")
    silver_vehicles = etl.cleansing_vehicles(df = bronze_vehicles)
    silver_lines = etl.cleansing_lines(df = bronze_lines)
    silver_municipalities = etl.cleansing_municipalities(df = bronze_municipalities)

    print("Running Task - Enrich")
    etl.enrich()

    print("ETL program completed")

    print("Lets answers some questions:")
    etl.answers()



Initializing Spark session
Starting ETL program
Running Task - Ingestion
Starting ingestion process for vehicles.
Defining the schema for vehicles.
Extracting from API
Creating date column
Loading on bronze layer
Options on load: aggregate_file = True, saved in 1 files on folder and partitioned by date in parquet.
Defining the schema for lines
Extracting from API
Loading on bronze layer
Options on load: aggregate_file = True, saved in 1 files on folder and partitioned by None in parquet.
Defining the schema for municipalities
Extracting from API
Loading on bronze layer
Options on load: aggregate_file = True, saved in 1 files on folder and partitioned by None in parquet.
Running Task - Cleansing
Reading vehicles file from bronze layer
Starting applying some transformations to the Dataframe
Renaming some columns: lat - latitude and lon - longitude
Removing duplicate records
Removing records where CORRENT_STATUS is null
Loading vehicles on silver layer
Options on load: aggregate_file = Tr