In [122]:
# Imports
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, concat, col, lit, array, udf, dense_rank
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, DateType, TimestampType
from pyspark.sql import Window as W

In [123]:
# Initialize and configure Spark
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("FixturePipeline")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
spark = SparkSession.builder \
        .config(conf=sparkConf).getOrCreate()

In [124]:
# Setup abstract Google storage FileSystem
bucket_id = "data_de2022_ng"
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set('temporaryGcsBucket', bucket_id)
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

In [125]:
# # Load data into DataFrame
# google_cloud_storage_path = 'gs://data_de2022_ng/'

# df = spark.read.format("json") \
#                          .option("inferSchema", "true") \
#                          .option("multiLine", "true") \
#                          .load(f'{google_cloud_storage_path}fixtures.json')


In [126]:
# Load data into DataFrame
project_id = 'de-2022-ng'

df = spark.read.format("json") \
                         .option("inferSchema", "true") \
                         .option("multiLine", "true") \
                         .load('/home/jovyan/data/fixtures.json')


In [127]:
# Print response schema
df.select('response').printSchema()

root
 |-- response: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- fixture: struct (nullable = true)
 |    |    |    |-- date: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- periods: struct (nullable = true)
 |    |    |    |    |-- first: long (nullable = true)
 |    |    |    |    |-- second: long (nullable = true)
 |    |    |    |-- referee: string (nullable = true)
 |    |    |    |-- status: struct (nullable = true)
 |    |    |    |    |-- elapsed: long (nullable = true)
 |    |    |    |    |-- long: string (nullable = true)
 |    |    |    |    |-- short: string (nullable = true)
 |    |    |    |-- timestamp: long (nullable = true)
 |    |    |    |-- timezone: string (nullable = true)
 |    |    |    |-- venue: struct (nullable = true)
 |    |    |    |    |-- city: string (nullable = true)
 |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |-- name: string (nullable 

In [128]:
# Format DataFrame for easy feature extraction
response_df = df.withColumn('response', explode('response')).select('response')

In [129]:
# Fetch all features
fixture_table = response_df.select( \
                   col('response.fixture.id'), \
                   col('response.league.id').alias('league_id'), \
                   split(col('response.fixture.date'), 'T').getItem(0).cast(DateType()).alias('date'), \
                   split(col('response.fixture.date'), 'T').getItem(1).cast(TimestampType()).alias('time'), \
                   'response.fixture.referee', \
                   col('response.teams.away.id').alias('away_id'), \
                   col('response.teams.home.id').alias('home_id'), \
                   'response.league.round', \
                   'response.league.season', \
                   col('response.score.halftime.home').alias('halftime_home'), \
                   col('response.score.halftime.away').alias('halftime_away'), \
                   col('response.score.fulltime.home').alias('fulltime_home'), \
                   col('response.score.fulltime.away').alias('fulltime_away'), \
                   col('response.score.extratime.home').alias('extratime_home'), \
                   col('response.score.extratime.away').alias('extratime_away'), \
                   col('response.score.penalty.home').alias('penalty_home'), \
                   col('response.score.penalty.away').alias('penalty_away'))

fixture_table.show()

+------+---------+----------+-------------------+--------------------+-------+-------+---------------+------+-------------+-------------+-------------+-------------+--------------+--------------+------------+------------+
|    id|league_id|      date|               time|             referee|away_id|home_id|          round|season|halftime_home|halftime_away|fulltime_home|fulltime_away|extratime_home|extratime_away|penalty_home|penalty_away|
+------+---------+----------+-------------------+--------------------+-------+-------+---------------+------+-------------+-------------+-------------+-------------+--------------+--------------+------------+------------+
|855734|        1|2022-11-21|2022-12-14 16:00:00|Wilton Pereira Sa...|   1118|     13|Group Stage - 1|  2022|            0|            0|            0|            2|          null|          null|        null|        null|
|855735|        1|2022-11-21|2022-12-14 13:00:00|Raphael Claus, Br...|     22|     10|Group Stage - 1|  2022|   

In [130]:
# Build Round table
window_rounds = W.orderBy('round')
(
    fixture_table
    .withColumn('round_id', dense_rank().over(window_rounds))
)

window_rounds = W.orderBy('round')

fixture_table = fixture_table.withColumn('round_id', dense_rank().over(window_rounds))

rounds_table = fixture_table.select('round_id', 'round').distinct()
rounds_table.show()

+--------+---------------+
|round_id|          round|
+--------+---------------+
|       1|Group Stage - 1|
|       2|Group Stage - 2|
|       3|Group Stage - 3|
|       4| Quarter-finals|
|       5|    Round of 16|
+--------+---------------+



In [131]:
# Build Referee table
window_referee = W.orderBy('referee')
(
    fixture_table
    .withColumn('referee_id', dense_rank().over(window_referee))
)

window_referee = W.orderBy('referee')

fixture_table = fixture_table.withColumn('referee_id', dense_rank().over(window_referee))

referee_table = fixture_table.select('referee_id', 'referee').distinct()
referee_table.show(100)

+----------+--------------------+
|referee_id|             referee|
+----------+--------------------+
|         1|                null|
|         2|Alireza Faghani, ...|
|         3|Andres Matonte, U...|
|         4|Anthony Taylor, E...|
|         5|Antonio Mateu, Spain|
|         6|Bakary Papa Gassa...|
|         7|Cesar Arturo Ramo...|
|         8|Chris Beath, Aust...|
|         9|Clement Turpin, F...|
|        10|Daniel Siebert, G...|
|        11|Daniele Orsato, I...|
|        12|Danny Desmond Mak...|
|        13|Facundo Tello, Ar...|
|        14|Fernando Rapallin...|
|        15|Ibrahim Al-Jassim...|
|        16|  Ismail Elfath, USA|
|        17|Ivan Arcides Bart...|
|        18|Janny Sikazwe, Za...|
|        19|Jesus Valenzuela,...|
|        20|Mario Escobar, Gu...|
|        21|Matt Conger, New ...|
|        22|Michael Oliver, E...|
|        23|Mohammed Abdulla ...|
|        24|Mustapha Ghorbal,...|
|        25|Raphael Claus, Br...|
|        26|Slavko Vincic, Sl...|
|        27|St

In [132]:
# Build Season table
window_season = W.orderBy('season')
(
    fixture_table
    .withColumn('season_id', dense_rank().over(window_season))
)

window_season = W.orderBy('season')

fixture_table = fixture_table.withColumn('season_id', dense_rank().over(window_season))

season_table = fixture_table.select('season_id', 'season').distinct()
season_table.show(100)

+---------+------+
|season_id|season|
+---------+------+
|        1|  2022|
+---------+------+



In [133]:
# Build Fixture table
fixture_table = fixture_table.drop('referee')
fixture_table = fixture_table.drop('round')
fixture_table = fixture_table.drop('season')

fixture_table.show()

+------+---------+----------+-------------------+-------+-------+-------------+-------------+-------------+-------------+--------------+--------------+------------+------------+--------+----------+---------+
|    id|league_id|      date|               time|away_id|home_id|halftime_home|halftime_away|fulltime_home|fulltime_away|extratime_home|extratime_away|penalty_home|penalty_away|round_id|referee_id|season_id|
+------+---------+----------+-------------------+-------+-------+-------------+-------------+-------------+-------------+--------------+--------------+------------+------------+--------+----------+---------+
|977794|        1|2022-12-09|2022-12-14 19:00:00|     26|   1118|         null|         null|         null|         null|          null|          null|        null|        null|       4|         1|        1|
|978036|        1|2022-12-10|2022-12-14 19:00:00|      2|     10|         null|         null|         null|         null|          null|          null|        null|    

In [134]:
# Build Teams table
teams_table = response_df.select('response.teams.away.id', 'response.teams.away.name').distinct().sort('id')
teams_table.show(32)

+----+------------+
|  id|        name|
+----+------------+
|   1|     Belgium|
|   2|      France|
|   3|     Croatia|
|   6|      Brazil|
|   7|     Uruguay|
|   9|       Spain|
|  10|     England|
|  12|       Japan|
|  13|     Senegal|
|  14|      Serbia|
|  15| Switzerland|
|  16|      Mexico|
|  17| South Korea|
|  20|   Australia|
|  21|     Denmark|
|  22|        Iran|
|  23|Saudi Arabia|
|  24|      Poland|
|  25|     Germany|
|  26|   Argentina|
|  27|    Portugal|
|  28|     Tunisia|
|  29|  Costa Rica|
|  31|     Morocco|
| 767|       Wales|
|1118| Netherlands|
|1504|       Ghana|
|1530|    Cameroon|
|1569|       Qatar|
|2382|     Ecuador|
|2384|         USA|
|5529|      Canada|
+----+------------+



In [135]:
# Write Fixture table to BigQuery
fixture_table.count()

fixture_table.write.format('bigquery') \
    .option('table', '{0}.worldcup.fixture'.format(project_id)) \
    .option("temporaryGcsBucket","data_de2022_ng") \
    .mode('overwrite').save()

In [136]:
# Write Teams table to BigQuery
teams_table.show(100)

teams_table.write.format('bigquery') \
    .option('table', '{0}.worldcup.teams'.format(project_id)) \
    .option("temporaryGcsBucket","data_de2022_ng") \
    .mode('overwrite').save()

+----+------------+
|  id|        name|
+----+------------+
|   1|     Belgium|
|   2|      France|
|   3|     Croatia|
|   6|      Brazil|
|   7|     Uruguay|
|   9|       Spain|
|  10|     England|
|  12|       Japan|
|  13|     Senegal|
|  14|      Serbia|
|  15| Switzerland|
|  16|      Mexico|
|  17| South Korea|
|  20|   Australia|
|  21|     Denmark|
|  22|        Iran|
|  23|Saudi Arabia|
|  24|      Poland|
|  25|     Germany|
|  26|   Argentina|
|  27|    Portugal|
|  28|     Tunisia|
|  29|  Costa Rica|
|  31|     Morocco|
| 767|       Wales|
|1118| Netherlands|
|1504|       Ghana|
|1530|    Cameroon|
|1569|       Qatar|
|2382|     Ecuador|
|2384|         USA|
|5529|      Canada|
+----+------------+



In [137]:
# Write Round table to BigQuery
rounds_table.show()

rounds_table.write.format('bigquery') \
    .option('table', '{0}.worldcup.rounds'.format(project_id)) \
    .option("temporaryGcsBucket","data_de2022_ng") \
    .mode('overwrite').save()

+--------+---------------+
|round_id|          round|
+--------+---------------+
|       1|Group Stage - 1|
|       2|Group Stage - 2|
|       3|Group Stage - 3|
|       4| Quarter-finals|
|       5|    Round of 16|
+--------+---------------+



In [138]:
# Write Referee table to BigQuery
referee_table.show()

referee_table.write.format('bigquery') \
    .option('table', '{0}.worldcup.referee'.format(project_id)) \
    .option("temporaryGcsBucket","data_de2022_ng") \
    .mode('overwrite').save()

+----------+--------------------+
|referee_id|             referee|
+----------+--------------------+
|         1|                null|
|         2|Alireza Faghani, ...|
|         3|Andres Matonte, U...|
|         4|Anthony Taylor, E...|
|         5|Antonio Mateu, Spain|
|         6|Bakary Papa Gassa...|
|         7|Cesar Arturo Ramo...|
|         8|Chris Beath, Aust...|
|         9|Clement Turpin, F...|
|        10|Daniel Siebert, G...|
|        11|Daniele Orsato, I...|
|        12|Danny Desmond Mak...|
|        13|Facundo Tello, Ar...|
|        14|Fernando Rapallin...|
|        15|Ibrahim Al-Jassim...|
|        16|  Ismail Elfath, USA|
|        17|Ivan Arcides Bart...|
|        18|Janny Sikazwe, Za...|
|        19|Jesus Valenzuela,...|
|        20|Mario Escobar, Gu...|
+----------+--------------------+
only showing top 20 rows



In [139]:
# Write season table to BigQuery
season_table.show()

season_table.write.format('bigquery') \
    .option('table', '{0}.worldcup.season'.format(project_id)) \
    .option("temporaryGcsBucket","data_de2022_ng") \
    .mode('overwrite').save()

+---------+------+
|season_id|season|
+---------+------+
|        1|  2022|
+---------+------+



In [140]:
spark.stop()