## 1. Spark setup 🌠

we will import the required libraries and setup the configs required to start the spark session

In [54]:
# NOTE: this scirpt is a driver program which creates the SparkContext 
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

print(pyspark.__version__)
print(pyspark.__file__)

import pandas as pd
from pyspark.sql.functions import col, when, round, format_number, initcap, concat, lit
from pyspark.sql import types
from pyspark.sql import functions as F

# remove side padding in notebook : https://stackoverflow.com/a/34058270/19268172
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

3.3.3
/home/shaikh/spark/spark-3.3.3-bin-hadoop3/python/pyspark/__init__.py


In [55]:
credentials_location = "/home/shaikh/.google/credentials/ttc-data-analytics-key.json"

# Configure SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('spark_etl') \
    .config("spark.jars", "./lib/gcs-connector-hadoop3-2.2.11.jar, ./lib/spark-bigquery-with-dependencies_2.12-0.24.0.jar") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location) \
    .getOrCreate()

# Configure Hadoop Configuration
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

spark
# we stored the connector files in the lib/ dir :
# !ls lib

## 1.1 Understanding source data columns 📅


> Subway

|Field Name|Description|Example|
|---|---|---|
|Date|Date (YYYY/MM/DD)|12/31/2016|
|Time|Time (24h clock)|1:59|
|Day|Name of the day of the week|Saturday|
|Station|TTC subway station name|Rosedale Station|
|Code|TTC delay code|MUIS|
|Min Delay|Delay (in minutes) to subway service|5|
|Min Gap|Time length (in minutes) between trains|9|
|Bound|Direction of train dependant on the line|N|
|Line|TTC subway line i.e. YU, BD, SHP, and SRT|YU|
|Vehicle|TTC train number|5961|

we have a **delay code** lookup file which we can join with `Code` column 

we also have **line code** lookup values :  
Line 1 Yonge-University (YU), Line 2 Bloor-Danforth (BD), Line 3 Scarborough (SRT), Line 4 Sheppard (SHP) which we can join with the `Line` column

> Bus

|Field Name|Description|Example|
|---|---|---|
|Report Date|The date (YYYY/MM/DD) when the delay-causing incident occurred|6/20/2017|
|Route|The number of the bus route|51|
|Time|Time (24h clock) when the delay-causing incident occurred|12:35:00 AM|
|Day|The name of the day|Monday|
|Location|The location of the delay-causing incident|York Mills Station|
|Incident|The description of the delay-causing incident|Mechanical|
|Min Delay|The delay, in minutes, to the schedule for the following bus|10|
|Min Gap|The total scheduled time, in minutes, from the bus ahead of the following bus|20|
|Direction|The direction of the bus route where B,b or BW indicates both ways. <br>(On an east west route, it includes both east and west)<br>NB - northbound, SB - southbound, EB - eastbound, WB - westbound|N||
|Vehicle|Vehicle number|1057|

> Streetcar

|Field Name|Description|Example|
|---|---|---|
|Report Date|The date (YYYY/MM/DD) when the delay-causing incident occurred|6/20/2017|
|Route|The number of the streetcar route|51|
|Time|Time (24h clock) when the delay-causing incident occurred|12:35:00 AM|
|Day|The name of the day|Monday|
|Location|The location of the delay-causing incident|York Mills Station|
|Incident|The description of the delay-causing incident|Mechanical|
|Min Delay|The delay, in minutes, to the schedule for the following streetcar|10|
|Min Gap|The total scheduled time, in minutes, from the streetcar ahead of the following streetcar|20|
|Direction|The direction of the bus route where B,b or BW indicates both ways.<br>(On an east west route, it includes both east and west)<br>NB - northbound, SB - southbound, EB - eastbound, WB - westbound<br>The direction is not case sensitive|N|
|Vehicle|Vehicle number|1057|
		

## 2. Extracting data from gcs 📤

This will be the first part of the ETL process - The **Extraction** (Reading) process

to read data from gcs, we follow these steps : [read data from gcs using spark (from boslai's notes)](https://github.com/boisalai/de-zoomcamp-2023/blob/main/week5.md#setup-to-read-from-gcs)

In [56]:
df_bus = spark.read.parquet('gs://ttc_data_lake_ttc-data-analytics/bus_delay_data/*')
df_subway = spark.read.parquet('gs://ttc_data_lake_ttc-data-analytics/subway_delay_data/ttc-subway*')
df_streetcar = spark.read.parquet('gs://ttc_data_lake_ttc-data-analytics/streetcar_delay_data/*')

print("Bus DataFrame Schema:")
df_bus.printSchema()

print("\nStreetcar DataFrame Schema:")
df_streetcar.printSchema()

print("\nSubway DataFrame Schema:")
df_subway.printSchema()

Bus DataFrame Schema:
root
 |-- Date: timestamp (nullable = true)
 |-- Route: long (nullable = true)
 |-- Time: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Incident: string (nullable = true)
 |-- Min Delay: long (nullable = true)
 |-- Min Gap: long (nullable = true)
 |-- Direction: string (nullable = true)
 |-- Vehicle: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)


Streetcar DataFrame Schema:
root
 |-- Date: timestamp (nullable = true)
 |-- Line: long (nullable = true)
 |-- Time: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Incident: string (nullable = true)
 |-- Min Delay: long (nullable = true)
 |-- Min Gap: long (nullable = true)
 |-- Bound: string (nullable = true)
 |-- Vehicle: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)


Subway DataFrame Schema:
root
 |-- Date: timestamp (nullable = true)
 |-- Time: string (n

## 3. Transformations🔧

This is the 2nd step of the ETL process, the **Transformation** process. Now we will modify the extracted data so as to make it fit for being used as a table which will later be used to create dashboards.

**1. Extra unwanted column**

we have an extra column named `index_level_0` in your DataFrame that you didn't have in the original source `.xlsx` file.  
This could be due to the index column being converted to a regular column during processing.  

In pandas, an index is a special column that serves as a unique identifier for each row in a DataFrame or Series.
By default, when you create a DataFrame in pandas, it automatically assigns a numeric index to each row, starting from 0 and incrementing by 1.

we will remove this using `df_bus = df_bus.drop("__index_level_0__")`

---
**2. converting `long` types to `int` as it occupies less memory**  

we will use the `.cast("int"))` method

---

**3. Transforming column values to lower case**

The columns `Station` and `Location` have all values in Caps, so we have to change it  

we also want to capitalize the first letter of each word in a column while converting the rest of the letters to lowercase, so we can use the `initcap()` method from `pyspark.sql.functions`. using ref : https://stackoverflow.com/a/68370448/19268172

In [57]:
# 1. dropping the index column
df_bus = df_bus.drop("__index_level_0__")
df_streetcar = df_streetcar.drop("__index_level_0__")

# 2. Convert common Long Type columns to Integer Type
for col_name in ["Min Delay", "Min Gap", "Vehicle"]:
    df_bus = df_bus.withColumn(col_name, col(col_name).cast("int"))
    df_subway = df_subway.withColumn(col_name, col(col_name).cast("int"))
    df_streetcar = df_streetcar.withColumn(col_name, col(col_name).cast("int"))
    # ↪ `col(col_name).cast("int")` will be the new value for `col_name`

# convert un-common Long Type columns to Integer Type
df_streetcar = df_streetcar.withColumn("Line", col("Line").cast("int"))
df_bus = df_bus.withColumn("Route", col("Route").cast("int"))

# 3. Capitalize the first letter of each word in the "Location" column
df_bus = df_bus.withColumn("Location", initcap("Location"))
df_streetcar = df_streetcar.withColumn("Location", initcap("Location"))
df_subway = df_subway.withColumn("Station", initcap("Station"))

# check the results
print("Bus DataFrame Schema:")
df_bus.printSchema()

df_bus.show(5)

Bus DataFrame Schema:
root
 |-- Date: timestamp (nullable = true)
 |-- Route: integer (nullable = true)
 |-- Time: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Incident: string (nullable = true)
 |-- Min Delay: integer (nullable = true)
 |-- Min Gap: integer (nullable = true)
 |-- Direction: string (nullable = true)
 |-- Vehicle: integer (nullable = true)

+-------------------+-----+-----+--------+--------------------+--------------------+---------+-------+---------+-------+
|               Date|Route| Time|     Day|            Location|            Incident|Min Delay|Min Gap|Direction|Vehicle|
+-------------------+-----+-----+--------+--------------------+--------------------+---------+-------+---------+-------+
|2022-01-01 00:00:00|  320|02:00|Saturday|    Yonge And Dundas|       General Delay|        0|      0|     null|   8531|
|2022-01-01 00:00:00|  325|02:00|Saturday|Overlea And Thorc...|           Diversion|      131|   

## 3.1 Continuing Transformations 🛠️

3. we use `F.to_date()` built-in Spark function that converts a timestamp to date format (year, month and day only, no hour and minute). on `Date` column
---
4. we need to rename some columns before merging (union) so that we can distinguish between the types
---
5. We need to deal with column having `null` values
---
6. we need to create a union the 3 dfs to create one single df (ie. a table in BQ)

In [58]:
# 3. modifying the DataFrame inside the loop won't affect the original DataFrames unless we reassign them.
dataframes = [df_bus, df_streetcar, df_subway]

for i in range(len(dataframes)):
    dataframes[i] = dataframes[i].withColumn('Date', F.to_date(dataframes[i].Date))

# Re-assign the modified DataFrames to their original variable names
df_bus, df_streetcar, df_subway = dataframes


# 4. Rename some columns
df_bus = df_bus \
    .withColumnRenamed('Route', 'Bus_Route') \
    .withColumnRenamed('Vehicle', 'Bus_Number')

df_streetcar = df_streetcar \
    .withColumnRenamed('Line', 'Strcar_Route') \
    .withColumnRenamed('Bound', 'Direction') \
    .withColumnRenamed('Vehicle', 'Strcar_Number')

df_subway = df_subway \
    .withColumnRenamed('Bound', 'Direction') \
    .withColumnRenamed('Vehicle', 'Train_Num')

# check if the columns contain any un-common values

count1 = df_bus.select("Incident") \
    .exceptAll(df_streetcar.select("Incident")) \
    .distinct().count()

count2 = df_streetcar.select("Incident") \
    .exceptAll(df_bus.select("Incident")) \
    .distinct().count()

print("Incident un-common values : ",count1+count2)

print("Incident common values : ",df_bus.select("Incident") \
    .intersect(df_streetcar.select("Incident")) \
    .count())

# same for column "Location"
count1 = df_bus.select("Location") \
    .exceptAll(df_streetcar.select("Location")) \
    .distinct().count()

count2 = df_streetcar.select("Location") \
    .exceptAll(df_bus.select("Location")) \
    .distinct().count()

print("\nLocation un-common values : ",count1+count2)

print("Location common values : ",df_bus.select("Location") \
    .intersect(df_streetcar.select("Location")) \
    .count())

# check the results
print("\nBus DataFrame :")
df_bus.show(5)

print("\nStreetcar DataFrame :")
df_streetcar.show(5)

print("\nSubway DataFrame :")
df_subway.show(5)

Incident un-common values :  19
Incident common values :  11

Location un-common values :  14934
Location common values :  625

Bus DataFrame :
+----------+---------+-----+--------+--------------------+--------------------+---------+-------+---------+----------+
|      Date|Bus_Route| Time|     Day|            Location|            Incident|Min Delay|Min Gap|Direction|Bus_Number|
+----------+---------+-----+--------+--------------------+--------------------+---------+-------+---------+----------+
|2022-01-01|      320|02:00|Saturday|    Yonge And Dundas|       General Delay|        0|      0|     null|      8531|
|2022-01-01|      325|02:00|Saturday|Overlea And Thorc...|           Diversion|      131|    161|        W|      8658|
|2022-01-01|      320|02:00|Saturday|   Yonge And Steeles|Operations - Oper...|       17|     20|        S|         0|
|2022-01-01|      320|02:07|Saturday|   Yonge And Steeles|Operations - Oper...|        4|     11|        S|         0|
|2022-01-01|      320|0

## 3.2 Data Cleaning 🧹

we have a `ttc-delay-code.parquet` lookup file which we can join with `Code` column `df_subway`. But, the content of this file has to be cleaned using various transformations and applying union! 

In [59]:
df_zone_lookup = spark.read.parquet('gs://ttc_data_lake_ttc-data-analytics/subway_delay_data/ttc-delay-code.parquet')
# print(df_zone_lookup.count())
# df_zone_lookup.show()

# Drop the unwanted columns
columns_to_drop = ["Unnamed: 0", "Unnamed: 1", "Unnamed: 4", "Unnamed: 5"]
df_zone_lookup = df_zone_lookup.drop(*columns_to_drop)

# Rename some columns
df_subway = df_subway.withColumnRenamed("Delay Code", "Delay_Code")

df_zone_lookup = df_zone_lookup \
    .withColumnRenamed('Unnamed: 2', 'Delay_Code1') \
    .withColumnRenamed('Unnamed: 6', 'Delay_Code2') \
    .withColumnRenamed('Unnamed: 3', 'Code_Description1') \
    .withColumnRenamed('Unnamed: 7', 'Code_Description2') \

# Remove the first row
df_zone_lookup = df_zone_lookup.filter(df_zone_lookup.Delay_Code1 != "SUB RMENU CODE")

# merging duplicate columns by first seperate dfs for subway and srt and then apply union :

# Select and rename columns using alias
df_zone_lookup_sub = df_zone_lookup.select(
    df_zone_lookup.Delay_Code1.alias("Delay_Code"),
    df_zone_lookup.Code_Description1.alias("Code_Description")
)

# df_zone_lookup_sub.show()

df_zone_lookup_srt = df_zone_lookup.select(
    df_zone_lookup.Delay_Code2.alias("Delay_Code"),
    df_zone_lookup.Code_Description2.alias("Code_Description")
)

# df_zone_lookup_srt.show()

df_zone_lookup = df_zone_lookup_sub.unionAll(df_zone_lookup_srt).filter(col("Delay_Code").isNotNull())

# confirm results
df_zone_lookup.show(5)
df_zone_lookup.count() # 129 + 71 = 200

+----------+-------------------+
|Delay_Code|   Code_Description|
+----------+-------------------+
|      EUAC|   Air Conditioning|
|      EUAL|Alternating Current|
|     EUATC| ATC RC&S Equipment|
|      EUBK|             Brakes|
|      EUBO|               Body|
+----------+-------------------+
only showing top 5 rows



200

## 3.3 Union and joins 🎇

Now that that we have cleaned `df_zone_lookup`, its time to join it with `df_subway` on the `Delay_Code`/`Code` columns

we also have **line code** lookup values :  
Line 1 Yonge-University (YU), Line 2 Bloor-Danforth (BD), Line 3 Scarborough (SRT), Line 4 Sheppard (SHP) which we can join with the `Line` column of `df_subway`

common columns from all the 3 dfs to create one union df

type column to refer the 3 types

union of `df_bus` and `df_streetcar` only ?

In [60]:
# NOTE: joins df_subway with df_zone_lookup to get columns : Delay_Code, Code_Description:

df_subway = df_subway.join(df_zone_lookup, df_subway.Code == df_zone_lookup.Delay_Code)
# ↪ NOTE: use on=['col1'] if col1 is present in both dfs

df_subway = df_subway.drop('Code')

# replace Line codes with values :
'''
replace values of `Line` column of df_subway with :

case "YU" : "Yonge-University (YU)"
case "BD" : "Bloor-Danforth (BD)"
case "SRT" : "Scarborough (SRT)"
case "SHP" : "Sheppard (SHP)"
'''

df_subway = df_subway.withColumn("Line", 
                                 when(col("Line") == "YU", "Yonge-University (YU)")
                                 .when(col("Line") == "BD", "Bloor-Danforth (BD)")
                                 .when(col("Line") == "SRT", "Scarborough (SRT)")
                                 .when(col("Line") == "SHP", "Sheppard (SHP)")
                                 .otherwise(col("Line")))

df_subway.show(5)

+----------+-----+--------+--------------------+---------+-------+---------+--------------------+---------+----------+--------------------+
|      Date| Time|     Day|             Station|Min Delay|Min Gap|Direction|                Line|Train_Num|Delay_Code|    Code_Description|
+----------+-----+--------+--------------------+---------+-------+---------+--------------------+---------+----------+--------------------+
|2022-01-01|15:59|Saturday|Lawrence East Sta...|        0|      0|        N|   Scarborough (SRT)|     3023|      SRDP|   Disorderly Patron|
|2022-01-01|02:23|Saturday|  Spadina Bd Station|        0|      0|     null| Bloor-Danforth (BD)|        0|      MUIS|Injured or ill Cu...|
|2022-01-01|22:00|Saturday|Kennedy Srt Stati...|        0|      0|     null|   Scarborough (SRT)|        0|       MRO| Miscellaneous Other|
|2022-01-01|02:28|Saturday|  Vaughan Mc Station|        0|      0|     null|Yonge-University ...|        0|      MUIS|Injured or ill Cu...|
|2022-01-01|02:34|Sa

In [61]:
# pre-union : Create the list of columns present in the two datasets
common_colums = []
bus_columns = set(df_bus.columns)

for col in df_streetcar.columns:
    if col in bus_columns:
        common_colums.append(col)

# Create a column `service_type` indicating where the data comes from.

df_bus_sel = df_bus \
    .select(common_colums) \
    .withColumn('service_type', F.lit('bus'))

df_streetcar_sel = df_streetcar \
    .select(common_colums) \
    .withColumn('service_type', F.lit('streetcar'))

# Create a new DataFrame containing union of rows of green and yellow DataFrame.
df_road_delay_data = df_bus_sel.unionAll(df_streetcar_sel)

df_road_delay_data.groupBy('service_type').count().show()

df_road_delay_data.show(5)

+------------+-----+
|service_type|count|
+------------+-----+
|         bus|78176|
|   streetcar|23366|
+------------+-----+

+----------+-----+--------+--------------------+--------------------+---------+-------+---------+------------+
|      Date| Time|     Day|            Location|            Incident|Min Delay|Min Gap|Direction|service_type|
+----------+-----+--------+--------------------+--------------------+---------+-------+---------+------------+
|2022-01-01|02:00|Saturday|    Yonge And Dundas|       General Delay|        0|      0|     null|         bus|
|2022-01-01|02:00|Saturday|Overlea And Thorc...|           Diversion|      131|    161|        W|         bus|
|2022-01-01|02:00|Saturday|   Yonge And Steeles|Operations - Oper...|       17|     20|        S|         bus|
|2022-01-01|02:07|Saturday|   Yonge And Steeles|Operations - Oper...|        4|     11|        S|         bus|
|2022-01-01|02:13|Saturday|   Yonge And Steeles|Operations - Oper...|        4|      8|        S

In [62]:
# Create the list of columns present in all three datasets
common_columns = []
bus_columns = set(df_bus.columns)
streetcar_columns = set(df_streetcar.columns)

for col in df_subway.columns:
    if col in bus_columns and col in streetcar_columns:
        common_columns.append(col)

# Create columns with 'service_type' for each DataFrame
df_bus_sel = df_bus.select(common_columns).withColumn('service_type', F.lit('bus'))
df_streetcar_sel = df_streetcar.select(common_columns).withColumn('service_type', F.lit('streetcar'))
df_subway_sel = df_subway.select(common_columns).withColumn('service_type', F.lit('subway'))

# Union the three DataFrames
df_all_delay_data = df_bus_sel.unionAll(df_streetcar_sel).unionAll(df_subway_sel)

# Append ", Toronto" to each value in the 'location' column so that heat map in looker focuses only on Toronto
df_road_delay_data = df_road_delay_data.withColumn("Location", concat(df_road_delay_data["Location"], lit(", Ontario")))

df_all_delay_data.show(5)

+----------+-----+--------+---------+-------+---------+------------+
|      Date| Time|     Day|Min Delay|Min Gap|Direction|service_type|
+----------+-----+--------+---------+-------+---------+------------+
|2022-01-01|02:00|Saturday|        0|      0|     null|         bus|
|2022-01-01|02:00|Saturday|      131|    161|        W|         bus|
|2022-01-01|02:00|Saturday|       17|     20|        S|         bus|
|2022-01-01|02:07|Saturday|        4|     11|        S|         bus|
|2022-01-01|02:13|Saturday|        4|      8|        S|         bus|
+----------+-----+--------+---------+-------+---------+------------+
only showing top 5 rows



In [53]:
filtered_df = df_bus.filter((col("Bus_number") != 0))

# Show the filtered rows
filtered_df.count()
df_bus.count()

TypeError: 'str' object is not callable

## 4. Loading data into Big Query 📥

Finally, we push our results to Big Query completing the **Loading** part of the ETL process

This requires setting up a temp bucket and We also need to specify the connector jar

use `--jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar` when submitting a job using the terminal  

use `.config("spark.jars", "./lib/gcs-connector-hadoop3-2.2.11.jar, ./lib/spark-bigquery-with-dependencies_2.12-0.24.0.jar")` within the `spark = SparkSession.builder` when running in a notebook


In [63]:
# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
temp_bucket = "spark_temp_ttc"
spark.conf.set('temporaryGcsBucket', temp_bucket)

# Saving the data to BigQuery (for overwrite : https://stackoverflow.com/q/72519200/19268172)
df_bus.write.format('bigquery') \
    .option('table', 'ttc_delays_data.bus_delays_table') \
	.mode("overwrite") \
    .save()

df_subway.write.format('bigquery') \
    .option('table', 'ttc_delays_data.subway_delays_table') \
	.mode("overwrite") \
    .save()

df_streetcar.write.format('bigquery') \
    .option('table', 'ttc_delays_data.streetcar_delays_table') \
	.mode("overwrite") \
    .save()

df_road_delay_data.write.format('bigquery') \
    .option('table', 'ttc_delays_data.road_delays_table') \
	.mode("overwrite") \
    .save()

df_all_delay_data.write.format('bigquery') \
    .option('table', 'ttc_delays_data.all_delays_table') \
	.mode("overwrite") \
    .save()

                                                                                