In [0]:
spark

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, BooleanType
from pyspark.sql.functions import col, count, sum, avg, max, min, countDistinct, mean, stddev, stddev_pop, skewness, kurtosis, corr, covar_pop, covar_samp, approx_count_distinct, collect_list, collect_set, first, last, array_contains, array_sort, array_agg, array_distinct, array_except, array_intersect, array_remove, array_union,arrays_overlap, row_number, dense_rank, rank, lag, lead, window
from pyspark.sql.window import Window
from pyspark.sql.functions import when

In [0]:
from pyspark.sql import SparkSession
#create session
spark = SparkSession.builder.appName("IPL Data Analysis").getOrCreate()

In [0]:
spark

In [0]:
ball_by_ball_schema=StructType([StructField("MatcH_id",IntegerType(),True),
                                StructField("Over_id",IntegerType(),True),
                                StructField("Ball_id",IntegerType(),True),
                                StructField("Innings_No",IntegerType(),True),
                                StructField("Team_Batting",StringType(),True),
                                StructField("Team_Bowling",StringType(),True),
                                StructField("Batsman",StringType(),True),
                                StructField("Bowler",StringType(),True),
                                StructField("Striker_Batting_Position",IntegerType(),True),
                                StructField("Extra_Type",StringType(),True),
                                StructField("Runs_Scored",IntegerType(),True),
                                StructField("Extra_runs",IntegerType(),True),
                                StructField("Wides",IntegerType(),True),
                                StructField("Legbyes",IntegerType(),True),
                                StructField("Byes",IntegerType(),True),
                                StructField("Noballs",IntegerType(),True),
                                StructField("Penalty",IntegerType(),True),
                                StructField("Wicket_Type",StringType(),True),
                                StructField("Caught",BooleanType(),True),
                                StructField("Bowled",BooleanType(),True),
                                StructField("Run_out",BooleanType(),True),
                                StructField("Hit_wicket",IntegerType(),True),
                                StructField("Stumped",IntegerType(),True),
                                StructField("Fielder",StringType(),True),
                                StructField("Caught_and_bowled",IntegerType(),True),
                                StructField("LBW",BooleanType(),True),
                                StructField("Retired_hurt",BooleanType(),True),
                                StructField("ObstructingFeild",IntegerType(),True),
                                StructField("Bowler_Wicket",IntegerType(),True),
                                StructField("Match_Date",DateType(),True),
                                StructField("Season",IntegerType(),True),
                                StructField("Striker",IntegerType(),True),
                                StructField("Non_Striker",IntegerType(),True),
                                StructField("Player_Out",IntegerType(),True),
                                StructField("Fielders",IntegerType(),True),
                                StructField("Striker_match_SK",IntegerType(),True),
                                StructField("StrikerSK",IntegerType(),True),
                                StructField("NonStriker_match_SK",IntegerType(),True),
                                StructField("NONStriker_SK",IntegerType(),True),
                                StructField("Fielder_match_SK",IntegerType(),True),
                                StructField("Fielder_SK",IntegerType(),True),
                                StructField("Bowler_match_SK",IntegerType(),True),
                                StructField("BOWLER_SK",IntegerType(),True),
                                StructField("PlayerOut_match_SK",IntegerType(),True),
                                StructField("BattingTeam_SK",IntegerType(),True),
                                StructField("BowlingTeam_SK",IntegerType(),True),
                                StructField("Keeper_Catch",IntegerType(),True),
                                StructField("Player_out_sk",IntegerType(),True),
                                StructField("MatchDateSK",IntegerType(),True)])


In [0]:
ball_by_ball_df = spark.read.schema(ball_by_ball_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Ball_By_Ball.csv")

In [0]:
match_schema = StructType([
    StructField("Match_id", IntegerType(), True),
    StructField("Team1", StringType(), True),
    StructField("Team2", StringType(), True),
    StructField("Team1_SK", IntegerType(), True),
    StructField("Team2_SK", IntegerType(), True),
    StructField("Winner", StringType(), True),
    StructField("Winner_SK", IntegerType(), True),
    StructField("Tournament", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Venue", StringType(), True),
    StructField("Venue_SK", IntegerType(), True),
    StructField("Match_Date", DateType(), True),
    StructField("Season", IntegerType(), True),
    StructField("MatchDateSK", IntegerType(), True)
])

match_df = spark.read.schema(match_schema).format("csv").option(
    "header", "true"
).load("s3://ipl-data-analysis-project/Match.csv")

display(match_df)


In [0]:
player_match_schema=StructType([
    StructField("Player_SK",IntegerType(),True),
    StructField("Player_Name",StringType(),True),
    StructField("Team_SK",IntegerType(),True),
    StructField("Team_Name",StringType(),True),
    StructField("Player_Match_SK",IntegerType(),True)
])
player_match_df=spark.read.schema(player_match_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Player_Match.csv")
player_schema=StructType([
    StructField("Player_SK",IntegerType(),True),
    StructField("Player_Name",StringType(),True)
])
player_df=spark.read.schema(player_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Player.csv")


In [0]:
team_schema=StructType([
    StructField("Team_SK",IntegerType(),True),
    StructField("Team_Name",StringType(),True),
    StructField("Team_Match_SK",IntegerType(),True)
])
team_df=spark.read.schema(team_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Team.csv")

In [0]:
#filter to include only valid deliveries (exclude wides and no balls)
from pyspark.sql.functions import col
ball_by_ball_df = ball_by_ball_df.filter((col("Wides") == 0) & (col("Noballs") == 0))

In [0]:
#agg:calculate the total and avg runs scored in each match and inning
total_and_avg_runs = ball_by_ball_df.groupBy("MatcH_id", "Innings_No").agg(
    sum("Runs_Scored").alias("total_runs"),
    avg("Runs_Scored").alias("avg_runs")
)

In [0]:
#window function:calculate running total of runs in each match for each over
windowSpec = Window.partitionBy("MatcH_id").orderBy("Over_id")
#to add new column
ball_by_ball_df = ball_by_ball_df.withColumn("running_total", sum("Runs_Scored").over(windowSpec))

In [0]:
#conditional column:flag for high impact balls(either a wicket or more than 6 runs including extras)

ball_by_ball_df = ball_by_ball_df.withColumn(
    "High_Impact_Ball",
    when(
        (col("Runs_Scored") + col("Extra_runs") > 6) | (col("Bowler_Wicket") == 1),
        True
    ).otherwise(False)
)


In [0]:
from pyspark.sql.functions import (
    to_date,
    year,
    month,
    dayofmonth,
    when,
    col
)

match_df = spark.read.format("csv").option("header", True).load(
    "s3://ipl-data-analysis-project/Match.csv"
)

# Parse Match_Date using try_to_date to handle malformed values
match_df = match_df.withColumn(
    "Match_Date_Parsed",
    to_date(col("Match_Date"), "M/d/yyyy")
)

match_df = match_df.withColumn(
    "Year",
    year(col("Match_Date_Parsed"))
)
match_df = match_df.withColumn(
    "Month",
    month(col("Match_Date_Parsed"))
)
match_df = match_df.withColumn(
    "Day",
    dayofmonth(col("Match_Date_Parsed"))
)

match_df = match_df.withColumn(
    "Win_Margin",
    when(
        col("Win_Type") == "Runs",
        "High"
    ).when(
        col("Win_Type") == "Wickets",
        "Medium"
    ).otherwise("Low")
)

match_df = match_df.withColumn(
    "Toss_Winner_Impact",
    when(
        col("Toss_Winner") == col("Team1"),
        "Yes"
    ).otherwise("No")
)

display(match_df)

In [0]:
from pyspark.sql.functions import (
    to_date,
    year,
    month,
    dayofmonth,
    when,
    col,
    expr
)

# Parse Match_Date to date type using the correct format
match_df = match_df.withColumn(
    "Match_Date_Parsed",
    to_date(col("Match_Date"), "M/d/yyyy")
)

# Extract year, month, and day from the parsed date
match_df = match_df.withColumn(
    "Year",
    year(col("Match_Date_Parsed"))
)
match_df = match_df.withColumn(
    "Month",
    month(col("Match_Date_Parsed"))
)
match_df = match_df.withColumn(
    "Day",
    dayofmonth(col("Match_Date_Parsed"))
)

# Cast Win_margin to integer safely using try_cast
match_df = match_df.withColumn(
    "Win_margin_int",
    expr("try_cast(Win_margin as int)")
)

# Categorize Win_margin_int into High, Medium, Low
match_df = match_df.withColumn(
    "Win_Margin_Category",
    when(
        col("Win_margin_int") >= 100,
        "High"
    ).when(
        (col("Win_margin_int") >= 50) & (col("Win_margin_int") < 100),
        "Medium"
    ).otherwise("Low")
)

# Analyze the impact of the toss
match_df = match_df.withColumn(
    "Toss_Winner_Impact",
    when(
        col("Toss_Winner") == col("match_winner"),
        "Yes"
    ).otherwise("No")
)

display(match_df)

In [0]:
# Load the player data from a CSV file
player_df = spark.read.format("csv").option("header", True).load("s3://ipl-data-analysis-project/Player.csv")

from pyspark.sql.functions import (
    lower,
    regexp_replace,
    col,
    when
)

# Normalize and clean player names
player_df = player_df.withColumn(
    "Player_Name",
    lower(
        regexp_replace(
            col("Player_Name"),
            "[^a-zA-Z ]",
            ""
        )
    )
)

# Handle missing values in 'batting_hand' and 'bowling_skill' with default 'unknown'
player_df = player_df.fillna(
    {
        "batting_hand": "unknown",
        "bowling_skill": "unknown"
    }
)

# Categorize players based on batting hand
player_df = player_df.withColumn(
    "Batting_Style",
    when(
        col("batting_hand").contains("left"),
        "Left-Handed"
    ).otherwise("Right-Handed")
)

display(player_df)

In [0]:
from pyspark.sql.functions import col, when, current_date, expr, year

# Example: Use 'Player_Id' as a placeholder for age_as_on_match logic
# Replace 'Player_Id' with the correct column if you have an age column
player_match_df = player_df.withColumn(
    "veteran_status",
    when(col("Player_Id") >= 35, "veteran").otherwise("non-veteran")
)

# If 'season_year' does not exist, replace it with the correct year column
player_match_df = player_match_df.withColumn(
    "years_since_debut",
    year(current_date()) - col("Player_Id")
)

display(player_match_df)

In [0]:
player_df = spark.read.table("player")
player_match_df = spark.read.table("player_match")
ball_by_ball_df = spark.read.table("ball_by_ball")
team_df = spark.read.table("team")




In [0]:
# Load match data
match_df = spark.read.format("csv").option("header", True).load("s3://ipl-data-analysis-project/Match.csv")
match_df.createOrReplaceTempView("match")

# Load team data
team_df = spark.read.format("csv").option("header", True).load("s3://ipl-data-analysis-project/Team.csv")
team_df.createOrReplaceTempView("team")

In [0]:
# Register DataFrames as temp views

team_df.createOrReplaceTempView("team")
match_df.createOrReplaceTempView("match")

# Example SQL query: total runs by batsman per team per match
result_df = spark.sql("""
SELECT
  t.Team_Name,
  COUNT(m.Match_SK) AS matches_played
FROM
  team t
JOIN
  match m
ON
  t.Team_Id = m.Team1 OR t.Team_Id = m.Team2
GROUP BY
  t.Team_Name
ORDER BY
  matches_played DESC
""")
display(result_df)