In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, row_number, monotonically_increasing_id, when, to_timestamp, date_format, explode

In [2]:
# Initialize Spark Session
spark = SparkSession.builder.appName("FootballDataETL").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/08 05:17:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Load the JSON file (adjust the path as needed)
fixtures_df = spark.read.option("multiline", "true").json("raw_data/fixtures/*.json")
teams_df = spark.read.option("multiline", "true").json("raw_data/teams/*.json")
players_df = spark.read.option("multiline", "true").json("raw_data/player_stats/*.json")
team_stats_df = spark.read.option("multiline", "true").json("raw_data/team_stats/*.json")

                                                                                

In [4]:
# Explode the response array to get individual match records
fixtures_exploded = fixtures_df.selectExpr("explode(response) as match_data")

In [5]:
# Manually populate Dim_Seasons
dim_seasons_data = [(2021, 1), (2022, 2), (2023, 3), (2024, 4), (2025, 5)]
dim_seasons_df = spark.createDataFrame(dim_seasons_data, ["Season_Year", "Season_ID"])

In [6]:
# Extract Dim_Countries dynamically
dim_countries_df = [("England", 1), ("Spain", 2), ("Germany", 3), ("Italy", 4), ("France", 5)]
dim_countries_df = spark.createDataFrame(dim_countries_df, ["Country_Name", "Country_ID"])

In [7]:
# Extract Dim_Leagues dynamically
dim_leagues_df = fixtures_exploded.select(
    col("match_data.league.id").alias("League_ID"),
    col("match_data.league.name").alias("League_Name"),
    col("match_data.league.country").alias("Country_Name"),
    col("match_data.league.logo").alias("League_Logo")
).distinct()

# Assign Country_ID from Dim_Countries
dim_leagues_df = dim_leagues_df.join(dim_countries_df, "Country_Name", "left") \
                             .select("League_ID", "League_Name", "Country_ID", "League_Logo")

In [8]:
# Explode response array to extract individual team records
teams_exploded = teams_df.selectExpr("explode(response) as team_data")

In [9]:
# Extract Dim_Venues
dim_venues_df = teams_exploded.select(
    col("team_data.venue.id").alias("Venue_ID"),
    col("team_data.venue.name").alias("Venue_Name"),
    col("team_data.venue.city").alias("City"),
    col("team_data.team.country").alias("Country_Name"),
    col("team_data.venue.capacity").alias("Capacity")
).distinct()

# Assign Country_ID from Dim_Countries
dim_venues_df = dim_venues_df.join(dim_countries_df, "Country_Name", "left") \
                             .select("Venue_ID", "Venue_Name", "City", "Country_ID", "Capacity")

In [10]:
# Extract Dim_Teams
dim_teams_df = teams_exploded.select(
    col("team_data.team.id").alias("Team_ID"),
    col("team_data.team.name").alias("Team_Name"),
    col("team_data.team.logo").alias("Team_Logo"),
    col("team_data.team.country").alias("Country_Name"),
    col("team_data.venue.id").alias("Venue_ID")
).distinct()

# Assign League_ID by joining with Dim_Leagues
dim_teams_df = dim_teams_df.join(dim_countries_df, "Country_Name", "left") \
                           .select("Team_ID", "Team_Name", "Venue_ID", "Team_Logo", "Country_ID") \
                           .join(dim_leagues_df, "Country_ID", "left") \
                           .select("Team_ID", "Team_Name", "League_ID", "Venue_ID", "Team_Logo")

In [11]:
# Extract Dim_Fixtures
dim_fixtures_df = fixtures_exploded.select(
    col("match_data.fixture.id").alias("Fixture_ID"),
    col("match_data.league.id").alias("League_ID"),
    col("match_data.league.season").alias("Season_Year"),
    col("match_data.teams.home.id").alias("Home_Team_ID"),
    col("match_data.teams.away.id").alias("Away_Team_ID"),
    col("match_data.fixture.venue.id").alias("Venue_ID"),
    to_timestamp(col("match_data.fixture.date")).alias("Match_Date"),
    to_timestamp(date_format(col("match_data.fixture.date"), "HH:mm:ss")).alias("Match_Time"),
    col("match_data.goals.home").alias("Goals_Home"),
    col("match_data.goals.away").alias("Goals_Away"),
    when(col("match_data.teams.home.winner") == True, lit(True)).otherwise(lit(False)).alias("Home_win"),
    when(col("match_data.teams.away.winner") == True, lit(True)).otherwise(lit(False)).alias("Away_win"),
    when(col("match_data.teams.home.winner") == True, col("match_data.teams.home.id"))
    .when(col("match_data.teams.away.winner") == True, col("match_data.teams.away.id"))
    .otherwise(lit(None)).alias("Winner_Team_ID"),
    col("match_data.fixture.status.short").alias("Status")
)

# Assign Season_ID by joining with Dim_Seasons
dim_fixtures_df = dim_fixtures_df.join(dim_seasons_df, dim_fixtures_df.Season_Year == dim_seasons_df.Season_Year, "left") \
                                   .select("Fixture_ID", "League_ID", "Season_ID", "Home_Team_ID", "Away_Team_ID", 
                                           "Venue_ID", "Match_Date", "Match_Time", "Goals_Home", "Goals_Away",
                                           "Home_win", "Away_win", "Winner_Team_ID", "Status")


In [12]:
dim_seasons_df.show()

+-----------+---------+
|Season_Year|Season_ID|
+-----------+---------+
|       2021|        1|
|       2022|        2|
|       2023|        3|
|       2024|        4|
|       2025|        5|
+-----------+---------+



In [13]:
dim_countries_df.show()

+------------+----------+
|Country_Name|Country_ID|
+------------+----------+
|     England|         1|
|       Spain|         2|
|     Germany|         3|
|       Italy|         4|
|      France|         5|
+------------+----------+



In [14]:
dim_leagues_df.show()

+---------+--------------+----------+--------------------+
|League_ID|   League_Name|Country_ID|         League_Logo|
+---------+--------------+----------+--------------------+
|      140|       La Liga|         2|https://media.api...|
|       39|Premier League|         1|https://media.api...|
|      135|       Serie A|         4|https://media.api...|
|       78|    Bundesliga|         3|https://media.api...|
+---------+--------------+----------+--------------------+



In [15]:
dim_venues_df.show()

+--------+--------------------+--------------------+----------+--------+
|Venue_ID|          Venue_Name|                City|Country_ID|Capacity|
+--------+--------------------+--------------------+----------+--------+
|    1494|Estadio Ramón Sán...|             Sevilla|         2|   48649|
|   20421| Stage Front Stadium|Cornella de Llobr...|         2|   40423|
|    1488| Estadio de Vallecas|              Madrid|         2|   15500|
|    1498|Estadio de la Cer...|          Villarreal|         2|   24500|
|    1460|    San Mamés Barria|              Bilbao|         2|   53289|
|   11915|Estadio Nuevo Mir...|               Cádiz|         2|   22000|
|    1456|Estadio Santiago ...|              Madrid|         2|   85454|
|   19216|Power Horse Stadi...|             Almería|         2|   21350|
|   19939|Estadi Olímpic Ll...|           Barcelona|         2|   55926|
|    1489|Estadio Benito Vi...|             Sevilla|         2|   60721|
|    1478|Estadi Municipal ...|              Girona

In [16]:
dim_teams_df.show()

+-------+---------------+---------+--------+--------------------+
|Team_ID|      Team_Name|League_ID|Venue_ID|           Team_Logo|
+-------+---------------+---------+--------+--------------------+
|    727|        Osasuna|      140|    1486|https://media.api...|
|    533|     Villarreal|      140|    1498|https://media.api...|
|    548|  Real Sociedad|      140|    1491|https://media.api...|
|    546|         Getafe|      140|   20422|https://media.api...|
|    538|     Celta Vigo|      140|    1467|https://media.api...|
|    720|     Valladolid|      140|    1492|https://media.api...|
|    532|       Valencia|      140|    1497|https://media.api...|
|    529|      Barcelona|      140|   19939|https://media.api...|
|    541|    Real Madrid|      140|    1456|https://media.api...|
|    728| Rayo Vallecano|      140|    1488|https://media.api...|
|    797|          Elche|      140|    1473|https://media.api...|
|    547|         Girona|      140|    1478|https://media.api...|
|    531| 

In [17]:
dim_fixtures_df.show()

+----------+---------+---------+------------+------------+--------+-------------------+-------------------+----------+----------+--------+--------+--------------+------+
|Fixture_ID|League_ID|Season_ID|Home_Team_ID|Away_Team_ID|Venue_ID|         Match_Date|         Match_Time|Goals_Home|Goals_Away|Home_win|Away_win|Winner_Team_ID|Status|
+----------+---------+---------+------------+------------+--------+-------------------+-------------------+----------+----------+--------+--------+--------------+------+
|   1037952|      140|        3|         723|         728|   19216|2023-08-11 13:30:00|2025-03-08 13:30:00|         0|         2|   false|    true|           728|    FT|
|   1037956|      140|        3|         536|         532|    1494|2023-08-11 16:00:00|2025-03-08 16:00:00|         1|         2|   false|    true|           532|    FT|
|   1037960|      140|        3|         548|         547|    1491|2023-08-12 11:00:00|2025-03-08 11:00:00|         1|         1|   false|   false|   

In [18]:
# Extract Fixture_ID from parameters.fixture
player_stats_df = players_df.withColumn("Fixture_ID", col("parameters.fixture"))

# Explode response array to extract individual fixture records
player_stats_exploded = player_stats_df.selectExpr("Fixture_ID", "explode(response) as match_data")

# Explode players array to get individual player records
player_stats_exploded = player_stats_exploded.select(
    col("Fixture_ID"),
    col("match_data.team.id").alias("Team_ID"),
    col("match_data.team.name").alias("Team_Name"),
    col("match_data.players").alias("Players")
).withColumn("Players", explode(col("Players")))

# Extract Player-Level Statistics
fact_player_stats_df = player_stats_exploded.select(
    col("Fixture_ID").cast("int"),
    col("Team_ID").cast("int"),
    col("Players.player.id").cast("int").alias("Player_ID"),
    when(col("Players.statistics.games.minutes").isNotNull(), col("Players.statistics.games.minutes").getItem(0)).otherwise(lit(None)).cast("int").alias("Minutes_Played"),
    when(col("Players.statistics.games.position").isNotNull(), col("Players.statistics.games.position").getItem(0)).otherwise(lit(None)).alias("Position"),
    when(col("Players.statistics.games.rating").isNotNull(), col("Players.statistics.games.rating").getItem(0)).otherwise(lit(None)).cast("float").alias("Rating"),
    when(col("Players.statistics.games.substitute").isNotNull(), col("Players.statistics.games.substitute").getItem(0)).otherwise(lit(None)).cast("boolean").alias("Substitute"),
    when(col("Players.statistics.goals.total").isNotNull(), col("Players.statistics.goals.total").getItem(0)).otherwise(lit(None)).cast("int").alias("Goals_Scored"),
    when(col("Players.statistics.goals.conceded").isNotNull(), col("Players.statistics.goals.conceded").getItem(0)).otherwise(lit(None)).cast("int").alias("Goals_Conceded"),
    when(col("Players.statistics.goals.assists").isNotNull(), col("Players.statistics.goals.assists").getItem(0)).otherwise(lit(None)).cast("int").alias("Assists"),
    when(col("Players.statistics.goals.saves").isNotNull(), col("Players.statistics.goals.saves").getItem(0)).otherwise(lit(None)).cast("int").alias("Saves"),
    when(col("Players.statistics.passes.total").isNotNull(), col("Players.statistics.passes.total").getItem(0)).otherwise(lit(None)).cast("int").alias("Passes"),
    when(col("Players.statistics.passes.key").isNotNull(), col("Players.statistics.passes.key").getItem(0)).otherwise(lit(None)).cast("int").alias("Passes_Key"),
    when(col("Players.statistics.passes.accuracy").isNotNull(), col("Players.statistics.passes.accuracy").getItem(0)).otherwise(lit(None)).cast("float").alias("Pass_Accuracy"),
    when(col("Players.statistics.cards.yellow").isNotNull(), col("Players.statistics.cards.yellow").getItem(0)).otherwise(lit(None)).cast("int").alias("Yellow_Cards"),
    when(col("Players.statistics.cards.red").isNotNull(), col("Players.statistics.cards.red").getItem(0)).otherwise(lit(None)).cast("int").alias("Red_Cards"),
    when(col("Players.statistics.shots.total").isNotNull(), col("Players.statistics.shots.total").getItem(0)).otherwise(lit(None)).cast("int").alias("Shots"),
    when(col("Players.statistics.shots.on").isNotNull(), col("Players.statistics.shots.on").getItem(0)).otherwise(lit(None)).cast("int").alias("Shots_On_Target"),
    when(col("Players.statistics.tackles.total").isNotNull(), col("Players.statistics.tackles.total").getItem(0)).otherwise(lit(None)).cast("int").alias("Tackles"),
    when(col("Players.statistics.tackles.blocks").isNotNull(), col("Players.statistics.tackles.blocks").getItem(0)).otherwise(lit(None)).cast("int").alias("Blocks"),
    when(col("Players.statistics.tackles.interceptions").isNotNull(), col("Players.statistics.tackles.interceptions").getItem(0)).otherwise(lit(None)).cast("int").alias("Interceptions"),
    when(col("Players.statistics.duels.total").isNotNull(), col("Players.statistics.duels.total").getItem(0)).otherwise(lit(None)).cast("int").alias("Duels"),
    when(col("Players.statistics.duels.won").isNotNull(), col("Players.statistics.duels.won").getItem(0)).otherwise(lit(None)).cast("int").alias("Duels_Won"),
    when(col("Players.statistics.dribbles.attempts").isNotNull(), col("Players.statistics.dribbles.attempts").getItem(0)).otherwise(lit(None)).cast("int").alias("Dribbles_Attempted"),
    when(col("Players.statistics.dribbles.success").isNotNull(), col("Players.statistics.dribbles.success").getItem(0)).otherwise(lit(None)).cast("int").alias("Dribbles_Success"),
    when(col("Players.statistics.dribbles.past").isNotNull(), col("Players.statistics.dribbles.past").getItem(0)).otherwise(lit(None)).cast("int").alias("Dribbled_Past"),
    when(col("Players.statistics.fouls.committed").isNotNull(), col("Players.statistics.fouls.committed").getItem(0)).otherwise(lit(None)).cast("int").alias("Fouls_Committed"),
    when(col("Players.statistics.fouls.drawn").isNotNull(), col("Players.statistics.fouls.drawn").getItem(0)).otherwise(lit(None)).cast("int").alias("Fouls_Drawn"),
    when(col("Players.statistics.penalty.scored").isNotNull(), col("Players.statistics.penalty.scored").getItem(0)).otherwise(lit(None)).cast("int").alias("Penalty_Scored"),
    when(col("Players.statistics.penalty.missed").isNotNull(), col("Players.statistics.penalty.missed").getItem(0)).otherwise(lit(None)).cast("int").alias("Penalty_Missed"),
    when(col("Players.statistics.penalty.saved").isNotNull(), col("Players.statistics.penalty.saved").getItem(0)).otherwise(lit(None)).cast("int").alias("Penalty_Saved"),
    when(col("Players.statistics.penalty.won").isNotNull(), col("Players.statistics.penalty.won").getItem(0)).otherwise(lit(None)).cast("int").alias("Penalty_Won"),
    when(col("Players.statistics.penalty.commited").isNotNull(), col("Players.statistics.penalty.commited").getItem(0)).otherwise(lit(None)).cast("int").alias("Penalty_Commited"),
    when(col("Players.statistics.offsides").isNotNull(), col("Players.statistics.offsides").getItem(0)).otherwise(lit(None)).cast("int").alias("Offside"),
)

In [19]:
fact_player_stats_df.show()

25/03/08 05:18:08 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+-------+---------+--------------+--------+------+----------+------------+--------------+-------+-----+------+----------+-------------+------------+---------+-----+---------------+-------+------+-------------+-----+---------+------------------+----------------+-------------+---------------+-----------+--------------+--------------+-------------+-----------+----------------+-------+
|Fixture_ID|Team_ID|Player_ID|Minutes_Played|Position|Rating|Substitute|Goals_Scored|Goals_Conceded|Assists|Saves|Passes|Passes_Key|Pass_Accuracy|Yellow_Cards|Red_Cards|Shots|Shots_On_Target|Tackles|Blocks|Interceptions|Duels|Duels_Won|Dribbles_Attempted|Dribbles_Success|Dribbled_Past|Fouls_Committed|Fouls_Drawn|Penalty_Scored|Penalty_Missed|Penalty_Saved|Penalty_Won|Penalty_Commited|Offside|
+----------+-------+---------+--------------+--------+------+----------+------------+--------------+-------+-----+------+----------+-------------+------------+---------+-----+---------------+-------+------+--

In [20]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, explode, when, lit, regexp_replace

# Extract Fixture_ID from parameters.fixture
team_stats_df = team_stats_df.withColumn("Fixture_ID", col("parameters.fixture"))

# Explode response array to extract individual fixture records
team_stats_exploded = team_stats_df.selectExpr("Fixture_ID", "explode(response) as match_data")

# Explode statistics array for each team
team_stats_exploded = team_stats_exploded.select(
    col("Fixture_ID"),
    col("match_data.team.id").alias("Team_ID"),
    col("match_data.team.name").alias("Team_Name"),
    col("match_data.statistics").alias("statistics")
).withColumn("statistics", explode(col("statistics")))

# Extract specific statistics
fact_team_stats_df = team_stats_exploded.select(
    col("Fixture_ID"),
    col("Team_ID"),
    when(col("statistics.type") == "Shots on Goal", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Shots_on_Target"),
    when(col("statistics.type") == "Shots off Goal", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Shots_off_Target"),
    when(col("statistics.type") == "Total Shots", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Total_Shots"),
    when(col("statistics.type") == "Blocked Shots", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Blocked_Shots"),
    when(col("statistics.type") == "Shots insidebox", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Shots_Inside_Box"),
    when(col("statistics.type") == "Shots outsidebox", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Shots_Outside_Box"),
    when(col("statistics.type") == "Fouls", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Fouls"),
    when(col("statistics.type") == "Corner Kicks", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Corner_Kicks"),
    when(col("statistics.type") == "Offsides", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Offsides"),
    when(col("statistics.type") == "Ball Possession", regexp_replace(col("statistics.value"), "%", "").cast("int")).otherwise(lit(None)).alias("Possession_Percentage"),
    when(col("statistics.type") == "Yellow Cards", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Yellow_Cards"),
    when(col("statistics.type") == "Red Cards", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Red_Cards"),
    when(col("statistics.type") == "Goalkeeper Saves", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Goalkeeper_Saves"),
    when(col("statistics.type") == "Total passes", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Total_Passes"),
    when(col("statistics.type") == "Passes accurate", col("statistics.value").cast("int")).otherwise(lit(None)).alias("Passes_Accurate"),
    when(col("statistics.type") == "Passes %", regexp_replace(col("statistics.value"), "%", "").cast("int")).otherwise(lit(None)).alias("Passes_Percent"),
    when(col("statistics.type") == "expected_goals", col("statistics.value").cast("float")).otherwise(lit(None)).alias("Expected_Goals")
)

# Group by Fixture_ID and Team_ID to aggregate statistics
fact_team_stats_df = fact_team_stats_df.groupBy(
    "Fixture_ID", "Team_ID"
).agg(
    F.max("Shots_on_Target").alias("Shots_on_Target"),
    F.max("Shots_off_Target").alias("Shots_off_Target"),
    F.max("Total_Shots").alias("Total_Shots"),
    F.max("Blocked_Shots").alias("Blocked_Shots"),
    F.max("Shots_Inside_Box").alias("Shots_Inside_Box"),
    F.max("Shots_Outside_Box").alias("Shots_Outside_Box"),
    F.max("Fouls").alias("Fouls"),
    F.max("Corner_Kicks").alias("Corner_Kicks"),
    F.max("Offsides").alias("Offsides"),
    F.avg("Possession_Percentage").alias("Possession_Percentage"),  
    F.max("Yellow_Cards").alias("Yellow_Cards"),
    F.max("Red_Cards").alias("Red_Cards"),
    F.max("Goalkeeper_Saves").alias("Goalkeeper_Saves"),
    F.max("Total_Passes").alias("Total_Passes"),
    F.max("Passes_Accurate").alias("Passes_Accurate"),
    F.avg("Passes_Percent").alias("Passes_Percent"),  
    F.max("Expected_Goals").alias("Expected_Goals")
)


In [21]:
from pyspark.sql.functions import col, when

# Join fact_team_stats_df with dim_fixtures_df on Fixture_ID
fact_team_stats_df = fact_team_stats_df.join(
    dim_fixtures_df,
    on="Fixture_ID",
    how="left"
)

# Derive Goals_Scored, Goals_Conceded, and Winner_Team
fact_team_stats_df = fact_team_stats_df.withColumn(
    "Goals_Scored",
    when(col("Team_ID") == col("Home_Team_ID"), col("Goals_Home"))
    .when(col("Team_ID") == col("Away_Team_ID"), col("Goals_Away"))
    .otherwise(lit(None))
)

fact_team_stats_df = fact_team_stats_df.withColumn(
    "Goals_Conceded",
    when(col("Team_ID") == col("Home_Team_ID"), col("Goals_Away"))
    .when(col("Team_ID") == col("Away_Team_ID"), col("Goals_Home"))
    .otherwise(lit(None))
)

fact_team_stats_df = fact_team_stats_df.withColumn(
    "Winner_Team",
    when(col("Team_ID") == col("Winner_Team_ID"), lit(True)).otherwise(lit(False))
)

# Select only relevant columns
fact_team_stats_df = fact_team_stats_df.select(
    "Fixture_ID", "Team_ID", "Goals_Scored", "Goals_Conceded", "Winner_Team",
    "Shots_on_Target", "Shots_off_Target", "Total_Shots", "Blocked_Shots",
    "Shots_Inside_Box", "Shots_Outside_Box", "Fouls", "Corner_Kicks", "Offsides",
    "Possession_Percentage", "Yellow_Cards", "Red_Cards", "Goalkeeper_Saves",
    "Total_Passes", "Passes_Accurate", "Passes_Percent", "Expected_Goals"
)

In [22]:
# Drop unnecessary columns from dim_fixtures_df
dim_fixtures_df = dim_fixtures_df.drop(
    "Goals_Home", "Goals_Away", "Home_win", "Away_win", "Winner_Team_ID", "Status"
)

In [23]:
dim_fixtures_df.show()

+----------+---------+---------+------------+------------+--------+-------------------+-------------------+
|Fixture_ID|League_ID|Season_ID|Home_Team_ID|Away_Team_ID|Venue_ID|         Match_Date|         Match_Time|
+----------+---------+---------+------------+------------+--------+-------------------+-------------------+
|   1037952|      140|        3|         723|         728|   19216|2023-08-11 13:30:00|2025-03-08 13:30:00|
|   1037956|      140|        3|         536|         532|    1494|2023-08-11 16:00:00|2025-03-08 16:00:00|
|   1037960|      140|        3|         548|         547|    1491|2023-08-12 11:00:00|2025-03-08 11:00:00|
|   1037957|      140|        3|         534|         798|    1481|2023-08-12 13:30:00|2025-03-08 13:30:00|
|   1037953|      140|        3|         531|         541|    1460|2023-08-12 15:30:00|2025-03-08 15:30:00|
|   1037955|      140|        3|         538|         727|    1467|2023-08-13 11:00:00|2025-03-08 11:00:00|
|   1037959|      140|      

In [24]:
fact_team_stats_df.show()



+----------+-------+------------+--------------+-----------+---------------+----------------+-----------+-------------+----------------+-----------------+-----+------------+--------+---------------------+------------+---------+----------------+------------+---------------+--------------+--------------+
|Fixture_ID|Team_ID|Goals_Scored|Goals_Conceded|Winner_Team|Shots_on_Target|Shots_off_Target|Total_Shots|Blocked_Shots|Shots_Inside_Box|Shots_Outside_Box|Fouls|Corner_Kicks|Offsides|Possession_Percentage|Yellow_Cards|Red_Cards|Goalkeeper_Saves|Total_Passes|Passes_Accurate|Passes_Percent|Expected_Goals|
+----------+-------+------------+--------------+-----------+---------------+----------------+-----------+-------------+----------------+-----------------+-----+------------+--------+---------------------+------------+---------+----------------+------------+---------------+--------------+--------------+
|   1035552|     48|           1|             3|      false|              2|            

                                                                                

In [25]:
# Explode the nested structure to extract player information
players_exploded = players_df.selectExpr("explode(response) as response") \
    .selectExpr("response.team.id as Team_ID", "explode(response.players) as player_data") \
    .select(
        col("player_data.player.id").alias("Player_ID"),
        col("player_data.player.name").alias("Player_Name"),
        col("Team_ID")
    )

# Remove duplicates to keep only distinct Player_IDs
dim_players_df = players_exploded.dropDuplicates(["Player_ID"])

In [26]:
dim_players_df.show()



+---------+------------------+-------+
|Player_ID|       Player_Name|Team_ID|
+---------+------------------+-------+
|        5|     Manuel Akanji|     50|
|       17| Christian Pulišić|    489|
|       19|      Julian Weigl|    163|
|       22|Jacob Bruun Larsen|     44|
|       25|        Marco Reus|    165|
|       26|       Marius Wolf|    165|
|       29|         Jan Oblak|    530|
|       31|José María Giménez|    530|
|       39|      Stefan Savić|    530|
|       50|              Koke|    530|
|       52|    Sergio Camello|    728|
|       54|       Diego Costa|     39|
|       56| Antoine Griezmann|    530|
|       95| Benoît Badiashile|     49|
|       98| Benjamin Henrichs|    173|
|      104|   Carlos Vinícius|     36|
|      110| Han-Noah Massengo|     44|
|      119|    Radamel Falcao|    728|
|      126|        Iñaki Peña|    529|
|      130|     Nélson Semedo|     39|
+---------+------------------+-------+
only showing top 20 rows



                                                                                

In [28]:
# Save DataFrames as tables (optional)
dim_seasons_df.coalesce(1).write.mode("overwrite").csv("transformed_data/dim_seasons", header=True)
dim_countries_df.coalesce(1).write.mode("overwrite").csv("transformed_data/dim_countries", header=True)
dim_leagues_df.write.mode("overwrite").csv("transformed_data/dim_leagues", header=True)
dim_venues_df.write.mode("overwrite").csv("transformed_data/dim_venues", header=True)
dim_teams_df.write.mode("overwrite").csv("transformed_data/dim_teams", header=True)
dim_fixtures_df.write.mode("overwrite").csv("transformed_data/dim_fixtures", header=True)
fact_player_stats_df.write.mode("overwrite").csv("transformed_data/fact_player_stats", header=True)
fact_team_stats_df.write.mode("overwrite").csv("transformed_data/fact_team_stats", header=True)
dim_players_df.write.mode("overwrite").csv("transformed_data/dim_players", header=True)