In [0]:
spark

<pyspark.sql.connect.session.SparkSession at 0xff24b1b8b320>

In [0]:
from pyspark.sql import SparkSession

#create session
spark = SparkSession.builder.appName("IPL Data Analysis").getOrCreate()

In [0]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, BooleanType, DateType, DecimalType
from pyspark.sql.functions import col, when, sum, avg, row_number
from pyspark.sql.window import Window

In [0]:
ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", StringType(), True),
    StructField("team_bowling", StringType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", BooleanType(), True),
    StructField("bowled", BooleanType(), True),
    StructField("run_out", BooleanType(), True),
    StructField("lbw", BooleanType(), True),
    StructField("retired_hurt", BooleanType(), True),
    StructField("stumped", BooleanType(), True),
    StructField("caught_and_bowled", BooleanType(), True),
    StructField("hit_wicket", BooleanType(), True),
    StructField("obstructingfeild", BooleanType(), True),
    StructField("bowler_wicket", BooleanType(), True),
    StructField("match_date", DateType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", BooleanType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", DateType(), True)
])

In [0]:
ball_by_ball_df = spark.read.schema(ball_by_ball_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project-07/Ball_By_Ball.csv")

In [0]:
ball_by_ball_df.show(5)

+--------+-------+-------+----------+------------+------------+------------------------+----------+-----------+----------+-----+-------+----+-------+-------+-------------+--------------+------+------+-------+----+------------+-------+-----------------+----------+----------------+-------------+----------+------+-------+-----------+------+----------+--------+----------------+---------+-------------------+-------------+----------------+----------+---------------+---------+------------------+--------------+--------------+------------+-------------+-----------+
|match_id|over_id|ball_id|innings_no|team_batting|team_bowling|striker_batting_position|extra_type|runs_scored|extra_runs|wides|legbyes|byes|noballs|penalty|bowler_extras|      out_type|caught|bowled|run_out| lbw|retired_hurt|stumped|caught_and_bowled|hit_wicket|obstructingfeild|bowler_wicket|match_date|season|striker|non_striker|bowler|player_out|fielders|striker_match_sk|strikersk|nonstriker_match_sk|nonstriker_sk|fielder_match_sk|

In [0]:
#filter to include only valid deliveries(excluding extras like wides and no balls for specific analyses
ball_by_ball_df = ball_by_ball_df.filter((col("wides") == 0) & (col("noballs") == 0))

#aggregation: calcualte ther total and avarage runs scored in each match and inning
total_and_avarage_runs = ball_by_ball_df.groupBy("match_id", "innings_no").agg(
    sum("runs_scored").alias("total_runs"),
    avg("runs_scored").alias("avarage_runs")
)

In [0]:
# Window Function: Calculate running total of runs in each match for each over
windowSpec = Window.partitionBy("match_id", "innings_no").orderBy("over_id")

ball_by_ball_df = ball_by_ball_df.withColumn(
    "running_total_runs",
    sum("runs_scored").over(windowSpec)
) 

In [0]:
# Conditional Column: Flag for high impact balls(either a wicket or more than 6 runs including extras)
ball_by_ball_df = ball_by_ball_df.withColumn(
    "high_impact",
    when(
        ((col("runs_scored") + col("extra_runs")) > 6) | (col("bowler_wicket") == True),
        True
    ).otherwise(False)
)

In [0]:
ball_by_ball_df.show(5)

+--------+-------+-------+----------+------------+------------+------------------------+----------+-----------+----------+-----+-------+----+-------+-------+-------------+--------------+------+------+-------+----+------------+-------+-----------------+----------+----------------+-------------+----------+------+-------+-----------+------+----------+--------+----------------+---------+-------------------+-------------+----------------+----------+---------------+---------+------------------+--------------+--------------+------------+-------------+-----------+------------------+-----------+
|match_id|over_id|ball_id|innings_no|team_batting|team_bowling|striker_batting_position|extra_type|runs_scored|extra_runs|wides|legbyes|byes|noballs|penalty|bowler_extras|      out_type|caught|bowled|run_out| lbw|retired_hurt|stumped|caught_and_bowled|hit_wicket|obstructingfeild|bowler_wicket|match_date|season|striker|non_striker|bowler|player_out|fielders|striker_match_sk|strikersk|nonstriker_match_sk|

In [0]:
match_schema = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", DateType(), True),
    StructField("season_year", IntegerType(), True),  # year stored as integer (YYYY)
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True)
])

In [0]:
match_df = spark.read.schema(match_schema).format("csv").option("header", "true").load("s3://ipl-data-analysis-project-07/Match.csv")

In [0]:
match_df.show(5)

+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+
|match_sk|match_id|               team1|               team2|match_date|season_year|          venue_name| city_name|country_name|         toss_winner|        match_winner|toss_name|win_type|outcome_type|  manofmach|win_margin|country_id|
+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+
|       0|  335987|Royal Challengers...|Kolkata Knight Ri...|      NULL|       2008|M Chinnaswamy Sta...| Bangalore|       India|Royal Challengers...|Kolkata Knight Ri...|    field|    runs|      Result|BB McCullum|       140|         1|
|       1|  335988|     Kings XI Punjab| Chennai

In [0]:
from pyspark.sql.functions import year, month, dayofmonth, when

#Extracting year, month, and day from match date date for detailed time-based analysis
match_df = match_df.withColumn("year", year("match_date"))
match_df = match_df.withColumn("month", month("match_date"))
match_df = match_df.withColumn("day", dayofmonth("match_date"))

# High margin win: Categorizing win margins into 'high', 'medium, and 'low']
match_df = match_df.withColumn(
    "win_margin_category",
    when(col("win_margin") >= 100, "High")
    .when((col("win_margin") >=50) & (col("win_margin") < 100), "medium")
    .otherwise("low")
)

# Analyze the impact of the toss: who wins and the match
match_df = match_df.withColumn(
    "toss_match_winner",
    when(col("toss_winner") == col("match_winner"), "Yes").otherwise("No")
)

# show the enhanced match Dataframe
match_df.show(2)

+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+----+-----+----+-------------------+-----------------+
|match_sk|match_id|               team1|               team2|match_date|season_year|          venue_name| city_name|country_name|         toss_winner|        match_winner|toss_name|win_type|outcome_type|  manofmach|win_margin|country_id|year|month| day|win_margin_category|toss_match_winner|
+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+----+-----+----+-------------------+-----------------+
|       0|  335987|Royal Challengers...|Kolkata Knight Ri...|      NULL|       2008|M Chinnaswamy Sta...| Bangalore|       I

In [0]:
player_schema = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True)
])

In [0]:
player_df = spark.read.schema(player_schema).format("csv").option("header", "true").load("s3://ipl-data-analysis-project-07/Player.csv")

In [0]:
from pyspark.sql.functions import lower, regexp_replace

# Normalize and clean player names
player_df = player_df.withColumn("player_name",lower(regexp_replace(col("player_name"), "[^a-zA-Z0-9 ]", "")))

# Handle missing values in 'batting_hand' and 'bowling_skill' with a default 'unknown'
player_df = player_df.na.fill({"batting_hand": "unknown", "bowling_skill": "unknown"})

# Categorizing player based on batting hand
player_df = player_df.withColumn("batting_style", when(col("batting_hand").contains("Left-hand bat"), "left-handed").otherwise("right-handed"))

#show the modified player DataFrame
player_df.show(18)

+---------+---------+---------------+----+--------------+--------------------+------------+-------------+
|player_sk|player_id|    player_name| dob|  batting_hand|       bowling_skill|country_name|batting_style|
+---------+---------+---------------+----+--------------+--------------------+------------+-------------+
|        0|        1|     sc ganguly|NULL| Left-hand bat|    Right-arm medium|       India|  left-handed|
|        1|        2|    bb mccullum|NULL|Right-hand bat|    Right-arm medium| New Zealand| right-handed|
|        2|        3|     rt ponting|NULL|Right-hand bat|    Right-arm medium|   Australia| right-handed|
|        3|        4|      dj hussey|NULL|Right-hand bat|  Right-arm offbreak|   Australia| right-handed|
|        4|        5|mohammad hafeez|NULL|Right-hand bat|  Right-arm offbreak|    Pakistan| right-handed|
|        5|        6|       r dravid|NULL|Right-hand bat|  Right-arm offbreak|       India| right-handed|
|        6|        7|       w jaffer|NULL|Righ

In [0]:
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", DecimalType(20, 2), True),  # assuming up to 20 digits, 2 decimal places
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),  # using IntegerType for year
    StructField("is_manofthematch", BooleanType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", BooleanType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True)
])

In [0]:
player_match_df = spark.read.schema(player_match_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project-07/Player_match.csv")

In [0]:
from pyspark.sql.functions import col, when, current_date, expr

# Add a 'veteran_status' column based on player age
player_match_df = player_match_df.withColumn("veteran_status", when(col("age_as_on_match") >= 35, "veteran").otherwise("non-veteran"))


# Dynamic column to calculate years since debut
player_match_df = player_match_df.withColumn("years_since_debut", (year(current_date()) - col("season_year")))

player_match_df.display()

player_match_sk,playermatch_key,match_id,player_id,player_name,dob,batting_hand,bowling_skill,country_name,role_desc,player_team,opposit_team,season_year,is_manofthematch,age_as_on_match,isplayers_team_won,batting_status,bowling_status,player_captain,opposit_captain,player_keeper,opposit_keeper,veteran_status,years_since_debut
-1,-1.0,-1,-1,,,,,,,,,,,,,,,,,,,non-veteran,
12694,33598700006.0,335987,6,R Dravid,,Right-hand bat,Right-arm offbreak,India,Captain,Royal Challengers Bangalore,Kolkata Knight Riders,2008.0,,35.0,,,,R Dravid,SC Ganguly,MV Boucher,WP Saha,veteran,17.0
12695,33598700007.0,335987,7,W Jaffer,,Right-hand bat,Right-arm offbreak,India,Player,Royal Challengers Bangalore,Kolkata Knight Riders,2008.0,,30.0,,,,R Dravid,SC Ganguly,MV Boucher,WP Saha,non-veteran,17.0
12696,33598700008.0,335987,8,V Kohli,,Right-hand bat,Right-arm medium,India,Player,Royal Challengers Bangalore,Kolkata Knight Riders,2008.0,,20.0,,,,R Dravid,SC Ganguly,MV Boucher,WP Saha,non-veteran,17.0
12697,33598700009.0,335987,9,JH Kallis,,Right-hand bat,Right-arm fast-medium,South Africa,Player,Royal Challengers Bangalore,Kolkata Knight Riders,2008.0,,33.0,,,,R Dravid,SC Ganguly,MV Boucher,WP Saha,non-veteran,17.0
12698,33598700010.0,335987,10,CL White,,Right-hand bat,Legbreak googly,Australia,Player,Royal Challengers Bangalore,Kolkata Knight Riders,2008.0,,25.0,,,,R Dravid,SC Ganguly,MV Boucher,WP Saha,non-veteran,17.0
12699,33598700011.0,335987,11,MV Boucher,,Right-hand bat,Right-arm medium,South Africa,Keeper,Royal Challengers Bangalore,Kolkata Knight Riders,2008.0,,32.0,,,,R Dravid,SC Ganguly,MV Boucher,WP Saha,non-veteran,17.0
12700,33598700012.0,335987,12,B Akhil,,Right-hand bat,Right-arm medium-fast,India,Player,Royal Challengers Bangalore,Kolkata Knight Riders,2008.0,,31.0,,,,R Dravid,SC Ganguly,MV Boucher,WP Saha,non-veteran,17.0
12701,33598700013.0,335987,13,AA Noffke,,Right-hand bat,Right-arm fast-medium,Australia,Player,Royal Challengers Bangalore,Kolkata Knight Riders,2008.0,,31.0,,,,R Dravid,SC Ganguly,MV Boucher,WP Saha,non-veteran,17.0
12702,33598700014.0,335987,14,P Kumar,,Right-hand bat,Right-arm medium,India,Player,Royal Challengers Bangalore,Kolkata Knight Riders,2008.0,,22.0,,,,R Dravid,SC Ganguly,MV Boucher,WP Saha,non-veteran,17.0


In [0]:
team_schema = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), True),
    StructField("team_name", StringType(), True)
])

In [0]:
team_df = spark.read.schema(team_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project-07/Team.csv")

In [0]:
# Add a new column Team_Code by taking the all letters of Team_Name.
from pyspark.sql.functions import substring, upper, monotonically_increasing_id

team_df = team_df.withColumn("Team_Code", upper(substring("Team_Name", 1, 3)))


In [0]:
team_df.show()

+-------+-------+--------------------+---------+
|team_sk|team_id|           team_name|Team_Code|
+-------+-------+--------------------+---------+
|      0|      1|Kolkata Knight Ri...|      KOL|
|      1|      2|Royal Challengers...|      ROY|
|      2|      3| Chennai Super Kings|      CHE|
|      3|      4|     Kings XI Punjab|      KIN|
|      4|      5|    Rajasthan Royals|      RAJ|
|      5|      6|    Delhi Daredevils|      DEL|
|      6|      7|      Mumbai Indians|      MUM|
|      7|      8|     Deccan Chargers|      DEC|
|      8|      9|Kochi Tuskers Kerala|      KOC|
|      9|     10|       Pune Warriors|      PUN|
|     10|     11| Sunrisers Hyderabad|      SUN|
|     11|     12|Rising Pune Super...|      RIS|
|     12|     13|       Gujarat Lions|      GUJ|
+-------+-------+--------------------+---------+

