In [0]:
spark

In [0]:
from pyspark.sql import SparkSession

#creating session
spark = SparkSession.builder.appName("IPL-Data-Analysis").getOrCreate()

In [0]:
# For creating our own scehema
from pyspark.sql.types import StructField,StructType,IntegerType, StringType,BooleanType,DateType, DecimalType

In [0]:
#creating our own schema to make sure datatypes are followed as per given
ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", StringType(), True),
    StructField("team_bowling", StringType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", BooleanType(), True),
    StructField("bowled", BooleanType(), True),
    StructField("run_out", BooleanType(), True),
    StructField("lbw", BooleanType(), True),
    StructField("retired_hurt", BooleanType(), True),
    StructField("stumped", BooleanType(), True),
    StructField("caught_and_bowled", BooleanType(), True),
    StructField("hit_wicket", BooleanType(), True),
    StructField("obstructingfeild", BooleanType(), True),
    StructField("bowler_wicket", BooleanType(), True),
    StructField("match_date", DateType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", BooleanType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", DateType(), True)
])


In [0]:
#ball_by_ball_df = spark.read.csv("s3://ipl-dataset-analysis-project/Ball_By_Ball.csv")
#ball_by_ball_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3://ipl-dataset-analysis-project/Ball_By_Ball.csv")
ball_by_ball_df = spark.read.schema(ball_by_ball_schema).format("csv").option("header","true").option("inferSchema","true").load("s3://ipl-dataset-analysis-project/Ball_By_Ball.csv")


In [0]:
ball_by_ball_df.show(3)

+--------+-------+-------+----------+------------+------------+------------------------+----------+-----------+----------+-----+-------+----+-------+-------+-------------+--------------+------+------+-------+----+------------+-------+-----------------+----------+----------------+-------------+----------+------+-------+-----------+------+----------+--------+----------------+---------+-------------------+-------------+----------------+----------+---------------+---------+------------------+--------------+--------------+------------+-------------+-----------+
|match_id|over_id|ball_id|innings_no|team_batting|team_bowling|striker_batting_position|extra_type|runs_scored|extra_runs|wides|legbyes|byes|noballs|penalty|bowler_extras|      out_type|caught|bowled|run_out| lbw|retired_hurt|stumped|caught_and_bowled|hit_wicket|obstructingfeild|bowler_wicket|match_date|season|striker|non_striker|bowler|player_out|fielders|striker_match_sk|strikersk|nonstriker_match_sk|nonstriker_sk|fielder_match_sk|

In [0]:
match_schema = StructType([
    StructField("match_sk", IntegerType()),
    StructField("match_id", IntegerType()),
    StructField("team1", StringType()),
    StructField("team2", StringType()),
    StructField("match_date", DateType()),
    StructField("season_year", IntegerType()),
    StructField("venue_name", StringType()),
    StructField("city_name", StringType()),
    StructField("country_name", StringType()),
    StructField("toss_winner", StringType()),
    StructField("match_winner", StringType()),
    StructField("toss_name", StringType()),
    StructField("win_type", StringType()),
    StructField("outcome_type", StringType()),
    StructField("manofmach", StringType()),
    StructField("win_margin", IntegerType()),
    StructField("country_id", IntegerType())
])

match_df = spark.read.schema(match_schema).format("csv").option("header","true").option("inferSchema","true").load("s3://ipl-dataset-analysis-project/Match.csv")

match_df.show(3)


+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+
|match_sk|match_id|               team1|               team2|match_date|season_year|          venue_name| city_name|country_name|         toss_winner|        match_winner|toss_name|win_type|outcome_type|  manofmach|win_margin|country_id|
+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+
|       0|  335987|Royal Challengers...|Kolkata Knight Ri...|      null|       2008|M Chinnaswamy Sta...| Bangalore|       India|Royal Challengers...|Kolkata Knight Ri...|    field|    runs|      Result|BB McCullum|       140|         1|
|       1|  335988|     Kings XI Punjab| Chennai

In [0]:
player_schema = StructType([
    StructField("player_sk", IntegerType()),
    StructField("player_id", IntegerType()),
    StructField("player_name", StringType()),
    StructField("dob", DateType()),
    StructField("batting_hand", StringType()),
    StructField("bowling_skill", StringType()),
    StructField("country_name", StringType())
])

player_df = spark.read.schema(player_schema).format("csv").option("header","true").option("inferSchema","true").load("s3://ipl-dataset-analysis-project/Player.csv")
player_df.show(3)

+---------+---------+-----------+----+--------------+----------------+------------+
|player_sk|player_id|player_name| dob|  batting_hand|   bowling_skill|country_name|
+---------+---------+-----------+----+--------------+----------------+------------+
|        0|        1| SC Ganguly|null| Left-hand bat|Right-arm medium|       India|
|        1|        2|BB McCullum|null|Right-hand bat|Right-arm medium| New Zealand|
|        2|        3| RT Ponting|null|Right-hand bat|Right-arm medium|   Australia|
+---------+---------+-----------+----+--------------+----------------+------------+
only showing top 3 rows



In [0]:
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType()),
    StructField("playermatch_key", DecimalType(18, 0)),
    StructField("match_id", IntegerType()),
    StructField("player_id", IntegerType()),
    StructField("player_name", StringType()),
    StructField("dob", DateType()),
    StructField("batting_hand", StringType()),
    StructField("bowling_skill", StringType()),
    StructField("country_name", StringType()),
    StructField("role_desc", StringType()),
    StructField("player_team", StringType()),
    StructField("opposit_team", StringType()),
    StructField("season_year", IntegerType()),
    StructField("is_manofthematch", BooleanType()),
    StructField("age_as_on_match", IntegerType()),
    StructField("isplayers_team_won", BooleanType()),
    StructField("batting_status", StringType()),
    StructField("bowling_status", StringType()),
    StructField("player_captain", StringType()),
    StructField("opposit_captain", StringType()),
    StructField("player_keeper", StringType()),
    StructField("opposit_keeper", StringType())
])

player_match_df = spark.read.schema(player_match_schema).format("csv").option("header","true").option("inferSchema","true").load("s3://ipl-dataset-analysis-project/Player_match.csv")
player_match_df.show(3)

+---------------+---------------+--------+---------+-----------+----+--------------+------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+--------------+--------------+---------------+-------------+--------------+
|player_match_sk|playermatch_key|match_id|player_id|player_name| dob|  batting_hand|     bowling_skill|country_name|role_desc|         player_team|        opposit_team|season_year|is_manofthematch|age_as_on_match|isplayers_team_won|batting_status|bowling_status|player_captain|opposit_captain|player_keeper|opposit_keeper|
+---------------+---------------+--------+---------+-----------+----+--------------+------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+--------------+--------------+---------------+-------------+--------------+
|             -1|             -

In [0]:
team_schema = StructType([
    StructField("team_sk", IntegerType()),
    StructField("team_id", IntegerType()),
    StructField("team_name", StringType())
])

team_df = spark.read.schema(team_schema).format("csv").option("header","true").option("inferSchema","true").load("s3://ipl-dataset-analysis-project/Team.csv")
team_df.show()

+-------+-------+--------------------+
|team_sk|team_id|           team_name|
+-------+-------+--------------------+
|      0|      1|Kolkata Knight Ri...|
|      1|      2|Royal Challengers...|
|      2|      3| Chennai Super Kings|
|      3|      4|     Kings XI Punjab|
|      4|      5|    Rajasthan Royals|
|      5|      6|    Delhi Daredevils|
|      6|      7|      Mumbai Indians|
|      7|      8|     Deccan Chargers|
|      8|      9|Kochi Tuskers Kerala|
|      9|     10|       Pune Warriors|
|     10|     11| Sunrisers Hyderabad|
|     11|     12|Rising Pune Super...|
|     12|     13|       Gujarat Lions|
+-------+-------+--------------------+



In [0]:
ball_by_ball_df.columns

Out[45]: ['match_id',
 'over_id',
 'ball_id',
 'innings_no',
 'team_batting',
 'team_bowling',
 'striker_batting_position',
 'extra_type',
 'runs_scored',
 'extra_runs',
 'wides',
 'legbyes',
 'byes',
 'noballs',
 'penalty',
 'bowler_extras',
 'out_type',
 'caught',
 'bowled',
 'run_out',
 'lbw',
 'retired_hurt',
 'stumped',
 'caught_and_bowled',
 'hit_wicket',
 'obstructingfeild',
 'bowler_wicket',
 'match_date',
 'season',
 'striker',
 'non_striker',
 'bowler',
 'player_out',
 'fielders',
 'striker_match_sk',
 'strikersk',
 'nonstriker_match_sk',
 'nonstriker_sk',
 'fielder_match_sk',
 'fielder_sk',
 'bowler_match_sk',
 'bowler_sk',
 'playerout_match_sk',
 'battingteam_sk',
 'bowlingteam_sk',
 'keeper_catch',
 'player_out_sk',
 'matchdatesk']

In [0]:
ball_by_ball_column_names = ball_by_ball_df.columns
print(ball_by_ball_column_names)

['match_id', 'over_id', 'ball_id', 'innings_no', 'team_batting', 'team_bowling', 'striker_batting_position', 'extra_type', 'runs_scored', 'extra_runs', 'wides', 'legbyes', 'byes', 'noballs', 'penalty', 'bowler_extras', 'out_type', 'caught', 'bowled', 'run_out', 'lbw', 'retired_hurt', 'stumped', 'caught_and_bowled', 'hit_wicket', 'obstructingfeild', 'bowler_wicket', 'match_date', 'season', 'striker', 'non_striker', 'bowler', 'player_out', 'fielders', 'striker_match_sk', 'strikersk', 'nonstriker_match_sk', 'nonstriker_sk', 'fielder_match_sk', 'fielder_sk', 'bowler_match_sk', 'bowler_sk', 'playerout_match_sk', 'battingteam_sk', 'bowlingteam_sk', 'keeper_catch', 'player_out_sk', 'matchdatesk']


In [0]:
# Filter to include only valid deliveries (excluding extras like wides and no balls for specific analyses)
from pyspark.sql.functions import *
ball_by_ball_df = ball_by_ball_df.filter((col("wides") == 0) & (col("noballs")==0))
valid_deliveries_df = ball_by_ball_df.filter(
    (ball_by_ball_df["extra_type"].isNull()) |  # Exclude rows where 'extra_type' is not null
    (ball_by_ball_df["extra_type"] == "")      # Exclude rows where 'extra_type' is an empty string
)


In [0]:
total_avg_runs =  ball_by_ball_df.groupBy("matchid","innings_no").agg

In [0]:
#analysing using sql from below

#Converting to SQL table for Querying
ball_by_ball_df.createOrReplaceTempView("ball_by_ball")
match_df.createOrReplaceTempView("match")
player_df.createOrReplaceTempView("player")
player_match_df.createOrReplaceTempView("player_match")
team_df.createOrReplaceTempView("team")

In [0]:
# using SQL tables

top_scoring_batsmen_per_season = spark.sql("""
SELECT p.player_name,m.season_year,SUM(b.runs_scored) AS total_runs 
FROM ball_by_ball b
JOIN match m ON b.match_id = m.match_id   
JOIN player_match pm ON m.match_id = pm.match_id AND b.striker = pm.player_id     
JOIN player p ON p.player_id = pm.player_id
GROUP BY p.player_name, m.season_year
ORDER BY m.season_year, total_runs DESC
""")

In [0]:
top_scoring_batsmen_per_season.show(30)

+---------------+-----------+----------+
|    player_name|season_year|total_runs|
+---------------+-----------+----------+
|       SE Marsh|       2008|       614|
|      G Gambhir|       2008|       532|
|  ST Jayasuriya|       2008|       508|
|      SR Watson|       2008|       463|
|       GC Smith|       2008|       437|
|   AC Gilchrist|       2008|       431|
|      YK Pathan|       2008|       430|
|       SK Raina|       2008|       420|
|       MS Dhoni|       2008|       414|
|       V Sehwag|       2008|       399|
|      RG Sharma|       2008|       399|
|       R Dravid|       2008|       370|
|     SC Ganguly|       2008|       349|
|       S Dhawan|       2008|       340|
|  KC Sangakkara|       2008|       319|
|      DJ Hussey|       2008|       318|
|     RV Uthappa|       2008|       316|
|    SA Asnodkar|       2008|       311|
|   Yuvraj Singh|       2008|       299|
|       PA Patel|       2008|       297|
|Y Venugopal Rao|       2008|       283|
|      JA Morkel

In [0]:
economical_bowlers_powerplay = spark.sql("""
SELECT 
p.player_name, 
AVG(b.runs_scored) AS avg_runs_per_ball, 
COUNT(b.bowler_wicket) AS total_wickets
FROM ball_by_ball b
JOIN player_match pm ON b.match_id = pm.match_id AND b.bowler = pm.player_id
JOIN player p ON pm.player_id = p.player_id
WHERE b.over_id <= 6
GROUP BY p.player_name
HAVING COUNT(*) >= 1
ORDER BY avg_runs_per_ball, total_wickets DESC
""")
economical_bowlers_powerplay.show()




In [0]:
x= spark.sql("""
SELECT *
FROM match
limit 5          
""")

x.show()



In [0]:
toss_impact_individual_matches = spark.sql("""
SELECT m.match_id, m.toss_winner, m.toss_name, m.match_winner,
       CASE WHEN m.toss_winner = m.match_winner THEN 'Won' ELSE 'Lost' END AS match_outcome
FROM match m
WHERE m.toss_name IS NOT NULL
ORDER BY m.match_id
""")
toss_impact_individual_matches.show()

+--------+--------------------+---------+--------------------+-------------+
|match_id|         toss_winner|toss_name|        match_winner|match_outcome|
+--------+--------------------+---------+--------------------+-------------+
|  335987|Royal Challengers...|    field|Kolkata Knight Ri...|         Lost|
|  335988| Chennai Super Kings|      bat| Chennai Super Kings|          Won|
|  335989|    Rajasthan Royals|      bat|    Delhi Daredevils|         Lost|
|  335990|      Mumbai Indians|      bat|Royal Challengers...|         Lost|
|  335991|     Deccan Chargers|      bat|Kolkata Knight Ri...|         Lost|
|  335992|     Kings XI Punjab|      bat|    Rajasthan Royals|         Lost|
|  335993|     Deccan Chargers|      bat|    Delhi Daredevils|         Lost|
|  335994|      Mumbai Indians|    field| Chennai Super Kings|         Lost|
|  335995|    Rajasthan Royals|    field|    Rajasthan Royals|          Won|
|  335996|      Mumbai Indians|    field|     Kings XI Punjab|         Lost|

In [0]:
import matplotlib.pyplot as plt
