# Eda PySpark Statbombs

### Imports

In [1]:
import numpy as np
import os
import json
import matplotlib.pyplot as plt
import plotly.express as px
import pyspark as ps
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, explode,struct, input_file_name, regexp_extract, countDistinct, sum as _sum, min as _min, max as _max 
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

## Part 1 : Data Loading and Preprocessing

In [2]:
# Create a Spark session
spark = SparkSession.builder.appName("Football Data Analysis").getOrCreate()


### 1.1 Competitions folder

In [3]:
# Define the schema for the matches dataset
competitions_file_path = "../open-data/data/competitions.json"
# Read JSON files into DataFrames
competitions_df = spark.read.json(competitions_file_path, multiLine=True)
competitions_df.printSchema()

root
 |-- competition_gender: string (nullable = true)
 |-- competition_id: long (nullable = true)
 |-- competition_international: boolean (nullable = true)
 |-- competition_name: string (nullable = true)
 |-- competition_youth: boolean (nullable = true)
 |-- country_name: string (nullable = true)
 |-- match_available: string (nullable = true)
 |-- match_available_360: string (nullable = true)
 |-- match_updated: string (nullable = true)
 |-- match_updated_360: string (nullable = true)
 |-- season_id: long (nullable = true)
 |-- season_name: string (nullable = true)


In [4]:
print(f"Number of competitions : {competitions_df.select('competition_name').count()}")

numeric_columns_df = competitions_df.select("match_available", "match_available_360",
                                            "match_updated", "match_updated_360")

# Optionally, you can convert the timestamps to a more readable format using date_format if needed
from pyspark.sql.functions import date_format

# Convert the columns to a more readable format
formatted_df = numeric_columns_df.select(
    date_format("match_available", "yyyy-MM-dd HH:mm:ss").alias("match_available"),
    date_format("match_available_360", "yyyy-MM-dd HH:mm:ss").alias("match_available_360"),
    date_format("match_updated", "yyyy-MM-dd HH:mm:ss").alias("match_updated"),
    date_format("match_updated_360", "yyyy-MM-dd HH:mm:ss").alias("match_updated_360")
)

# Show the formatted DataFrame summary
formatted_df.summary("min", "max").show(truncate=False)

Number of competitions : 71
+-------+-------------------+-------------------+-------------------+-------------------+
|summary|match_available    |match_available_360|match_updated      |match_updated_360  |
+-------+-------------------+-------------------+-------------------+-------------------+
|min    |2020-07-29 05:00:00|2023-04-27 22:38:34|2020-07-29 05:00:00|2021-06-12 16:17:31|
|max    |2024-02-14 05:41:27|2023-12-13 04:53:55|2024-02-14 05:41:27|2023-12-13 04:53:55|
+-------+-------------------+-------------------+-------------------+-------------------+


In [5]:
unique_competitions = competitions_df.groupBy("competition_name", "competition_gender").count()

# Select and display the competition name and gender
unique_competitions.select("competition_name", "competition_gender").show(truncate=False)

# Sort by competition name and gender for better readability
unique_competitions_sorted = unique_competitions.select("competition_name", "competition_gender").orderBy(
    "competition_name", "competition_gender")

# Show the sorted unique competitions
unique_competitions_sorted.show(truncate=False)

+-----------------------+------------------+
|competition_name       |competition_gender|
+-----------------------+------------------+
|North American League  |male              |
|Copa del Rey           |male              |
|Premier League         |male              |
|Serie A                |male              |
|Champions League       |male              |
|Ligue 1                |male              |
|UEFA Europa League     |male              |
|UEFA Women's Euro      |female            |
|African Cup of Nations |male              |
|FA Women's Super League|female            |
|NWSL                   |female            |
|FIFA World Cup         |male              |
|UEFA Euro              |male              |
|FIFA U20 World Cup     |male              |
|1. Bundesliga          |male              |
|Indian Super league    |male              |
|Liga Profesional       |male              |
|Major League Soccer    |male              |
|La Liga                |male              |
|Women's W

In [6]:

# Count the number of unique male competitions
num_unique_male_competitions = unique_competitions.filter(col('competition_gender') == 'male').count()
print(f"Number of unique male competitions: {num_unique_male_competitions}")

# Count the number of unique female competitions
num_unique_female_competitions = unique_competitions.filter(col('competition_gender') == 'female').count()
print(f"Number of unique female competitions: {num_unique_female_competitions}")

print(
    f"----------------------------------------------------------------------------------------------------------------------")

unique_youth_competitions = competitions_df.groupBy("competition_name", "competition_youth").count()

num_unique_youth_competitions = unique_youth_competitions.filter(col('competition_youth') == True).count()
print(f"Number of unique youth competitions: {num_unique_youth_competitions}")

num_unique_nonyouth_competitions = unique_youth_competitions.filter(col('competition_youth') == False).count()

print(f"Number of unique non-youth competitions: {num_unique_nonyouth_competitions}")

Number of unique male competitions: 16
Number of unique female competitions: 4
----------------------------------------------------------------------------------------------------------------------
Number of unique youth competitions: 0
Number of unique non-youth competitions: 20


### 1.2 Matches folder

In [7]:
# go through each file under the id folder --> matche folder --> id folder --> json files
# Root directory path (change this to your root directory)
root_dir = "../open-data/data/matches"

matches_df = spark.read.json(f"{root_dir}/**/*.json", multiLine=True)
matches_df.printSchema()


root
 |-- away_score: long (nullable = true)
 |-- away_team: struct (nullable = true)
 |    |-- away_team_gender: string (nullable = true)
 |    |-- away_team_group: string (nullable = true)
 |    |-- away_team_id: long (nullable = true)
 |    |-- away_team_name: string (nullable = true)
 |    |-- country: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |-- managers: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- country: struct (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- dob: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- nickname: string (nullable = true)
 |-- competition: struct (nullable = true)
 |    |-- competition_id: long (nullable = true)
 |    |-- competition_nam

In [8]:
#select all seasons of my df and show them all by order

# Select the season column
seasons_df = matches_df.select("season.season_name").distinct()

# Count the number of unique seasons
num_seasons = seasons_df.count()

# Show the unique seasons
seasons_df.orderBy("season_name").show(num_seasons,truncate=False)

+-----------+
|season_name|
+-----------+
|1958       |
|1962       |
|1970       |
|1970/1971  |
|1971/1972  |
|1972/1973  |
|1973/1974  |
|1974       |
|1977       |
|1977/1978  |
|1979       |
|1981       |
|1982/1983  |
|1983/1984  |
|1986       |
|1986/1987  |
|1988/1989  |
|1990       |
|1997/1998  |
|1999/2000  |
|2003/2004  |
|2004/2005  |
|2005/2006  |
|2006/2007  |
|2007/2008  |
|2008/2009  |
|2009/2010  |
|2010/2011  |
|2011/2012  |
|2012/2013  |
|2013/2014  |
|2014/2015  |
|2015/2016  |
|2016/2017  |
|2017/2018  |
|2018       |
|2018/2019  |
|2019       |
|2019/2020  |
|2020       |
|2020/2021  |
|2021/2022  |
|2022       |
|2022/2023  |
|2023       |
+-----------+


In [9]:
# Count the number of matches in the dataset
num_matches = matches_df.count()
date_df = matches_df.select(col("match_date"))

# Compute number of different competitions
num_competitions = matches_df.select("competition.competition_name").distinct().count()

# Compute the minimum and maximum dates
date_range = date_df.agg(
    _min("match_date").alias("begin_date"),
    _max("match_date").alias("end_date")
).collect()

# Extract the begin and end dates
begin_date = date_range[0]["begin_date"]
end_date = date_range[0]["end_date"]

# Display the results
print(f"Dates of matches goes from {begin_date} to {end_date}")
print(f"Number of matches: {num_matches}")
print(f"Number of seasons: {num_seasons}")
print(f"Number of competitions: {num_competitions}")

Dates of matches goes from 1958-06-24 to 2024-02-11
Number of matches: 3316
Number of seasons: 45
Number of competitions: 20


In [10]:
# Assuming matches_df is your DataFrame
# Extract necessary fields
season_dates_df = matches_df.select(
    col("season.season_name").alias("season_name"),
    col("match_date")
)

# Group by season name and compute the minimum and maximum match dates for each season
season_date_range = season_dates_df.groupBy("season_name").agg(
    _min("match_date").alias("begin_date"),
    _max("match_date").alias("end_date")
)

# Show the results
season_date_range.show(num_seasons,truncate=False)

+-----------+----------+----------+
|season_name|begin_date|end_date  |
+-----------+----------+----------+
|1958       |1958-06-24|1958-06-29|
|1962       |1962-05-30|1962-05-30|
|1970       |1970-06-04|1970-06-21|
|1970/1971  |1971-06-02|1971-06-02|
|1971/1972  |1972-05-31|1972-05-31|
|1972/1973  |1973-05-30|1973-05-30|
|1973/1974  |1974-02-17|1974-02-17|
|1974       |1974-06-19|1974-07-07|
|1977       |1977-08-28|1977-08-28|
|1977/1978  |1978-04-19|1978-04-19|
|1979       |1979-09-07|1979-09-07|
|1981       |1981-04-10|1981-04-10|
|1982/1983  |1983-06-04|1983-06-04|
|1983/1984  |1984-05-05|1984-05-05|
|1986       |1986-06-22|1986-06-29|
|1986/1987  |1986-11-09|1986-11-09|
|1988/1989  |1989-03-15|1989-05-03|
|1990       |1990-06-24|1990-06-24|
|1997/1998  |1997-10-25|1997-10-25|
|1999/2000  |1999-11-23|1999-11-23|
|2003/2004  |2003-08-16|2004-05-26|
|2004/2005  |2004-10-16|2005-05-25|
|2005/2006  |2005-10-01|2006-02-25|
|2006/2007  |2006-08-28|2007-06-17|
|2007/2008  |2007-08-26|2008

In [11]:
# Select only the home_score and away_score columns
scores_matches_df = matches_df.select("home_score", "away_score")

# Compute specific statistics like mean
mean_scores = scores_matches_df.agg(
    {"home_score": "mean", "away_score": "mean"}
).withColumnRenamed("avg(home_score)", "avg_home_score").withColumnRenamed("avg(away_score)", "avg_away_score")

# Show the mean scores
mean_scores.show()

+------------------+------------------+
|    avg_home_score|    avg_away_score|
+------------------+------------------+
|1.5980096501809409|1.2605548854041013|
+------------------+------------------+


In [12]:
# Extract necessary fields
selected_matches_df = matches_df.select(
    col("competition.competition_name").alias("competition_name"),
    col("season.season_name").alias("season_name"),
    col("home_team.home_team_gender").alias("competition_gender"),
    col("home_score"),
    col("away_score")
)

# Group by competition name, season name, and genders
matches_per_league_season = selected_matches_df.groupBy(
    "competition_name", "season_name", "competition_gender"
).agg(
    count("*").alias("number_of_matches"),
    _sum("home_score").alias("total_home_goals"),
    _sum("away_score").alias("total_away_goals"),
    (_sum("home_score") + _sum("away_score")).alias("total_goals")
)

# Sort by number of matches in descending order
matches_per_league_season_sorted = matches_per_league_season.orderBy("number_of_matches", ascending=False)

# Show the sorted DataFrame
matches_per_league_season_sorted.show(num_seasons,truncate=False)

+-----------------------+-----------+------------------+-----------------+----------------+----------------+-----------+
|competition_name       |season_name|competition_gender|number_of_matches|total_home_goals|total_away_goals|total_goals|
+-----------------------+-----------+------------------+-----------------+----------------+----------------+-----------+
|Serie A                |2015/2016  |male              |380              |559             |420             |979        |
|Premier League         |2015/2016  |male              |380              |567             |459             |1026       |
|La Liga                |2015/2016  |male              |380              |615             |428             |1043       |
|Ligue 1                |2015/2016  |male              |377              |543             |406             |949        |
|1. Bundesliga          |2015/2016  |male              |306              |479             |387             |866        |
|FA Women's Super League|2020/20

In [13]:
matches_per_league_season_sorted = matches_per_league_season_sorted.withColumn(
    "avg_goals_per_match",
    col("total_goals") / col("number_of_matches")
)

# Show the sorted DataFrame
matches_per_league_season_sorted.select("competition_name","season_name", "competition_gender","avg_goals_per_match").show(num_seasons,truncate=False)

+-----------------------+-----------+------------------+-------------------+
|competition_name       |season_name|competition_gender|avg_goals_per_match|
+-----------------------+-----------+------------------+-------------------+
|Serie A                |2015/2016  |male              |2.5763157894736843 |
|Premier League         |2015/2016  |male              |2.7                |
|La Liga                |2015/2016  |male              |2.7447368421052634 |
|Ligue 1                |2015/2016  |male              |2.5172413793103448 |
|1. Bundesliga          |2015/2016  |male              |2.8300653594771243 |
|FA Women's Super League|2020/2021  |female            |3.1526717557251906 |
|Indian Super league    |2021/2022  |male              |3.0782608695652174 |
|FA Women's Super League|2018/2019  |female            |3.0277777777777777 |
|FA Women's Super League|2019/2020  |female            |2.9770114942528734 |
|FIFA World Cup         |2018       |male              |2.640625           |

### 1.3 Lineups folder

In [18]:
# Original
root_dir = "../open-data/data/lineups"
lineup_df = spark.read.json(f"{root_dir}/*.json", multiLine=True)
lineup_df.printSchema()

root
 |-- lineup: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cards: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- card_type: string (nullable = true)
 |    |    |    |    |-- period: long (nullable = true)
 |    |    |    |    |-- reason: string (nullable = true)
 |    |    |    |    |-- time: string (nullable = true)
 |    |    |-- country: struct (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |-- jersey_number: long (nullable = true)
 |    |    |-- player_id: long (nullable = true)
 |    |    |-- player_name: string (nullable = true)
 |    |    |-- player_nickname: string (nullable = true)
 |    |    |-- positions: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- end_reason: string (nullable = true)
 |    |    |    |    |-- from: string (nullabl

In [19]:
# Marked with match_id
from pyspark.sql.functions import input_file_name, regexp_replace, struct

# Read the JSON files and include the filename
lineups_df = spark.read.json(f"{root_dir}/*.json", multiLine=True) \
    .withColumn("file_name", input_file_name())

# Extract match_id from the filename by removing the .json extension
lineups_df = lineups_df.withColumn("match_id", regexp_replace("file_name", r"(.*/)?([^/]+)\.json$", "$2"))

# Nest the existing structure inside the match_id struct
lineups_df = lineups_df.select(struct("*").alias("lineups"), "match_id")

In [20]:
# Print the schema of the resulting DataFrame
lineups_df.printSchema()

root
 |-- lineups: struct (nullable = false)
 |    |-- lineup: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- cards: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- card_type: string (nullable = true)
 |    |    |    |    |    |-- period: long (nullable = true)
 |    |    |    |    |    |-- reason: string (nullable = true)
 |    |    |    |    |    |-- time: string (nullable = true)
 |    |    |    |-- country: struct (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- jersey_number: long (nullable = true)
 |    |    |    |-- player_id: long (nullable = true)
 |    |    |    |-- player_name: string (nullable = true)
 |    |    |    |-- player_nickname: string (nullable = true)
 |    |    |    |-- positions: array (nullable = true)
 |    |    |    |    |-- element: struct (cont

In [21]:
# Select and show the match id 7298
lineups_df.filter(col('match_id') == "7298").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [79]:
from pyspark.sql.functions import explode, col

# 1. Total Number of Players
total_players = lineups_df.select(explode(col("lineups.lineup")).alias("player")) \
                         .select("player.player_id") \
                         .distinct() \
                         .count()
print(f"Total Number of Unique Players: {total_players}")

# 2. Total Number of Cards Issued
total_cards = lineups_df.select(explode(col("lineups.lineup")).alias("player")) \
                       .select(explode(col("player.cards")).alias("card")) \
                       .count()
print(f"Total Number of Cards Issued: {total_cards}")

# 3. Distribution of Card Types
card_type_distribution = lineups_df.select(explode(col("lineups.lineup")).alias("player")) \
                                  .select(explode(col("player.cards")).alias("card")) \
                                  .groupBy("card.card_type") \
                                  .count()
print("Distribution of Card Types:")
card_type_distribution.show(truncate=False)

# 4. Distribution of Player Positions
position_distribution = lineups_df.select(explode(col("lineups.lineup")).alias("player")) \
                                 .select(explode(col("player.positions")).alias("position")) \
                                 .groupBy("position.position") \
                                 .count()
print("Distribution of Player Positions:")
position_distribution.show(truncate=False)

# 5. Top Players by Number of Cards
top_players_by_cards = lineups_df.select(explode(col("lineups.lineup")).alias("player")) \
                                .select("player.player_name", explode(col("player.cards")).alias("card")) \
                                .groupBy("player_name") \
                                .count() \
                                .orderBy(col("count").desc()) \
                                .limit(10)
print("Top Players by Number of Cards:")
top_players_by_cards.show(truncate=False)


Total Number of Unique Players: 9937
Total Number of Cards Issued: 13541
Distribution of Card Types:
+-------------+-----+
|card_type    |count|
+-------------+-----+
|Yellow Card  |12865|
|Second Yellow|349  |
|Red Card     |327  |
+-------------+-----+

Distribution of Player Positions:
+-------------------------+-----+
|position                 |count|
+-------------------------+-----+
|Left Attacking Midfield  |398  |
|Right Wing               |8363 |
|Center Defensive Midfield|4626 |
|Right Wing Back          |1585 |
|Right Center Midfield    |6565 |
|Right Defensive Midfield |6044 |
|Left Back                |7728 |
|Right Center Forward     |4234 |
|Center Attacking Midfield|6154 |
|Center Forward           |8794 |
|Right Center Back        |8505 |
|Left Center Back         |8387 |
|Center Back              |1334 |
|Left Midfield            |4808 |
|Left Center Forward      |4189 |
|Left Defensive Midfield  |5904 |
|Right Midfield           |4755 |
|Goalkeeper               |672

In [68]:
from pyspark.sql.functions import col, explode

# Explode the lineup and cards arrays
exploded_df = lineups_df.select(explode(col("lineups.lineup")).alias("player")) \
                        .select("player.player_name", explode(col("player.cards")).alias("card"))

# Group by player name and card type, then count occurrences
card_counts_df = exploded_df.groupBy("player_name", "card.card_type").count()

# Pivot the results to get counts of each card type as separate columns
pivot_df = card_counts_df.groupBy("player_name").pivot("card_type").sum("count")

# Fill NaN values with 0
pivot_df = pivot_df.fillna(0)

from pyspark.sql.functions import col
from functools import reduce
import operator


# Create a new column in pivot_df that sums all card counts
total_card_count_col = reduce(operator.add, [col(column) for column in pivot_df.columns if column != "player_name"])
pivot_df = pivot_df.withColumn("total_card_count", total_card_count_col)

# Get the top 20 rows based on the new column total_card_count
top_20_df = pivot_df.orderBy(col("total_card_count").desc()).limit(20)

# Show the top 20 rows
top_20_df.show(truncate=False)


+------------------------------------+--------+-------------+-----------+
|player_name                         |Red Card|Second Yellow|Yellow Card|
+------------------------------------+--------+-------------+-----------+
|Adriano Correia Claro               |0       |0            |19         |
|Víctor Machín Pérez                 |0       |2            |8          |
|Khouma El Hadji Babacar             |0       |0            |3          |
|Yessenia Estefani Huenteo Cheuqueman|0       |0            |1          |
|Floyd Ayité                         |1       |0            |2          |
|Jack Colback                        |0       |0            |11         |
|Ryan Shawcross                      |0       |1            |3          |
|Artem Kravets                       |0       |0            |1          |
|György Garics                       |0       |0            |8          |
|Narayan Das                         |0       |0            |2          |
|Rafael Alcântara do Nascimento      |

In [70]:
from pyspark.sql.functions import col, explode, countDistinct

# Explode the lineup and cards arrays
exploded_df = lineups_df.select(explode(col("lineups.lineup")).alias("player"), col("match_id")) \
                        .select("player.player_name", "match_id", explode(col("player.cards")).alias("card"))

# Group by player name and card type, then count occurrences
card_counts_df = exploded_df.groupBy("player_name", "card.card_type").count()

# Pivot the results to get counts of each card type as separate columns
pivot_df = card_counts_df.groupBy("player_name").pivot("card_type").sum("count")

# Fill NaN values with 0
pivot_df = pivot_df.fillna(0)

# Create a new column in pivot_df that sums all card counts
from functools import reduce
import operator

total_card_count_col = reduce(operator.add, [col(column) for column in pivot_df.columns if column != "player_name"])
pivot_df = pivot_df.withColumn("total_card_count", total_card_count_col)

# Get the distinct count of matches for each player
matches_count_df = lineups_df.select(explode(col("lineups.lineup")).alias("player"), col("match_id")) \
                             .groupBy("player.player_name").agg(countDistinct("match_id").alias("match_count"))

# Join the matches count with the pivot dataframe
final_df = pivot_df.join(matches_count_df, "player_name", "left")

# Get the top 20 rows based on the new column total_card_count
top_20_df = final_df.orderBy(col("total_card_count").desc()).limit(20)

# Show the top 20 rows
top_20_df.show(truncate=False)



+-------------------------------+--------+-------------+-----------+----------------+-----------+
|player_name                    |Red Card|Second Yellow|Yellow Card|total_card_count|match_count|
+-------------------------------+--------+-------------+-----------+----------------+-----------+
|Sergio Busquets i Burgos       |0       |1            |105        |106             |420        |
|Gerard Piqué Bernabéu          |1       |4            |91         |96              |358        |
|Daniel Alves da Silva          |1       |3            |56         |60              |250        |
|Jordi Alba Ramos               |1       |2            |50         |53              |284        |
|Lionel Andrés Messi Cuccittini |0       |0            |53         |53              |598        |
|Javier Alejandro Mascherano    |2       |1            |49         |52              |221        |
|Neymar da Silva Santos Junior  |0       |1            |34         |35              |158        |
|Sergio Ramos García

In [78]:
from pyspark.sql.functions import when, round as _round
# Add the average cards per match column
final_df = final_df.withColumn(
    "average_cards_per_match",
    _round(when(col("match_count") > 0, col("total_card_count") / col("match_count")).otherwise(0),2)
)

# Get the top 20 rows based on the new column total_card_count
top_20_df = final_df.orderBy(col("total_card_count").desc()).limit(20)

# Show the top 20 rows
top_20_df.show(truncate=False)

+-------------------------------+--------+-------------+-----------+----------------+-----------+-----------------------+
|player_name                    |Red Card|Second Yellow|Yellow Card|total_card_count|match_count|average_cards_per_match|
+-------------------------------+--------+-------------+-----------+----------------+-----------+-----------------------+
|Sergio Busquets i Burgos       |0       |1            |105        |106             |420        |0.25                   |
|Gerard Piqué Bernabéu          |1       |4            |91         |96              |358        |0.27                   |
|Daniel Alves da Silva          |1       |3            |56         |60              |250        |0.24                   |
|Jordi Alba Ramos               |1       |2            |50         |53              |284        |0.19                   |
|Lionel Andrés Messi Cuccittini |0       |0            |53         |53              |598        |0.09                   |
|Javier Alejandro Masche

### 1.4 Events folder

In [18]:
# Original
root_dir = "../open-data/data/events"
"""events_df = spark.read.json(f"{root_dir}/*.json", multiLine=True)
events_df.printSchema()"""

'events_df = spark.read.json(f"{root_dir}/*.json", multiLine=True)\nevents_df.printSchema()'

In [19]:
# Marked with match_id
from pyspark.sql.functions import input_file_name, regexp_replace, struct

# Read the JSON files and include the filename
events_df = spark.read.json(f"{root_dir}/*.json", multiLine=True) \
    .withColumn("file_name", input_file_name())

# Extract match_id from the filename by removing the .json extension
events_df = events_df.withColumn("match_id", regexp_replace("file_name", r"(.*/)?([^/]+)\.json$", "$2"))

# Nest the existing structure inside the match_id struct
events_df = events_df.select(struct("*").alias("events"), "match_id")

In [20]:
events_df.printSchema()

root
 |-- events: struct (nullable = false)
 |    |-- 50_50: struct (nullable = true)
 |    |    |-- outcome: struct (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |-- bad_behaviour: struct (nullable = true)
 |    |    |-- card: struct (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |-- ball_receipt: struct (nullable = true)
 |    |    |-- outcome: struct (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |-- ball_recovery: struct (nullable = true)
 |    |    |-- offensive: boolean (nullable = true)
 |    |    |-- recovery_failure: boolean (nullable = true)
 |    |-- block: struct (nullable = true)
 |    |    |-- deflection: boolean (nullable = true)
 |    |    |-- offensive: boolean (nullable = true)
 |    |    |-- save_block: boolean (nullable = true)
 |    |-- carry:

In [86]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, count, sum as _sum, avg, when
# 1. Total Number of Events per Match
total_events_per_match = events_df.groupBy("match_id").count()
total_events_per_match.show()

# 2. Event Distribution by Type
event_distribution = events_df.groupBy("events.type.name").count()
event_distribution.show()

# 3. Player Performance Metrics
player_metrics = events_df.groupBy("events.player.name").agg(
    count("*").alias("total_events"),
    count(when(col("events.shot.outcome.name") == "Goal", True)).alias("goals"),
    count(when(col("events.pass.goal_assist") == True, True)).alias("assists")
)
player_metrics.show()

# 4. Team Performance Metrics
team_metrics = events_df.groupBy("events.team.name").agg(
    count("*").alias("total_events"),
    count(when(col("events.shot.outcome.name") == "Goal", True)).alias("goals"),
    count(when(col("events.pass.goal_assist") == True, True)).alias("assists")
)
team_metrics.show()

# 5. Pass Analysis
pass_analysis = events_df.filter(col("events.type.name") == "Pass").groupBy("events.team.name").agg(
    count("*").alias("total_passes"),
    count(when(col("events.pass.outcome.name") == "Complete", True)).alias("successful_passes"),
    avg(col("events.pass.length")).alias("average_pass_length")
)
pass_analysis.show()

# 6. Shot Analysis
shot_analysis = events_df.filter(col("events.type.name") == "Shot").groupBy("events.team.name").agg(
    count("*").alias("total_shots"),
    count(when(col("events.shot.outcome.name") == "Goal", True)).alias("goals"),
    avg(col("events.shot.statsbomb_xg")).alias("average_xg")
)
shot_analysis.show()

# 7. Foul Analysis
foul_analysis = events_df.filter(col("events.type.name") == "Foul Committed").groupBy("events.team.name").agg(
    count("*").alias("total_fouls"),
    count(when(col("events.foul_committed.card.name").isNotNull(), True)).alias("cards")
)
foul_analysis.show()

# 8. Possession Analysis
possession_analysis = events_df.groupBy("events.team.name", "events.possession_team.name").agg(
    count("*").alias("total_possession_events")
).groupBy("events.team.name").agg(
    _sum("total_possession_events").alias("total_possession_events")
)
possession_analysis.show()


+--------+-----+
|match_id|count|
+--------+-----+
| 3829519| 4632|
| 3869420| 5145|
|   68357| 4597|
|   18243| 4708|
|    7582| 5024|
| 3794692| 4921|
| 2302764| 4648|
| 3773369| 4806|
| 3795108| 4990|
| 3795220| 4827|
| 3794685| 4883|
|   70305| 4740|
| 3837938| 4794|
| 3795506| 4796|
|   18237| 4695|
| 3794686| 4860|
|   69295| 4599|
| 3869220| 4750|
| 3795221| 4691|
|   69296| 4567|
+--------+-----+
+-----------------+-------+
|             name|  count|
+-----------------+-------+
|       Camera off|    693|
|   Tactical Shift|   8130|
|             Shot|  84065|
|Referee Ball-Drop|   3850|
|     Dispossessed|  85501|
|     Own Goal For|    320|
|  Injury Stoppage|  13034|
|             Duel| 248531|
|    Bad Behaviour|   2402|
|         Foul Won|  91996|
|       Player Off|   3119|
|           Shield|   4438|
|         Half End|  13472|
|        Camera On|   2595|
|      Starting XI|   6632|
|            Carry|2511471|
|        Clearance| 153420|
|    Ball Recovery| 352836|
|   

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `events`.`team`.`name` cannot be resolved. Did you mean one of the following? [`name`, `name`, `total_possession_events`].;
'Aggregate ['events.team.name], ['events.team.name, sum(total_possession_events#4393L) AS total_possession_events#4405L]
+- Aggregate [events#3783.team.name, events#3783.possession_team.name], [events#3783.team.name AS name#4396, events#3783.possession_team.name AS name#4397, count(1) AS total_possession_events#4393L]
   +- Project [struct(50_50, 50_50#3609, bad_behaviour, bad_behaviour#3610, ball_receipt, ball_receipt#3611, ball_recovery, ball_recovery#3612, block, block#3613, carry, carry#3614, clearance, clearance#3615, counterpress, counterpress#3616, dribble, dribble#3617, duel, duel#3618, duration, duration#3619, foul_committed, foul_committed#3620, ... 64 more fields) AS events#3783, match_id#3738]
      +- Project [50_50#3609, bad_behaviour#3610, ball_receipt#3611, ball_recovery#3612, block#3613, carry#3614, clearance#3615, counterpress#3616, dribble#3617, duel#3618, duration#3619, foul_committed#3620, foul_won#3621, goalkeeper#3622, half_end#3623, half_start#3624, id#3625, index#3626L, injury_stoppage#3627, interception#3628, location#3629, minute#3630L, miscontrol#3631, off_camera#3632, ... 20 more fields]
         +- Project [50_50#3609, bad_behaviour#3610, ball_receipt#3611, ball_recovery#3612, block#3613, carry#3614, clearance#3615, counterpress#3616, dribble#3617, duel#3618, duration#3619, foul_committed#3620, foul_won#3621, goalkeeper#3622, half_end#3623, half_start#3624, id#3625, index#3626L, injury_stoppage#3627, interception#3628, location#3629, minute#3630L, miscontrol#3631, off_camera#3632, ... 19 more fields]
            +- Relation [50_50#3609,bad_behaviour#3610,ball_receipt#3611,ball_recovery#3612,block#3613,carry#3614,clearance#3615,counterpress#3616,dribble#3617,duel#3618,duration#3619,foul_committed#3620,foul_won#3621,goalkeeper#3622,half_end#3623,half_start#3624,id#3625,index#3626L,injury_stoppage#3627,interception#3628,location#3629,minute#3630L,miscontrol#3631,off_camera#3632,... 18 more fields] json


In [None]:
from pyspark.sql.functions import col, count, when, ceil

# Create a helper column to bin the time into 10-minute intervals
events_distributions_df = events_df.withColumn("time_bin", ceil(col("events.minute") / 10))

# Distribution of fouls during a match in 10-minute bins
fouls_distribution = events_distributions_df.filter(col("events.foul_committed").isNotNull()) \
    .groupBy("time_bin").count() \
    .withColumnRenamed("count", "fouls_count")

# Distribution of goals during a match in 10-minute bins
goals_distribution = events_distributions_df.filter(col("events.shot.outcome.name") == "Goal") \
    .groupBy("time_bin").count() \
    .withColumnRenamed("count", "goals_count")

In [23]:

# Show the results
fouls_distribution.show()

+--------+-----------+
|time_bin|fouls_count|
+--------+-----------+
|       0|        144|
|       7|       3224|
|       6|       3228|
|       9|       3350|
|       5|       3320|
|       1|       2072|
|      10|       1027|
|       3|       2806|
|      12|         37|
|       8|       3181|
|      11|         26|
|       2|       2569|
|       4|       3027|
|      13|          3|
+--------+-----------+


In [24]:
goals_distribution.show()

+--------+-----------+
|time_bin|goals_count|
+--------+-----------+
|       0|         32|
|       7|       1033|
|       6|       1073|
|       9|       1081|
|       5|       1099|
|       1|        794|
|      10|        345|
|       3|        904|
|      12|         28|
|       8|        985|
|      11|         13|
|       2|        853|
|       4|        938|
|      13|        148|
|      14|         10|
+--------+-----------+


### 1.5 Three-sixty folder

In [5]:
root_dir = "../open-data/data/three-sixty"

three_sixty_df = spark.read.json(f"{root_dir}/*.json", multiLine=True)
three_sixty_df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- event_uuid: string (nullable = true)
 |-- freeze_frame: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- actor: boolean (nullable = true)
 |    |    |-- keeper: boolean (nullable = true)
 |    |    |-- location: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- teammate: boolean (nullable = true)
 |-- visible_area: array (nullable = true)
 |    |-- element: double (containsNull = true)


In [6]:
three_sixty_df.show()

+---------------+--------------------+--------------------+--------------------+
|_corrupt_record|          event_uuid|        freeze_frame|        visible_area|
+---------------+--------------------+--------------------+--------------------+
|           NULL|ea57ee78-a4ef-4d6...|[{false, false, [...|[120.0, 80.0, 0.0...|
|           NULL|66655fd3-2764-453...|[{true, false, [3...|[0.0, 80.0, 0.0, ...|
|           NULL|3765b81d-d184-4af...|[{true, false, [3...|[0.0, 80.0, 0.0, ...|
|           NULL|fdcb6ded-f8be-4fa...|[{true, false, [3...|[2.18856831879949...|
|           NULL|0549751b-8a46-49b...|[{false, false, [...|[13.952798163308,...|
|           NULL|be62c43a-ce82-494...|[{false, false, [...|[13.952798163308,...|
|           NULL|428d98d8-ece0-4c7...|[{false, false, [...|[37.5456955242079...|
|           NULL|897fdc30-b4bd-421...|[{false, false, [...|[26.5122636276366...|
|           NULL|a61d0e74-ff36-443...|[{false, false, [...|[51.4761931728235...|
|           NULL|19338014-ee

In [16]:
# Step 1: Map - Filter and project required columns
mapped_df = three_sixty_df.filter(col('_corrupt_record').isNotNull()).select('_corrupt_record', 'event_uuid')

# Step 2: Reduce - Perform aggregation
aggregated_df = mapped_df.groupBy('_corrupt_record').agg(count('event_uuid').alias('count'))

# Show the results
aggregated_df.show(truncate=False)

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [3]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException


# Directory containing the JSON files
root_dir = "../open-data/data/three-sixty"

# Get the list of JSON files in the directory
json_files = [os.path.join(root_dir, file) for file in os.listdir(root_dir) if file.endswith('.json')]

# Function to check for corrupted records in a file
def find_corrupted_records(file_path):
    try:
        # Try to read the JSON file
        df = spark.read.json(file_path, multiLine=True)
        # If successful, return None
        return None
    except AnalysisException as e:
        # If an AnalysisException occurs, return the file path and the error message
        return file_path, str(e)
    except Exception as e:
        # If any other Exception occurs, return the file path and the error message
        return file_path, str(e)

# Iterate through the files and check for corrupted records
for json_file in json_files:
    result = find_corrupted_records(json_file)
    if result:
        file_path, error_message = result
        print(f"Error processing file: {file_path}")
        print(f"Error message: {error_message}")

