In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, BooleanType, DataType, DecimalType, StringType, DateType
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
from pyspark.sql import SparkSession 

#create session 
spark = SparkSession.builder.appName("IPL Data Analysis").getOrCreate()

In [0]:

ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), nullable=False),
    StructField("over_id", IntegerType(), nullable=False),
    StructField("ball_id", IntegerType(), nullable=False),
    StructField("innings_no", IntegerType(), nullable=False),
    StructField("team_batting", StringType(), nullable=False),
    StructField("team_bowling", StringType(), nullable=False),
    StructField("striker_batting_position", IntegerType(), nullable=True),
    StructField("extra_type", StringType(), nullable=True),
    StructField("runs_scored", IntegerType(), nullable=True),
    StructField("extra_runs", IntegerType(), nullable=True),
    StructField("wides", IntegerType(), nullable=True),
    StructField("legbyes", IntegerType(), nullable=True),
    StructField("byes", IntegerType(), nullable=True),
    StructField("noballs", IntegerType(), nullable=True),
    StructField("penalty", IntegerType(), nullable=True),
    StructField("bowler_extras", IntegerType(), nullable=True),
    StructField("out_type", StringType(), nullable=True),
    StructField("caught", BooleanType(), nullable=True),
    StructField("bowled", BooleanType(), nullable=True),
    StructField("run_out", BooleanType(), nullable=True),
    StructField("lbw", BooleanType(), nullable=True),
    StructField("retired_hurt", BooleanType(), nullable=True),
    StructField("stumped", BooleanType(), nullable=True),
    StructField("caught_and_bowled", BooleanType(), nullable=True),
    StructField("hit_wicket", BooleanType(), nullable=True),
    StructField("obstructingfeild", BooleanType(), nullable=True),
    StructField("bowler_wicket", BooleanType(), nullable=True),
    StructField("match_date", DateType(), nullable=True),
    StructField("season", IntegerType(), nullable=True),
    StructField("striker", IntegerType(), nullable=True),
    StructField("non_striker", IntegerType(), nullable=True),
    StructField("bowler", IntegerType(), nullable=True),
    StructField("player_out", IntegerType(), nullable=True),
    StructField("fielders", IntegerType(), nullable=True),
    StructField("striker_match_sk", IntegerType(), nullable=True),
    StructField("strikersk", IntegerType(), nullable=True),
    StructField("nonstriker_match_sk", IntegerType(), nullable=True),
    StructField("nonstriker_sk", IntegerType(), nullable=True),
    StructField("fielder_match_sk", IntegerType(), nullable=True),
    StructField("fielder_sk", IntegerType(), nullable=True),
    StructField("bowler_match_sk", IntegerType(), nullable=True),
    StructField("bowler_sk", IntegerType(), nullable=True),
    StructField("playerout_match_sk", IntegerType(), nullable=True),
    StructField("battingteam_sk", IntegerType(), nullable=True),
    StructField("bowlingteam_sk", IntegerType(), nullable=True),
    StructField("keeper_catch", BooleanType(), nullable=True),
    StructField("player_out_sk", IntegerType(), nullable=True),
    StructField("matchdatesk", DateType(), nullable=True)
])

ball_by_ball_df = spark.read.format("csv").option("header","true").load("s3://ipl-data-analysis-using-apachespark/Ball_By_Ball.csv")


In [0]:
match_schema = StructType([
    StructField("match_sk", IntegerType(), nullable=False),
    StructField("match_id", IntegerType(), nullable=False),
    StructField("team1", StringType(), nullable=False),
    StructField("team2", StringType(), nullable=False),
    StructField("match_date", DateType(), nullable=True),
    StructField("season_year", IntegerType(), nullable=True),
    StructField("venue_name", StringType(), nullable=True),
    StructField("city_name", StringType(), nullable=True),
    StructField("country_name", StringType(), nullable=True),
    StructField("toss_winner", StringType(), nullable=True),
    StructField("match_winner", StringType(), nullable=True),
    StructField("toss_name", StringType(), nullable=True),
    StructField("win_type", StringType(), nullable=True),
    StructField("outcome_type", StringType(), nullable=True),
    StructField("manofmach", StringType(), nullable=True),
    StructField("win_margin", IntegerType(), nullable=True),
    StructField("country_id", IntegerType(), nullable=True)
])

match_df = spark.read.format('csv').option('header','true').load('s3://ipl-data-analysis-using-apachespark/Match.csv')

In [0]:
player_schema = StructType([
    StructField("player_sk", IntegerType(), nullable=False),
    StructField("player_id", IntegerType(), nullable=False),
    StructField("player_name", StringType(), nullable=False),
    StructField("dob", DateType(), nullable=True),
    StructField("batting_hand", StringType(), nullable=True),
    StructField("bowling_skill", StringType(), nullable=True),
    StructField("country_name", StringType(), nullable=True)
])

player_df = spark.read.format('csv').option('header','true').load('s3://ipl-data-analysis-using-apachespark/Player.csv')

In [0]:
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), nullable=False),
    StructField("playermatch_key", DecimalType(20, 10), nullable=True),  # Adjust precision and scale as needed
    StructField("match_id", IntegerType(), nullable=False),
    StructField("player_id", IntegerType(), nullable=False),
    StructField("player_name", StringType(), nullable=True),
    StructField("dob", DateType(), nullable=True),
    StructField("batting_hand", StringType(), nullable=True),
    StructField("bowling_skill", StringType(), nullable=True),
    StructField("country_name", StringType(), nullable=True),
    StructField("role_desc", StringType(), nullable=True),
    StructField("player_team", StringType(), nullable=True),
    StructField("opposit_team", StringType(), nullable=True),
    StructField("season_year", IntegerType(), nullable=True),
    StructField("is_manofthematch", BooleanType(), nullable=True),
    StructField("age_as_on_match", IntegerType(), nullable=True),
    StructField("isplayers_team_won", BooleanType(), nullable=True),
    StructField("batting_status", StringType(), nullable=True),
    StructField("bowling_status", StringType(), nullable=True),
    StructField("player_captain", StringType(), nullable=True),
    StructField("opposit_captain", StringType(), nullable=True),
    StructField("player_keeper", StringType(), nullable=True),
    StructField("opposit_keeper", StringType(), nullable=True)
])

player_match_df = spark.read.format('csv').option('header','true').load('s3://ipl-data-analysis-using-apachespark/Player_match.csv')

In [0]:
team_schema = StructType([
    StructField("team_sk", IntegerType(), nullable=False),
    StructField("team_id", IntegerType(), nullable=False),
    StructField("team_name", StringType(), nullable=False)
])

team_df = spark.read.format('csv').option('header','true').load('s3://ipl-data-analysis-using-apachespark/Team.csv')

In [0]:
# FILTER TO INCLUDE ONLY VALID DELEVIRIES (EXCLUDING EXTRAS LIKE WIDE BALLS AND NO BALLS )

ball_by_ball_df = ball_by_ball_df.filter((col("wides")==0) & (col("noballs")== 0))

In [0]:
#AGGREGATION : CALCULATE TOTAL AND AVG RUNS SCORED IN EACH MATCH AND INNINGS

total_and_avg_sum = ball_by_ball_df.groupBy("match_id", "innings_no").agg(
    sum("runs_scored").alias("total_runs"),
    avg("runs_scored").alias("average_runs")
)

In [0]:
# CALCULATE RUNNING TOTAL OF RUNS IN EACH MATCH FOR EACH OVER


# Define the window specification
windowSpec = Window.partitionBy("match_id", "innings_no").orderBy("over_id")

ball_by_ball_df = ball_by_ball_df.withColumn(
    "running_total_runs", 
    sum("runs_scored").over(windowSpec)
)

In [0]:
# CONDITIONAL COLUMN : FLAG FOR HIGH IMPACT BALLS ( EITHER A WICKET OR MORE THAN 6 RUNS INCLUDING EXTRAS )

ball_by_ball_df = ball_by_ball_df.withColumn(
    "high_impact",
    when(
    (col("runs_scored") + col("extra_runs") > 6) | (col("bowler_wicket") == True),
    True
).otherwise(False)
)

In [0]:
# EXTRACTING YEAR, MONTH AND DAY FROM THR MATCH DATA FOR DETAILED TIME BASED ANALYSIS

match_df = match_df.withColumn("year", year("match_date"))
match_df = match_df.withColumn("month", month("match_date"))
match_df = match_df.withColumn("day", dayofmonth("match_date"))

In [0]:
# HIGH MARGIN WIN : CATEGORIZING WITH MARGINS INTO 'HIGH', 'MEDIUM' AND 'LOW'

match_df =match_df.withColumn(
    "win_margin_category",
    when(col("win_margin") >= 100, "High")
    .when((col("win_margin") >= 50) & (col("win_margin") < 100), "Medium")
    .otherwise("Low")
)

In [0]:
# ANALYZE THE IMPACT OF TOSS ON THE MATCH
match_df = match_df.withColumn(
    "toss_match_winner",
    when(col("toss_winner") == col("match_winner"), "Yes").otherwise("No")
)

In [0]:
# DYNAMIC COLUMN TO CAL YEARS SINCE DEBUT 

player_match_df = player_match_df.withColumn(
    "year_since_debut",
    (year(current_date()) - col("season_year"))
)

In [0]:
#PLAYER PERFORMANCE BASED ON THEIR MAN OF THE MATCH STATUS

player_awards_df = match_df.filter(col("manofmach").isNotNull())\
        .groupBy("manofmach")\
        .count()\
        .withColumnRenamed("count","awards_won")

sorted_player_awards_df = player_awards_df.orderBy(col("awards_won").desc())

For Sql Queries

In [0]:
ball_by_ball_df.createOrReplaceTempView("ball_by_ball")
match_df.createOrReplaceTempView("match")
player_df.createOrReplaceTempView("player")
player_match_df.createOrReplaceTempView("player_match")
team_df.createOrReplaceTempView("team")
player_awards_df.createOrReplaceTempView("Awards")

In [0]:
# WINTYPE BASED ON THE RUNS AND WICKETS

wintype = spark.sql("""
SELECT
    SUM(CASE WHEN win_type = 'runs' THEN 1 ELSE 0 END) AS Runs,
    SUM(CASE WHEN win_type = 'wickets' THEN 1 ELSE 0 END) AS Wickets
FROM match
""")

wintype.show(5)

+----+-------+
|Runs|Wickets|
+----+-------+
| 286|    339|
+----+-------+



In [0]:
# BEST TEAMS ACCORDING TO THE WINS 

most_wins = spark.sql("""
SELECT 
    match_winner, 
    COUNT(*) AS total_wins 
FROM match 
GROUP BY match_winner 
ORDER BY total_wins DESC 
""")
most_wins.show()

+--------------------+----------+
|        match_winner|total_wins|
+--------------------+----------+
|      Mumbai Indians|        91|
| Chennai Super Kings|        79|
|Kolkata Knight Ri...|        77|
|Royal Challengers...|        73|
|     Kings XI Punjab|        70|
|    Rajasthan Royals|        63|
|    Delhi Daredevils|        62|
| Sunrisers Hyderabad|        42|
|     Deccan Chargers|        29|
|Rising Pune Super...|        15|
|       Gujarat Lions|        13|
|       Pune Warriors|        12|
|Kochi Tuskers Kerala|         6|
|                NULL|         3|
|                tied|         1|
|           abandoned|         1|
+--------------------+----------+



In [0]:
# BEST BOWLER ACCORDING TO THE WICKETS TAKEN 

best_bowler = spark.sql("""
SELECT 
    p.player_name, 
    COUNT(bbb.out_type) AS total_wickets
FROM 
    ball_by_ball bbb
JOIN 
    player p 
ON 
    bbb.bowler = p.player_id
WHERE 
    bbb.out_type IN ('caught', 'bowled', 'lbw', 'stumped', 'caught and bowled', 'hit wicket')
GROUP BY 
    p.player_name
ORDER BY 
    total_wickets DESC
""")
best_bowler.show(10)

+---------------+-------------+
|    player_name|total_wickets|
+---------------+-------------+
|     SL Malinga|          144|
|       A Mishra|          128|
|      PP Chawla|          121|
|Harbhajan Singh|          116|
|       DJ Bravo|          102|
|        B Kumar|           97|
|         Z Khan|           94|
|       R Ashwin|           93|
|      SP Narine|           88|
|        A Nehra|           88|
+---------------+-------------+
only showing top 10 rows



In [0]:
# BEST BATSMAN ACCORDING TO THE RUNS SCORED 

best_batsman = spark.sql("""
SELECT 
    p.player_name, 
    SUM(bbb.runs_scored) AS total_runs
FROM 
    ball_by_ball bbb
JOIN 
    player p 
ON 
    bbb.striker = p.player_id
GROUP BY 
    p.player_name
ORDER BY 
    total_runs DESC
""")
best_batsman.show(10)

+--------------+----------+
|   player_name|total_runs|
+--------------+----------+
|      SK Raina|    4526.0|
|       V Kohli|    4402.0|
|     RG Sharma|    4184.0|
|     G Gambhir|    4112.0|
|     DA Warner|    3985.0|
|    RV Uthappa|    3767.0|
|      CH Gayle|    3606.0|
|      MS Dhoni|    3553.0|
|      S Dhawan|    3541.0|
|AB de Villiers|    3460.0|
+--------------+----------+
only showing top 10 rows



In [0]:
# HIGHEST RUN SCORER IN SINGLE MATCH

highest_single_match_runs = spark.sql("""
SELECT 
    p.player_name,
    SUM(bbb.runs_scored) AS total_runs
FROM 
    ball_by_ball bbb
JOIN 
    player p 
ON 
    bbb.striker = p.player_id
JOIN 
    match m 
ON 
    bbb.match_id = m.match_id
GROUP BY 
    p.player_name, m.match_id
ORDER BY 
    total_runs DESC
LIMIT 1
""")
highest_single_match_runs.show(20)

+-----------+----------+
|player_name|total_runs|
+-----------+----------+
|   CH Gayle|     171.0|
+-----------+----------+



In [0]:
# MOST WICKET TAKEN BY ANY BOWLER IN A SINGLE MATCH

most_wickets_single_match = spark.sql("""
SELECT 
    p.player_name, 
    COUNT(*) AS total_wickets
FROM 
    ball_by_ball bbb
JOIN 
    player p 
ON 
    bbb.bowler = p.player_id
JOIN 
    match m 
ON 
    bbb.match_id = m.match_id
WHERE 
    bbb.out_type IN ('caught', 'bowled', 'lbw', 'stumped', 'caught and bowled', 'hit wicket')
GROUP BY 
    p.player_name, m.match_id
ORDER BY 
    total_wickets DESC
""")
most_wickets_single_match.show(20)

+---------------+-------------+
|    player_name|total_wickets|
+---------------+-------------+
|  Sohail Tanvir|            6|
|        A Zampa|            6|
|      RA Jadeja|            5|
|Harbhajan Singh|            5|
| AD Mascarenhas|            5|
|     JD Unadkat|            5|
|       A Mishra|            5|
|       MM Patel|            5|
|     JD Unadkat|            5|
|     SL Malinga|            5|
|        B Kumar|            5|
|    JP Faulkner|            5|
|    JP Faulkner|            5|
|         AJ Tye|            5|
|       R Bhatia|            4|
|    Imran Tahir|            4|
|       R Ashwin|            4|
|        B Kumar|            4|
|      SB Jakati|            4|
|      SR Watson|            4|
+---------------+-------------+
only showing top 20 rows



In [0]:
# TOP SCORING TEAMS PER SEASON 

top_scoring_team_each_season = spark.sql("""
SELECT 
    m.season_year,
    t.team_name AS team_name,
    SUM(bbb.runs_scored) AS total_runs
FROM 
    ball_by_ball bbb
JOIN 
    match m 
ON 
    bbb.match_id = m.match_id
JOIN 
    team t
ON
    bbb.team_batting = t.team_id
GROUP BY 
    m.season_year, t.team_name
ORDER BY 
    m.season_year, total_runs DESC
""")
top_scoring_team_each_season.show()


+-----------+--------------------+----------+
|season_year|           team_name|total_runs|
+-----------+--------------------+----------+
|       2008|    Rajasthan Royals|    2399.0|
|       2008| Chennai Super Kings|    2339.0|
|       2008|     Kings XI Punjab|    2334.0|
|       2008|     Deccan Chargers|    2095.0|
|       2008|    Delhi Daredevils|    1988.0|
|       2008|      Mumbai Indians|    1873.0|
|       2008|Royal Challengers...|    1858.0|
|       2008|Kolkata Knight Ri...|    1793.0|
|       2009|     Deccan Chargers|    2265.0|
|       2009| Chennai Super Kings|    2155.0|
|       2009|Royal Challengers...|    2150.0|
|       2009|    Delhi Daredevils|    2006.0|
|       2009|      Mumbai Indians|    1792.0|
|       2009|     Kings XI Punjab|    1763.0|
|       2009|Kolkata Knight Ri...|    1622.0|
|       2009|    Rajasthan Royals|    1528.0|
|       2010|      Mumbai Indians|    2579.0|
|       2010| Chennai Super Kings|    2432.0|
|       2010|Royal Challengers...|