In [0]:
spark

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('IPL Data Analysis').getOrCreate()

In [0]:
spark

In [0]:
ball_by_ball_df = spark.read.csv("s3://ipl-dataset-2024/Ball_By_Ball.csv",inferSchema=True,header=True)
ball_by_ball_df.show(3)

In [0]:
ball_by_ball_df.printSchema()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, DateType,DecimalType
from pyspark.sql.functions import *
from pyspark.sql.window import *

In [0]:
ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", StringType(), True),
    StructField("team_bowling", StringType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", BooleanType(), True),
    StructField("bowled", BooleanType(), True),
    StructField("run_out", BooleanType(), True),
    StructField("lbw", BooleanType(), True),
    StructField("retired_hurt", BooleanType(), True),
    StructField("stumped", BooleanType(), True),
    StructField("caught_and_bowled", BooleanType(), True),
    StructField("hit_wicket", BooleanType(), True),
    StructField("obstructingfeild", BooleanType(), True),
    StructField("bowler_wicket", BooleanType(), True),
    StructField("match_date", DateType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", BooleanType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", DateType(), True)
])

In [0]:
ball_by_ball_df = spark.read.csv("s3://ipl-dataset-2024/Ball_By_Ball.csv",header=True,schema = ball_by_ball_schema)
ball_by_ball_df.printSchema()

In [0]:
match_schema = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", DateType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True)
])

match_df = spark.read.csv("s3://ipl-dataset-2024/Match.csv",header=True, schema = match_schema)


In [0]:
player_schema = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True)
])

player_df = spark.read.csv("s3://ipl-dataset-2024/Player.csv", header=True, schema=player_schema)


In [0]:
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", DecimalType(18, 2), True),
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("is_manofthematch", BooleanType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", BooleanType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True)
])

player_match_df = spark.read.csv("s3://ipl-dataset-2024/Player_match.csv", header=True,schema=player_match_schema)


In [0]:
team_schema = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), True),
    StructField("team_name", StringType(), True)
])

team_df = spark.read.csv("s3://ipl-dataset-2024/Team.csv", header=True, schema=team_schema)


# IPL Data Analysis

In [0]:
 # Filter to include only valid deliveries (excluding extras like wides and no balls for specific analysis
ball_by_ball_df = ball_by_ball_df.filter((col('wides')==0) & (col('noballs')==0))
display(ball_by_ball_df)

In [0]:
# Aggregation: Calculate the total and average runs scored in each match and inning
total_and_avg_runs = ball_by_ball_df.groupBy('match_id','innings_no').agg(sum("runs_scored").alias('total_run'),\
                                                                         round(avg("runs_scored"),2).alias('avg_run'))

total_and_avg_runs.show(10)


In [0]:
# Calculate running total of runs in each match for each over
windowspec = Window.partitionBy('match_id','over_id').orderBy('over_id')

total_runs_per_over = ball_by_ball_df.withColumn('runs_scored_per_over',sum('runs_scored').over(windowspec))

In [0]:
# Conditional Column: Flag for high impact balls (either a wicket or more than 6 runs including extras)
ball_by_ball_df = ball_by_ball_df.withColumn('high_impact',when((col('runs_scored') + col('extra_runs') > 6) | (col('Bowler_Wicket') == True),True).otherwise(False))

In [0]:
display(ball_by_ball_df)

In [0]:
# Extracting year, month, and day from the match date for more detailed time-based analysis
match_df = match_df.withColumn('year',year('match_date'))
match_df = match_df.withColumn('month',month('match_date'))
match_df = match_df.withColumn('day',dayofmonth('match_date'))

# categorizing win margins into 'high', 'medium', and 'low'
match_df = match_df.withColumn('win_margin_category',when(col('win_margin') > 100,'High')\
                                                    .when((col('win_margin') < 100) & (col('win_margin') > 50) , 'Medium')\
                                                        .otherwise('Low'))

In [0]:
match_df.show(2)

In [0]:
# Analyze the impact of the toss: who wins the toss and the match
match_df = match_df.withColumn('toss_match_winner',when(col('toss_winner') == col('match_winner'),'Yes').otherwise('No'))

In [0]:
match_df.show(2)

In [0]:
# Normalize and clean player names
player_df = player_df.withColumn('player_name',regexp_replace('player_name',"[^a-zA-Z0-9 ]", ""))
player_df.show(2)

In [0]:
#Handle missing values for the batting_hand and bowling_skill columns with Unknown
player_df = player_df.fillna({'batting_hand':'Unknown',
                              'bowling_skill': 'Unknown'})

In [0]:
# Categorizing players based on batting hand
player_df = player_df.withColumn('batting_style',when(col('batting_hand').contains('left'),"Left-Handed").otherwise('Right-Handed'))

player_df.show(5)

In [0]:
# Add a 'veteran_status' column based on player age
player_match_df= player_match_df.withColumn('veteran_status',when(col('age_as_on_match') >= 35,'Veteran').otherwise('Non-Veteran'))
player_match_df.show(2)

In [0]:
# Dynamic column to calculate years since debut
player_match_df = player_match_df.withColumn('years_since_debut', (year(current_date())-col('season_year')))
player_match_df.show(3)

In [0]:
ball_by_ball_df.createOrReplaceTempView('ball_by_ball')
match_df.createOrReplaceTempView('match')
player_df.createOrReplaceTempView('player')
player_match_df.createOrReplaceTempView('player_match')
team_df.createOrReplaceTempView('team')

In [0]:
#top 10 highest individual scores by players in IPL history
top_batsmen = spark.sql("""select p.player_name, sum(b.runs_scored) as highest_score from
                        ball_by_ball b join player p on b.striker = p.player_id
                        group by p.player_name
                        order by highest_score desc
                       """ )
top_batsmen.show(10)

In [0]:
#batsmen with most centuries
batsmen_with_most_centuries = spark.sql("""
SELECT p.player_name, COUNT(*) AS centuries
FROM (
    SELECT match_id, striker, SUM(runs_scored) AS total_runs
    FROM Ball_By_Ball
    GROUP BY match_id, striker
) AS runs_per_match
JOIN Player p ON runs_per_match.striker = p.player_id
WHERE runs_per_match.total_runs >= 100
GROUP BY p.player_name
ORDER BY centuries DESC;
""")

batsmen_with_most_centuries.show(10)

In [0]:
#top scoring batsman per season
top_scoring_batsmen_per_season = spark.sql("""select season_year, player_name, total_runs
from (
    select m.season_year, p.player_name, sum(b.runs_scored) AS total_runs,
    row_number() over (partition by m.season_year order by SUM(b.runs_scored) desc) as rn
    from Ball_By_Ball b
    join Player p on b.striker = p.player_id
    join Match m on b.match_id = m.match_id
    group by m.season_year, p.player_name
) as ranked where rn = 1 order by season_year""")

top_scoring_batsmen_per_season.show()

In [0]:
#economical bowler powerplay
economical_bowler_in_powerplay = spark.sql("""select p.player_name,round(avg(b.runs_scored),2)avg_runs_per_ball,
                                           sum(CASE WHEN b.bowler_wicket = 'true' THEN 1 ELSE 0 END)total_wickets 
                                           from ball_by_ball b 
                                           join player_match pm on b.match_id = pm.match_id and b.bowler = pm.player_id
                                           join player p on pm.player_id = p.player_id
                                           where b.over_id >=6
                                           group by p.player_name 
                                           having count(*) > 1 order by avg_runs_per_ball, total_wickets desc""")

economical_bowler_in_powerplay.show()

In [0]:
#toss impact on win or loss 
toss_impact_outcome = spark.sql("""select match_id,toss_winner,toss_name,match_winner,
                                (case when toss_winner = match_winner then 'Won' else 'Lost' end ) as match_outcome
                                from match where toss_winner is not null
                                """)

toss_impact_outcome.show()

In [0]:
spark.sql("""
SELECT team_name, round(AVG(b.runs_scored,2) AS avg_runs_powerplay
FROM Ball_By_Ball b
JOIN Match m ON b.match_id = m.match_id
JOIN Team t ON b.team_batting = t.team_name
WHERE b.over_id BETWEEN 1 AND 6
GROUP BY team_name
ORDER BY avg_runs_powerplay DESC
""").show()
