In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, DateType
from pyspark.sql.functions import *

#set up spark (session)
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("WorldCupBatchPipeline")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

#set up hadoop fs configuration
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

#retrieve data from bucket
#google storage file path
#make sure this is a seperate bucket that only has the correct files in there
gsc_file_path = 'gs://de_jads_batch_data/' # bucket name !

dataSchema = StructType(
    [StructField("home_team", StringType(), True),
    StructField("away_team", StringType(), True),
    StructField("home_score", LongType(), True),
    StructField("home_xg", DoubleType(), True),
    StructField("home_penalty", LongType(), True),
    StructField("away_score", LongType(), True),
    StructField("away_xg", DoubleType(), True),
    StructField("away_penalty", LongType(), True),
    StructField("home_manager", StringType(), True),
    StructField("home_captain", StringType(), True),
    StructField("away_manager", StringType(), True),
    StructField("away_captain", StringType(), True),
    StructField("home_goals", StringType(), True),
    StructField("away_goals", StringType(), True),
    StructField("Attendance", LongType(), True),
    StructField("Venue", StringType(), True),
    StructField("Officials", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("Score", StringType(), True),
    StructField("Referee", StringType(), True),
    StructField("Notes", StringType(), True),
    StructField("Round", StringType(), True),
    StructField("Host", StringType(), True),
    StructField("Year", LongType(), True)
    ])

#add all decades to one dataframe
matches = spark.read.format("csv").schema(dataSchema).option("header", "true") \
    .load(gsc_file_path+'*.csv')

matches.printSchema()



root
 |-- home_team: string (nullable = true)
 |-- away_team: string (nullable = true)
 |-- home_score: long (nullable = true)
 |-- home_xg: double (nullable = true)
 |-- home_penalty: long (nullable = true)
 |-- away_score: long (nullable = true)
 |-- away_xg: double (nullable = true)
 |-- away_penalty: long (nullable = true)
 |-- home_manager: string (nullable = true)
 |-- home_captain: string (nullable = true)
 |-- away_manager: string (nullable = true)
 |-- away_captain: string (nullable = true)
 |-- home_goals: string (nullable = true)
 |-- away_goals: string (nullable = true)
 |-- Attendance: long (nullable = true)
 |-- Venue: string (nullable = true)
 |-- Officials: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Referee: string (nullable = true)
 |-- Notes: string (nullable = true)
 |-- Round: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- Year: long (nullable = true)



In [2]:
#convert StringType to DateType
matches = matches.withColumn("Date", to_date(col("Date"), 'yyyy-MM-dd'))
#matches = matches.withColumn("Year", to_date(col("Year"), 'yyyy'))

matches.printSchema()
matches.show(10)

root
 |-- home_team: string (nullable = true)
 |-- away_team: string (nullable = true)
 |-- home_score: long (nullable = true)
 |-- home_xg: double (nullable = true)
 |-- home_penalty: long (nullable = true)
 |-- away_score: long (nullable = true)
 |-- away_xg: double (nullable = true)
 |-- away_penalty: long (nullable = true)
 |-- home_manager: string (nullable = true)
 |-- home_captain: string (nullable = true)
 |-- away_manager: string (nullable = true)
 |-- away_captain: string (nullable = true)
 |-- home_goals: string (nullable = true)
 |-- away_goals: string (nullable = true)
 |-- Attendance: long (nullable = true)
 |-- Venue: string (nullable = true)
 |-- Officials: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Score: string (nullable = true)
 |-- Referee: string (nullable = true)
 |-- Notes: string (nullable = true)
 |-- Round: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- Year: long (nullable = true)

+---------+-----------+----------+----

Calculate the total number and average number of goals per year

In [3]:
world_total_matches = matches.select('Year', 'Host', 'home_team') \
    .groupBy('Year', 'Host') \
    .agg(count('home_team'). alias('total_matches')) \
    .withColumn('Country', lit("Worldwide")) \
    .sort('Year') \
    .na.drop(how = 'any')
                
world_total_matches.show(21)

+----+--------------------+-------------+---------+
|Year|                Host|total_matches|  Country|
+----+--------------------+-------------+---------+
|1930|             Uruguay|           18|Worldwide|
|1934|               Italy|           17|Worldwide|
|1938|              France|           18|Worldwide|
|1950|              Brazil|           22|Worldwide|
|1954|         Switzerland|           26|Worldwide|
|1958|              Sweden|           35|Worldwide|
|1962|               Chile|           32|Worldwide|
|1966|             England|           32|Worldwide|
|1970|              Mexico|           32|Worldwide|
|1974|             Germany|           38|Worldwide|
|1978|           Argentina|           38|Worldwide|
|1982|               Spain|           52|Worldwide|
|1986|              Mexico|           52|Worldwide|
|1990|               Italy|           52|Worldwide|
|1994|       United States|           52|Worldwide|
|1998|              France|           64|Worldwide|
|2002|Korea 

In [4]:
# total goals for all countries combined
world_total_goals = matches.withColumn("total_goals", col("home_score") + col("away_score")) \
    .groupBy('Year', 'Host') \
    .agg(sum('total_goals').alias('total_goals')) \
    .sort('Year') \
    .na.drop(how = 'any') \
    .withColumn("Country", lit("Worldwide"))

world_total_goals.show(21)

+----+--------------------+-----------+---------+
|Year|                Host|total_goals|  Country|
+----+--------------------+-----------+---------+
|1930|             Uruguay|         70|Worldwide|
|1934|               Italy|         70|Worldwide|
|1938|              France|         84|Worldwide|
|1950|              Brazil|         88|Worldwide|
|1954|         Switzerland|        140|Worldwide|
|1958|              Sweden|        126|Worldwide|
|1962|               Chile|         89|Worldwide|
|1966|             England|         89|Worldwide|
|1970|              Mexico|         95|Worldwide|
|1974|             Germany|         97|Worldwide|
|1978|           Argentina|        102|Worldwide|
|1982|               Spain|        146|Worldwide|
|1986|              Mexico|        132|Worldwide|
|1990|               Italy|        115|Worldwide|
|1994|       United States|        141|Worldwide|
|1998|              France|        171|Worldwide|
|2002|Korea Republic, J...|        161|Worldwide|


In [5]:
#average goals for all countries combined
from pyspark.sql.functions import *

world_avg_goals = matches.withColumn("total_goals", col("home_score") + col("away_score")) \
    .groupBy('Year', 'Host') \
    .agg(round(avg('total_goals'),2).alias('average_goals_per_match')) \
    .sort('Year') \
    .na.drop(how = 'any') \
    .withColumn("Country", lit("Worldwide"))


world_avg_goals.show(21)

+----+--------------------+-----------------------+---------+
|Year|                Host|average_goals_per_match|  Country|
+----+--------------------+-----------------------+---------+
|1930|             Uruguay|                   3.89|Worldwide|
|1934|               Italy|                   4.12|Worldwide|
|1938|              France|                   4.67|Worldwide|
|1950|              Brazil|                    4.0|Worldwide|
|1954|         Switzerland|                   5.38|Worldwide|
|1958|              Sweden|                    3.6|Worldwide|
|1962|               Chile|                   2.78|Worldwide|
|1966|             England|                   2.78|Worldwide|
|1970|              Mexico|                   2.97|Worldwide|
|1974|             Germany|                   2.55|Worldwide|
|1978|           Argentina|                   2.68|Worldwide|
|1982|               Spain|                   2.81|Worldwide|
|1986|              Mexico|                   2.54|Worldwide|
|1990|  

In [6]:
#get home_matches and away_matches
from pyspark.sql.functions import *

countries_home_matches = matches.select('Year', 'Host', 'home_team') \
    .groupBy('Year', 'Host', 'home_team') \
    .agg(count('home_team'). alias('home_matches')) \
    .withColumnRenamed('home_team', 'Country') \
    .sort('Year', 'Country') \
    .na.drop(how = 'any')

countries_home_matches.show(10)

countries_away_matches = matches.select('Year', 'Host', 'away_team') \
    .groupBy('Year', 'Host', 'away_team') \
    .agg(count('away_team'). alias('away_matches')) \
    .withColumnRenamed('away_team', 'Country') \
    .sort('Year', 'Country') \
    .na.drop(how = 'any')

countries_away_matches.show(10)

+----+-------+-------------+------------+
|Year|   Host|      Country|home_matches|
+----+-------+-------------+------------+
|1930|Uruguay|    Argentina|           4|
|1930|Uruguay|       Brazil|           1|
|1930|Uruguay|        Chile|           2|
|1930|Uruguay|       France|           1|
|1930|Uruguay|     Paraguay|           1|
|1930|Uruguay|      Romania|           1|
|1930|Uruguay|United States|           2|
|1930|Uruguay|      Uruguay|           4|
|1930|Uruguay|   Yugoslavia|           2|
|1934|  Italy|      Austria|           2|
+----+-------+-------------+------------+
only showing top 10 rows

+----+-------+---------+------------+
|Year|   Host|  Country|away_matches|
+----+-------+---------+------------+
|1930|Uruguay|Argentina|           1|
|1930|Uruguay|  Belgium|           2|
|1930|Uruguay|  Bolivia|           2|
|1930|Uruguay|   Brazil|           1|
|1930|Uruguay|    Chile|           1|
|1930|Uruguay|   France|           2|
|1930|Uruguay|   Mexico|           3|
|1930|

In [7]:
#total home goals per country per year
from pyspark.sql.functions import *

countries_home_goals = matches.select('Year', 'Host', 'home_team', 'home_score') \
    .groupBy('Year', 'Host', 'home_team') \
    .agg(sum('home_score').alias('home_goals')) \
    .withColumnRenamed('home_team', 'Country') \
    .sort('Year', 'Country') \
    .na.drop(how = 'any') 

countries_home_goals.show(10)

#total away goals per country per year
from pyspark.sql.functions import *

countries_away_goals = matches.select('Year', 'Host', 'away_team', 'away_score') \
    .groupBy('Year', 'Host', 'away_team') \
    .agg(sum('away_score').alias('away_goals')) \
    .withColumnRenamed('away_team', 'Country') \
    .sort('Year', 'Country') \
    .na.drop(how = 'any')

countries_away_goals.show(20)

+----+-------+-------------+----------+
|Year|   Host|      Country|home_goals|
+----+-------+-------------+----------+
|1930|Uruguay|    Argentina|        16|
|1930|Uruguay|       Brazil|         4|
|1930|Uruguay|        Chile|         4|
|1930|Uruguay|       France|         4|
|1930|Uruguay|     Paraguay|         1|
|1930|Uruguay|      Romania|         3|
|1930|Uruguay|United States|         6|
|1930|Uruguay|      Uruguay|        15|
|1930|Uruguay|   Yugoslavia|         6|
|1934|  Italy|      Austria|         5|
+----+-------+-------------+----------+
only showing top 10 rows

+----+-------+--------------+----------+
|Year|   Host|       Country|away_goals|
+----+-------+--------------+----------+
|1930|Uruguay|     Argentina|         2|
|1930|Uruguay|       Belgium|         0|
|1930|Uruguay|       Bolivia|         0|
|1930|Uruguay|        Brazil|         1|
|1930|Uruguay|         Chile|         1|
|1930|Uruguay|        France|         0|
|1930|Uruguay|        Mexico|         4|
|193

In [8]:
#create merged table for each country
joinExpression = ["Year", "Host", "Country"]
countries_information = countries_home_goals.join(countries_away_goals, joinExpression, "full") \
    .join(countries_home_matches, joinExpression, "full") \
    .join(countries_away_matches, joinExpression, "full") \
    .na.fill(value=0, subset=['home_goals', 'away_goals', 'home_matches', 'away_matches']) \
    .withColumn('total_goals', col('home_goals') + col('away_goals')) \
    .withColumn('total_matches', col('home_matches') + col('away_matches')) \
    .withColumn('average_goals_per_match', round(col('total_goals') / col('total_matches'),2))

countries_information.show(10)

+----+-------+---------+----------+----------+------------+------------+-----------+-------------+-----------------------+
|Year|   Host|  Country|home_goals|away_goals|home_matches|away_matches|total_goals|total_matches|average_goals_per_match|
+----+-------+---------+----------+----------+------------+------------+-----------+-------------+-----------------------+
|1930|Uruguay|Argentina|        16|         2|           4|           1|         18|            5|                    3.6|
|1930|Uruguay|  Belgium|         0|         0|           0|           2|          0|            2|                    0.0|
|1930|Uruguay|  Bolivia|         0|         0|           0|           2|          0|            2|                    0.0|
|1930|Uruguay|   Brazil|         4|         1|           1|           1|          5|            2|                    2.5|
|1930|Uruguay|    Chile|         4|         1|           2|           1|          5|            3|                   1.67|
|1930|Uruguay|  

In [9]:
#create merged table for worldwide
joinExpression = ["Year", "Host", "Country"]
world_information = world_total_goals.join(world_avg_goals, joinExpression, "full") \
    .join(world_total_matches, joinExpression, "full")
    
world_information.show(21)

+----+--------------------+---------+-----------+-----------------------+-------------+
|Year|                Host|  Country|total_goals|average_goals_per_match|total_matches|
+----+--------------------+---------+-----------+-----------------------+-------------+
|1930|             Uruguay|Worldwide|         70|                   3.89|           18|
|1934|               Italy|Worldwide|         70|                   4.12|           17|
|1938|              France|Worldwide|         84|                   4.67|           18|
|1950|              Brazil|Worldwide|         88|                    4.0|           22|
|1954|         Switzerland|Worldwide|        140|                   5.38|           26|
|1958|              Sweden|Worldwide|        126|                    3.6|           35|
|1962|               Chile|Worldwide|         89|                   2.78|           32|
|1966|             England|Worldwide|         89|                   2.78|           32|
|1970|              Mexico|World

In [10]:
#merge two tables together
joinExpression = ["Year", "Host", "Country", "total_goals", "average_goals_per_match", "total_matches"]
worldcup_information = countries_information.join(world_information, joinExpression, "full") \
    .sort('Year', 'Country')

worldcup_information.show(30)

+----+-------+--------------+-----------+-----------------------+-------------+----------+----------+------------+------------+
|Year|   Host|       Country|total_goals|average_goals_per_match|total_matches|home_goals|away_goals|home_matches|away_matches|
+----+-------+--------------+-----------+-----------------------+-------------+----------+----------+------------+------------+
|1930|Uruguay|     Argentina|         18|                    3.6|            5|        16|         2|           4|           1|
|1930|Uruguay|       Belgium|          0|                    0.0|            2|         0|         0|           0|           2|
|1930|Uruguay|       Bolivia|          0|                    0.0|            2|         0|         0|           0|           2|
|1930|Uruguay|        Brazil|          5|                    2.5|            2|         4|         1|           1|           1|
|1930|Uruguay|         Chile|          5|                   1.67|            3|         4|         1|   

Save data into bucket/big query

In [11]:
#use the cloud storage bucket for temporary BigQuery export data used by the connector
bucket = "de_jads_temp_annelies" # bucket name !
spark.conf.set('temporaryGcsBucket', bucket)

#save the combined matches data to BigQuery -> do not forget to change project ID
matches.write.format('bigquery') \
    .option('table', 'de2022-362620.assignment2dataset.matches') \
    .mode("overwrite") \
    .save()

#save the goal data to bigQuery -> do not forget to change project ID
worldcup_information.write.format('bigquery') \
    .option('table', 'de2022-362620.assignment2dataset.worldcupinformation') \
    .mode("append") \
    .save()

In [12]:
# stop the spark context
spark.stop()