In [3]:
%pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
     ---------------------------------------- 0.0/317.0 MB ? eta -:--:--
     ---------------------------------------- 0.1/317.0 MB 4.3 MB/s eta 0:01:14
     ---------------------------------------- 0.3/317.0 MB 2.8 MB/s eta 0:01:52
     ---------------------------------------- 0.4/317.0 MB 3.0 MB/s eta 0:01:45
     ---------------------------------------- 0.5/317.0 MB 3.3 MB/s eta 0:01:35
     ---------------------------------------- 1.2/317.0 MB 5.6 MB/s eta 0:00:57
     ---------------------------------------- 1.9/317.0 MB 6.6 MB/s eta 0:00:48
     ---------------------------------------- 2.8/317.0 MB 8.4 MB/s eta 0:00:38
     --------------------------------------- 3.6/317.0 MB 10.0 MB/s eta 0:00:32
     --------------------------------------- 3.6/317.0 MB 10.0 MB/s eta 0:00:32
     --------------------------------------- 3.6/317.0 MB 10.0 MB/s eta 0:00:32
     ---------------------------------------- 3.7/317.0 MB 7.4

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, desc, month


# Initialize a Spark session
spark = SparkSession.builder.appName("BundesligaAnalysis").getOrCreate()

# Load the data
data_path = "data/task4/"
matches_path = f"{data_path}Matches.csv"
teams_in_matches_path = f"{data_path}Teams_in_Matches.csv"
teams_path = f"{data_path}Teams.csv"
unique_teams_path = f"{data_path}Unique_Teams.csv"

matches_df = spark.read.csv(matches_path, header=True, inferSchema=True)
teams_in_matches_df = spark.read.csv(teams_in_matches_path, header=True, inferSchema=True)
teams_df = spark.read.csv(teams_path, header=True, inferSchema=True)
unique_teams_df = spark.read.csv(unique_teams_path, header=True, inferSchema=True)

# Show the schemas
matches_df.printSchema()
teams_in_matches_df.printSchema()
teams_df.printSchema()
unique_teams_df.printSchema()

In [6]:

# Filter matches for the last decade and D1 division
matches_last_decade_df = matches_df.filter((col('division') == 'D1') & (year(col('date')) >= 2013))

# Determine the winners for each season
winners_df = matches_last_decade_df.groupBy('season').agg(
    {'home_team': 'first', 'away_team': 'first', 'result': 'first'}
)

# Determine the team with the highest points for each season
points_df = matches_last_decade_df.withColumn(
    'points_home', col('result').substr(1, 1).cast('int')
).withColumn(
    'points_away', col('result').substr(3, 1).cast('int')
)

total_points_df = points_df.groupBy('home_team').agg(
    {'points_home': 'sum'}
).withColumnRenamed('sum(points_home)', 'total_points_home')

total_points_df = total_points_df.union(
    points_df.groupBy('away_team').agg({'points_away': 'sum'}).withColumnRenamed('sum(points_away)', 'total_points_away')
)

winners = total_points_df.orderBy(desc('total_points_home')).select('home_team').distinct()

winners.show()


UsageError: Line magic function `%go` not found.


In [None]:
# Filter teams_in_matches for relegated teams in the last decade
relegated_teams_df = teams_in_matches_df.filter((col('division') == 'D2') & (year(col('date')) >= 2013))

# Get the unique teams that were relegated
relegated_teams = relegated_teams_df.select('team').distinct()

relegated_teams.show()


In [None]:
# Filter matches for October
october_matches_df = matches_df.filter(month(col('date')) == 10)

# Analyze performance by comparing October matches with other months
october_performance_df = october_matches_df.groupBy('home_team').agg(
    {'result': 'avg'}
).withColumnRenamed('avg(result)', 'avg_result')

overall_performance_df = matches_df.groupBy('home_team').agg(
    {'result': 'avg'}
).withColumnRenamed('avg(result)', 'avg_result')

performance_comparison_df = october_performance_df.join(
    overall_performance_df,
    on='home_team',
    how='inner'
).withColumn(
    'performance_difference', col('avg_result_october') - col('avg_result_overall')
)

performance_comparison_df.show()


In [None]:
# Calculate standard deviation of points for each season to determine competitiveness
points_stddev_df = points_df.groupBy('season').agg(
    {'points_home': 'stddev', 'points_away': 'stddev'}
)

points_stddev_df = points_stddev_df.withColumn(
    'total_stddev', col('stddev(points_home)') + col('stddev(points_away)')
)

most_competitive_season_df = points_stddev_df.orderBy(desc('total_stddev')).select('season').limit(1)

most_competitive_season_df.show()


In [None]:
# Calculate average goals per month
goals_per_month_df = matches_df.groupBy(month(col('date')).alias('month')).agg(
    {'home_team_goals': 'avg', 'away_team_goals': 'avg'}
)

goals_per_month_df = goals_per_month_df.withColumn(
    'avg_goals', col('avg(home_team_goals)') + col('avg(away_team_goals)')
)

best_month_df = goals_per_month_df.orderBy(desc('avg_goals')).select('month').limit(1)

best_month_df.show()
