In [0]:
spark

In [0]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, BooleanType, DateType, DecimalType
from pyspark.sql import Window, functions as F


In [0]:
from pyspark.sql import SparkSession

#create session
spark = SparkSession.builder.appName("IPL Data Analysis").getOrCreate()

In [0]:
spark

In [0]:
ball_by_ball_df_raw = spark.read.format("csv").option("header", "true").load("s3://ipl-data-analysis-project/Ball_By_Ball.csv")
ball_by_ball_df_raw.printSchema()

root
 |-- MatcH_id: string (nullable = true)
 |-- Over_id: string (nullable = true)
 |-- Ball_id: string (nullable = true)
 |-- Innings_No: string (nullable = true)
 |-- Team_Batting: string (nullable = true)
 |-- Team_Bowling: string (nullable = true)
 |-- Striker_Batting_Position: string (nullable = true)
 |-- Extra_Type: string (nullable = true)
 |-- Runs_Scored: string (nullable = true)
 |-- Extra_runs: string (nullable = true)
 |-- Wides: string (nullable = true)
 |-- Legbyes: string (nullable = true)
 |-- Byes: string (nullable = true)
 |-- Noballs: string (nullable = true)
 |-- Penalty: string (nullable = true)
 |-- Bowler_Extras: string (nullable = true)
 |-- Out_type: string (nullable = true)
 |-- Caught: string (nullable = true)
 |-- Bowled: string (nullable = true)
 |-- Run_out: string (nullable = true)
 |-- LBW: string (nullable = true)
 |-- Retired_hurt: string (nullable = true)
 |-- Stumped: string (nullable = true)
 |-- caught_and_bowled: string (nullable = true)
 |-- hi

In [0]:
ball_by_ball_df = ball_by_ball_df_raw \
    .withColumn("match_id", F.col("MatcH_id").cast(IntegerType())) \
    .withColumn("over_id", F.col("Over_id").cast(IntegerType())) \
    .withColumn("ball_id", F.col("Ball_id").cast(IntegerType())) \
    .withColumn("innings_no", F.col("Innings_No").cast(IntegerType())) \
    .withColumn("team_batting", F.col("Team_Batting").cast(StringType())) \
    .withColumn("team_bowling", F.col("Team_Bowling").cast(StringType())) \
    .withColumn("striker_batting_position", F.col("Striker_Batting_Position").cast(IntegerType())) \
    .withColumn("extra_type", F.col("Extra_Type").cast(StringType())) \
    .withColumn("runs_scored", F.col("Runs_Scored").cast(IntegerType())) \
    .withColumn("extra_runs", F.col("Extra_runs").cast(IntegerType())) \
    .withColumn("wides", F.col("Wides").cast(IntegerType())) \
    .withColumn("legbyes", F.col("Legbyes").cast(IntegerType())) \
    .withColumn("byes", F.col("Byes").cast(IntegerType())) \
    .withColumn("noballs", F.col("Noballs").cast(IntegerType())) \
    .withColumn("penalty", F.col("Penalty").cast(IntegerType())) \
    .withColumn("bowler_extras", F.col("Bowler_Extras").cast(IntegerType())) \
    .withColumn("out_type", F.col("Out_type").cast(StringType())) \
    .withColumn("caught", F.col("Caught").cast(BooleanType())) \
    .withColumn("bowled", F.col("Bowled").cast(BooleanType())) \
    .withColumn("run_out", F.col("Run_out").cast(BooleanType())) \
    .withColumn("lbw", F.col("LBW").cast(BooleanType())) \
    .withColumn("retired_hurt", F.col("Retired_hurt").cast(BooleanType())) \
    .withColumn("stumped", F.col("Stumped").cast(BooleanType())) \
    .withColumn("caught_and_bowled", F.col("caught_and_bowled").cast(BooleanType())) \
    .withColumn("hit_wicket", F.col("hit_wicket").cast(BooleanType())) \
    .withColumn("obstructingfeild", F.col("ObstructingFeild").cast(BooleanType())) \
    .withColumn("bowler_wicket", F.col("Bowler_Wicket").cast(BooleanType())) \
    .withColumn("match_date", F.to_date(F.col("match_date"), "M/d/yyyy")) \
    .withColumn("season", F.col("Season").cast(IntegerType())) \
    .withColumn("striker", F.col("Striker").cast(IntegerType())) \
    .withColumn("non_striker", F.col("Non_Striker").cast(IntegerType())) \
    .withColumn("bowler", F.col("Bowler").cast(IntegerType())) \
    .withColumn("player_out", F.col("Player_Out").cast(IntegerType())) \
    .withColumn("fielders", F.col("Fielders").cast(IntegerType())) \
    .withColumn("striker_match_sk", F.col("Striker_match_SK").cast(IntegerType())) \
    .withColumn("strikersk", F.col("StrikerSK").cast(IntegerType())) \
    .withColumn("nonstriker_match_sk", F.col("NonStriker_match_SK").cast(IntegerType())) \
    .withColumn("nonstriker_sk", F.col("NONStriker_SK").cast(IntegerType())) \
    .withColumn("fielder_match_sk", F.col("Fielder_match_SK").cast(IntegerType())) \
    .withColumn("fielder_sk", F.col("Fielder_SK").cast(IntegerType())) \
    .withColumn("bowler_match_sk", F.col("Bowler_match_SK").cast(IntegerType())) \
    .withColumn("bowler_sk", F.col("BOWLER_SK").cast(IntegerType())) \
    .withColumn("playerout_match_sk", F.col("PlayerOut_match_SK").cast(IntegerType())) \
    .withColumn("battingteam_sk", F.col("BattingTeam_SK").cast(IntegerType())) \
    .withColumn("bowlingteam_sk", F.col("BowlingTeam_SK").cast(IntegerType())) \
    .withColumn("keeper_catch", F.col("Keeper_Catch").cast(BooleanType())) \
    .withColumn("player_out_sk", F.col("Player_out_sk").cast(IntegerType())) \
    .withColumn("matchdatesk", F.to_date(F.col("MatchDateSK"), "yyyyMMdd"))

In [0]:
ball_by_ball_df.show(1)

+--------+-------+-------+----------+------------+------------+------------------------+----------+-----------+----------+-----+-------+----+-------+-------+-------------+--------------+------+------+-------+-----+------------+-------+-----------------+----------+----------------+-------------+----------+------+-------+-----------+------+----------+--------+----------------+---------+-------------------+-------------+----------------+----------+---------------+---------+------------------+--------------+--------------+------------+-------------+-----------+
|match_id|over_id|ball_id|innings_no|team_batting|team_bowling|striker_batting_position|extra_type|runs_scored|extra_runs|wides|legbyes|byes|noballs|penalty|bowler_extras|      out_type|caught|bowled|run_out|  lbw|retired_hurt|stumped|caught_and_bowled|hit_wicket|obstructingfeild|bowler_wicket|match_date|season|striker|non_striker|bowler|player_out|fielders|striker_match_sk|strikersk|nonstriker_match_sk|nonstriker_sk|fielder_match_s

In [0]:
match_df_raw = spark.read.format("csv").option("header","true").load("s3://ipl-data-analysis-project/Match.csv")
match_df_raw.printSchema()

root
 |-- Match_SK: string (nullable = true)
 |-- match_id: string (nullable = true)
 |-- Team1: string (nullable = true)
 |-- Team2: string (nullable = true)
 |-- match_date: string (nullable = true)
 |-- Season_Year: string (nullable = true)
 |-- Venue_Name: string (nullable = true)
 |-- City_Name: string (nullable = true)
 |-- Country_Name: string (nullable = true)
 |-- Toss_Winner: string (nullable = true)
 |-- match_winner: string (nullable = true)
 |-- Toss_Name: string (nullable = true)
 |-- Win_Type: string (nullable = true)
 |-- Outcome_Type: string (nullable = true)
 |-- ManOfMach: string (nullable = true)
 |-- Win_Margin: string (nullable = true)
 |-- Country_id: string (nullable = true)



In [0]:
match_df = match_df_raw \
    .withColumn("match_sk", F.col("Match_SK").cast(IntegerType())) \
    .withColumn("match_id", F.col("match_id").cast(IntegerType())) \
    .withColumn("match_date", F.to_date("match_date","M/d/yyyy")) \
    .withColumn("season_year", F.col("season_year").cast(IntegerType())) \
    .withColumn("win_margin", F.col("win_margin").cast(IntegerType())) \
    .withColumn("country_id", F.col("country_id").cast(IntegerType()))

In [0]:
match_df.show(1)

+--------+--------+--------------------+--------------------+----------+-----------+--------------------+---------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+
|match_sk|match_id|               Team1|               Team2|match_date|season_year|          Venue_Name|City_Name|Country_Name|         Toss_Winner|        match_winner|Toss_Name|Win_Type|Outcome_Type|  ManOfMach|win_margin|country_id|
+--------+--------+--------------------+--------------------+----------+-----------+--------------------+---------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+
|       0|  335987|Royal Challengers...|Kolkata Knight Ri...|2008-04-18|       2008|M Chinnaswamy Sta...|Bangalore|       India|Royal Challengers...|Kolkata Knight Ri...|    field|    runs|      Result|BB McCullum|       140|         1|
+--------+--------+--------------------+------------

In [0]:
player_df_raw = spark.read.format("csv").option("header","true").option("dateFormat","M/d/yyyy").load("s3://ipl-data-analysis-project/Player.csv")
player_df_raw.printSchema()

root
 |-- PLAYER_SK: string (nullable = true)
 |-- Player_Id: string (nullable = true)
 |-- Player_Name: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- Batting_hand: string (nullable = true)
 |-- Bowling_skill: string (nullable = true)
 |-- Country_Name: string (nullable = true)



In [0]:
player_df = player_df_raw \
    .withColumn("player_sk", F.col("player_sk").cast(IntegerType())) \
    .withColumn("player_id", F.col("player_id").cast(IntegerType())) \
    .withColumn("dob", F.to_date("dob", "M/d/yyyy"))

In [0]:
player_df.show(1)

+---------+---------+-----------+----------+-------------+----------------+------------+
|player_sk|player_id|Player_Name|       dob| Batting_hand|   Bowling_skill|Country_Name|
+---------+---------+-----------+----------+-------------+----------------+------------+
|        0|        1| SC Ganguly|1972-07-08|Left-hand bat|Right-arm medium|       India|
+---------+---------+-----------+----------+-------------+----------------+------------+
only showing top 1 row



In [0]:
player_match_df_raw = spark.read.format("csv").option("header","true").load("s3://ipl-data-analysis-project/Player_match.csv")
player_match_df_raw.printSchema()

root
 |-- Player_match_SK: string (nullable = true)
 |-- PlayerMatch_key: string (nullable = true)
 |-- Match_Id: string (nullable = true)
 |-- Player_Id: string (nullable = true)
 |-- Player_Name: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- Batting_hand: string (nullable = true)
 |-- Bowling_skill: string (nullable = true)
 |-- Country_Name: string (nullable = true)
 |-- Role_Desc: string (nullable = true)
 |-- Player_team: string (nullable = true)
 |-- Opposit_Team: string (nullable = true)
 |-- Season_year: string (nullable = true)
 |-- is_manofThematch: string (nullable = true)
 |-- Age_As_on_match: string (nullable = true)
 |-- IsPlayers_Team_won: string (nullable = true)
 |-- Batting_Status: string (nullable = true)
 |-- Bowling_Status: string (nullable = true)
 |-- Player_Captain: string (nullable = true)
 |-- Opposit_captain: string (nullable = true)
 |-- Player_keeper: string (nullable = true)
 |-- Opposit_keeper: string (nullable = true)



In [0]:
player_match_df = player_match_df_raw \
    .withColumn("player_match_sk", F.col("player_match_sk").cast(IntegerType())) \
    .withColumn("playermatch_key", F.col("playermatch_key").cast(DecimalType())) \
    .withColumn("match_id", F.col("match_id").cast(IntegerType())) \
    .withColumn("player_id", F.col("player_id").cast(IntegerType())) \
    .withColumn("dob", F.to_date("dob","M/d/yyyy")) \
    .withColumn("season_year", F.col("season_year").cast(IntegerType())) \
    .withColumn("age_as_on_match", F.col("age_as_on_match").cast(IntegerType())) \
    .withColumn("is_manofthematch", F.col("is_manofthematch").cast(BooleanType())) \
    .withColumn("isplayers_team_won", F.col("isplayers_team_won").cast(BooleanType())) 


In [0]:
player_match_df.show(5)

+---------------+---------------+--------+---------+-----------+----------+--------------+--------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+--------------+--------------+---------------+-------------+--------------+
|player_match_sk|playermatch_key|match_id|player_id|Player_Name|       dob|  Batting_hand|       Bowling_skill|Country_Name|Role_Desc|         Player_team|        Opposit_Team|season_year|is_manofthematch|age_as_on_match|isplayers_team_won|Batting_Status|Bowling_Status|Player_Captain|Opposit_captain|Player_keeper|Opposit_keeper|
+---------------+---------------+--------+---------+-----------+----------+--------------+--------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+--------------+--------------+---------------+-------------+--------------+
|      

In [0]:
team_df_raw = spark.read.format("csv").option("header","true").load("s3://ipl-data-analysis-project/Team.csv")
team_df_raw.printSchema()

root
 |-- Team_SK: string (nullable = true)
 |-- Team_Id: string (nullable = true)
 |-- Team_Name: string (nullable = true)



In [0]:
team_df = team_df_raw \
    .withColumn("team_sk", F.col("team_sk").cast(IntegerType())) \
    .withColumn("team_id", F.col("team_id").cast(IntegerType()))

In [0]:
team_df.show(5)

+-------+-------+--------------------+
|team_sk|team_id|           Team_Name|
+-------+-------+--------------------+
|      0|      1|Kolkata Knight Ri...|
|      1|      2|Royal Challengers...|
|      2|      3| Chennai Super Kings|
|      3|      4|     Kings XI Punjab|
|      4|      5|    Rajasthan Royals|
+-------+-------+--------------------+
only showing top 5 rows



In [0]:
# No of bowler extras by each team per season

joined_df = ball_by_ball_df.join(team_df, ball_by_ball_df["team_bowling"] == team_df["team_id"], "inner")
grouped_df = joined_df.groupBy("team_name", "season").sum("bowler_extras").withColumnRenamed("sum(bowler_extras)", "total_bowler_extras")
sorted_df = grouped_df.orderBy(grouped_df.season.asc(), grouped_df.total_bowler_extras.desc())
bowler_extras_df = sorted_df.show()


+--------------------+------+-------------------+
|           team_name|season|total_bowler_extras|
+--------------------+------+-------------------+
| Chennai Super Kings|  2008|                 91|
|Kolkata Knight Ri...|  2008|                 91|
|     Kings XI Punjab|  2008|                 90|
|Royal Challengers...|  2008|                 89|
|      Mumbai Indians|  2008|                 84|
|     Deccan Chargers|  2008|                 82|
|    Rajasthan Royals|  2008|                 82|
|    Delhi Daredevils|  2008|                 69|
|Kolkata Knight Ri...|  2009|                 94|
|    Delhi Daredevils|  2009|                 92|
|      Mumbai Indians|  2009|                 88|
|    Rajasthan Royals|  2009|                 82|
|Royal Challengers...|  2009|                 77|
|     Deccan Chargers|  2009|                 73|
|     Kings XI Punjab|  2009|                 61|
| Chennai Super Kings|  2009|                 34|
|     Kings XI Punjab|  2010|                107|


In [0]:
# No of extra runs by each team per season

joined_df = ball_by_ball_df.join(team_df, ball_by_ball_df["team_batting"] == team_df["team_id"], "inner")
grouped_df = joined_df.groupBy("team_name", "season").sum("extra_runs").withColumnRenamed("sum(extra_runs)", "total_extra_runs")
sorted_df = grouped_df.orderBy(grouped_df.season.asc(), grouped_df.total_extra_runs.desc())
team_extras_df = sorted_df.show()


+--------------------+------+----------------+
|           team_name|season|total_extra_runs|
+--------------------+------+----------------+
|      Mumbai Indians|  2008|             183|
|    Rajasthan Royals|  2008|             177|
| Chennai Super Kings|  2008|             155|
|Kolkata Knight Ri...|  2008|             141|
|     Kings XI Punjab|  2008|             124|
|    Delhi Daredevils|  2008|             118|
|Royal Challengers...|  2008|             118|
|     Deccan Chargers|  2008|             112|
|     Kings XI Punjab|  2009|             160|
| Chennai Super Kings|  2009|             150|
|      Mumbai Indians|  2009|             135|
|     Deccan Chargers|  2009|             125|
|Royal Challengers...|  2009|             114|
|    Delhi Daredevils|  2009|             110|
|    Rajasthan Royals|  2009|              93|
|Kolkata Knight Ri...|  2009|              90|
|      Mumbai Indians|  2010|             194|
|Kolkata Knight Ri...|  2010|             166|
| Chennai Sup

In [0]:
# No of extras by top 5 bowlers each season

grouped_df = ball_by_ball_df.groupBy("bowler", "season").sum("bowler_extras").withColumnRenamed("sum(bowler_extras)", "total_bowler_extras")
joined_df = grouped_df.join(player_df, grouped_df["bowler"] == player_df["player_id"], "inner")
select_df = joined_df.select("player_name", "country_name", "season", "total_bowler_extras")
window_spec = Window.partitionBy("season").orderBy(select_df.total_bowler_extras.desc())
ranked_df = select_df.withColumn("rank", F.row_number().over(window_spec))
top_bowlers_df = ranked_df.filter(ranked_df.rank <= 5).show()

+--------------+------------+------+-------------------+----+
|   player_name|country_name|season|total_bowler_extras|rank|
+--------------+------------+------+-------------------+----+
|   S Sreesanth|       India|  2008|                 34|   1|
|     JA Morkel|South Africa|  2008|                 26|   2|
|M Muralitharan|   Sri Lanka|  2008|                 26|   3|
|      DW Steyn|South Africa|  2008|                 24|   4|
|      I Sharma|       India|  2008|                 21|   5|
|    SL Malinga|   Sri Lanka|  2009|                 31|   1|
|      RP Singh|       India|  2009|                 24|   2|
|     DP Nannes|   Australia|  2009|                 23|   3|
|       A Nehra|       India|  2009|                 20|   4|
|     JH Kallis|South Africa|  2009|                 19|   5|
|      DW Steyn|South Africa|  2010|                 29|   1|
|       SW Tait|   Australia|  2010|                 28|   2|
|    SL Malinga|   Sri Lanka|  2010|                 24|   3|
|       

In [0]:
# No of sixes by top 5 batters each season

filter_sixes_df = ball_by_ball_df.filter(F.col("runs_scored") == 6)
grouped_df = filter_sixes_df.groupBy("striker", "season").count().withColumnRenamed("count", "total_sixes")
joined_df = grouped_df.join(player_df, grouped_df["striker"] == player_df["player_id"] , "inner")
select_df = joined_df.select("player_name", "country_name", "season", "total_sixes")
window_spec = Window.partitionBy("season").orderBy(select_df.total_sixes.desc())
ranked_df = select_df.withColumn("rank", F.row_number().over(window_spec))
top_batters_df = ranked_df.filter(ranked_df.rank <= 5)


In [0]:
# Determine which players have the best batting averages over the years.

# Step 1: Calculate Total Outs by Each Player in Each Season
# Filter for deliveries that resulted in an out, group by player and season, and count the total outs.
player_outs_per_season_df = ball_by_ball_df.select("striker", "season") \
    .where(F.col("bowler_wicket") == 'true') \
    .groupBy("striker", "season") \
    .agg(F.count("striker").alias("total_outs"))

# Step 2: Calculate Total Runs Scored by Each Player in Each Season
# Group by player and season, and sum the runs scored.
player_runs_per_season_df = ball_by_ball_df.select("striker", "season", "runs_scored") \
    .groupBy("striker", "season") \
    .agg(F.sum("runs_scored").alias("total_runs"))

# Step 3: Join Runs and Outs Data for Each Player per Season
# Join the runs and outs data to have both total runs and outs for each player-season.
seasonal_runs_outs_df = player_runs_per_season_df \
    .join(player_outs_per_season_df, ["striker", "season"], "inner") \
    .select("striker", "season", "total_runs", "total_outs")

# Step 4: Define a Window Specification for Calculating Cumulative Sums per Player per Season
# Partition by player to calculate cumulative statistics within each player group, ordered by season.
player_season_window_spec = Window.partitionBy("striker").orderBy("season")

# Step 5: Calculate Cumulative Runs, Outs, and Batting Average
# Calculate cumulative totals and batting average for each player over the seasons.
cumulative_runs_outs_df = seasonal_runs_outs_df \
    .withColumn("cumulative_runs", F.sum("total_runs").over(player_season_window_spec)) \
    .withColumn("cumulative_outs", F.sum("total_outs").over(player_season_window_spec)) \
    .withColumn("cumulative_avg", F.round(F.col("cumulative_runs") / F.col("cumulative_outs"), 2))

# Step 6: Join with Player Data to Include Detailed Player Information
# Join the cumulative data with player information, including player name and country.
player_cumulative_avg_df = cumulative_runs_outs_df \
    .join(player_df, player_df["player_id"] == cumulative_runs_outs_df["striker"], "inner") \
    .select("player_name", "country_name", "season", "cumulative_avg")

# Step 7: Define Window Specification to Rank Players by Cumulative Average within Each Season
# Partition by season and rank players by their cumulative batting average in descending order.
season_rank_window_spec = Window.partitionBy("season").orderBy(F.desc("cumulative_avg"))

# Step 8: Select Top 5 Batsmen by Cumulative Average for Each Season
# Calculate ranks and filter for the top 5 players based on cumulative average in each season.
top_5_batsmen_df = player_cumulative_avg_df \
    .withColumn("rank", F.row_number().over(season_rank_window_spec)) \
    .filter(F.col("rank") <= 5) \
    .select("player_name", "country_name", "season", "cumulative_avg") \
    .show()

# Step 9: Select Top 5 Indian Batsmen by Cumulative Average for Each Season
# Filter for Indian players, rank, and then select the top 5 in each season.
top_5_indian_batsmen_df = player_cumulative_avg_df \
    .filter(F.col("country_name") == 'India') \
    .withColumn("rank", F.row_number().over(season_rank_window_spec)) \
    .filter(F.col("rank") <= 5) \
    .select("player_name", "country_name", "season", "cumulative_avg") \
    .show()


+----------------+------------+------+--------------+
|     player_name|country_name|season|cumulative_avg|
+----------------+------------+------+--------------+
|   LA Pomersbach|   Australia|  2008|         152.0|
|       ML Hayden|   Australia|  2008|          94.5|
|      MEK Hussey|   Australia|  2008|          84.0|
|       A Symonds|   Australia|  2008|          80.5|
|        SE Marsh|   Australia|  2008|         68.44|
|       ML Hayden|   Australia|  2009|         63.42|
|      MN van Wyk|South Africa|  2009|         55.67|
|       A Symonds|   Australia|  2009|         51.25|
|   LA Pomersbach|   Australia|  2009|         48.25|
|       JP Duminy|South Africa|  2009|          46.5|
|  PD Collingwood|     England|  2010|         67.67|
|        SE Marsh|   Australia|  2010|         63.58|
|         OA Shah|     England|  2010|          57.5|
|       SR Watson|   Australia|  2010|         54.42|
|      MEK Hussey|   Australia|  2010|         51.25|
|        SE Marsh|   Austral

In [0]:
# Analyze team performance when batting first versus chasing

# Step 1: Select Relevant Columns for Analysis
# Create a DataFrame with essential columns for determining match outcomes based on toss and innings choices.
winning_teams_df = match_df \
    .select("match_id", "season_year", "toss_winner", "match_winner", "toss_name")

# Step 2: Filter and Aggregate Data for Teams Winning When Chasing
# Filter for matches where the winning team chased. If the toss winner chose to bat but did not win, or if the toss winner
# chose to field and won, then the team was chasing. Group by season and match winner to count wins while chasing.
chasing_first_winning_teams_df = winning_teams_df \
    .filter(
        ((F.col("toss_winner") != F.col("match_winner")) & (F.col("toss_name") == 'bat')) |  # Toss winner batted, but lost
        ((F.col("toss_winner") == F.col("match_winner")) & (F.col("toss_name") == 'field'))  # Toss winner fielded and won
    ) \
    .groupBy("season_year", "match_winner") \
    .agg(F.count("*").alias("chasing_win_count")) \
    .orderBy(F.col("season_year").asc(), F.col("chasing_win_count").desc()) \
    .show()

# Step 3: Filter and Aggregate Data for Teams Winning When Batting First
# Filter for matches where the winning team batted first. If the toss winner chose to field but did not win, or if the toss 
# winner chose to bat and won, then the team batted first. Group by season and match winner to count wins while batting first.
batting_first_winning_teams_df = winning_teams_df \
    .filter(
        ((F.col("toss_winner") != F.col("match_winner")) & (F.col("toss_name") == 'field')) |  # Toss winner fielded, but lost
        ((F.col("toss_winner") == F.col("match_winner")) & (F.col("toss_name") == 'bat'))      # Toss winner batted and won
    ) \
    .groupBy("season_year", "match_winner") \
    .agg(F.count("*").alias("batting_win_count")) \
    .orderBy(F.col("season_year").asc(), F.col("batting_win_count").desc()) \
    .show()

+-----------+--------------------+-----------------+
|season_year|        match_winner|chasing_win_count|
+-----------+--------------------+-----------------+
|       2008|    Rajasthan Royals|                9|
|       2008|     Kings XI Punjab|                6|
|       2008|    Delhi Daredevils|                5|
|       2008| Chennai Super Kings|                5|
|       2008|      Mumbai Indians|                5|
|       2008|     Deccan Chargers|                2|
|       2008|Kolkata Knight Ri...|                2|
|       2008|Royal Challengers...|                2|
|       2009|    Delhi Daredevils|                7|
|       2009|Royal Challengers...|                6|
|       2009|     Kings XI Punjab|                4|
|       2009|     Deccan Chargers|                4|
|       2009|Kolkata Knight Ri...|                3|
|       2009|    Rajasthan Royals|                3|
|       2009| Chennai Super Kings|                2|
|       2009|      Mumbai Indians|            

In [0]:
# Analyze top bowlers based on the number of wickets taken each season

# Step 1: Define Window Specification for Ranking Bowlers by Wickets within Each Season
# Partition by season and order by the number of wickets taken in descending order.
season_rank_window_spec = Window.partitionBy("season").orderBy(F.desc("wicket_taken_season_wise"))

# Step 2: Calculate Total Wickets Taken by Each Bowler per Season
# Filter for deliveries resulting in a wicket, group by season, team, and bowler, then count total wickets.
bowler_wickets_df = ball_by_ball_df \
    .filter(F.col("bowler_wicket") == 'true') \
    .groupBy("season", "team_bowling", "bowler") \
    .agg(F.count("*").alias("wicket_taken_season_wise")) \
    .join(player_df, ball_by_ball_df["bowler"] == player_df["player_id"], "inner") \
    .join(team_df, ball_by_ball_df["team_bowling"] == team_df["team_id"], "inner")

# Step 3: Identify Top 5 Bowlers by Wickets Taken Each Season
# Calculate the rank based on wickets taken and filter for the top 5 bowlers for each season.
top_5_all_bowler_wickets_df = bowler_wickets_df \
    .withColumn("rank", F.row_number().over(season_rank_window_spec)) \
    .filter(F.col("rank") <= 5) \
    .select("season", "player_name", "team_name", "country_name", "wicket_taken_season_wise") \
    .show()

# Step 4: Identify Top 5 Indian Bowlers by Wickets Taken Each Season
# Filter for Indian bowlers, calculate the rank, and filter for the top 5 Indian bowlers by season.
top_5_Indian_bowler_wickets_df = bowler_wickets_df \
    .filter(F.col("country_name") == 'India') \
    .withColumn("rank", F.row_number().over(season_rank_window_spec)) \
    .filter(F.col("rank") <= 5) \
    .select("season", "player_name", "team_name", "country_name", "wicket_taken_season_wise") \
    .show()

+------+---------------+--------------------+------------+------------------------+
|season|    player_name|           team_name|country_name|wicket_taken_season_wise|
+------+---------------+--------------------+------------+------------------------+
|  2008|  Sohail Tanvir|    Rajasthan Royals|    Pakistan|                      22|
|  2008|    S Sreesanth|     Kings XI Punjab|       India|                      19|
|  2008|       SK Warne|    Rajasthan Royals|   Australia|                      19|
|  2008|      SR Watson|    Rajasthan Royals|   Australia|                      17|
|  2008|      PP Chawla|     Kings XI Punjab|       India|                      17|
|  2009|       RP Singh|     Deccan Chargers|       India|                      23|
|  2009|       A Kumble|Royal Challengers...|       India|                      21|
|  2009|        A Nehra|    Delhi Daredevils|       India|                      19|
|  2009|        PP Ojha|     Deccan Chargers|       India|                  

In [0]:
# Analyze top bowlers based on their economy rate

# Step 1: Calculate Runs per Ball for Each Bowler
# Select relevant columns and calculate runs scored per ball, including extras.
bowlers_economy_df = ball_by_ball_df \
    .select("season", "bowler", "team_bowling", "runs_scored", "bowler_extras") \
    .withColumn('runs_per_ball', F.col("runs_scored") + F.col("bowler_extras"))

# Step 2: Calculate Overall Economy Rate for Each Bowler
# Group by team and bowler, sum the runs per ball and count the number of balls bowled.
top_bowlers_economy_all_time_df = bowlers_economy_df \
    .groupBy("team_bowling" ,"bowler") \
    .agg(F.sum("runs_per_ball").alias("sum_runs_per_ball"), F.count("*").alias("count_balls")) \
    .filter(F.col("count_balls") >= 20) \
    .withColumn('overs_bowled', F.round(( F.col("count_balls") / 6 ), 2)) \
    .withColumn('bowler_economy', F.round(( F.col("sum_runs_per_ball") / F.col("overs_bowled") ), 2)) \
    .join(player_df, ball_by_ball_df["bowler"] == player_df["player_id"], "inner") \
    .join(team_df, ball_by_ball_df["team_bowling"] == team_df["team_id"], "inner") \
    .select("team_name", "player_name", "country_name", "bowler_economy") \
    .orderBy(F.col("bowler_economy").asc()).show()

# Step 3: Define Window Specification for Ranking Bowlers by Economy within Each Season
# Partition by season and order by economy rate in ascending order.
season_wise_economy_window_spec = Window.partitionBy("season").orderBy(F.col("bowler_economy").asc())

# Step 4: Calculate Economy Rate per Bowler for Each Season
top_bowlers_economy_per_season_df = bowlers_economy_df \
    .groupBy("season", "team_bowling" ,"bowler") \
    .agg(F.sum("runs_per_ball").alias("sum_runs_per_ball"), F.count("*").alias("count_balls")) \
    .filter(F.col("count_balls") >= 20) \
    .withColumn('overs_bowled', F.round(( F.col("count_balls") / 6 ), 2)) \
    .withColumn('bowler_economy', F.round(( F.col("sum_runs_per_ball") / F.col("overs_bowled") ), 2)) \
    .join(player_df, ball_by_ball_df["bowler"] == player_df["player_id"], "inner") \
    .join(team_df, ball_by_ball_df["team_bowling"] == team_df["team_id"], "inner") \
    .orderBy(F.col("season"), F.col("bowler_economy").asc())

# Step 5: Identify Top 5 Bowlers by Economy Rate for Each Season
top_5_bowlers_economy_all_season_wise_df = top_bowlers_economy_per_season_df \
    .withColumn("rank", F.row_number().over(season_wise_economy_window_spec)) \
    .filter(F.col("rank") <= 5) \
    .select("season", "team_name", "player_name", "country_name", "bowler_economy").show()

# Step 6: Identify Top 5 Indian Bowlers by Economy Rate for Each Season
top_5_bowlers_economy_indian_season_wise_df = top_bowlers_economy_per_season_df \
    .filter(F.col("country_name") == 'India') \
    .withColumn("rank", F.row_number().over(season_wise_economy_window_spec)) \
    .filter(F.col("rank") <= 5) \
    .select("season", "team_name", "player_name", "country_name", "bowler_economy").show()


+--------------------+----------------+------------+--------------+
|           team_name|     player_name|country_name|bowler_economy|
+--------------------+----------------+------------+--------------+
|Royal Challengers...|        ND Doshi|       India|          4.08|
|Rising Pune Super...|        MR Marsh|   Australia|          4.91|
| Sunrisers Hyderabad|     Anand Rajan|       India|          4.94|
|      Mumbai Indians|        JDP Oram| New Zealand|          5.08|
|Kolkata Knight Ri...|     JW Hastings|   Australia|          5.29|
|      Mumbai Indians|     DJ Thornely|   Australia|          5.46|
|      Mumbai Indians|       JP Duminy|South Africa|          5.57|
|       Pune Warriors|   Parvez Rasool|       India|           5.6|
|Kochi Tuskers Kerala|     S Sreesanth|       India|           6.0|
|       Pune Warriors|        M Manhas|       India|           6.0|
|    Rajasthan Royals|   Sohail Tanvir|    Pakistan|          6.02|
|     Deccan Chargers|        DW Steyn|South Afr

In [0]:
# Analyze the most common types of dismissals in cricket

# Step 1: Define Window Specification for Ranking Dismissal Types per Season
# Partition by season and order by the number of wickets taken for each dismissal type in descending order.
season_wise_out_type_order_window_spec = Window \
    .partitionBy("season") \
    .orderBy(F.desc("wicket_taken_season_out_type_wise"))

# Step 2: Calculate the Count of Each Dismissal Type per Season
# Filter for deliveries that resulted in a wicket, group by season and out type, then count occurrences.
common_out_types_df = ball_by_ball_df \
    .filter(F.col("bowler_wicket") == 'true') \
    .groupBy("season", "out_type") \
    .agg(F.count("*").alias("wicket_taken_season_out_type_wise")) \
    .withColumn("rank", F.row_number().over(season_wise_out_type_order_window_spec)) \
    .filter(F.col("rank") <= 5) \
    .select("season", "out_type", "wicket_taken_season_out_type_wise") \
    .show()

+------+-----------------+---------------------------------+
|season|         out_type|wicket_taken_season_out_type_wise|
+------+-----------------+---------------------------------+
|  2008|           caught|                              330|
|  2008|           bowled|                              131|
|  2008|     Keeper Catch|                               70|
|  2008|              lbw|                               37|
|  2008|caught and bowled|                               21|
|  2009|           caught|                              349|
|  2009|           bowled|                              112|
|  2009|     Keeper Catch|                               66|
|  2009|              lbw|                               47|
|  2009|          stumped|                               29|
|  2010|           caught|                              347|
|  2010|           bowled|                              141|
|  2010|     Keeper Catch|                               48|
|  2010|              lb

In [0]:
# Analyze which bowlers excel at specific dismissal types

# Step 1: Define Window Specification for Ranking Bowlers by Dismissal Type per Season
# Partition by season and out type, then order by season, out type, and the count of wickets taken in descending order.
season_out_type_wise_window_spec = Window \
    .partitionBy("season", "out_type") \
    .orderBy(F.col("season").asc(), F.col("out_type").asc(), F.desc("wicket_taken_season_out_type_wise"))

# Step 2: Calculate Wickets Taken by Each Bowler for Each Dismissal Type
# Filter for deliveries that resulted in a wicket, group by season, team, bowler, and out type, then count occurrences.
bowler_wickets_out_type_wise_df = ball_by_ball_df \
    .filter(F.col("bowler_wicket") == 'true') \
    .groupBy("season", "team_bowling" ,"bowler", "out_type") \
    .agg(F.count("*").alias("wicket_taken_season_out_type_wise")) \
    .join(player_df, ball_by_ball_df["bowler"] == player_df["player_id"], "inner") \
    .join(team_df, ball_by_ball_df["team_bowling"] == team_df["team_id"], "inner") 

# Step 3: Identify Top 5 Bowlers by Dismissal Type for All Players
top_5_all_bowlers_out_type_df = bowler_wickets_out_type_wise_df \
    .withColumn("rank", F.row_number().over(season_out_type_wise_window_spec)) \
    .filter(F.col("rank") <= 5) \
    .select("season", "player_name", "team_name", "country_name", "out_type", "wicket_taken_season_out_type_wise") \
    .show()

# Step 4: Display Results for Top 5 Bowlers by Dismissal Type for All Players
top_5_Indian_bowler_out_type_df = bowler_wickets_out_type_wise_df \
    .filter(F.col("country_name") == 'India') \
    .withColumn("rank", F.row_number().over(season_out_type_wise_window_spec)) \
    .filter(F.col("rank") <= 5) \
    .select("season", "player_name", "team_name", "country_name", "out_type", "wicket_taken_season_out_type_wise") \
    .show()

+------+-------------+-------------------+------------+-----------------+---------------------------------+
|season|  player_name|          team_name|country_name|         out_type|wicket_taken_season_out_type_wise|
+------+-------------+-------------------+------------+-----------------+---------------------------------+
|  2008|  DS Kulkarni|     Mumbai Indians|       India|     Keeper Catch|                                4|
|  2008|      A Nehra|     Mumbai Indians|       India|     Keeper Catch|                                4|
|  2008|     MM Patel|   Rajasthan Royals|       India|     Keeper Catch|                                3|
|  2008|  MF Maharoof|   Delhi Daredevils|       India|     Keeper Catch|                                3|
|  2008|    SR Watson|   Rajasthan Royals|   Australia|     Keeper Catch|                                3|
|  2008|Sohail Tanvir|   Rajasthan Royals|    Pakistan|           bowled|                                8|
|  2008|    IK Pathan|    Ki

In [0]:
# Analyze how a team's toss decision (bat or field) impacts match outcomes

# Step 1: Select Relevant Columns and Filter for Winning Teams Based on Toss Decision
toss_outcome_df = match_df \
    .select("season_year", "toss_winner", "match_winner", "toss_name") \
    .filter(F.col("toss_winner") == F.col("match_winner")) \
    .withColumnRenamed("season_year","season") \
    .withColumnRenamed("toss_winner","team_name") \

# Step 2: Calculate Wins for Teams that Elected to Field
winning_teams_by_fielding_df = toss_outcome_df \
    .filter(F.col("toss_name") == "field") \
    .groupBy("season", "team_name") \
    .agg(F.count("*").alias("match_wins_by_fielding")) \
    .withColumnRenamed("season", "field_season") \
    .withColumnRenamed("team_name", "field_team_name")

# Step 3: Calculate Wins for Teams that Elected to Bat
winning_teams_by_batting_df = toss_outcome_df \
    .filter(F.col("toss_name") == "bat") \
    .groupBy("season", "team_name") \
    .agg(F.count("*").alias("match_wins_by_batting")) \
    .withColumnRenamed("season", "bat_season") \
    .withColumnRenamed("team_name", "bat_team_name")

# Step 4: Join Fielding and Batting Win Dataframes to Analyze Toss Impact
teams_toss_effect_on_type = winning_teams_by_fielding_df \
    .join(
        winning_teams_by_batting_df, 
        (winning_teams_by_fielding_df["field_season"] == winning_teams_by_batting_df["bat_season"]) & 
        (winning_teams_by_fielding_df["field_team_name"] == winning_teams_by_batting_df["bat_team_name"]), 
        "inner"
    ) \
    .withColumn("total_wins", F.col("match_wins_by_fielding") + F.col("match_wins_by_batting")) \
    .withColumn("win_percentage_field", F.round(F.col("match_wins_by_fielding") / F.col("total_wins") * 100)) \
    .withColumn("win_percentage_bat", 100 - F.col("win_percentage_field")) \
    .select(
        winning_teams_by_batting_df["bat_season"].alias("season"), 
        winning_teams_by_batting_df["bat_team_name"].alias("team_name"), 
        "win_percentage_field",
        "win_percentage_bat"
    ) \
    .orderBy(F.col("season").desc())

# Step 5: Analyze Teams Winning by Fielding
teams_winning_by_fielding = teams_toss_effect_on_type \
    .select(
        "season", 
        "team_name", 
        "win_percentage_field"
    ) \
    .orderBy(F.col("win_percentage_field").desc()).show()


# Step 6: Analyze Teams Winning by Batting
teams_winning_by_batting = teams_toss_effect_on_type \
    .select(
        "season", 
        "team_name", 
        "win_percentage_bat"
    ) \
    .orderBy(F.col("win_percentage_bat").desc()).show()

+------+--------------------+--------------------+
|season|           team_name|win_percentage_field|
+------+--------------------+--------------------+
|  2014|Kolkata Knight Ri...|                83.0|
|  2015|Kolkata Knight Ri...|                80.0|
|  2012|    Delhi Daredevils|                80.0|
|  2011|Kolkata Knight Ri...|                80.0|
|  2008|    Rajasthan Royals|                78.0|
|  2013|Royal Challengers...|                75.0|
|  2008|     Kings XI Punjab|                75.0|
|  2012| Chennai Super Kings|                75.0|
|  2014|    Rajasthan Royals|                75.0|
|  2011|     Kings XI Punjab|                75.0|
|  2012|     Kings XI Punjab|                75.0|
|  2016| Sunrisers Hyderabad|                71.0|
|  2014| Chennai Super Kings|                67.0|
|  2014|      Mumbai Indians|                67.0|
|  2012|      Mumbai Indians|                67.0|
|  2013|    Rajasthan Royals|                67.0|
|  2011|    Delhi Daredevils|  

In [0]:
# Analyze which teams consistently perform well across seasons

# Step 1: Count occurrences where the team is team1
team1_count_df = match_df.groupBy("season_year", "team1") \
    .agg(F.count("*").alias("team1_count")) \
    .withColumnRenamed("team1", "team_name")

# Step 2: Count occurrences where the team is team2
team2_count_df = match_df.groupBy("season_year", "team2") \
    .agg(F.count("*").alias("team2_count")) \
    .withColumnRenamed("team2", "team_name")

# Step 3: Count winning occurrences for each team
winning_team_count = match_df.groupBy("season_year", "match_winner") \
    .agg(F.count("*").alias("match_win_count")) \
    .withColumnRenamed("match_winner", "team_name")

# Step 4: Define a window specification for ranking teams based on winning percentage
season_year_wise_window_spec = Window \
    .partitionBy("season_year") \
    .orderBy(F.col("winning_percentage").desc())

# Step 5: Merge counts to get total matches played by each team in each season
total_matches_df = team1_count_df.join(
    team2_count_df,
    on=["season_year", "team_name"],
    how="outer"
).join(
    winning_team_count, 
    on=["season_year", "team_name"],
    how="outer"
) \
.fillna(0, ["team1_count", "team2_count"]) \
.withColumn("total_matches", F.col("team1_count") + F.col("team2_count")) \
.withColumn("winning_percentage", F.round((F.col("match_win_count")/F.col("total_matches")*100), 2) ) \
.withColumn("rank", F.row_number().over(season_year_wise_window_spec)) \
.filter(F.col("rank") <= 3) \
.select("season_year", "team_name", "total_matches", "match_win_count", "winning_percentage")


# Step 6: Display the results
total_matches_df.orderBy("season_year", F.col("winning_percentage").desc()).show()

+-----------+--------------------+-------------+---------------+------------------+
|season_year|           team_name|total_matches|match_win_count|winning_percentage|
+-----------+--------------------+-------------+---------------+------------------+
|       2008|    Rajasthan Royals|           16|             13|             81.25|
|       2008|     Kings XI Punjab|           15|             10|             66.67|
|       2008| Chennai Super Kings|           16|              9|             56.25|
|       2009|    Delhi Daredevils|           15|             10|             66.67|
|       2009| Chennai Super Kings|           14|              8|             57.14|
|       2009|     Deccan Chargers|           16|              9|             56.25|
|       2010|      Mumbai Indians|           16|             11|             68.75|
|       2010| Chennai Super Kings|           16|              9|             56.25|
|       2010|     Deccan Chargers|           16|              8|            