In [31]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import col, sum, rank, avg, row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import regexp_replace

# Spark Configuration
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Top_Attacking_Teams_Pipeline")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# Create the Spark session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Setup Hadoop configuration for GCS
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Load tables into DataFrames
playersDF = spark.read.format("csv").option("header", "true") \
    .load("gs://data_assignment2/players_table.csv")
teamsDF = spark.read.format("csv").option("header", "true") \
    .load("gs://data_assignment2/teams_table.csv")
performanceDF = spark.read.format("csv").option("header", "true") \
    .load("gs://data_assignment2/player_performance_table.csv")

# Display initial data
playersDF.show(5)
teamsDF.show(5)
performanceDF.show(5)

+-----------------+-------+----+-----+--------------+
|           Player| Nation| Age|  Pos|         Squad|
+-----------------+-------+----+-----+--------------+
|       Max Aarons|eng ENG|23.0|   DF|   Bournemouth|
| Brenden Aaronson| us USA|22.0|MF,FW|  Union Berlin|
|  Paxten Aaronson| us USA|19.0|   MF|Eint Frankfurt|
|Keyliane Abdallah| fr FRA|17.0|   FW|     Marseille|
| Yunis Abdelhamid| ma MAR|35.0|   DF|         Reims|
+-----------------+-------+----+-----+--------------+
only showing top 5 rows

+--------------+------------------+
|         Squad|              Comp|
+--------------+------------------+
|   Bournemouth|eng Premier League|
|  Union Berlin|     de Bundesliga|
|Eint Frankfurt|     de Bundesliga|
|     Marseille|        fr Ligue 1|
|         Reims|        fr Ligue 1|
+--------------+------------------+
only showing top 5 rows

+-----------------+--------------+---+------+----+---+---+---+---+---+
|           Player|         Squad| MP|Starts| Min|Gls|Ast|G+A| xG|xAG

In [32]:
#fix the names of the competition
teamsDF = teamsDF.withColumn("Comp", regexp_replace(col("Comp"), r"^\w+\s", ""))
teamsDF.show(5)

+--------------+--------------+
|         Squad|          Comp|
+--------------+--------------+
|   Bournemouth|Premier League|
|  Union Berlin|    Bundesliga|
|Eint Frankfurt|    Bundesliga|
|     Marseille|       Ligue 1|
|         Reims|       Ligue 1|
+--------------+--------------+
only showing top 5 rows



In [33]:
# Filter and cast relevant columns for analysis
performanceDF = performanceDF.select("Player", "Squad", "Gls").filter(col("Gls").isNotNull())
performanceDF = performanceDF.withColumn("Gls", col("Gls").cast("int"))
performanceDF.head(10)

[Row(Player='Max Aarons', Squad='Bournemouth', Gls=0),
 Row(Player='Brenden Aaronson', Squad='Union Berlin', Gls=2),
 Row(Player='Paxten Aaronson', Squad='Eint Frankfurt', Gls=0),
 Row(Player='Keyliane Abdallah', Squad='Marseille', Gls=0),
 Row(Player='Yunis Abdelhamid', Squad='Reims', Gls=4),
 Row(Player='Salis Abdul Samed', Squad='Lens', Gls=0),
 Row(Player='Nabil Aberdin', Squad='Getafe', Gls=0),
 Row(Player='Laurent Abergel', Squad='Lorient', Gls=2),
 Row(Player='Matthis Abline', Squad='Nantes', Gls=5),
 Row(Player='Abner', Squad='Betis', Gls=0)]

In [34]:
# Join performanceDF with teamsDF to include competition (Comp)
teamsDF = teamsDF.select("Squad", "Comp")  # Keep only relevant columns
performanceWithCompDF = performanceDF.join(teamsDF, on="Squad", how="inner")
performanceWithCompDF.head(10)

[Row(Squad='Bournemouth', Player='Max Aarons', Gls=0, Comp='Premier League'),
 Row(Squad='Union Berlin', Player='Brenden Aaronson', Gls=2, Comp='Bundesliga'),
 Row(Squad='Eint Frankfurt', Player='Paxten Aaronson', Gls=0, Comp='Bundesliga'),
 Row(Squad='Marseille', Player='Keyliane Abdallah', Gls=0, Comp='Ligue 1'),
 Row(Squad='Reims', Player='Yunis Abdelhamid', Gls=4, Comp='Ligue 1'),
 Row(Squad='Lens', Player='Salis Abdul Samed', Gls=0, Comp='Ligue 1'),
 Row(Squad='Getafe', Player='Nabil Aberdin', Gls=0, Comp='La Liga'),
 Row(Squad='Lorient', Player='Laurent Abergel', Gls=2, Comp='Ligue 1'),
 Row(Squad='Nantes', Player='Matthis Abline', Gls=5, Comp='Ligue 1'),
 Row(Squad='Betis', Player='Abner', Gls=0, Comp='La Liga')]

In [35]:
# Aggregate total goals by Squad and Comp
teamStatsDF = performanceWithCompDF.groupBy("Squad", "Comp").agg(
    sum("Gls").alias("Total_Gls")
)

In [36]:
teamStatsDF.head(10)

[Row(Squad='Lazio', Comp='Serie A', Total_Gls=46),
 Row(Squad='Stuttgart', Comp='Bundesliga', Total_Gls=78),
 Row(Squad='Tottenham', Comp='Premier League', Total_Gls=69),
 Row(Squad='Athletic Club', Comp='La Liga', Total_Gls=59),
 Row(Squad='Real Sociedad', Comp='La Liga', Total_Gls=48),
 Row(Squad='Milan', Comp='Serie A', Total_Gls=76),
 Row(Squad='Liverpool', Comp='Premier League', Total_Gls=80),
 Row(Squad='Lens', Comp='Ligue 1', Total_Gls=45),
 Row(Squad='Real Madrid', Comp='La Liga', Total_Gls=85),
 Row(Squad='Bournemouth', Comp='Premier League', Total_Gls=52)]

In [37]:
globalWindow = Window.orderBy(col("Total_Gls").desc())
teamStatsWithRankDF = teamStatsDF.withColumn("Global_Rank", rank().over(globalWindow))

In [38]:
#League Ranking
leagueWindow = Window.partitionBy("Comp").orderBy(col("Total_Gls").desc())
teamStatsWithLeagueRankDF = teamStatsWithRankDF.withColumn("League_Rank", rank().over(leagueWindow))
teamStatsWithLeagueRankDF.show(5)

+-------------+----------+---------+-----------+-----------+
|        Squad|      Comp|Total_Gls|Global_Rank|League_Rank|
+-------------+----------+---------+-----------+-----------+
|Bayern Munich|Bundesliga|       93|          2|          1|
|   Leverkusen|Bundesliga|       87|          3|          2|
|    Stuttgart|Bundesliga|       78|         10|          3|
|   RB Leipzig|Bundesliga|       74|         15|          4|
|     Dortmund|Bundesliga|       68|         19|          5|
+-------------+----------+---------+-----------+-----------+
only showing top 5 rows



In [39]:
leagueWindow = Window.partitionBy("Comp").orderBy(col("Total_Gls").desc())
teamStatsWithLeagueRankDF = teamStatsWithRankDF.withColumn("League_Rank", rank().over(leagueWindow))

# Get unique competitions (leagues)
unique_competitions = [row["Comp"] for row in teamStatsWithLeagueRankDF.select("Comp").distinct().collect()]

# Extract top 5 teams for each league
for league in unique_competitions:
    top_5_in_league = teamStatsWithLeagueRankDF.filter((col("Comp") == league) & (col("League_Rank") <= 5))
    print(f"Top 5 Teams in {league}")
    top_5_in_league.show()

Top 5 Teams in Serie A
+----------+-------+---------+-----------+-----------+
|     Squad|   Comp|Total_Gls|Global_Rank|League_Rank|
+----------+-------+---------+-----------+-----------+
|     Inter|Serie A|       87|          3|          1|
|     Milan|Serie A|       76|         12|          2|
|  Atalanta|Serie A|       72|         16|          3|
|      Roma|Serie A|       64|         23|          4|
|Fiorentina|Serie A|       60|         25|          5|
+----------+-------+---------+-----------+-----------+

Top 5 Teams in Premier League
+---------------+--------------+---------+-----------+-----------+
|          Squad|          Comp|Total_Gls|Global_Rank|League_Rank|
+---------------+--------------+---------+-----------+-----------+
|Manchester City|Premier League|       94|          1|          1|
|        Arsenal|Premier League|       86|          5|          2|
|  Newcastle Utd|Premier League|       83|          8|          3|
|      Liverpool|Premier League|       80|       

In [40]:
teamRankingWindow = Window.orderBy(col("Total_Gls").desc())
rankedTeamsDF = teamStatsDF.withColumn("Rank", row_number().over(teamRankingWindow))

In [41]:
#Top 10 teams globally based on Global Rank
top_global_teams = teamStatsWithLeagueRankDF.orderBy(col("Global_Rank"))
top_global_teams.show()


+---------------+--------------+---------+-----------+-----------+
|          Squad|          Comp|Total_Gls|Global_Rank|League_Rank|
+---------------+--------------+---------+-----------+-----------+
|Manchester City|Premier League|       94|          1|          1|
|  Bayern Munich|    Bundesliga|       93|          2|          1|
|     Leverkusen|    Bundesliga|       87|          3|          2|
|          Inter|       Serie A|       87|          3|          1|
|        Arsenal|Premier League|       86|          5|          2|
|    Real Madrid|       La Liga|       85|          6|          1|
|         Girona|       La Liga|       84|          7|          2|
|  Newcastle Utd|Premier League|       83|          8|          3|
|      Liverpool|Premier League|       80|          9|          4|
|      Stuttgart|    Bundesliga|       78|         10|          3|
|      Paris S-G|       Ligue 1|       78|         10|          1|
|      Barcelona|       La Liga|       76|         12|        

In [42]:
bucket = "temp_assignment2"  
spark.conf.set('temporaryGcsBucket', bucket)

rankedTeamsDF.write.mode("overwrite").csv("gs://temp_assignment2/best_20_teams.csv", header=True)
# Save to BigQuery
rankedTeamsDF.write.format('bigquery') \
  .option('table', 'de-assignment2-group10.Pipeline_team_stats.top_attacking_teams') \
  .mode("overwrite") \
  .save()

In [43]:
# Stop the spark context
spark.stop()