In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,BooleanType,DateType,LongType
from pyspark.sql.functions import col, when, sum, avg, row_number
from pyspark.sql.window import Window

# create session 

spark = SparkSession.builder.appName("IPL Data Analysis").getOrCreate()

In [0]:
ball_by_ball_schema = StructType([

    StructField("MatcH_id", IntegerType(), True),
    StructField("Over_id", IntegerType(), True),
    StructField("Ball_id", IntegerType(), True),
    StructField("Innings_No", IntegerType(), True),
    StructField("Team_Batting", StringType(), True),
    StructField("Team_Bowling", StringType(), True),
    StructField("Striker_Batting_Position", IntegerType(), True),
    StructField("Extra_Type", StringType(), True),
    StructField("Runs_Scored", IntegerType(), True),
    StructField("Extra_Runs", IntegerType(), True),
    StructField("Wides", IntegerType(), True),
    StructField("Legbyes", IntegerType(), True),
    StructField("Byes", IntegerType(), True),
    StructField("Noballs", IntegerType(), True),
    StructField("Penalty", IntegerType(), True),
    StructField("Bowler_Extras", IntegerType(), True),
    StructField("Out_type", StringType(), True),
    StructField("Caught", IntegerType(), True),
    StructField("Bowled", IntegerType(), True),
    StructField("Run_out", IntegerType(), True),
    StructField("LBW", IntegerType(), True),
    StructField("Retired_hurt", IntegerType(), True),
    StructField("Stumped", IntegerType(), True),
    StructField("caught_and_bowled", IntegerType(), True),
    StructField("hit_wicket", IntegerType(), True ),
    StructField("ObstructingFeild", IntegerType(), True),
    StructField("Bowler_Wicket", BooleanType(), True),
    StructField("Match_Date", DateType(), True),
    StructField("Season", IntegerType(), True),
    StructField("Striker", IntegerType(), True),
    StructField("Non_Striker", IntegerType(), True),
    StructField("Bowler", IntegerType(), True),
    StructField("Player_Out", IntegerType(), True),
    StructField("Fielders", IntegerType(), True),
    StructField("Striker_match_SK", IntegerType(), True),
    StructField("StrikerSK", IntegerType(), True),
    StructField("NonStriker_match_SK", IntegerType(), True),
    StructField("NONStriker_SK", IntegerType(), True),
    StructField("Fielder_match_SK", IntegerType(), True),
    StructField("Fielder_SK", IntegerType(), True),
    StructField("Bowler_match_SK", IntegerType(), True),
    StructField("BOWLER_SK", IntegerType(), True),
    StructField("PlayerOut_match_SK", IntegerType(), True),
    StructField("BattingTeam_SK", IntegerType(), True),
    StructField("BowlingTeam_SK", IntegerType(), True),
    StructField("Keeper_Catch", IntegerType(), True),
    StructField("Player_out_sk", IntegerType(), True),
    StructField("MatchDateSK", LongType(), True)
])

ball_by_ball_df  = spark.read.schema(ball_by_ball_schema).format("csv").option("header","true").option("dateFormat",'M/d/yyyy').load("s3://ipl-data-analysis-project/Ball_By_Ball.csv") 




In [0]:
match_schema = StructType([
    StructField("Match_SK", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("Team1", StringType(), True),
    StructField("Team2", StringType(), True),
    StructField("match_date", DateType(), True),
    StructField("Season_Year", IntegerType(), True),
    StructField("Venue_Name", StringType(), True),
    StructField("City_Name", StringType(), True),
    StructField("Country_Name", StringType(), True),
    StructField("Toss_Winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("Toss_Name", StringType(), True),
    StructField("Win_Type", StringType(), True),
    StructField("Outcome_Type", StringType(), True),
    StructField("ManOfMach", StringType(), True),
    StructField("Win_Margin", IntegerType(), True),
    StructField("Country_id", IntegerType(), True)
 
])

match_df = spark.read.schema(match_schema).format("csv").option("header",'true').option("dateFormat", "M/d/yyyy").load("s3://ipl-data-analysis-project/Match.csv")


In [0]:

player_schema = StructType([
    StructField("Player_SK", IntegerType(), True),
    StructField("Player_Id", IntegerType(), True),
    StructField("Player_Name", StringType(), True),
    StructField("DOB", DateType(), True),
    StructField("Batting_hand", StringType(), True),
    StructField("Bowling_skill", StringType(), True),
    StructField("Country_Name", StringType(), True)
])

player_df = spark.read.schema(player_schema).format("csv").option("header",'true').option("DateFormat",'m/d/yyyy').load("s3://ipl-data-analysis-project/Player.csv")


In [0]:
player_match_schema = StructType([

    StructField("Player_match_SK", IntegerType(), True),
    StructField("PlayerMatch_key", LongType(), True),
    StructField("Match_Id", IntegerType(), True),
    StructField("Player_Id", IntegerType(), True),
    StructField("Player_Name", StringType(), True),
    StructField("DOB", DateType(), True),
    StructField("Batting_hand", StringType(), True),
    StructField("Bowling_skill", StringType(), True),
    StructField("Country_Name", StringType(), True),
    StructField("Role_Desc", StringType(), True),
    StructField("Player_team", StringType(), True),
    StructField("Opposit_Team", StringType(), True),
    StructField("Season_year", IntegerType(), True),
    StructField("is_manofThematch", IntegerType(), True),
    StructField("Age_As_on_match", IntegerType(), True),
    StructField("IsPlayers_Team_won", IntegerType(), True),
    StructField("Batting_Status", StringType(), True),
    StructField("Bowling_Status", StringType(), True),
    StructField("Player_Captain", StringType(), True),
    StructField("Opposit_captain", StringType(), True),
    StructField("Player_keeper", StringType(), True),
    StructField("Opposit_keeper", StringType(), True),

])

player_match_df = spark.read.schema(player_match_schema).format("csv").option("header",'true').option("dateFormat", "M/d/yyyy").load("s3://ipl-data-analysis-project/Player_match.csv")
# display(player_match_df)


In [0]:


team_schema = StructType([
    StructField("team_sk",IntegerType(),True),
    StructField("team_id",IntegerType(),True),
    StructField("team_name",StringType(),True)

])

team_df =  spark.read.schema(team_schema).format("csv").option("header",'true').load("s3://ipl-data-analysis-project/Team.csv")
# display(team_df)


In [0]:

#filter to include only valid deliveries (excluding extras like wides and no balls for specific analysis)

ball_by_ball_df = ball_by_ball_df.filter((col("Wides") == 0 ) & (col("Noballs") == 0))


# aggregation:  Calucalte the total and average runs scored in each match and inning

total_and_avg_runs = ball_by_ball_df.groupBy("MatcH_id","Innings_No").agg(
    sum("Runs_Scored").alias("total_runs"),
    avg("Runs_Scored").alias("average_runs")
)



In [0]:


# window functions : Calucalte running total of runs in each match for each over 

windowSpec = Window.partitionBy("match_id","innings_no").orderBy("over_id")

ball_by_ball_df = ball_by_ball_df.withColumn(
    "running_total_runs",
    sum("runs_scored").over(windowSpec)
)




In [0]:

# Conditional coloumn : Flag for high impact balls (either a wicket or more than six runs including extras )

ball_by_ball_df = ball_by_ball_df.withColumn(
    "high_impact",
    when((col("Runs_Scored") + col("Extra_Runs") > 6) | (col("Bowler_Wicket") == True ),True).otherwise(False)
)


In [0]:

from pyspark.sql.functions import year, month, dayofmonth, when

# Extracting year,month,day of month from the match date for more detailed time based analysis

match_df = match_df.withColumn("year",year("match_date"))
match_df = match_df.withColumn("month",month("match_date"))
match_df = match_df.withColumn("day",dayofmonth("match_date"))

# High Margin Win : Categorizing win margin into High, Medium , Low 
match_df = match_df.withColumn(
    "Win_Margin_Category",
    when(col("Win_Margin") >= 100,"High")
    .when((col("Win_Margin") > 50) & (col("Win_Margin") < 100),"Medium")
    .otherwise("Low")
)

# Analyze the impact of the toss . Who wins the toss and the match 
match_df = match_df.withColumn(
    "Toss_and_Match_Winner",
    when(col("Toss_Winner") == col("match_winner"),"Win")
    .otherwise("Loss") 
  )
display(match_df)


In [0]:

from pyspark.sql.functions import lower,regexp_replace

# Normalize and clean players names 
player_df = player_df.withColumn(
    "player_name_new",
    lower(regexp_replace("player_name","[^a-zA-Z0-9 ]","")))

# Handling missing values in batting_hand and bowling skill with the default 'unknown'

player_df = player_df.na.fill({"Batting_hand":"unknown","Bowling_skill":"unknown"})

# categorizing players based on their batting hand 

player_df = player_df.withColumn(
    "Batting_Style",
    when(col("Batting_hand").contains("Left"),"Left Handed").otherwise("Right Handed")
)
# display(player_df)


In [0]:

from pyspark.sql.functions import col,when,current_date,expr

# Add a veteren status coloumn based on player age 
player_match_df = player_match_df.withColumn(
    "veteran_status",
    when(col("Age_As_on_match") >= 35,"Veteran").otherwise("Non-Veteran")
)

# dynamic column to calucalte years since debut 

player_match_df = player_match_df.withColumn(
    "years_since_debut",
    (year(current_date()) - col("Season_year"))

)
display(player_match_df)



In [0]:

ball_by_ball_df.createOrReplaceTempView("ball_by_ball")
match_df.createOrReplaceTempView("match")
player_df.createOrReplaceTempView("player")
player_match_df.createOrReplaceTempView("player_match")
team_df.createOrReplaceTempView("team")


In [0]:
# Team Dimention Table 
spark.sql("""
          Create or Replace Temp View dim_team AS 
          SELECT DISTINCT 
          team_sk,
          team_id,
          team_name
          from team
          """)


In [0]:
# Player Dimention Table 
spark.sql("""
          Create or Replace Temp View dim_player AS 
          SELECT DISTINCT 
          p.player_sk,
          p.player_id,
          p.player_name,
          p.dob,
          p.batting_hand,
          p.bowling_skill,
          pm.Role_Desc,
          pm.Season_year,
          pm.is_manofThematch,
          pm.Age_As_on_match,
          pm.IsPlayers_Team_won
          FROM player p LEFT JOIN player_match pm 
          ON p.player_id = pm.player_id
          """)


In [0]:
# Date Dimention 
spark.sql("""
CREATE OR REPLACE TEMP VIEW dim_date AS 
SELECT DISTINCT 
        CAST(DATE_FORMAT(match_date,'yyyyMMdd') AS INT) AS date_sk,
        match_date,
        DAY(match_date) as day,
        MONTH(match_date) as month,
        QUARTER(match_date) as quarter,
        YEAR(match_date) as year,
        DAYOFWEEK(match_date) as weekday
        FROM match
          """)

In [0]:
# Match Dimention

In [0]:
dd = spark.sql(
    """
    CREATE OR REPLACE TEMP VIEW dim_match AS 
    SELECT DISTINCT 
    m.match_sk,
    m.match_id,
    t1.team_sk AS team1_sk,
    t2.team_sk AS team2_sk,
    dd.date_sk AS match_date_sk,
    m.Venue_Name,
    m.City_Name,
    m.Country_Name,
    tw.team_sk AS toss_winner_sk,
    mw.team_sk AS match_winner_sk,
    m.Toss_Winner,
    m.match_winner,
    m.Toss_Name,
    m.Win_Type,
    m.Outcome_Type,
    p.player_sk as Man_of_match_sk,
    m.Win_Margin,
    m.country_id
    FROM match m
    LEFT JOIN team t1 ON m.Team1 = t1.team_id
    LEFT JOIN team t2 ON m.Team2 = t2.team_id
    LEFT JOIN team tw ON m.Toss_Winner = tw.team_id
    LEFT JOIN team mw ON m.match_winner = mw.team_id
    LEFT JOIN player p ON m.ManOfMach = p.player_id
    LEFT JOIN dim_date dd ON m.match_date = dd.match_date

    """
)

dd.show(5)

In [0]:
# Fact Table Ball By Ball 

spark.sql("""
CREATE OR REPLACE TEMP VIEW fact_ball_by_ball AS 
SELECT 
ROW_NUMBER() OVER(ORDER BY b.match_id,b.innings_no,b.over_id,b.ball_id) AS ball_sk,
m.match_sk,
b.innings_no,
b.over_id,
b.ball_id,
sp.player_sk AS striker_sk,
nsp.player_sk AS non_striker_sk,
bp.player_sk AS bowler_sk,
bt.team_sk AS batting_team_sk,
bwt.team_sk AS bowling_team_sk,
b.Bowler_Wicket,
b.runs_scored,
b.extra_runs,
b.wides,
b.legbyes,
b.byes,
b.noballs,
b.penalty,
CASE WHEN b.out_type IS NOT NULL THEN 1 ELSE 0 END AS wicket_flag,
b.out_type,
b.caught,
b.bowled,
b.run_out,
b.lbw,
b.retired_hurt,
b.stumped,
b.caught_and_bowled,
b.hit_wicket,
dd.date_sk AS match_date_sk 
FROM ball_by_ball b
LEFT JOIN match m ON b.match_id = m.match_id
LEFT JOIN player sp ON b.StrikerSK = sp.player_id
LEFT JOIN player nsp ON b.NonStriker_SK = nsp.player_id
LEFT JOIN player bp ON b.BOWLER_SK = bp.player_id
LEFT JOIN team bt ON b.battingTeam_SK = bt.team_sk
LEFT JOIN team bwt ON b.BowlingTeam_SK = bwt.team_sk
LEFT JOIN dim_date dd ON m.match_date = dd.match_date         
          """)

In [0]:


top_scoring_batsman_per_season = spark.sql("""
Select p.Player_Name,p.Season_year,SUM(b.Runs_Scored) as total_runs FROM fact_ball_by_ball b 
join dim_player p ON b.striker_sk = p.player_sk
GROUP BY p.Player_Name,p.Season_year
ORDER BY p.Season_year,total_runs DESC
                                           
                                           """)

top_scoring_batsman_per_season.show(n=5)


In [0]:

economical_bowlers_powerplay = spark.sql("""
SELECT p.player_name,
AVG(b.runs_scored) AS avg_runs_per_ball,
COUNT(b.bowler_wicket) AS total_wickets FROM fact_ball_by_ball b 
JOIN dim_player p ON b.bowler_sk = p.player_sk
Where b.over_id <= 6 
GROUP BY p.player_name
HAVING COUNT(*) > 10
Order BY avg_runs_per_ball,total_wickets DESC
""")
economical_bowlers_powerplay.show(n=5)



In [0]:

toss_impact_on_individual_matches = spark.sql("""
         SELECT m.match_id,m.toss_winner,m.toss_name,m.match_winner,
         CASE When m.toss_winner = m.match_winner THEN "Win" ELSE "Loss" END as Match_OUTCOME
         FROM match m  
         WHERE m.toss_name IS NOT NULL
         ORDER BY m.match_id                                    
           """)

# display(toss_impact_on_individual_matches)


In [0]:

average_runs_in_wins = spark.sql("""
SELECT p.player_name,AVG(b.runs_scored) as avg_runs_in_wins,COUNT(*) as innings_played
from ball_by_ball b
JOIN player_match pm ON b.match_id = pm.match_id AND b.striker = pm.player_id
JOIN player p ON pm.player_id = p.player_id
JOIN match m ON pm.match_id = m.match_id
WHERE m.match_winner = pm.player_team
GROUP BY p.player_name
ORDER BY avg_runs_in_wins DESC                                 
""")

# average_runs_in_wins.show()



In [0]:

import matplotlib.pyplot as plt

economical_bowlers_pd = economical_bowlers_powerplay.toPandas()


# Visualization using MatPlotLib
plt.figure(figsize=(12,8))

# Limiting to top 10 for clarity of the plot 

top_economical_bowlers = economical_bowlers_pd.nsmallest(10,'avg_runs_per_ball')
plt.bar(top_economical_bowlers['player_name'],top_economical_bowlers['avg_runs_per_ball'],color='skyblue')
plt.xlabel('Bowler Name')
plt.ylabel('Average Runs Per Ball')
plt.title('Most Economical Bowlers in Powerplay Overs (Top 10)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [0]:

import seaborn as sns 

# converting a dataframe in to pandas 

toss_impact_pd = toss_impact_on_individual_matches.toPandas()
# Creating a Count Plot to show win / loss after winning toss

# Visualization using MatPlotLib
plt.figure(figsize=(10,6))
sns.countplot(x='toss_winner',hue='Match_OUTCOME',data=toss_impact_pd)
plt.xlabel('Toss Winner')
plt.ylabel('No Of Matches')
plt.title('Impact of Winning Toss on Match Outcome')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [0]:


average_runs_pd = average_runs_in_wins.toPandas()

# using seaboarn to plot every runs in winning matches 
plt.figure(figsize=(12,8))

# Limiting to top 10 for clarity of the plot 

top_scorers = average_runs_pd.nlargest(10,'avg_runs_in_wins')
sns.barplot(x='player_name',y = 'avg_runs_in_wins',data=top_scorers)
plt.title('Average Runs Scored by Batsman in Winning Matches (Top 10 Scorers)')
plt.xlabel('Player Name')
plt.ylabel('Average Runs In Wins')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()




In [0]:


# Excecute sql query

scores_by_venue = spark.sql("""
SELECT venue_name,AVG(total_runs) AS average_score, MAX(total_runs) as Highest_score
FROM (
    SELECT ball_by_ball.match_id,match.venue_name,SUM(runs_scored) as total_runs
    FROM ball_by_ball
    JOIN match ON ball_by_ball.match_id = match.match_id
    GROUP BY ball_by_ball.match_id,match.venue_name 
)
GROUP BY venue_name
ORDER BY average_score DESC
                            """)

scores_by_venue.show()


In [0]:


# Convert to pandas dataframe
scores_by_venue_pd = scores_by_venue.toPandas()

# Visualization using MatPlotLib
plt.figure(figsize=(14,8))
sns.barplot(x='average_score',y = 'venue_name' ,data= scores_by_venue_pd)
plt.title('Distribution of Scores By Venue')
plt.xlabel('Average Score')
plt.ylabel('Venue')

plt.show()


In [0]:

# Excecute SQL query 
dismissal_types = spark.sql("""
SELECT out_type,COUNT(*) AS frequency
FROM ball_by_ball WHERE out_type IS NOT NULL
GROUP BY out_type
Order BY frequency DESC
                            """)



In [0]:

dismissal_types_pd = dismissal_types.toPandas()

# Plot
plt.figure(figsize=(12,6))
sns.barplot(x='frequency',y='out_type',data=dismissal_types_pd,palette='pastel')
plt.title('Most Frequent Dismissal Types')
plt.xlabel('Frequency')
plt.ylabel('Dismissal Type')
plt.show()





In [0]:

# Execute SQL query 

team_toss_win_performance = spark.sql("""
SELECT team1, COUNT(*) AS matches_played,SUM(CASE WHEN toss_winner = match_winner THEN 1 ELSE 0 END ) AS wins_after_toss
FROM match 
where toss_winner = team1
GROUP BY team1
ORDER BY wins_after_toss DESC
""")


In [0]:

team_toss_win_pd = team_toss_win_performance.toPandas()


# Plot
plt.figure(figsize=(12,6))
sns.barplot(x='wins_after_toss',y='team1',data=team_toss_win_pd)
plt.title('Team Performance After Winning Toss')
plt.xlabel('Wins After Winning Toss')
plt.ylabel('Team')
plt.show()


In [0]:
# How Many Matches Were Played In Each IPL Season 
# display(match_df)

matches_played_in_each_ipl_season = spark.sql("""
    SELECT Season_Year,Count(*) as Total_Matches_played_in_each_season FROM match GROUP BY Season_Year ORDER BY Season_Year                                        
                                            """)

matches_played_in_each_ipl_season.show(n=15)

In [0]:
matches_played_in_each_ipl_season_pd = matches_played_in_each_ipl_season.toPandas()


plt.figure(figsize=(12,6))
sns.barplot(x='Season_Year',y='Total_Matches_played_in_each_season',data=matches_played_in_each_ipl_season_pd,color='red')
plt.title('Total Matches Played In Each IPL Season')
plt.xlabel('Season')
plt.ylabel('Total Matches Played')
plt.show()

In [0]:
# Which teams have won the most matches overall?

# team_won_the_most_matches = spark.sql("""
# SELECT COUNT(*) as Wins_total, match_winner FROM match GROUP BY match_winner ORDER BY Wins_total DESC limit 1
#                                       """)

team_won_the_most_matches = match_df.groupBy('match_winner').count().withColumnRenamed('count', 'Wins_total').orderBy('Wins_total', ascending=False).limit(1)

display(team_won_the_most_matches)