In [None]:
# The below libraries and functions allow us to define our own schema
from pyspark.sql.types import StructField, StructType,IntegerType,StringType,DateType,DecimalType,BooleanType
from pyspark.sql.functions import col, when, sum, avg, row_number
from pyspark.sql.window import Window

In [None]:
from pyspark.sql import SparkSession


# Craeting the Spark Session
spark = SparkSession.builder.appName('ipl-data-analysis').getOrCreate()

In [None]:
# Reading the Data from the AWS S3 bucket

# First Dataset out of 5 

# The option keyword here allows us to change the dataset or infer things while reading them (examples below)
# Option 1: makes the first row headers
# Option 2: infers the schema of the data from the dataset this will help correctly classify the type of data in each column

ball_df = spark.read.format('csv').option('header','true').option('inferSchema').load('s3://ipl-data-analysis-project/Ball_By_Ball.csv')

# HOWEVER, it is best practice to create your OWN schema because inferring could be wrong

# DO NOT RUN the above line with the below lines. I did this to remind me of Spark terminology

# This way we define each column however we deem the data to be.

ball_schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", StringType(), True),
    StructField("team_bowling", StringType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", BooleanType(), True),
    StructField("bowled", BooleanType(), True),
    StructField("run_out", BooleanType(), True),
    StructField("lbw", BooleanType(), True),
    StructField("retired_hurt", BooleanType(), True),
    StructField("stumped", BooleanType(), True),
    StructField("caught_and_bowled", BooleanType(), True),
    StructField("hit_wicket", BooleanType(), True),
    StructField("obstructingfeild", BooleanType(), True),
    StructField("bowler_wicket", BooleanType(), True),
    StructField("match_date", DateType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", BooleanType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", DateType(), True)
])

# Now we can run the same line intially when creating the df but with a small twist as we already defined the schema above:

ball_df = spark.read.format('csv').schema(ball_schema).option('header', 'true').load('s3://ipl-data-analysis-project/Ball_By_Ball.csv')

In [None]:
# Now I want to do the same thing for each csv file stored in the AWS S3 bucket

# Match Data
match_schema = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", DateType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True)
])

match_df = spark.read.schema(match_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Match.csv")

#Player Data
player_schema = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True)
])

player_df = spark.read.schema(player_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Player.csv")

#Player-Match Data
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", DecimalType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("is_manofthematch", BooleanType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", BooleanType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True)
])

player_match_df = spark.read.schema(player_match_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Player_match.csv")

team_schema = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), True),
    StructField("team_name", StringType(), True)
])

team_df = spark.read.schema(team_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Team.csv")



In [None]:
# Transformation Spark! 
# Spark runs on the lazy principle, meaning the outcome of each line below will not immediately show, instead it will wait until all transformation code is written and run it then efficiently.

# Example: Let's filter to only show valid deliviries in Cricket Matches (no noballs and no wides)
ball_df = ball_df.filter((col('wides')== 0) & (col('noballs') == 0))

# Example: Total and Average Runs Scored in Each Match and Inning. 
total_and_avg_runs  = ball_df.groupBy('matchid','innings_no').agg(
    sum('runs_scored').alias('total_runs'),
    avg('runs_scored').alias('avg_runs')
)


In [None]:
# SQL Window Function Example
windowSpec = Window.partitionBy('matchid','innings_no').orderBy('over_id')

ball_df - ball_df.withColumn(
    'running_total_runs',
    sum('runs_scored').over(windowSpec)
)

In [None]:
# Example: Conditional Formatting : flag for high impact balls (wicket OR more than 6 runs including extras)

# Explanation: this function will check when columns runs scored and extra runs are bigger than 6 or if there was a wicket THEN return value of TRUE otherwise return FALSE
ball_df = ball_df.withColumn(
    'high_impact',
    when((col('runs_scored') + col('extra_runs') > 6) | (col('bowler_wicket') == True), True).otherwise(False)
)

In [None]:
from pysparl.sql.functions import year,month,dayofmonth,when

# Time Analysis - Splitting the Match Date to Year, Month, and Day of Month

match_df = match_df.withColumn('year', year('match_date'))
match_df = match_df.withColumn('month', month('match_date'))
match_df = match_df.withColumn('day', dayofmonth('match_date'))

# Example Analysis: Identifying Matches with a High, Medium, and Low Margin Wins. 

match_df.match_df.withColumn(
    'win_margin_type',
    when(col('win_margin') >= 100, 'High')
    .when((col('win_margin') >= 50) & (col('win_margin') < 100), 'Medium')
    .otherwise('Low')
)

# Analyze Teams's wins based on the Toss.

match_df = match_df.withColumn(
    'toss_match_winner',
    when(col('toss_winner') == col('match_winner'), 'Yes').otherwise('No')
)

In [None]:
from pyspark.sql.functions import lower, regexp_replace

# Normalizing the Data for the Player Names, removing all unnecessasry symbols, signs and more

player_df = player.df.withColumn('player_name', lower(regexp_replace('player_name ', '[^a-zA-Z0-9]', "")))

# Any Missing Values for 'batting_hand' and 'bowling_skill' to fill with 'unknown'

player_df = player_df.na.fill({'batting_hand': 'unknown', 'bowling_skill': 'unknown'})

# Separting players with left-handed batting style from those that are right-handed

player_df = player_df.withColumn(
    'batting_style',
    when(col('batting_hand').contains('Left'), 'Left-handed')
    .when(col('batting_hand').contains('Right'), 'Right-handed')
    .otherwise('unknown')
)



In [None]:
from pyspark.sql.functions import col, when, current_date, expr

# Assign a veteran status to players aged 35+ 

player_match_df = player_match_df.withColumn(
    'veteran_status',
    when(col('age_as_on_match') >= 35, 'Veteran').otherwise('Non-Veteran')
)

# Filter the dataset to only include players that batted (played the game)

player_match_df = player_match_df.filter(col('batting_status') != 'Did Not Bat')

# Column to Calculate the Player year count since their initial debut 

player_match_df = player_match_df.withColumn(
    'years_since_debut',
    (year(current_date()) - col('season_year'))
)

In [None]:
# Creating Temporary SQL tables
ball_df.createOrReplaceTempView('ball_table')
match_df.createOrReplaceTempView("match")
player_df.createOrReplaceTempView("player")
player_match_df.createOrReplaceTempView("player_match")
team_df.createOrReplaceTempView("team")

In [None]:
# Write Actual SQL queries in Pyspark 

top_scoring_batsmen_per_season = spark.sql("""
SELECT 
p.player_name,
m.season_year,
SUM(b.runs_scored) AS total_runs 

FROM 
    ball_by_ball b

JOIN 
    match m ON b.match_id = m.match_id   

JOIN 
    player_match pm ON m.match_id = pm.match_id AND b.striker = pm.player_id     

JOIN 
    player p ON p.player_id = pm.player_id

GROUP BY 
    p.player_name, m.season_year

ORDER BY 
    m.season_year, total_runs DESC

""")

In [None]:
# Another Actual SQL query

economical_bowlers_powerplay = spark.sql("""
SELECT 
p.player_name, 
AVG(b.runs_scored) AS avg_runs_per_ball, 
COUNT(b.bowler_wicket) AS total_wickets

FROM 
    ball_by_ball b
JOIN 
    player_match pm ON b.match_id = pm.match_id AND b.bowler = pm.player_id
JOIN 
    player p ON pm.player_id = p.player_id
WHERE 
    b.over_id <= 6
GROUP BY 
    p.player_name
HAVING 
    COUNT(*) >= 1
ORDER BY 
    avg_runs_per_ball, total_wickets DESC
""")

In [None]:
cointoss_impact_matches = spark.sql("""
SELECT m.match_id, m.toss_winner, m.toss_name, m.match_winner,
       CASE WHEN m.toss_winner = m.match_winner THEN 'Won' ELSE 'Lost' END AS match_outcome
FROM 
    match m
WHERE 
    m.toss_name IS NOT NULL
ORDER BY 
    m.match_id
""")

In [None]:
average_runs_in_wins = spark.sql("""
SELECT p.player_name, AVG(b.runs_scored) AS avg_runs_in_wins, COUNT(*) AS innings_played

FROM 
    ball_by_ball b
JOIN 
    player_match pm ON b.match_id = pm.match_id AND b.striker = pm.player_id
JOIN 
    player p ON pm.player_id = p.player_id
JOIN 
    match m ON pm.match_id = m.match_id
WHERE 
    m.match_winner = pm.player_team
GROUP BY 
    p.player_name
ORDER BY 
    avg_runs_in_wins ASC
""")

In [None]:
# An Example of A Simple Visualization Using matplot 

import matplotlib.pyplot as plt

In [None]:
# Assuming 'economical_bowlers_powerplay' is already executed and available as a Spark DataFrame
economical_bowlers_pd = economical_bowlers_powerplay.toPandas()

# Visualizing using Matplotlib
plt.figure(figsize=(12, 8))
# Limiting to top 10 for clarity in the plot
top_economical_bowlers = economical_bowlers_pd.nsmallest(10, 'avg_runs_per_ball')
plt.bar(top_economical_bowlers['player_name'], top_economical_bowlers['avg_runs_per_ball'], color='skyblue')
plt.xlabel('Bowler Name')
plt.ylabel('Average Runs per Ball')
plt.title('Most Economical Bowlers in Powerplay Overs (Top 10)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()