In [1]:
from google.colab import files

# Upload the zip file
uploaded = files.upload()


Saving Dataset for Task 4-20240716T095937Z-001.zip to Dataset for Task 4-20240716T095937Z-001.zip


In [2]:
import zipfile
import os

# Define the path to the uploaded zip file
zip_file_path = 'Dataset for Task 4-20240716T095937Z-001.zip'

# Create an extraction directory
extraction_dir = '/content/dataset/'
os.makedirs(extraction_dir, exist_ok=True)

# Extract the contents of the zip file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extraction_dir)

# List the extracted files
extracted_files = os.listdir(extraction_dir)
print(extracted_files)


['Dataset for Task 4']


In [3]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=5ad5105855bbdc1a33e9efd3aeb34b887ecfb94e845324e6805ac744d9b5b617
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [10]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("Bundesliga Analysis").getOrCreate()

# Define the paths to the dataset directory and CSV files
dataset_dir = os.path.join(extraction_dir, 'Dataset for Task 4')
teams_path = os.path.join(dataset_dir, 'Teams.csv')
teams_in_matches_path = os.path.join(dataset_dir, 'Teams_in_Matches.csv')
unique_teams_path = os.path.join(dataset_dir, 'Unique_Teams.csv')
matches_path = os.path.join(dataset_dir, 'Matches.csv')

# Load the CSV files into PySpark DataFrames
teams_df = spark.read.csv(teams_path, header=True, inferSchema=True)
teams_in_matches_df = spark.read.csv(teams_in_matches_path, header=True, inferSchema=True)
unique_teams_df = spark.read.csv(unique_teams_path, header=True, inferSchema=True)
matches_df = spark.read.csv(matches_path, header=True, inferSchema=True)

# Show the first few rows of each DataFrame to understand their structure
teams_df.show(5)
teams_in_matches_df.show(5)
unique_teams_df.show(5)
matches_df.show(5)


+------+-------------+---------+----------+------------------+----------------------+------------------+---------------+
|Season|     TeamName|KaderHome|AvgAgeHome|ForeignPlayersHome|OverallMarketValueHome|AvgMarketValueHome|StadiumCapacity|
+------+-------------+---------+----------+------------------+----------------------+------------------+---------------+
|  2017|Bayern Munich|       27|        26|                15|             597950000|          22150000|          75000|
|  2017|     Dortmund|       33|        25|                18|             416730000|          12630000|          81359|
|  2017|   Leverkusen|       31|        24|                15|             222600000|           7180000|          30210|
|  2017|   RB Leipzig|       30|        23|                15|             180130000|           6000000|          42959|
|  2017|   Schalke 04|       29|        24|                17|             179550000|           6190000|          62271|
+------+-------------+---------+

In [16]:
from pyspark.sql.functions import col, when, desc, count

d1_matches_df = matches_df.filter((col('Div') == 'D1') & (col('Season') >= 2010))

# Calculate the number of wins for each team per season
home_wins_df = d1_matches_df.filter(col("FTR") == "H").groupBy("Season", "HomeTeam").agg(count("FTR").alias("wins")).withColumnRenamed("HomeTeam", "Team")
away_wins_df = d1_matches_df.filter(col("FTR") == "A").groupBy("Season", "AwayTeam").agg(count("FTR").alias("wins")).withColumnRenamed("AwayTeam", "Team")

# Combine home and away wins
total_wins_df = home_wins_df.union(away_wins_df).groupBy("Season", "Team").agg({"wins": "sum"}).withColumnRenamed("sum(wins)", "total_wins")

# Find the winners for each season
from pyspark.sql.window import Window
import pyspark.sql.functions as F

window = Window.partitionBy("Season").orderBy(desc("total_wins"))
winners_df = total_wins_df.withColumn("rank", F.rank().over(window)).filter(col("rank") == 1).drop("rank")

# Show the winners
winners_df.show()

+------+-------------+----------+
|Season|         Team|total_wins|
+------+-------------+----------+
|  2010|     Dortmund|        23|
|  2011|     Dortmund|        25|
|  2012|Bayern Munich|        29|
|  2013|Bayern Munich|        29|
|  2014|Bayern Munich|        25|
|  2015|Bayern Munich|        28|
|  2016|Bayern Munich|        25|
|  2017|     Augsburg|        17|
|  2017|Bayern Munich|        17|
|  2017|       Hertha|        17|
|  2017|    Wolfsburg|        17|
|  2017|      Hamburg|        17|
|  2017|   Leverkusen|        17|
|  2017|     Hannover|        17|
|  2017|     Freiburg|        17|
|  2017|   Hoffenheim|        17|
|  2017|      FC Koln|        17|
|  2017|   M'gladbach|        17|
|  2017|Werder Bremen|        17|
|  2017|   RB Leipzig|        17|
+------+-------------+----------+
only showing top 20 rows



In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as spark_sum, rank, asc
from pyspark.sql.window import Window

# Assuming you have a SparkSession already created
spark = SparkSession.builder.appName("FootballAnalysis").getOrCreate()

# Filter matches for D1 division and the last decade
d1_matches_df = matches_df.filter((col('Div') == 'D1') & (col('Season') >= 2010))

# Calculate points for each match
points_df = d1_matches_df.withColumn(
    "HomePoints", when(col("FTR") == "H", 3).when(col("FTR") == "D", 1).otherwise(0)
).withColumn(
    "AwayPoints", when(col("FTR") == "A", 3).when(col("FTR") == "D", 1).otherwise(0)
)

# Aggregate points by HomeTeam
home_points_df = points_df.groupBy("Season", "HomeTeam").agg(spark_sum("HomePoints").alias("HomePoints"))

# Aggregate points by AwayTeam
away_points_df = points_df.groupBy("Season", "AwayTeam").agg(spark_sum("AwayPoints").alias("AwayPoints"))

# Combine home and away points
joined_df = home_points_df.join(
    away_points_df,
    (home_points_df.Season == away_points_df.Season) & (home_points_df.HomeTeam == away_points_df.AwayTeam),
    "outer"
).select(
    home_points_df["Season"],  # Explicitly select Season from home_points_df
    col("HomeTeam").alias("Team"),
    col("HomePoints"),
    col("AwayPoints")
)

# Calculate total points for each team across home and away matches
total_points_df = joined_df.withColumn("TotalPoints", col("HomePoints") + col("AwayPoints"))

# Find the relegated teams for each season (assuming bottom 3 teams are relegated)
window = Window.partitionBy("Season").orderBy(asc("TotalPoints"))
relegated_df = total_points_df.withColumn("Rank", rank().over(window)).filter(col("Rank") <= 3).drop("Rank")

# Show the relegated teams
relegated_df.show()



+------+------------------+----------+----------+-----------+
|Season|              Team|HomePoints|AwayPoints|TotalPoints|
+------+------------------+----------+----------+-----------+
|  2010|          St Pauli|        15|        14|         29|
|  2010|     Ein Frankfurt|        19|        15|         34|
|  2010|        M'gladbach|        18|        18|         36|
|  2011|    Kaiserslautern|        11|        12|         23|
|  2011|           FC Koln|        19|        11|         30|
|  2011|            Hertha|        15|        16|         31|
|  2012|    Greuther Furth|         4|        17|         21|
|  2012|Fortuna Dusseldorf|        21|         9|         30|
|  2012|        Hoffenheim|        19|        12|         31|
|  2013|      Braunschweig|        18|         7|         25|
|  2013|          Nurnberg|        14|        12|         26|
|  2013|           Hamburg|        18|         9|         27|
|  2014|         Paderborn|        18|        13|         31|
|  2014|

In [23]:
# Joining DataFrame to get comprehensive match information

from pyspark.sql.functions import col, when, month


joined_df = matches_df.join(
    teams_in_matches_df,
    matches_df.Match_ID == teams_in_matches_df.Match_ID,
    "left"
).join(
    unique_teams_df,
    teams_in_matches_df.Unique_Team_ID == unique_teams_df.Unique_Team_ID,
    "left"
).join(
    teams_df,
    unique_teams_df.TeamName == teams_df.TeamName,
    "left"
).select(
    matches_df.Match_ID,
    matches_df.Season,
    matches_df.Date,
    matches_df.HomeTeam,
    matches_df.AwayTeam,
    matches_df.FTHG,
    matches_df.FTAG,
    matches_df.FTR,
    teams_df.TeamName,
    teams_df.KaderHome,
    teams_df.AvgAgeHome,
    teams_df.ForeignPlayersHome,
    teams_df.OverallMarketValueHome,
    teams_df.AvgMarketValueHome,
    teams_df.StadiumCapacity
)

# Define Oktoberfest months (September and October)
oktoberfest_months = [9, 10]

# Add a column indicating whether a match occurred during Oktoberfest
joined_df = joined_df.withColumn(
    "IsOktoberfest",
    when(month(col("Date")).isin(oktoberfest_months), True).otherwise(False)
)

# Calculate performance metrics for Oktoberfest and non-Oktoberfest matches
# Calculate performance metrics for Oktoberfest and non-Oktoberfest matches
performance_metrics_df = joined_df.groupBy("IsOktoberfest").agg(
    {"FTHG": "avg", "FTAG": "avg", "FTR": "avg"}
).orderBy("IsOktoberfest")

# Show the performance metrics comparison
performance_metrics_df.show()

+-------------+------------------+--------+------------------+
|IsOktoberfest|         avg(FTAG)|avg(FTR)|         avg(FTHG)|
+-------------+------------------+--------+------------------+
|        false|1.0901671477957162|    NULL|1.5038589321707039|
|         true|1.1174280395245597|    NULL|1.5500358012315623|
+-------------+------------------+--------+------------------+



In [37]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when

# Assuming you have a SparkSession already created
spark = SparkSession.builder.appName("CompetitivenessAnalysis").getOrCreate()

# Assuming `matches_df` contains the match data as per your previous examples
matches_df.show(5)  # Display to verify column names and data

# Filter matches for D1 division and the last decade
d1_matches_df = matches_df.filter((col('Div') == 'D1') & (col('Season') >= 2010))

# Calculate points for each match
points_df = d1_matches_df.withColumn(
    "HomePoints",
    when(col("FTR") == "H", 3).when(col("FTR") == "D", 1).otherwise(0)
).withColumn(
    "AwayPoints",
    when(col("FTR") == "A", 3).when(col("FTR") == "D", 1).otherwise(0)
)

# Calculate total points per season for each team
season_points_df = points_df.groupBy("Season", "HomeTeam").agg(
    sum("HomePoints").alias("TotalHomePoints"),
    sum("AwayPoints").alias("TotalAwayPoints")
)

# Aggregate total points to get TotalPoints
season_points_df = season_points_df.withColumn(
    "TotalPoints",
    col("TotalHomePoints") + col("TotalAwayPoints")
).select("Season", "HomeTeam", "TotalPoints")

# Show the results
season_points_df.show()

# Calculate standard deviation of points per season to measure variability (if needed)
# stddev_per_season = season_points_df.groupBy("Season").agg(stddev("TotalPoints").alias("StdDevPoints")).orderBy("Season")

# Example dataframe (replace with your actual data)
df = season_points_df

# Calculate range of TotalPoints and standard deviation per season
season_summary = df.groupBy("Season").agg(
    stddev("TotalPoints").alias("PointsStdDev"),
    (max("TotalPoints") - min("TotalPoints")).alias("PointsRange")
).orderBy("PointsRange")  # Order by PointsRange to find the smallest range

# Find the most competitive season (smallest PointsRange or PointsStdDev)
most_competitive_season = season_summary.select("Season").first()[0]

# Show the summary results
season_summary.show()

# Print the most competitive season
print("The most competitive season based on points range and standard deviation is:", most_competitive_season)


+--------+---+------+----------+-------------+--------------+----+----+---+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|FTHG|FTAG|FTR|
+--------+---+------+----------+-------------+--------------+----+----+---+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|   2|   1|  H|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|   0|   1|  A|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|   1|   1|  D|
|       4| D2|  2009|2010-02-21|Frankfurt FSV|     Karlsruhe|   2|   1|  H|
|       5| D2|  2009|2009-12-06|        Ahlen|     Karlsruhe|   1|   3|  A|
+--------+---+------+----------+-------------+--------------+----+----+---+
only showing top 5 rows

+------+------------------+-----------+
|Season|          HomeTeam|TotalPoints|
+------+------------------+-----------+
|  2014|          Freiburg|         45|
|  2015|         Wolfsburg|         46|
|  2015|     Ein Frankfurt|         45|
|  2010|          Hannover|         50|
|  2011|   

In [35]:
from pyspark.sql.functions import col, month, avg

# Calculate average goals per match by month
goals_by_month = matches_df.withColumn("Month", month("Date")).groupBy("Month").agg(
    avg("FTHG").alias("AvgHomeGoals")
).orderBy("AvgHomeGoals", ascending=False)

goals_by_month.show()

# Get the month with the highest average goals
best_month = goals_by_month.select("Month").first()[0]

# Print the best month
print("The best month to watch Bundesliga matches based on average goals scored by home teams is:", best_month)


+-----+------------------+
|Month|      AvgHomeGoals|
+-----+------------------+
|    6|         1.9765625|
|    5| 1.687710843373494|
|   10|1.5704169944925255|
|    9|1.5395569620253164|
|   11|1.5366591080876795|
|    3|1.5183016105417277|
|    4|1.4825239715434582|
|   12|1.4762815608263198|
|    8|1.4717563989408649|
|    2|1.4424267437061493|
|    1|1.4282494365138994|
|    7|1.1308411214953271|
+-----+------------------+

The best month to watch Bundesliga matches based on average goals scored by home teams is: 6
