In [2]:
# Imports
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import col, dense_rank, isnull, coalesce, max, rank
from pyspark.sql import Row, Window

In [3]:
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077") # Do not change
sparkConf.setAppName("pipeline_2_app") # Change to app name
sparkConf.set("spark.driver.memory", "2g") # Do not change
sparkConf.set("spark.executor.cores", "1") # Do not change
sparkConf.set("spark.driver.cores", "1") # Do not change

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

### Who are the best-performing defensive players?
Get the average rank based on different defensive statistics ranks.

### Load data from Google Cloud Bucket

In [4]:
# Load data from Google Cloud Storage
# Google Storage File Path
gsc_root_path = 'gs://data_a2/'  #  Change name to Google Cloud Bucket. Also upload data files first

# Load player clearances data and create dataframe
dataSchema_clearances = StructType(
        [StructField("rank_clearances", IntegerType(), True),
         StructField("player_name_c", StringType(), True),
         StructField("team_name_c", StringType(), True),
         StructField("clearances_per90", FloatType(), True),
         StructField("total_clearances", FloatType(), True),
         StructField("minutes_c", IntegerType(), True),       
         StructField("matches_c", IntegerType(), True),
         StructField("country_c", StringType(), True)
         ])
df_player_clearances = spark.read.schema(dataSchema_clearances).format("csv").option("header", "true") \
       .load(gsc_root_path + 'player_effective_clearances.csv')

# Load player interceptions data and create dataframe
dataSchema_interceptions = StructType(
        [StructField("rank_interceptions", IntegerType(), True),
         StructField("player_name_i", StringType(), True),
         StructField("team_name_i", StringType(), True),
         StructField("interceptions_per90", FloatType(), True),
         StructField("total_interceptions", FloatType(), True),
         StructField("minutes_i", IntegerType(), True),       
         StructField("matches_i", IntegerType(), True),
         StructField("country_i", StringType(), True)
         ])
df_player_interceptions = spark.read.schema(dataSchema_interceptions).format("csv").option("header", "true") \
       .load(gsc_root_path + 'player_interceptions.csv')

# Load player tackles data and create dataframe
dataSchema_tackles = StructType(
        [StructField("rank_tackles", IntegerType(), True),
         StructField("player_name_t", StringType(), True),
         StructField("team_name_t", StringType(), True),
         StructField("tackles_per90", FloatType(), True),
         StructField("tackle_succes_rate", FloatType(), True),
         StructField("minutes_t", IntegerType(), True),       
         StructField("matches_t", IntegerType(), True),
         StructField("country_t", StringType(), True)
         ])
df_player_tackles = spark.read.schema(dataSchema_tackles).format("csv").option("header", "true") \
       .load(gsc_root_path + 'player_tackles_won.csv')


In [5]:
# Show dataframes
df_player_clearances.show(5)
df_player_interceptions.show(5)
df_player_tackles.show(5)

+---------------+--------------------+-----------+----------------+----------------+---------+---------+---------+
|rank_clearances|       player_name_c|team_name_c|clearances_per90|total_clearances|minutes_c|matches_c|country_c|
+---------------+--------------------+-----------+----------------+----------------+---------+---------+---------+
|              1|          Jaka Bijol|    Udinese|             5.9|           137.0|     2083|       24|      SVN|
|              2|Sebastian Walukie...|     Empoli|             5.3|           120.0|     2054|       27|      POL|
|              3|      Ardian Ismajli|     Empoli|             5.2|           119.0|     2046|       26|      ALB|
|              4|     Alberto Dossena|   Cagliari|             5.1|           168.0|     2979|       35|      ITA|
|              5|   Thomas Kristensen|    Udinese|             4.8|           112.0|     2097|       26|      DEN|
+---------------+--------------------+-----------+----------------+-------------

### Join dataframes
First join clearances and interceptions data, and then add tackles data.

There are players that  have switched clubs during the course of the season, that's why the team name is different in the dataframes. <br>
We will select the team the player ended the season with as the team_name as this is the most recent data, this corresponds with the data in the clearances dataframe. <br>

In [6]:
# Create combined dataframe
# First drop duplicate columns
#df_player_goals = df_player_goals.drop("matches", "minutes", "team_name", "country")

# Join the clearances and interceptions dataframes
joinExpression1 = df_player_clearances["player_name_c"] == df_player_interceptions['player_name_i']
df_player = df_player_clearances.join(df_player_interceptions, joinExpression1, "outer")
print("first join done")

# Combine common/duplicate columns
# Combine player name columns
df_player = df_player.withColumn("player_name_ci", coalesce(df_player["player_name_c"], df_player["player_name_i"]))
df_player = df_player.drop("player_name_c", "player_name_i")
# Combine minutes columns
df_player = df_player.withColumn("minutes_ci", coalesce(df_player["minutes_c"], df_player["minutes_i"]))
df_player = df_player.drop("minutes_c", "minutes_i")
# Combine matches columns
df_player = df_player.withColumn("matches_ci", coalesce(df_player["matches_c"], df_player["matches_i"]))
df_player = df_player.drop("matches_c", "matches_i")
# Combine country columns
df_player = df_player.withColumn("country_ci", coalesce(df_player["country_c"], df_player["country_i"]))
df_player = df_player.drop("country_c", "country_i")
# Combine team name columns
df_player = df_player.withColumn("team_name_ci", coalesce(df_player["team_name_c"], df_player["team_name_i"]))
df_player = df_player.drop("team_name_c", "team_name_i")
print("Combining columns done.") # Print that this task is done

# Show result
df_player.show(5)

first join done
Combining columns done.
+---------------+----------------+----------------+------------------+-------------------+-------------------+---------------+----------+----------+----------+------------+
|rank_clearances|clearances_per90|total_clearances|rank_interceptions|interceptions_per90|total_interceptions| player_name_ci|minutes_ci|matches_ci|country_ci|team_name_ci|
+---------------+----------------+----------------+------------------+-------------------+-------------------+---------------+----------+----------+----------+------------+
|            228|             0.8|            13.0|               179|                0.6|                9.0|  Aaron Caricol|      1380|        22|       ESP|       Genoa|
|            103|             1.7|            58.0|               113|                0.8|               29.0|   Adam Marusic|      3106|        37|       MNE|       Lazio|
|             48|             2.9|            35.0|                64|                1.1|     

In [7]:
# Join the df_player and tackle dataframes
joinExpression1 = df_player["player_name_ci"] == df_player_tackles['player_name_t']
df_player2 = df_player.join(df_player_tackles, joinExpression1, "outer")
print("Second join done.")

# Combine common/duplicate columns
# Combine player name columns
df_player2 = df_player2.withColumn("player_name", coalesce(df_player2["player_name_ci"], df_player2["player_name_t"]))
df_player2 = df_player2.drop("player_name_ci", "player_name_t")
# Combine minutes columns
df_player2 = df_player2.withColumn("minutes", coalesce(df_player2["minutes_ci"], df_player2["minutes_t"]))
df_player2 = df_player2.drop("minutes_ci", "minutes_t")
# Combine matches columns
df_player2 = df_player2.withColumn("matches", coalesce(df_player2["matches_ci"], df_player2["matches_t"]))
df_player2 = df_player2.drop("matches_ci", "matches_t")
# Combine country columns
df_player2 = df_player2.withColumn("country", coalesce(df_player2["country_ci"], df_player2["country_t"]))
df_player2 = df_player2.drop("country_ci", "country_t")
# Combine team name columns
df_player2 = df_player2.withColumn("team_name", coalesce(df_player2["team_name_ci"], df_player2["team_name_t"]))
df_player2 = df_player2.drop("team_name_ci", "team_name_t")
print("Combining columns done.") # Print that this task is done

# Show result
df_player2.show(5)

Second join done.
Combining columns done.
+---------------+----------------+----------------+------------------+-------------------+-------------------+------------+-------------+------------------+---------------+-------+-------+-------+---------+
|rank_clearances|clearances_per90|total_clearances|rank_interceptions|interceptions_per90|total_interceptions|rank_tackles|tackles_per90|tackle_succes_rate|    player_name|minutes|matches|country|team_name|
+---------------+----------------+----------------+------------------+-------------------+-------------------+------------+-------------+------------------+---------------+-------+-------+-------+---------+
|            228|             0.8|            13.0|               179|                0.6|                9.0|         222|          0.7|              47.6|  Aaron Caricol|   1380|     22|    ESP|    Genoa|
|            103|             1.7|            58.0|               113|                0.8|               29.0|         148|       

### Fill NULL values 
Fill NULL values in statistics with 0 as these players have no registered instances of these actions. <br>
Fill NULL ranks with the maximum rank plus 1 as these players are all ranked last for this statistic.

In [8]:
# Fill NULL values with 0 for the statistics as these players have no registered instances of these actions
# Initiate list with columns that should be filled
columns_to_fill = ["clearances_per90", "total_clearances", "interceptions_per90", "total_interceptions", "tackles_per90"] 
# Fill columns with 0
df_player2 = df_player2.na.fill(0, subset=columns_to_fill)

# Remove players with less than 10 matches played, to ensure the statistics are computed over enough games
df_player2 = df_player2.where("matches >= 10")

# Find maximum rank per statistic and add 1 to each
max_rank_clearances = df_player2.select(max("rank_clearances")).collect()[0][0] + 1
max_rank_interceptions = df_player2.select(max("rank_interceptions")).collect()[0][0] + 1
max_rank_tackles = df_player2.select(max("rank_tackles")).collect()[0][0] + 1

# Set all NULL values for each rank to the max rank 
fill_cols_vals = {"rank_clearances": max_rank_clearances, "rank_interceptions" : max_rank_interceptions, "rank_tackles" : max_rank_tackles}
df_player2 = df_player2.na.fill(fill_cols_vals)

# Show result
df_player2.show(5)

+---------------+----------------+----------------+------------------+-------------------+-------------------+------------+-------------+------------------+---------------+-------+-------+-------+---------+
|rank_clearances|clearances_per90|total_clearances|rank_interceptions|interceptions_per90|total_interceptions|rank_tackles|tackles_per90|tackle_succes_rate|    player_name|minutes|matches|country|team_name|
+---------------+----------------+----------------+------------------+-------------------+-------------------+------------+-------------+------------------+---------------+-------+-------+-------+---------+
|            228|             0.8|            13.0|               179|                0.6|                9.0|         222|          0.7|              47.6|  Aaron Caricol|   1380|     22|    ESP|    Genoa|
|            103|             1.7|            58.0|               113|                0.8|               29.0|         148|          0.9|              68.9|   Adam Marusic|

### Compute average rank 

In [9]:
# Comput average rank for each player
# Create a new column of goals + assists
df_player2 = df_player2.selectExpr(
"*",
"((rank_clearances + rank_interceptions + rank_tackles) / 3) as avg_rank")

# Define a window specification ordering by the average_rank column
window_spec1 = Window.orderBy("avg_rank")

# Add a new column rank_defense using the rank() function
df_player2 = df_player2.withColumn("rank_defense", rank().over(window_spec1))

# Define the window
window_spec2 = Window.partitionBy(col("team_name")).orderBy(col("rank_defense").asc())

# Appply window and add the column of the rank within the team based on goal contributions
df_player2 = df_player2.withColumn("rank_in_team", dense_rank().over(window_spec2))

df_player2.show(5)

+---------------+----------------+----------------+------------------+-------------------+-------------------+------------+-------------+------------------+----------------+-------+-------+-------+---------+------------------+------------+------------+
|rank_clearances|clearances_per90|total_clearances|rank_interceptions|interceptions_per90|total_interceptions|rank_tackles|tackles_per90|tackle_succes_rate|     player_name|minutes|matches|country|team_name|          avg_rank|rank_defense|rank_in_team|
+---------------+----------------+----------------+------------------+-------------------+-------------------+------------+-------------+------------------+----------------+-------+-------+-------+---------+------------------+------------+------------+
|             71|             2.3|            66.0|                 3|                2.1|               59.0|          81|          1.2|              62.3|Giorgio Scalvini|   2553|     33|    ITA| Atalanta|51.666666666666664|          15|  

In [10]:
# Create final dataframe for visualization
df_final = df_player2.select("rank_defense", "player_name", "team_name", "clearances_per90", "interceptions_per90", "tackles_per90", "minutes", "rank_in_team")

# Rename columns
df_final = (df_final
            .withColumnRenamed("rank_defense", "rank_in_league")
            .withColumnRenamed("player_name", "player")
            .withColumnRenamed("team_name", "team")
            .withColumnRenamed("clearances_per90", "clearances_per_90")
            .withColumnRenamed("interceptions_per90", "interceptions_per_90")
            .withColumnRenamed("tackles_per90", "tackles_per_90")
            .withColumnRenamed("rank_in_team", "rank_in_team")
           )

df_final.show(5)

+--------------+----------------+--------+-----------------+--------------------+--------------+-------+------------+
|rank_in_league|          player|    team|clearances_per_90|interceptions_per_90|tackles_per_90|minutes|rank_in_team|
+--------------+----------------+--------+-----------------+--------------------+--------------+-------+------------+
|            15|Giorgio Scalvini|Atalanta|              2.3|                 2.1|           1.2|   2553|           1|
|            16|  Marten de Roon|Atalanta|              1.7|                 1.4|           1.5|   2599|           2|
|            28|       Isak Hien|Atalanta|              3.2|                 1.1|           1.1|   1794|           3|
|            45|  Berat Djimsiti|Atalanta|              3.1|                 1.8|           0.8|   2831|           4|
|            56|  Sead Kolasinac|Atalanta|              1.9|                 0.9|           1.3|   2182|           5|
+--------------+----------------+--------+--------------

### Saving data to BigQuery (Data Sink)

In [31]:
# Google always uses Google Cloud storage to save table and it is then copied to BigQuery
# Therefore we need to create a temporary bucket
# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "temp_a2"  # use your bucket 
spark.conf.set('temporaryGcsBucket', bucket) # Do not change

# Saving the data to BigQuery
# BigQuery will create table if there is no table, probably have to try first
df_final.write.format('bigquery') \
  .option('table', 'data-engineering-435408.a2_dataset.player_data_defense') \
  .mode("overwrite") \
  .save() 

In [11]:
# Stop the spark context
spark.stop()