# Pipeline: Coach Awards

In [31]:
from pyspark.sql import SparkSession, Row
from pyspark import SparkConf
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lit, avg, sum, round, expr, concat, min, max, row_number, desc, asc, when, dense_rank, countDistinct

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("CoachesApp")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

## Load the datasets

### Prepare Teams Table

In [32]:
# Load the teams table
teams = spark.read.format("csv").option("header", "true").load("gs://data_de2023_qjsol/Teams.csv") #  use your gcp bucket name. 

# Only keep columns of interest
teams = teams.select("year", "lgID", "tmID", "name", "Pts", "W", "SoW")
teams.na.drop(subset = ["year", "lgID", "tmID", "name", "Pts", "W"]) # we allow NULL values in SoW as these indicate SoW = 0

# Replace NULL with 0 in SoW
teams = teams.fillna("0", "SoW")

# Show schema
teams.printSchema()

# Convert year, points, wins and  rank from string to integer type
teams = teams.withColumn("year", col("year").cast("int")) \
             .withColumn("Pts", col("Pts").cast("int")) \
             .withColumn("W", col("W").cast("int")) \
             .withColumn("SoW", col("SoW").cast("int"))

# Show new schema and top 5 rows
teams.printSchema()
teams.show(5)

root
 |-- year: string (nullable = true)
 |-- lgID: string (nullable = true)
 |-- tmID: string (nullable = true)
 |-- name: string (nullable = true)
 |-- Pts: string (nullable = true)
 |-- W: string (nullable = true)
 |-- SoW: string (nullable = false)

root
 |-- year: integer (nullable = true)
 |-- lgID: string (nullable = true)
 |-- tmID: string (nullable = true)
 |-- name: string (nullable = true)
 |-- Pts: integer (nullable = true)
 |-- W: integer (nullable = true)
 |-- SoW: integer (nullable = true)

+----+----+----+--------------------+---+---+---+
|year|lgID|tmID|                name|Pts|  W|SoW|
+----+----+----+--------------------+---+---+---+
|1909| NHA| COB| Cobalt Silver Kings|  8|  4|  0|
|1909| NHA| HAI|Haileybury Hockey...|  8|  4|  0|
|1909| NHA| LES|       Les Canadiens|  4|  2|  0|
|1909| NHA| MOS|  Montreal Shamrocks|  7|  3|  0|
|1909| NHA| MOW|  Montreal Wanderers| 22| 11|  0|
+----+----+----+--------------------+---+---+---+
only showing top 5 rows



### Prepare Coaches Table

In [34]:
# Load the coaches table
coaches = spark.read.format("csv").option("header", "true").load("gs://data_de2023_qjsol/Coaches.csv") #  use your gcp bucket name. 

# Only keep columns of interest
coaches = coaches.select("coachID", "year", "tmID")
coaches.na.drop("any")

# Show schema
coaches.printSchema()

# Convert year from string to integer type
coaches = coaches.withColumn("year", col("year").cast("int"))

# Show new schema and top 5 rows
coaches.printSchema()
coaches.show(5)

root
 |-- coachID: string (nullable = true)
 |-- year: string (nullable = true)
 |-- tmID: string (nullable = true)

root
 |-- coachID: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- tmID: string (nullable = true)

+---------+----+----+
|  coachID|year|tmID|
+---------+----+----+
|abelsi01c|1952| CHI|
|abelsi01c|1953| CHI|
|abelsi01c|1957| DET|
|abelsi01c|1958| DET|
|abelsi01c|1959| DET|
+---------+----+----+
only showing top 5 rows



### Prepare Awards Table

In [35]:
# Load the awards coaches table
awards = spark.read.format("csv").option("header", "true").load("gs://data_de2023_qjsol/AwardsCoaches.csv") #  use your gcp bucket name. 

# Only keep columns of interest
awards = awards.select("coachID", "award", "year")
awards.na.drop("any")

# Show schema
awards.printSchema()

# Convert year from string to integer type
awards = awards.withColumn("year", col("year").cast("int"))

# Show new schema and top 5 rows
awards.printSchema()
awards.show(5)

root
 |-- coachID: string (nullable = true)
 |-- award: string (nullable = true)
 |-- year: string (nullable = true)

root
 |-- coachID: string (nullable = true)
 |-- award: string (nullable = true)
 |-- year: integer (nullable = true)

+----------+--------------------+----+
|   coachID|               award|year|
+----------+--------------------+----+
|patrile01c| First Team All-Star|1930|
|irvindi01c|Second Team All-Star|1930|
|patrile01c| First Team All-Star|1931|
|irvindi01c|Second Team All-Star|1931|
|patrile01c| First Team All-Star|1932|
+----------+--------------------+----+
only showing top 5 rows



### Prepare HOF table

In [36]:
# Load the hof table
hof = spark.read.format("csv").option("header", "true").load("gs://data_de2023_qjsol/HOF.csv") #  use your gcp bucket name. 

# Only keep columns of interest
hof = hof.select("hofID", "category", "year")
hof.na.drop("any")

# Show schema
hof.printSchema()

# Convert year from string to integer type
hof = hof.withColumn("year", col("year").cast("int"))

# Show new schema and top 5 rows
hof.printSchema()
hof.show(5)

root
 |-- hofID: string (nullable = true)
 |-- category: string (nullable = true)
 |-- year: string (nullable = true)

root
 |-- hofID: string (nullable = true)
 |-- category: string (nullable = true)
 |-- year: integer (nullable = true)

+----------+--------+----+
|     hofID|category|year|
+----------+--------+----+
|bakerho01h|  Player|1945|
|gardich01h|  Player|1945|
|gerared01h|  Player|1945|
|mcgeefr01h|  Player|1945|
|morenho01h|  Player|1945|
+----------+--------+----+
only showing top 5 rows



In [38]:
# TODO: make smaller steps in notebook
# TODO: remove unused imports/tables
# TODO: use 'name' in HOF table?
# TODO: remove unused columns in end result

# Select teams of National Hockey League (NHL)
nhl_teams = teams.filter(teams['lgID'] == 'NHL').drop('lgID')
nhl_teams.show(5)

# Compute ROW
# The ROW subtracts the number of wins a team secures through the shootout from their overall total. 
# It is then used as a tiebreaker between two teams tied in the standings. 
# The team who has a higher ROW, will be placed higher in the standings.
nhl_teams = nhl_teams.withColumn("ROW", col("W") - col("SoW")).drop("W").drop("SoW")
nhl_teams.show()

# Only keep teams with year => 1931 (because awards were only given from 1931 onwards)
teams_1931 = nhl_teams.filter(nhl_teams['year'] >= 1931)
# teams_1931 = nhl_teams.filter(nhl_teams['year'] => 1931)

# Define a Window specification
window_spec = Window.partitionBy("year").orderBy(col("Pts").desc(), col("ROW").desc())

# Use dense_rank() instead of row_number()
ranked_teams = teams_1931.withColumn("dense_rank", dense_rank().over(window_spec))

# Filter rows where dense_rank is less than or equal to 3
top_three_teams = ranked_teams.filter(col("dense_rank") <= 3)
# .drop("dense_rank")

# Show the resulting DataFrame
top_three_teams.show()

# Rename column for clarity
awards = awards.withColumnRenamed("year", "award_year")

# Join with Coaches and Awards table
intermediate = top_three_teams.join(coaches, ["tmID", "year"], "left_outer").join(awards, "coachID", "left_outer")
intermediate.show()

# An award only counts for a team when the coach received it before or during coaching the team
intermediate = intermediate.withColumn("award", when(col("year") < col("award_year"), None).otherwise(col("award_year"))).drop("award_year")
intermediate.show()

# Count the number of awards for each team per year
aggregated = intermediate.groupBy("year", "tmID", "name", "Pts", "ROW", "dense_rank") \
    .agg(sum(when(col("award").isNotNull(), 1).otherwise(0)).alias("no_awards"))

# Change datatype from long to integer
result = aggregated.withColumn("no_awards", col("no_awards").cast("int"))
result.show()
result.printSchema()

+----+----+------------------+---+---+---+
|year|tmID|              name|Pts|  W|SoW|
+----+----+------------------+---+---+---+
|1917| MTL|Montreal Canadiens| 26| 13|  0|
|1917| MTW|Montreal Wanderers|  2|  1|  0|
|1917| OTS|   Ottawa Senators| 18|  9|  0|
|1917| TOA|    Toronto Arenas| 26| 13|  0|
|1918| MTL|Montreal Canadiens| 20| 10|  0|
+----+----+------------------+---+---+---+
only showing top 5 rows

+----+----+--------------------+---+---+
|year|tmID|                name|Pts|ROW|
+----+----+--------------------+---+---+
|1917| MTL|  Montreal Canadiens| 26| 13|
|1917| MTW|  Montreal Wanderers|  2|  1|
|1917| OTS|     Ottawa Senators| 18|  9|
|1917| TOA|      Toronto Arenas| 26| 13|
|1918| MTL|  Montreal Canadiens| 20| 10|
|1918| OTS|     Ottawa Senators| 24| 12|
|1918| TOA|      Toronto Arenas| 10|  5|
|1919| MTL|  Montreal Canadiens| 26| 13|
|1919| OTS|     Ottawa Senators| 38| 19|
|1919| QUB|     Quebec Bulldogs|  8|  4|
|1919| TRS|Toronto St. Patricks| 24| 12|
|1920| HAM|   

## Store the result in BigQuery

In [39]:
# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "data_de2023_qjsol"  # use your bucket 
spark.conf.set('temporaryGcsBucket', bucket)
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
# Saving the data to BigQuery
result.write.format('bigquery').option('table', 'dataengineering2023-398611.assignment2.coaches').mode("append").save() # use your project-id

In [40]:
# Stop the spark context
spark.stop()