#Data Analytics of IPL match data - 2017

We will focus more on data engineering part involved in this project, focsuing on the use of S3 buckets, spark, SQL and ML to create an environment for analytics.

_Video referred for this - https://www.youtube.com/watch?v=0iNJPKheQqM&list=PLBJe2dFI4sgvQTNNkI3ETYJgNPR4CBpFd_

_Data Set Referred from - https://data.world/raghu543/ipl-data-till-2017_

**Before we get down to the ingestion part, there is a most important step in connecting the DBs session with AWS. Below I am highlighting the steps which I found on research.**

###Phase 1: AWS IAM role setup
This phase focused on creating an IAM role in AWS that Databricks Unity Catalog could assume to access your S3 data securely.
1. Create an IAM Role with a Temporary Trust Policy: Create a new IAM role in the AWS Console with a custom trust policy. This initial policy included the Databricks Unity Catalog service ARN and a placeholder External ID ("0000") because the role's own ARN was not available yet. The trust policy grants the sts:AssumeRole action under specific conditions. 

2. Attach a Permissions Policy for S3: Attach an S3 permissions policy to the IAM role during creation (I attached complete S3 access policy). This policy grants necessary permissions to your S3 bucket, such as s3:ListBucket and s3:GetObject. 

3. Name and Create the IAM Role: Name the role descriptively and complete its creation. 

###Phase 2: Databricks credential and external location setup
This phase established the connection between Databricks and the created IAM role.
4. Create a Storage Credential in Databricks: In the Databricks Catalog Explorer, create a storage credential (Manual), entering the ARN of the IAM role you created in AWS. Databricks then provided a unique External ID, which I copied.

5. Finalize the IAM Role's Trust Policy in AWS: Return to the AWS Console to edit the trust policy of the IAM role. Update policy to include both the Databricks service ARN and the role's own ARN in the Principal section. Then replace the temporary "0000" placeholder with the External ID obtained from Databricks.

6. Validate the Storage Credential in Databricks: Back in Databricks, validate the storage credential to confirm that Databricks could successfully assume the configured role. This is done by heading to _Catalog in Databricks >> Manage >> Credentials >> click on the storage cred created >> Validate Configuration_

7. Create an External Location in Databricks: Create an external location in Databricks' Catalog Explorer, specifying your S3 path (e.g., s3://your-bucket-name/) and associating it with the validated storage credential. We can test this connection to verify it is working well or not.

###Phase 3: Ingesting data
8. Run Spark Code to Ingest Data: With the setup complete, you were able to execute a Spark command within a Databricks notebook to read the CSV data from your designated S3 path. An example Spark code snippet is provided in the referenced document. 


## DATA INGESTION

- Create a spark session to host the ingestion and data transforms.

In [0]:
#Create a Spark Session

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("IPL Data Analysis").getOrCreate()

In [0]:
#Import data from S3 bucket using spark session created

Ball_By_Ball_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3://ipl-data-analysis-project-deproj/Ball_By_Ball.csv")

- The above schema shows that some data types of the columns are not imported properly. This is common challenge with Spark as it is difficult for it to always infer the exact format from the external file sources to be imported.

- This is why creating your own schemas while ingesting data is important, since this involves steps that make sure the data being updated/imported to a specific column/field inherets that fields data type.

###Enter StructField and StructType - To create a structured schema for a spark table

- We will use the pyspark.sql.types library to define a fixed schema for the target tables where the data will be stored.

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, BooleanType, DecimalType

- Once we run the above code we will then define the structure schema.
- We will be defining a struct schema for ball_by_ball data which we imported from S3 bucket. 
  - Define a dataframe for the schema : ball_by_ball_schema
  - Use the following code for each data type you want to use for each column:
      'StringType([StructField(<column_name>), IntegerType(), True <if you want to allow nulls>])'

- There are many colummns (in ball_by_ball case, 48) and to write a code for each of them will be a task. This is where AI comes into play. 
- Copy the table schema from the online data set data dictionary and paste it in any AI text engine. Prompt the AI engine to create a spark code for creating a structure schema for a table whose data dictionay is shared. Below is my prompt which I will be using for ChatGPT to generate a struct schema for my table: 

  " I am sharing my dataframe in this prompt. I want you to generate PySpark Structure (struct) schema using the schema of my dataframe shared below. "

- Below is the code generated by ChatGPT based on my above prompt and the data dictionary I shared with it. In this way we can reduce the manual task and focus on getting the task done quickly.

In [0]:
'''from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, StringType, BooleanType, DateType
)'''

ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", StringType(), True),
    StructField("team_bowling", StringType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", BooleanType(), True),
    StructField("bowled", BooleanType(), True),
    StructField("run_out", BooleanType(), True),
    StructField("lbw", BooleanType(), True),
    StructField("retired_hurt", BooleanType(), True),
    StructField("stumped", BooleanType(), True),
    StructField("caught_and_bowled", BooleanType(), True),
    StructField("hit_wicket", BooleanType(), True),
    StructField("obstructingfeild", BooleanType(), True),
    StructField("bowler_wicket", BooleanType(), True),
    StructField("match_date", DateType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", BooleanType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", DateType(), True),
])

### Combining all the above steps to get the correct schema type of data saved into our imported table

1. Use the schema code created by AI to create our schema, run it.

2. Change the df code to import data from S3 bucket such that we are importing the S3 data into the schema created.

3. Import the schema data from S3.

In [0]:
#0. Creating a spark session for the applicaiton to run

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("IPL Data Analysis").getOrCreate()

In [0]:
# 1. Use the schema code created by AI to create our schema, run it.

from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, StringType, BooleanType, DateType, DecimalType
)

ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", StringType(), True),
    StructField("team_bowling", StringType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", BooleanType(), True),
    StructField("bowled", BooleanType(), True),
    StructField("run_out", BooleanType(), True),
    StructField("lbw", BooleanType(), True),
    StructField("retired_hurt", BooleanType(), True),
    StructField("stumped", BooleanType(), True),
    StructField("caught_and_bowled", BooleanType(), True),
    StructField("hit_wicket", BooleanType(), True),
    StructField("obstructingfeild", BooleanType(), True),
    StructField("bowler_wicket", BooleanType(), True),
    StructField("match_date", DateType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", BooleanType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", DateType(), True),
])

In [0]:
# 2. Change the df code to import data from S3 bucket such that we are importing the S3 data into the schema created.

'''Ball_By_Ball_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3://ipl-data-analysis-project-deproj/Ball_By_Ball.csv")''' #Initial schema to import external data as it is

#Changed code below
Ball_By_Ball_df = spark.read.schema(ball_by_ball_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project-deproj/Ball_By_Ball.csv")

#3. Run this code block.

- If you compare the dataframe of the above code with the one we created without using schema, you can see Spark has retained the data type we defined for the our schema table here. 

- Spark will automatically apply the data structure to the data being impoted into the schema table. If there is an error, you ll see what that error is and if there i not error your data is imported accurately. This is the power of creating a sturctured schema in Spark. 

###Repeat above process for other 4 datasets which we have to use

Data sets to be used : Match.csv, Player_match.csv, Player.csv and Team.csv

### 2. Data ingestion for Match.csv data

In [0]:
# Match.csv extraction

match_schema = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", DateType(), True),
    StructField("season_year",IntegerType(), True),
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmatch", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True),
])

In [0]:
#Importing match.csv data from S3 bucket

match_df = spark.read.schema(match_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project-deproj/Match.csv")

-
#### --- Player.csv ---
player_schema = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
])

#### --- Player_match.csv ---
##### Note: adjust DecimalType precision/scale for playermatch_key if needed.
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", DecimalType(38, 0), True),
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),      # 'year' → use IntegerType
    StructField("is_manofthematch", BooleanType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", BooleanType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True),
])

#### --- Team.csv ---
team_schema = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), True),
    StructField("team_name", StringType(), True),
])


### 3. Data ingestion for Player_match.csv data

In [0]:
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", DecimalType(38, 0), True),
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),      # 'year' → use IntegerType
    StructField("is_manofthematch", BooleanType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", BooleanType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True),
])

In [0]:
player_match_df = spark.read.schema(player_match_schema).format("csv").option("header", "true").load("s3://ipl-data-analysis-project-deproj/Player_match.csv")

### 4. Data ingestion for Player.csv data

In [0]:
player_schema = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
])

In [0]:
player_df = spark.read.schema(player_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project-deproj/Player.csv")

### 5. Data ingestion for Team.csv data

In [0]:
team_schema = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), True),
    StructField("team_name", StringType(), True),
])

In [0]:
team_df = spark.read.schema(team_schema).format("csv").option("header", "true").load("s3://ipl-data-analysis-project-deproj/Team.csv")

In [0]:
# Let us check the schema tables updated

Ball_By_Ball_df.show(5)
match_df.show(5)
player_match_df.show(5)
player_df.show(5)
team_df.show(5)

##DATA TRANSFORMATIONS

Transforms are crucial from the business point of view as they help in filtering, combining and arranging data as per the business requirements. For this project we will not focus on the business aspect but more on the technicalities involved to get the transform right.

Before we begin the transformations, we need to call (import) some functions. We can do this by calling 'pyspark.sql.functions import *', this function imports all the functions from pyspark sql library.

We can also specify the functions that we need for the task, like: 'pyspark.sql.functions import col, when, sum, avg, row_number'.

Once the functions are imported we will then filter the ball by ball data and remove wides and no balls columns since they are not useful.

###Ball by ball data transforms

In [0]:
from pyspark.sql.functions import col, when, sum, avg, row_number

#Filter ball by ball data and remove wides and noballs
Ball_By_Ball_df = Ball_By_Ball_df.filter((col("wides") == 0) & (col("noballs") == 0))

# In Apache spark, the transformations are lazy, so we need to call the action to execute the transformations. We have few more transforms to apply so we will call the action at the end.

# Let us create a new aggregated df with total and avg runs scored in each match and inning.
total_and_avg_runs = Ball_By_Ball_df.groupBy("match_id", "innings_no").agg(
    sum("runs_scored").alias("total_runs"), 
    avg("runs_scored").alias("average_runs")
)

In [0]:
#Window Functions: Calculate running totals of runs in each match for each over
#For this we need to import window function. Follow: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Window.html

from pyspark.sql.window import Window

#Create a window structure with partition by match_id and innings_no and order by over_id
windowSpec = Window.partitionBy("match_id", "innings_no").orderBy("over_id")

#We will now create a running total transform using the window structure created above

Ball_By_Ball_df = Ball_By_Ball_df.withColumn(
    "running_total_runs", 
    sum("runs_scored").over(windowSpec)
)

#This will add a new column named "running_total_runs" at the end of ball by ball df

In [0]:
#Conditional Formatting - Flag wickets or 6 runs highlighting the high impact balls
#We can also apply conditional formatting to flag different outcomes

#Conditional columns: High impact balls

Ball_By_Ball_df = Ball_By_Ball_df.withColumn(
    "high_impact",
    when((col("runs_scored") + col("extra_runs") > 6) | (col("bowler_wicket") == True), True).otherwise(False)
)

In [0]:
#Action: to complete the alzy transforms we will run the show function to get the transformed columns

Ball_By_Ball_df.show(5)

We can follow similar steps to transform other data tables

### Match data transforms 

In [0]:
from pyspark.sql.functions import year, month, dayofmonth, when, col

# Extract year, month, and day from match_date
match_df = (
    match_df
        .withColumn("year",  year(col("match_date")))
        .withColumn("month", month(col("match_date")))
        .withColumn("day",   dayofmonth(col("match_date")))
)

# Categorize win margins into 'High', 'Medium', and 'Low'
match_df = match_df.withColumn(
    "win_margin_category",
    when(col("win_margin") >= 100, "High")
    .when((col("win_margin") >= 50) & (col("win_margin") < 100), "Medium")
    .otherwise("Low")
)

# Analyze toss impact: who wins the toss and the match
match_df = match_df.withColumn(
    "toss_match_winner",
    when(col("toss_winner") == col("match_winner"), "Yes").otherwise("No")
)

# Show the enhanced DataFrame
match_df.show(2)


###Player data transforms

In [0]:
from pyspark.sql.functions import lower, regexp_replace, when, col

# Normalize and clean player names
player_df = player_df.withColumn(
    "player_name",
    lower(regexp_replace(col("player_name"), r"[^a-zA-Z0-9 ]", ""))
)

# Handle missing values in 'batting_hand' and 'bowling_skill' with a default 'unknown'
player_df = player_df.na.fill({"batting_hand": "unknown", "bowling_skill": "unknown"})

# Categorize players based on batting hand
player_df = player_df.withColumn(
    "batting_style",
    when(lower(col("batting_hand")).contains("left"), "Left-Handed").otherwise("Right-Handed")
)

# Show the modified player DataFrame
player_df.show(2)


###Player Match data Transforms

In [0]:
from pyspark.sql.functions import col, when, current_date, year

# Add a 'veteran_status' column based on player age
player_match_df = player_match_df.withColumn(
    "veteran_status",
    when(col("age_as_on_match") >= 35, "Veteran").otherwise("Non-Veteran")
)

# Dynamic column to calculate years since debut
player_match_df = player_match_df.withColumn(
    "years_since_debut",
    year(current_date()) - col("season_year")
)

# Show the enriched DataFrame
player_match_df.show(2)


##SQL ANALYSIS: Gaining Insights

- From the updated data transform tables we will now create views for SQL analysis. This helps in less memory use and faster querying. 
- We will use the command : createOrReplaceGlobalTempView for all the dataframes we have.
> - Note: For Serverless compute in Databricks, 'createOrReplaceGlobalTempView' is not supported. So we will use 'createOrReplaceTempView' function to create a temporary view for SQL analysis

In [0]:
Ball_By_Ball_df.createOrReplaceTempView("ball_by_ball")
match_df.createOrReplaceTempView("match")
player_match_df.createOrReplaceTempView("player_match")
player_df.createOrReplaceTempView("player")
team_df.createOrReplaceTempView("team")

Let's begin with our SQL analysis.

###SQL View for Top Scoring Batsman per season

In [0]:
top_scoring_batsman_per_season = spark.sql(
    """
    SELECT
        p.player_name,
        m.season_year,
        SUM(b.runs_scored) AS total_runs
    FROM ball_by_ball b
    JOIN match m ON b.match_id = m.match_id
    JOIN player_match pm ON m.match_id = pm.match_id AND b.striker = pm.player_id
    JOIN player p ON p.player_id = pm.player_id
    GROUP BY p.player_name, m.season_year
    ORDER BY m.season_year, total_runs DESC
    """
)

In [0]:
top_scoring_batsman_per_season.show(5)

### SQL View for Economical Bowlers

In [0]:
economical_bowlers_powerplay = spark.sql(
    """
    SELECT 
        p.player_name, 
        ROUND(AVG(b.runs_scored),3) AS average_runs_per_ball,
        COUNT(b.bowler_wicket) AS total_wickets
        FROM ball_by_ball b
        JOIN player_match pm ON b.match_id = pm.match_id AND b.bowler = pm.player_id
        JOIN player p ON pm.player_id = p.player_id
        WHERE b.over_id <= 6
        GROUP BY p.player_name
        HAVING COUNT(*) > 120
        ORDER BY average_runs_per_ball, total_wickets DESC 
    """
)

economical_bowlers_powerplay.show(5)

There are 2 more queries. Check them after for practice.

##DATA VISUALIZATION

We can perform data visualization using python libraries.

###Economical Bowlers Data Viz

In [0]:
# Importing necessary libraries for visualization

import pandas as pd
import matplotlib.pyplot as plt


#Using bowlers spark dataframe for pandas
economocal_bowlers_pd = economical_bowlers_powerplay.toPandas()

#Visualizing using Matplotlib
plt.figure(figsize=(12,8))

#Limiting the records to top 10 for better clarity in the plot
top_economical_bowlers = economocal_bowlers_pd.nsmallest(10, 'average_runs_per_ball')
plt.bar(top_economical_bowlers['player_name'], top_economical_bowlers['average_runs_per_ball'], color = 'skyblue')
plt.xlabel('Bowler name')
plt.ylabel('Average Runs per Ball')
plt.title('Most Economical Bowlers in Powerplay Overs (Top 10)')
plt.xticks(rotation = 45)
plt.tight_layout()
plt.show()