In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('IPL Data Analysis').getOrCreate()

In [0]:
#  deliveries = spark.read.format("csv").option("header","true").option('inferSchema','true').load("s3://ipl-data-latest/deliveries.csv")
deliveries = spark.read.csv('s3://ipl-data-latest/deliveries.csv', header=True,inferSchema = True)

In [0]:
deliveries.show(5)

+--------+------+--------------------+--------------------+----+----+-----------+-------+-----------+------------+----------+----------+-----------+---------+----------------+--------------+-------+
|match_id|inning|        batting_team|        bowling_team|over|ball|     batter| bowler|non_striker|batsman_runs|extra_runs|total_runs|extras_type|is_wicket|player_dismissed|dismissal_kind|fielder|
+--------+------+--------------------+--------------------+----+----+-----------+-------+-----------+------------+----------+----------+-----------+---------+----------------+--------------+-------+
|  335982|     1|Kolkata Knight Ri...|Royal Challengers...|   0|   1| SC Ganguly|P Kumar|BB McCullum|           0|         1|         1|    legbyes|        0|              NA|            NA|     NA|
|  335982|     1|Kolkata Knight Ri...|Royal Challengers...|   0|   2|BB McCullum|P Kumar| SC Ganguly|           0|         0|         0|       NULL|        0|              NA|            NA|     NA|
|  33

In [0]:
# The Matches dataset
matches = spark.read.format('csv').option('header','true').option('inferSchema','true').load('s3://ipl-data-latest/matches.csv')

In [0]:
# Redefining the schema for the above data
from pyspark.sql.types import *

matches_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("season", StringType(), True),
    StructField("city", StringType(), True),
    StructField("date", DateType(), True),
    StructField("match_type", StringType(), True),
    StructField("player_of_match", StringType(), True),
    StructField("venue", StringType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("toss_decision", StringType(), True),
    StructField("winner", StringType(), True),
    StructField("result", StringType(), True),
    StructField("result_margin", IntegerType(), True),  #Integertype
    StructField("target_runs", IntegerType(), True),    #Integertype
    StructField("target_overs", IntegerType(), True),   #Integertype
    StructField("super_over", IntegerType(), True),     #Integertype
    StructField("method", StringType(), True),
    StructField("umpire1", StringType(), True),
    StructField("umpire2", StringType(), True)
])



In [0]:
#Matches Dataset
matches =  spark.read.schema(matches_schema).csv('s3://ipl-data-latest/matches.csv', header=True)

In [0]:
from pyspark.sql.functions import col, avg, row_number,when , sum

In [0]:

#Filter out the wides and no balls
deliveries.filter((col("extras_type")=='wides')&(col("extras_type")=='noballs')).show()
# deliveries.filter(deliveries.extras_type !="wides").show()

+--------+------+------------+------------+----+----+------+------+-----------+------------+----------+----------+-----------+---------+----------------+--------------+-------+
|match_id|inning|batting_team|bowling_team|over|ball|batter|bowler|non_striker|batsman_runs|extra_runs|total_runs|extras_type|is_wicket|player_dismissed|dismissal_kind|fielder|
+--------+------+------------+------------+----+----+------+------+-----------+------------+----------+----------+-----------+---------+----------------+--------------+-------+
+--------+------+------------+------------+----+----+------+------+-----------+------------+----------+----------+-----------+---------+----------------+--------------+-------+



In [0]:
#List of the teams in the dataset
team_names = deliveries.select("batting_team").distinct()
team_names.show(truncate=False)

+---------------------------+
|batting_team               |
+---------------------------+
|Chennai Super Kings        |
|Deccan Chargers            |
|Rajasthan Royals           |
|Royal Challengers Bangalore|
|Kolkata Knight Riders      |
|Kings XI Punjab            |
|Delhi Daredevils           |
|Mumbai Indians             |
|Sunrisers Hyderabad        |
|Kochi Tuskers Kerala       |
|Pune Warriors              |
|Rising Pune Supergiant     |
|Gujarat Lions              |
|Rising Pune Supergiants    |
|Punjab Kings               |
|Delhi Capitals             |
|Lucknow Super Giants       |
|Gujarat Titans             |
|Royal Challengers Bengaluru|
+---------------------------+



In [0]:
#Total how many teams are there on the list
no_of_teams = deliveries.select('batting_team').distinct().count()
no_of_teams

19

In [0]:
no_of_teams = deliveries.groupBy('batting_team').count()
no_of_teams.show()

+--------------------+-----+
|        batting_team|count|
+--------------------+-----+
| Chennai Super Kings|28651|
|     Deccan Chargers| 9034|
|    Rajasthan Royals|26242|
|Royal Challengers...|28205|
|Kolkata Knight Ri...|29514|
|     Kings XI Punjab|22646|
|    Delhi Daredevils|18786|
|      Mumbai Indians|31437|
| Sunrisers Hyderabad|21843|
|Kochi Tuskers Kerala| 1582|
|       Pune Warriors| 5443|
|Rising Pune Super...| 1900|
|       Gujarat Lions| 3566|
|Rising Pune Super...| 1580|
|        Punjab Kings| 6833|
|      Delhi Capitals|10946|
|Lucknow Super Giants| 5400|
|      Gujarat Titans| 5494|
|Royal Challengers...| 1818|
+--------------------+-----+



In [0]:
#Calculate the total and avergae runs scored in each match and inning
total_and_avg = deliveries.groupBy('match_id','inning').agg(
                sum('total_runs').alias('Total Runs'),
                avg('total_runs').alias('Average Score')
)

In [0]:
total_and_avg.show()

+--------+------+----------+------------------+
|match_id|inning|Total Runs|     Average Score|
+--------+------+----------+------------------+
|  336009|     1|       187|1.4841269841269842|
|  392196|     1|       165|1.2992125984251968|
|  419132|     1|       163|1.2834645669291338|
|  392196|     2|       169| 1.396694214876033|
|  335993|     1|       147|1.1666666666666667|
|  335993|     2|       152|1.4205607476635513|
|  419133|     2|       162|1.3846153846153846|
|  392191|     2|       173| 1.453781512605042|
|  419138|     2|       115|1.0087719298245614|
|  335983|     1|       240| 1.935483870967742|
|  336007|     2|       148|1.3214285714285714|
|  419123|     1|       171|1.3790322580645162|
|  392216|     1|       168|             1.344|
|  336004|     2|       133|1.1666666666666667|
|  336020|     2|       182|1.4918032786885247|
|  336040|     2|       164|1.3015873015873016|
|  392186|     1|       158|1.2440944881889764|
|  419137|     2|       223|            

In [0]:
#Import the window function
from pyspark.sql.window import Window

In [0]:
#Running total of the runs for matches and innings per over
windowSpec = Window.partitionBy('match_id','inning').orderBy('over')
deliveries = deliveries.withColumn(
    'running_total_runs',
    sum("total_runs").over(windowSpec)
)


In [0]:
deliveries.show(10)

+--------+------+-------------------+---------------+----+----+---------+-----------+-----------+------------+----------+----------+-----------+---------+----------------+--------------+-------+------------------+
|match_id|inning|       batting_team|   bowling_team|over|ball|   batter|     bowler|non_striker|batsman_runs|extra_runs|total_runs|extras_type|is_wicket|player_dismissed|dismissal_kind|fielder|running_total_runs|
+--------+------+-------------------+---------------+----+----+---------+-----------+-----------+------------+----------+----------+-----------+---------+----------------+--------------+-------+------------------+
|  335983|     1|Chennai Super Kings|Kings XI Punjab|   0|   1| PA Patel|      B Lee|  ML Hayden|           0|         0|         0|       NULL|        0|              NA|            NA|     NA|                 5|
|  335983|     1|Chennai Super Kings|Kings XI Punjab|   0|   2| PA Patel|      B Lee|  ML Hayden|           0|         0|         0|       NULL|

In [0]:
#Flagging high impact bowls (either a wicket or more than a six)
deliveries.withColumn(
    'high_impact_balls',
    when((col("total_runs")+col('extra_runs')>6) | (col('is_wicket')==1),True).otherwise(False)
)

DataFrame[match_id: int, inning: int, batting_team: string, bowling_team: string, over: int, ball: int, batter: string, bowler: string, non_striker: string, batsman_runs: int, extra_runs: int, total_runs: int, extras_type: string, is_wicket: int, player_dismissed: string, dismissal_kind: string, fielder: string, running_total_runs: bigint, high_impact_balls: boolean]

In [0]:
deliveries.printSchema()

root
 |-- match_id: integer (nullable = true)
 |-- inning: integer (nullable = true)
 |-- batting_team: string (nullable = true)
 |-- bowling_team: string (nullable = true)
 |-- over: integer (nullable = true)
 |-- ball: integer (nullable = true)
 |-- batter: string (nullable = true)
 |-- bowler: string (nullable = true)
 |-- non_striker: string (nullable = true)
 |-- batsman_runs: integer (nullable = true)
 |-- extra_runs: integer (nullable = true)
 |-- total_runs: integer (nullable = true)
 |-- extras_type: string (nullable = true)
 |-- is_wicket: integer (nullable = true)
 |-- player_dismissed: string (nullable = true)
 |-- dismissal_kind: string (nullable = true)
 |-- fielder: string (nullable = true)
 |-- running_total_runs: long (nullable = true)



In [0]:
matches.printSchema()

root
 |-- id: integer (nullable = true)
 |-- season: string (nullable = true)
 |-- city: string (nullable = true)
 |-- date: date (nullable = true)
 |-- match_type: string (nullable = true)
 |-- player_of_match: string (nullable = true)
 |-- venue: string (nullable = true)
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- toss_winner: string (nullable = true)
 |-- toss_decision: string (nullable = true)
 |-- winner: string (nullable = true)
 |-- result: string (nullable = true)
 |-- result_margin: integer (nullable = true)
 |-- target_runs: integer (nullable = true)
 |-- target_overs: integer (nullable = true)
 |-- super_over: integer (nullable = true)
 |-- method: string (nullable = true)
 |-- umpire1: string (nullable = true)
 |-- umpire2: string (nullable = true)



In [0]:
from pyspark.sql.functions import year, month, dayofmonth, when

# Extracting year, month, and day from the matches "date" for more detailed time-based analysis
matches = matches.withColumn("year", year("date"))
matches = matches.withColumn("month", month("date"))
matches = matches.withColumn("day", dayofmonth("date"))

# High margin win: categorizing win margins into 'high', 'medium', and 'low'
High_margin_calc = matches.withColumn(
    "win_margin_category",
    when(col("result_margin") >= 100, "High")
    .when((col("result_margin") >= 50) & (col("result_margin") < 100), "Medium")
    .otherwise("Low")
)

# Analyze the impact of the toss: who wins the toss and the match
matches = matches.withColumn(
    "toss_match_winner",
    when(col("toss_winner") == col("winner"), "Yes").otherwise("No")
)

# Show the enhanced match DataFrame
matches.show(2)

+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+------+-------------+-----------+------------+----------+------+---------+-----------+----+-----+---+-----------------+
|    id| season|      city|      date|match_type|player_of_match|               venue|               team1|               team2|         toss_winner|toss_decision|              winner|result|result_margin|target_runs|target_overs|super_over|method|  umpire1|    umpire2|year|month|day|toss_match_winner|
+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+------+-------------+-----------+------------+----------+------+---------+-----------+----+-----+---+-----------------+
|335982|2007/08| Bangalore|2008-04-18|    League|    BB McCullum|M Chinnaswamy Sta...|Ro

In [0]:
deliveries.createOrReplaceTempView("deliveries")
matches.createOrReplaceTempView("matches")

In [0]:
# Question: Who is the most prolific batsman (runs scored)
most_runs_batsman = deliveries.groupBy("batter").agg(sum("batsman_runs").alias("total_runs")) \
                     .orderBy("total_runs", ascending=False).first().batter
most_runs = deliveries.groupBy("batter").agg(sum("batsman_runs").alias("total_runs")) \
                     .orderBy("total_runs", ascending=False).first().total_runs
print(f"Most prolific batsman: {most_runs_batsman}")
print(f"Total Runs by {most_runs_batsman}: {most_runs}")

Most prolific batsman: V Kohli
Total Runs by V Kohli: 8014


In [0]:
# Question 2: Bowler conceding most runs
most_runs_conceded_bowler = deliveries.groupBy("bowler").agg(
                                sum("total_runs").alias("runs_conceded")) \
                               .orderBy("runs_conceded", ascending=False).first()
most_runs_conceded_bowler_name = most_runs_conceded_bowler.bowler

print(f"Bowler conceding most runs: {most_runs_conceded_bowler}")

Bowler conceding most runs: Row(bowler='R Ashwin', runs_conceded=5435)


In [0]:
from pyspark.sql.functions import *

bowler_stats = deliveries.groupBy("bowler", "match_id", "inning", "over") \
    .agg(sum("total_runs").alias("runs_conceded_per_over")) \
    .groupBy("bowler") \
    .agg(
        sum("runs_conceded_per_over").alias("runs_conceded"),
        count("*").alias("overs_bowled")  # Count the groups = distinct overs
    )

bowler_economy = bowler_stats.withColumn(
                "economy", col("runs_conceded") / col("overs_bowled")
    )
top_5_most_runs_conceded = bowler_economy.orderBy("runs_conceded", ascending=False).limit(5)

top_5_most_runs_conceded.show()

+---------+-------------+------------+------------------+
|   bowler|runs_conceded|overs_bowled|           economy|
+---------+-------------+------------+------------------+
| R Ashwin|         5435|         756| 7.189153439153439|
|PP Chawla|         5179|         646|  8.01702786377709|
|  B Kumar|         5051|         657| 7.687975646879757|
|RA Jadeja|         4917|         643|7.6469673405909795|
|YS Chahal|         4681|         591| 7.920473773265652|
+---------+-------------+------------+------------------+



In [0]:
from pyspark.sql.functions import *

batsman_stats = deliveries.groupBy("batter") \
    .agg(
        sum("batsman_runs").alias("total_runs"),
        count("*").alias("total_balls")  
    )
batsman_strike_rate = batsman_stats.withColumn(
    "strike_rate", (col("total_runs") / col("total_balls")) * 100
)
# Top 5 batsmen with their strike rates
top_5_batsmen = batsman_strike_rate.orderBy("total_runs", ascending=False).limit(5)

top_5_batsmen.show()

+---------+----------+-----------+------------------+
|   batter|total_runs|total_balls|       strike_rate|
+---------+----------+-----------+------------------+
|  V Kohli|      8014|       6236|128.51186658114176|
| S Dhawan|      6769|       5483|123.45431333211745|
|RG Sharma|      6630|       5183|127.91819409608335|
|DA Warner|      6567|       4849| 135.4299855640338|
| SK Raina|      5536|       4177|132.53531242518554|
+---------+----------+-----------+------------------+



In [0]:
# Question 3: Total wickets taken
total_wickets = deliveries.filter(col("player_dismissed").isNotNull()).count()
print(f"Total wickets taken: {total_wickets}")

Total wickets taken: 260920


In [0]:
# Count the occurrences of each dismissal type
dismissal_counts = deliveries.groupBy("dismissal_kind").count().orderBy("count", ascending=False)

# Display all dismissal types and their counts
dismissal_counts.show() 

+--------------------+------+
|      dismissal_kind| count|
+--------------------+------+
|                  NA|247970|
|              caught|  8063|
|              bowled|  2212|
|             run out|  1114|
|                 lbw|   800|
|   caught and bowled|   367|
|             stumped|   358|
|          hit wicket|    15|
|        retired hurt|    15|
|obstructing the f...|     3|
|         retired out|     3|
+--------------------+------+



In [0]:
# Count dismissals, including nulls initially
dismissal_counts = deliveries.groupBy("dismissal_kind").count()

# Filter out rows where dismissal_kind is NOT NULL
dismissal_counts_no_na = dismissal_counts.filter(col("dismissal_kind")!="NA")


dismissal_counts_no_na = dismissal_counts_no_na.orderBy("count", ascending=False)


dismissal_counts_no_na.show()

+--------------------+-----+
|      dismissal_kind|count|
+--------------------+-----+
|              caught| 8063|
|              bowled| 2212|
|             run out| 1114|
|                 lbw|  800|
|   caught and bowled|  367|
|             stumped|  358|
|          hit wicket|   15|
|        retired hurt|   15|
|obstructing the f...|    3|
|         retired out|    3|
+--------------------+-----+



In [0]:
highest_scoring_batting_team = deliveries.groupBy("batting_team").agg(sum("total_runs").alias("team_runs")) \
                                  .orderBy("team_runs", ascending=False).first().batting_team
print(f"Team with highest total runs (batting): {highest_scoring_batting_team}")

Team with highest total runs (batting): Mumbai Indians


In [0]:
from pyspark.sql.functions import *

# --- 1. Data Cleaning (as before) ---
deliveries = deliveries.withColumn(
    "batting_team_cleaned",
    regexp_replace(col("batting_team"), "Royal Challengers Bangalo.*", "Royal Challengers Bangalore")
).withColumn(
    "batting_team_cleaned",
    regexp_replace(col("batting_team_cleaned"), "Chennai Super King.*", "Chennai Super Kings")
).withColumn(
    "batting_team_cleaned",
    regexp_replace(col("batting_team_cleaned"), "Deccan Chag.*", "Deccan Chargers")
).withColumn(
    "batting_team_cleaned",
    regexp_replace(col("batting_team_cleaned"), "Kolkata Knight.*", "Kolkata Knight Riders")
).withColumn(
    "batting_team_cleaned",
    regexp_replace(col("batting_team_cleaned"), "Rajasthan Roy.*", "Rajasthan Royals")
).withColumn(
    "batting_team_cleaned",
    regexp_replace(col("batting_team_cleaned"), "Mumbai Ind.*", "Mumbai Indians")
).withColumn(
    "batting_team_cleaned",
    regexp_replace(col("batting_team_cleaned"), "Kings XI Punj.*", "Kings XI Punjab")
)

# --- 2. Highest Scoring Team and Total Runs ---
highest_scoring_team_info = deliveries.groupBy("batting_team_cleaned").agg(sum("total_runs").alias("team_runs")) \
    .orderBy("team_runs", ascending=False).first()

highest_scoring_team = highest_scoring_team_info.batting_team_cleaned
highest_runs = highest_scoring_team_info.team_runs

print(f"Highest Scoring Team: {highest_scoring_team}")
print(f"Total Runs: {highest_runs}")



Highest Scoring Team: Mumbai Indians
Total Runs: 42176


In [0]:
matches.columns

['id',
 'season',
 'city',
 'date',
 'match_type',
 'player_of_match',
 'venue',
 'team1',
 'team2',
 'toss_winner',
 'toss_decision',
 'winner',
 'result',
 'result_margin',
 'target_runs',
 'target_overs',
 'super_over',
 'method',
 'umpire1',
 'umpire2',
 'year',
 'month',
 'day',
 'toss_match_winner']

In [0]:
#total number of matches played in each season.
from pyspark.sql.window import Window

WindowSpec = Window.partitionBy('season').orderBy('season')
season_matches_df = matches.withColumn("total_matches_in_season", count("id").over(WindowSpec))
result = season_matches_df.select("season","total_matches_in_season").distinct()
Total_matches_per_season = result.show()

+-------+-----------------------+
| season|total_matches_in_season|
+-------+-----------------------+
|2007/08|                     58|
|   2009|                     57|
|2009/10|                     60|
|   2011|                     73|
|   2012|                     74|
|   2013|                     76|
|   2014|                     60|
|   2015|                     59|
|   2016|                     60|
|   2017|                     59|
|   2018|                     60|
|   2019|                     60|
|2020/21|                     60|
|   2021|                     60|
|   2022|                     74|
|   2023|                     74|
|   2024|                     71|
+-------+-----------------------+



In [0]:
#Total Number of matches played until 2024 season
Total_matched_played = matches.agg(count('id').alias('total_matches')).collect()[0]['total_matches']
print(f'Total matches played : {Total_matched_played}')

Total matches played : 1095


In [0]:
#Identifying the player with the most "Player of the Match" awards.
player_of_the_match = matches.groupBy('player_of_match').agg(count('id').alias('Frequency')).orderBy('Frequency',ascending = False).limit(10).show(10)

+---------------+---------+
|player_of_match|Frequency|
+---------------+---------+
| AB de Villiers|       25|
|       CH Gayle|       22|
|      RG Sharma|       19|
|      DA Warner|       18|
|        V Kohli|       18|
|       MS Dhoni|       17|
|      YK Pathan|       16|
|      SR Watson|       16|
|      RA Jadeja|       16|
|      SP Narine|       15|
+---------------+---------+



In [0]:
# Calculating the number of matches won by each team.
print('Total number of matches won by each team')

Winning_team_count = matches.groupBy('winner').agg(count('id').alias('winning_count')).orderBy('winning_count',ascending =False)
Winning_team_count.show(truncate=False)

Total number of matches won by each team
+---------------------------+-------------+
|winner                     |winning_count|
+---------------------------+-------------+
|Mumbai Indians             |144          |
|Chennai Super Kings        |138          |
|Kolkata Knight Riders      |131          |
|Royal Challengers Bangalore|116          |
|Rajasthan Royals           |112          |
|Sunrisers Hyderabad        |88           |
|Kings XI Punjab            |88           |
|Delhi Daredevils           |67           |
|Delhi Capitals             |48           |
|Deccan Chargers            |29           |
|Gujarat Titans             |28           |
|Lucknow Super Giants       |24           |
|Punjab Kings               |24           |
|Gujarat Lions              |13           |
|Pune Warriors              |12           |
|Rising Pune Supergiant     |10           |
|Royal Challengers Bengaluru|7            |
|Kochi Tuskers Kerala       |6            |
|NA                         |5     

In [0]:
#Just taking a rough check why there are NA values for winner in the dataset, whether its a draw?
matches.filter(col('winner')=='NA').show()

+-------+------+---------+----------+----------+---------------+--------------------+--------------------+-------------------+--------------------+-------------+------+---------+-------------+-----------+------------+----------+------+---------------+-------------+----+-----+---+-----------------+
|     id|season|     city|      date|match_type|player_of_match|               venue|               team1|              team2|         toss_winner|toss_decision|winner|   result|result_margin|target_runs|target_overs|super_over|method|        umpire1|      umpire2|year|month|day|toss_match_winner|
+-------+------+---------+----------+----------+---------------+--------------------+--------------------+-------------------+--------------------+-------------+------+---------+-------------+-----------+------------+----------+------+---------------+-------------+----+-----+---+-----------------+
| 501265|  2011|    Delhi|2011-05-21|    League|             NA|    Feroz Shah Kotla|    Delhi Daredevi

In [0]:
matches_with_outcomes = matches.withColumn(
    "toss_winner_is_match_winner",
    when(col('toss_winner')==col('winner'),1).otherwise(0)
)
toss_analysis = matches_with_outcomes.groupBy("toss_decision").agg(
    count("*").alias("total_matches"),
    count(when(col("toss_winner_is_match_winner") == 1, True)).alias("toss_winner_wins"),
    count(when(col("toss_winner_is_match_winner") == 0, True)).alias("toss_winner_losses")
)
# Calculate win percentage for toss decisions
toss_analysis = toss_analysis.withColumn(
    "win_percentage", 
    (col("toss_winner_wins") / col("total_matches")) * 100
)
toss_analysis.show(5)

+-------------+-------------+----------------+------------------+-----------------+
|toss_decision|total_matches|toss_winner_wins|toss_winner_losses|   win_percentage|
+-------------+-------------+----------------+------------------+-----------------+
|        field|          704|             377|               327|53.55113636363637|
|          bat|          391|             177|               214|45.26854219948849|
+-------------+-------------+----------------+------------------+-----------------+

