### Define Schema

In [0]:
%run "./0.Configurations"

In [0]:
### Import Required functions
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType,DecimalType
from pyspark.sql.functions import col, when, sum, avg, row_number 
from pyspark.sql.window import Window

In [0]:
# Define the schema
ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType()),
    StructField("over_id", IntegerType()),
    StructField("ball_id", IntegerType()),
    StructField("innings_no", IntegerType()),
    StructField("team_batting", StringType()),
    StructField("team_bowling", StringType()),
    StructField("striker_batting_position", IntegerType()),
    StructField("extra_type", StringType()),
    StructField("runs_scored", IntegerType()),
    StructField("extra_runs", IntegerType()),
    StructField("wides", IntegerType()),
    StructField("legbyes", IntegerType()),
    StructField("byes", IntegerType()),
    StructField("noballs", IntegerType()),
    StructField("penalty", IntegerType()),
    StructField("bowler_extras", IntegerType()),
    StructField("out_type", StringType()),
    StructField("caught", BooleanType()),
    StructField("bowled", BooleanType()),
    StructField("run_out", BooleanType()),
    StructField("lbw", BooleanType()),
    StructField("retired_hurt", BooleanType()),
    StructField("stumped", BooleanType()),
    StructField("caught_and_bowled", BooleanType()),
    StructField("hit_wicket", BooleanType()),
    StructField("obstructingfeild", BooleanType()),
    StructField("bowler_wicket", BooleanType()),
    StructField("match_date", DateType()),
    StructField("season", IntegerType()),
    StructField("striker", IntegerType()),
    StructField("non_striker", IntegerType()),
    StructField("bowler", IntegerType()),
    StructField("player_out", IntegerType()),
    StructField("fielders", IntegerType()),
    StructField("striker_match_sk", IntegerType()),
    StructField("strikersk", IntegerType()),
    StructField("nonstriker_match_sk", IntegerType()),
    StructField("nonstriker_sk", IntegerType()),
    StructField("fielder_match_sk", IntegerType()),
    StructField("fielder_sk", IntegerType()),
    StructField("bowler_match_sk", IntegerType()),
    StructField("bowler_sk", IntegerType()),
    StructField("playerout_match_sk", IntegerType()),
    StructField("battingteam_sk", IntegerType()),
    StructField("bowlingteam_sk", IntegerType()),
    StructField("keeper_catch", BooleanType()),
    StructField("player_out_sk", IntegerType()),
    StructField("matchdatesk", DateType())
])

## Read Files
bbb_df=spark.read.option('header','true').schema(ball_by_ball_schema).csv(f'{ipl_folder_path}/raw/Ball_By_Ball.csv')
bbb_df.show(3)

+--------+-------+-------+----------+------------+------------+------------------------+----------+-----------+----------+-----+-------+----+-------+-------+-------------+--------------+------+------+-------+----+------------+-------+-----------------+----------+----------------+-------------+----------+------+-------+-----------+------+----------+--------+----------------+---------+-------------------+-------------+----------------+----------+---------------+---------+------------------+--------------+--------------+------------+-------------+-----------+
|match_id|over_id|ball_id|innings_no|team_batting|team_bowling|striker_batting_position|extra_type|runs_scored|extra_runs|wides|legbyes|byes|noballs|penalty|bowler_extras|      out_type|caught|bowled|run_out| lbw|retired_hurt|stumped|caught_and_bowled|hit_wicket|obstructingfeild|bowler_wicket|match_date|season|striker|non_striker|bowler|player_out|fielders|striker_match_sk|strikersk|nonstriker_match_sk|nonstriker_sk|fielder_match_sk|

In [0]:
#Define Match Schema
match_schema = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", DateType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True)
])

#Read Match Files
match_df = spark.read.schema(match_schema).format("csv").option("header","true").load(f"{ipl_folder_path}/raw/Match.csv")
match_df.show()

+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-------------+----------+----------+
|match_sk|match_id|               team1|               team2|match_date|season_year|          venue_name| city_name|country_name|         toss_winner|        match_winner|toss_name|win_type|outcome_type|    manofmach|win_margin|country_id|
+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-------------+----------+----------+
|       0|  335987|Royal Challengers...|Kolkata Knight Ri...|      NULL|       2008|M Chinnaswamy Sta...| Bangalore|       India|Royal Challengers...|Kolkata Knight Ri...|    field|    runs|      Result|  BB McCullum|       140|         1|
|       1|  335988|     Kings XI Punjab|

In [0]:
# Define the schema
team_schema = StructType([
    StructField("team_sk", IntegerType()),
    StructField("team_id", IntegerType()),
    StructField("team_name", StringType())
])

## Read Team Files
team_df=spark.read.option('header','true').schema(team_schema).csv(f'{ipl_folder_path}/raw/Team.csv')
team_df.show(5,truncate=False)

+-------+-------+---------------------------+
|team_sk|team_id|team_name                  |
+-------+-------+---------------------------+
|0      |1      |Kolkata Knight Riders      |
|1      |2      |Royal Challengers Bangalore|
|2      |3      |Chennai Super Kings        |
|3      |4      |Kings XI Punjab            |
|4      |5      |Rajasthan Royals           |
+-------+-------+---------------------------+
only showing top 5 rows



In [0]:
# Define Player Schema
player_schema = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True)
])

#Read player Files
player_df = spark.read.schema(player_schema).format("csv").option("header","true").load(f"{ipl_folder_path}/raw/Player.csv")
player_df.show(2)


+---------+---------+-----------+----+--------------+----------------+------------+
|player_sk|player_id|player_name| dob|  batting_hand|   bowling_skill|country_name|
+---------+---------+-----------+----+--------------+----------------+------------+
|        0|        1| SC Ganguly|NULL| Left-hand bat|Right-arm medium|       India|
|        1|        2|BB McCullum|NULL|Right-hand bat|Right-arm medium| New Zealand|
+---------+---------+-----------+----+--------------+----------------+------------+
only showing top 2 rows



In [0]:
#Define Player Match Schema
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", DecimalType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("is_manofthematch", BooleanType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", BooleanType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True)
])

#Read Player Match Files
player_match_df = spark.read.schema(player_match_schema).format("csv").option("header","true").load(f"{ipl_folder_path}/raw/Player_match.csv")
player_match_df.show(2)


+---------------+---------------+--------+---------+-----------+----+--------------+------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+--------------+--------------+---------------+-------------+--------------+
|player_match_sk|playermatch_key|match_id|player_id|player_name| dob|  batting_hand|     bowling_skill|country_name|role_desc|         player_team|        opposit_team|season_year|is_manofthematch|age_as_on_match|isplayers_team_won|batting_status|bowling_status|player_captain|opposit_captain|player_keeper|opposit_keeper|
+---------------+---------------+--------+---------+-----------+----+--------------+------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+--------------+--------------+---------------+-------------+--------------+
|             -1|             -

### Transformation

In [0]:
## Filter to include only valid deliveries, excluding extras like wide and no balls

bbb_trans_df=bbb_df.filter('wides==0' and 'noballs==0')
bbb_trans_df.show(5)

+--------+-------+-------+----------+------------+------------+------------------------+----------+-----------+----------+-----+-------+----+-------+-------+-------------+--------------+------+------+-------+----+------------+-------+-----------------+----------+----------------+-------------+----------+------+-------+-----------+------+----------+--------+----------------+---------+-------------------+-------------+----------------+----------+---------------+---------+------------------+--------------+--------------+------------+-------------+-----------+
|match_id|over_id|ball_id|innings_no|team_batting|team_bowling|striker_batting_position|extra_type|runs_scored|extra_runs|wides|legbyes|byes|noballs|penalty|bowler_extras|      out_type|caught|bowled|run_out| lbw|retired_hurt|stumped|caught_and_bowled|hit_wicket|obstructingfeild|bowler_wicket|match_date|season|striker|non_striker|bowler|player_out|fielders|striker_match_sk|strikersk|nonstriker_match_sk|nonstriker_sk|fielder_match_sk|

In [0]:
### Aggregation: Calculate the total and average runs scored in each match and inning
ttl_avg_runs_df=bbb_trans_df.groupBy('match_id','innings_no')\
    .agg(sum('runs_scored').alias("total_runs"),
         avg('runs_scored').alias('average_runs'))
    
ttl_avg_runs_df.show(5)

+--------+----------+----------+------------------+
|match_id|innings_no|total_runs|      average_runs|
+--------+----------+----------+------------------+
|  980940|         1|       138|1.1219512195121952|
|  419132|         1|       162|          1.265625|
|  336009|         1|       151|1.2276422764227641|
|  501203|         1|       150|1.2396694214876034|
|  598022|         4|        11|1.8333333333333333|
+--------+----------+----------+------------------+
only showing top 5 rows



In [0]:
# Window Function: Calculate running total of runs in each match for each over
windowSpec = Window.partitionBy("match_id","innings_no").orderBy("over_id")

ball_by_ball_df = bbb_df.withColumn(
    "running_total_runs",
    sum("runs_scored").over(windowSpec)
)
ball_by_ball_df.show(5)

+--------+-------+-------+----------+------------+------------+------------------------+----------+-----------+----------+-----+-------+----+-------+-------+-------------+--------------+------+------+-------+----+------------+-------+-----------------+----------+----------------+-------------+----------+------+-------+-----------+------+----------+--------+----------------+---------+-------------------+-------------+----------------+----------+---------------+---------+------------------+--------------+--------------+------------+-------------+-----------+------------------+
|match_id|over_id|ball_id|innings_no|team_batting|team_bowling|striker_batting_position|extra_type|runs_scored|extra_runs|wides|legbyes|byes|noballs|penalty|bowler_extras|      out_type|caught|bowled|run_out| lbw|retired_hurt|stumped|caught_and_bowled|hit_wicket|obstructingfeild|bowler_wicket|match_date|season|striker|non_striker|bowler|player_out|fielders|striker_match_sk|strikersk|nonstriker_match_sk|nonstriker_s

In [0]:
# Conditional Column: Flag for high impact balls (either a wicket or more than 6 runs including extras)
ball_by_ball_df = ball_by_ball_df.withColumn(
    "high_impact",
    when((col("runs_scored") + col("extra_runs") > 6) | (col("bowler_wicket") == True), True).otherwise(False)
)

In [0]:
ball_by_ball_df.show(2)

+--------+-------+-------+----------+------------+------------+------------------------+----------+-----------+----------+-----+-------+----+-------+-------+-------------+--------------+------+------+-------+----+------------+-------+-----------------+----------+----------------+-------------+----------+------+-------+-----------+------+----------+--------+----------------+---------+-------------------+-------------+----------------+----------+---------------+---------+------------------+--------------+--------------+------------+-------------+-----------+------------------+-----------+
|match_id|over_id|ball_id|innings_no|team_batting|team_bowling|striker_batting_position|extra_type|runs_scored|extra_runs|wides|legbyes|byes|noballs|penalty|bowler_extras|      out_type|caught|bowled|run_out| lbw|retired_hurt|stumped|caught_and_bowled|hit_wicket|obstructingfeild|bowler_wicket|match_date|season|striker|non_striker|bowler|player_out|fielders|striker_match_sk|strikersk|nonstriker_match_sk|

In [0]:
# Import required functions
from pyspark.sql.functions import year, month, dayofmonth, when

# Extracting year, month, and day from the match date for more detailed time-based analysis
match_df = match_df.withColumn("year", year("match_date"))
match_df = match_df.withColumn("month", month("match_date"))
match_df = match_df.withColumn("day", dayofmonth("match_date"))

# High margin win: categorizing win margins into 'high', 'medium', and 'low'
match_df = match_df.withColumn(
    "win_margin_category",
    when(col("win_margin") >= 100, "High")
    .when((col("win_margin") >= 50) & (col("win_margin") < 100), "Medium")
    .otherwise("Low")
)


In [0]:
# Analyze the impact of the toss: who wins the toss and the match
match_df = match_df.withColumn(
    "toss_match_winner",
    when(col("toss_winner") == col("match_winner"), "Yes").otherwise("No")
)

# Show the enhanced match DataFrame
match_df.show(2)

+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+----+-----+----+-------------------+-----------------+
|match_sk|match_id|               team1|               team2|match_date|season_year|          venue_name| city_name|country_name|         toss_winner|        match_winner|toss_name|win_type|outcome_type|  manofmach|win_margin|country_id|year|month| day|win_margin_category|toss_match_winner|
+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+----+-----+----+-------------------+-----------------+
|       0|  335987|Royal Challengers...|Kolkata Knight Ri...|      NULL|       2008|M Chinnaswamy Sta...| Bangalore|       I

In [0]:
from pyspark.sql.functions import lower, regexp_replace

# Normalize and clean player names
player_df = player_df.withColumn("player_name", lower(regexp_replace("player_name", "[^a-zA-Z0-9 ]", "")))

# Handle missing values in 'batting_hand' and 'bowling_skill' with a default 'unknown'
player_df = player_df.na.fill({"batting_hand": "unknown", "bowling_skill": "unknown"})

# Categorizing players based on batting hand
player_df = player_df.withColumn(
    "batting_style",
    when(col("batting_hand").contains("left"), "Left-Handed").otherwise("Right-Handed")
)

# Show the modified player DataFrame
player_df.show(2)

+---------+---------+-----------+----+--------------+----------------+------------+-------------+
|player_sk|player_id|player_name| dob|  batting_hand|   bowling_skill|country_name|batting_style|
+---------+---------+-----------+----+--------------+----------------+------------+-------------+
|        0|        1| sc ganguly|NULL| Left-hand bat|Right-arm medium|       India| Right-Handed|
|        1|        2|bb mccullum|NULL|Right-hand bat|Right-arm medium| New Zealand| Right-Handed|
+---------+---------+-----------+----+--------------+----------------+------------+-------------+
only showing top 2 rows



In [0]:
from pyspark.sql.functions import col, when, current_date, expr

# Add a 'veteran_status' column based on player age
player_match_df = player_match_df.withColumn(
    "veteran_status",
    when(col("age_as_on_match") >= 35, "Veteran").otherwise("Non-Veteran")
)

# Dynamic column to calculate years since debut
player_match_df = player_match_df.withColumn(
    "years_since_debut",
    (year(current_date()) - col("season_year"))
)

# Show the enriched DataFrame
player_match_df.show()


+---------------+---------------+--------+---------+---------------+----+--------------+--------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+--------------+--------------+---------------+-------------+--------------+--------------+-----------------+
|player_match_sk|playermatch_key|match_id|player_id|    player_name| dob|  batting_hand|       bowling_skill|country_name|role_desc|         player_team|        opposit_team|season_year|is_manofthematch|age_as_on_match|isplayers_team_won|batting_status|bowling_status|player_captain|opposit_captain|player_keeper|opposit_keeper|veteran_status|years_since_debut|
+---------------+---------------+--------+---------+---------------+----+--------------+--------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+--------------+--------

### Create Global Temp View To Explore data using Spark Sql

In [0]:
ball_by_ball_df.createOrReplaceGlobalTempView("ball_by_ball")
match_df.createOrReplaceGlobalTempView("match")
player_df.createOrReplaceGlobalTempView("player")
player_match_df.createOrReplaceGlobalTempView("player_match")
team_df.createOrReplaceGlobalTempView("team")