## BDCC Assignment: Mini Project -  Indian Premier League (IPL) Data Analysis

#### Team Members

12120067 - Bhargavi Peddapati <br>
12120019 - Sriya Paruchuri <br>
12120045 - Madhab Chakraborty <br>
12120095 - Nagaraj G T <br>
12120087 - Rama Gangadhar Durvasula <br>

### Problem Statement

Selection of cricket players for Indian Premier League (IPL)  by Franchise Owners. 

The selection is not only based purely on past performance numbers but also on the match situations. Data Analytics on how player approached match situations is as well analyzed. Match Situation may be opening innings , middle order , handling spin bowlers, bowling final overs and so on. 

Analytics plays a large role in ,

1. analysis of the players' performance for auction. Before start of IPL player auctions, franchise owners, coaches and management staff rely on analytics to make for informed decision. More applicable to select domestic players.
2. Analysis of team performance based on best combination of individuals based on strengths and weaknesss. This helps both during auction and also during IPL season to improve team's winning strategies.

## Section - Dataset

**Objective :** Select a reasonably sized dataset with at least two files and one file having at least 6 columns and 1 MB in size, define a schema and load the data into the pipeline while ensuring handling of bad records. <br>
**Approach  :** We have searched the dataset in Kaggle site. Defined custom schema and loaded data from dbfs:/FileStore/tables/ipl_datasets and loaded into the pipeline

#### IPL Dataset Description <br>



We have selected Indian Premier League (IPL) dataset from **Kaggle** site. Here, is the link for the site to download the dataset.

- https://github.com/Bhargavi-6/BDCC_Group1_Assignment/blob/a22aa23bda2f908bdadc6aabeda36c4a1dea4b64/datasets/matches.csv
- https://github.com/Bhargavi-6/BDCC_Group1_Assignment/blob/a22aa23bda2f908bdadc6aabeda36c4a1dea4b64/datasets/deliveries.csv <br>

This dataset contains information about the matches and players in the Indian Premier League (IPL) matches between 2008 and 2016. <br>
It has two CSV files, one with data on matches and another with data on players. 
<br>

  - **matches.csv with size 117.1 KB and 18 number of columns**
  - **deliveries.csv with size 15.44 MB and 21 number of columns**
  
##### Description on matches.csv - This dataset contains each match information


- id: A unique identifier for each match played in IPL.
- season: The year in which the IPL tournament was played.
- city: The name of the city where the match was played.
- date: The date on which the match was played.
- team1: The name of the first team playing in the match.
- team2: The name of the second team playing in the match.
- toss_winner: The name of the team that won the toss.
- toss_decision: The decision made by the team that won the toss, whether to bat or field.
- result: The result of the match, whether it was a win, loss or a tie.
- dl_applied: A binary field indicating whether the Duckworth-Lewis (DL) method was applied to the match. The DL method is a mathematical formula used to calculate the target score for the team batting second in a rain-affected match.
- winner: The name of the winning team in the match.
- win_by_runs: The number of runs by which the winning team won the match.
- win_by_wickets: The number of wickets by which the team won.
- player_of_match: The name of the player who played well in the match.
- venue: The name of the stadium where the match was played.
- umpire1: The name of the first umpire who officiated the match.
- umpire2: The name of the second umpire who officiated the match.
- umpire3: The name of the third umpire who officiated the match (if any).


##### Description on deliveries.csv - This dataset contains each ball details for all the seasons matches

- match_id: A unique identifier for each match played in IPL.
- inning: The inning number of the match (1st or 2nd).
- batting_team: The name of the team that is currently batting.
- bowling_team: The name of the team that is currently bowling.
- over: The over number of the match.
- ball: The ball number of the over.
- batsman: The name of the batsman who is currently facing the ball.
- non_striker: The name of the non-striking batsman.
- bowler: The name of the bowler who is currently bowling.
- is_super_over: A binary field indicating whether the current over is a Super Over.
- wide_runs: The number of runs scored on a wide delivery.
- bye_runs: The number of runs scored on a bye.
- legbye_runs: The number of runs scored on a leg bye.
- noball_runs: The number of runs scored on a no-ball delivery.
- penalty_runs: The number of runs scored on a penalty.
- batsman_runs: The number of runs scored by the batsman on this delivery.
- extra_runs: The total number of extra runs scored on this delivery (wide, bye, leg bye, no ball, penalty).
- total_runs: The total number of runs scored on this delivery (batsman runs + extra runs).
- player_dismissed: The name of the batsman who got out on this delivery (if any).
- dismissal_kind: The type of dismissal on this delivery (if any), such as caught, bowled, run out, etc.
- fielder: The name of the fielder who caught or ran out the batsman



#### Loading data onto DBFS

- Go to **Data** on the left menu
- Click on **Create Table**
- Add **ipl_datasets** to /FileStore/tables directory path
- Drag and drop files or Upload the file onto the DBFS location: **/FileStore/tables/tables/ipl_datasets/** under **Files** section.

In [0]:
spark

#### Loading the dataset

- Data is provided in *csv* format. Spark provides *sqlContext.read.csv* to read the csv data and create the dataframe. 
- Spark can automatically infer the schema from the data if *inferSchema* is set to True. But we are defining custom schema.

In [0]:
%fs ls /FileStore/tables/ipl_datasets/

path,name,size,modificationTime
dbfs:/FileStore/tables/ipl_datasets/deliveries.csv,deliveries.csv,15442270,1678101160000
dbfs:/FileStore/tables/ipl_datasets/matches.csv,matches.csv,117096,1678101115000


### Define schema for matches.csv

In [0]:
from pyspark.sql.types import *

# Define schema for matches.csv
matches_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("season", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("date", StringType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("toss_decision", StringType(), True),
    StructField("result", StringType(), True),
    StructField("dl_applied", IntegerType(), True),
    StructField("winner", StringType(), True),
    StructField("win_by_runs", IntegerType(), True),
    StructField("win_by_wickets", IntegerType(), True),
    StructField("player_of_match", StringType(), True),
    StructField("venue", StringType(), True),
    StructField("umpire1", StringType(), True),
    StructField("umpire2", StringType(), True),
    StructField("umpire3", StringType(), True)
])


In [0]:
matches_df = sqlContext.read.csv("dbfs:/FileStore/tables/ipl_datasets/matches.csv", 
                                header = True, 
                                schema = StructType(matches_schema) )
matches_df.cache()
matches_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- season: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- date: string (nullable = true)
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- toss_winner: string (nullable = true)
 |-- toss_decision: string (nullable = true)
 |-- result: string (nullable = true)
 |-- dl_applied: integer (nullable = true)
 |-- winner: string (nullable = true)
 |-- win_by_runs: integer (nullable = true)
 |-- win_by_wickets: integer (nullable = true)
 |-- player_of_match: string (nullable = true)
 |-- venue: string (nullable = true)
 |-- umpire1: string (nullable = true)
 |-- umpire2: string (nullable = true)
 |-- umpire3: string (nullable = true)



In [0]:
matches_df.show(5)

+---+------+---------+----------+--------------------+--------------------+--------------------+-------------+------+----------+--------------------+-----------+--------------+---------------+--------------------+--------------+-------------+-------+
| id|season|     city|      date|               team1|               team2|         toss_winner|toss_decision|result|dl_applied|              winner|win_by_runs|win_by_wickets|player_of_match|               venue|       umpire1|      umpire2|umpire3|
+---+------+---------+----------+--------------------+--------------------+--------------------+-------------+------+----------+--------------------+-----------+--------------+---------------+--------------------+--------------+-------------+-------+
|  1|  2017|Hyderabad|2017-04-05| Sunrisers Hyderabad|Royal Challengers...|Royal Challengers...|        field|normal|         0| Sunrisers Hyderabad|         35|             0|   Yuvraj Singh|Rajiv Gandhi Inte...|   AY Dandekar|     NJ Llong|   nu

#### Memory Consumed to store the matches dataframe

In [0]:
spark._jvm.org.apache.spark.util.SizeEstimator.estimate(matches_df._jdf)

Out[5]: 65461632

#### Partitions the matches dataframe has

The dataframes are internally stores as RDDs and partitions. To find number of partitions.

In [0]:
matches_df.rdd.getNumPartitions()

Out[6]: 1

### Define schema for deliveries.csv

In [0]:
# Define schema for deliveries.csv
deliveries_schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("inning", IntegerType(), True),
    StructField("batting_team", StringType(), True),
    StructField("bowling_team", StringType(), True),
    StructField("over", IntegerType(), True),
    StructField("ball", IntegerType(), True),
    StructField("batsman", StringType(), True),
    StructField("non_striker", StringType(), True),
    StructField("bowler", StringType(), True),
    StructField("is_super_over", IntegerType(), True),
    StructField("wide_runs", IntegerType(), True),
    StructField("bye_runs", IntegerType(), True),
    StructField("legbye_runs", IntegerType(), True),
    StructField("noball_runs", IntegerType(), True),
    StructField("penalty_runs", IntegerType(), True),
    StructField("batsman_runs", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("total_runs", IntegerType(), True),
    StructField("player_dismissed", StringType(), True),
    StructField("dismissal_kind", StringType(), True),
    StructField("fielder", StringType(), True)
])

In [0]:
deliveries_df = sqlContext.read.csv("dbfs:/FileStore/tables/ipl_datasets/deliveries.csv", 
                                header = True, 
                                schema = StructType(deliveries_schema) )
deliveries_df.cache()
deliveries_df.printSchema()

root
 |-- match_id: integer (nullable = true)
 |-- inning: integer (nullable = true)
 |-- batting_team: string (nullable = true)
 |-- bowling_team: string (nullable = true)
 |-- over: integer (nullable = true)
 |-- ball: integer (nullable = true)
 |-- batsman: string (nullable = true)
 |-- non_striker: string (nullable = true)
 |-- bowler: string (nullable = true)
 |-- is_super_over: integer (nullable = true)
 |-- wide_runs: integer (nullable = true)
 |-- bye_runs: integer (nullable = true)
 |-- legbye_runs: integer (nullable = true)
 |-- noball_runs: integer (nullable = true)
 |-- penalty_runs: integer (nullable = true)
 |-- batsman_runs: integer (nullable = true)
 |-- extra_runs: integer (nullable = true)
 |-- total_runs: integer (nullable = true)
 |-- player_dismissed: string (nullable = true)
 |-- dismissal_kind: string (nullable = true)
 |-- fielder: string (nullable = true)



#### Memory Consumed to store the deliveries dataframe

In [0]:
spark._jvm.org.apache.spark.util.SizeEstimator.estimate(deliveries_df._jdf)

Out[9]: 65481632

#### Partitions the deliveries dataframe has

The dataframes are internally stores as RDDs and partitions. To find number of partitions.

In [0]:
deliveries_df.rdd.getNumPartitions()

Out[10]: 4

### Ensure handling of Bad Data

In [0]:
## Understanding Missing Values for Matches Data

{col: matches_df.filter(matches_df[col].isNull()).count() for col in matches_df.columns }

Out[11]: {'id': 0,
 'season': 0,
 'city': 7,
 'date': 0,
 'team1': 0,
 'team2': 0,
 'toss_winner': 0,
 'toss_decision': 0,
 'result': 0,
 'dl_applied': 0,
 'winner': 3,
 'win_by_runs': 0,
 'win_by_wickets': 0,
 'player_of_match': 3,
 'venue': 0,
 'umpire1': 1,
 'umpire2': 1,
 'umpire3': 636}

In [0]:
## 3 Matches where player_of_the_match is null
matches_df[matches_df["player_of_match"].isNull()].show()

+---+------+---------+----------+--------------------+--------------------+--------------------+-------------+---------+----------+------+-----------+--------------+---------------+--------------------+---------------+------------+-------+
| id|season|     city|      date|               team1|               team2|         toss_winner|toss_decision|   result|dl_applied|winner|win_by_runs|win_by_wickets|player_of_match|               venue|        umpire1|     umpire2|umpire3|
+---+------+---------+----------+--------------------+--------------------+--------------------+-------------+---------+----------+------+-----------+--------------+---------------+--------------------+---------------+------------+-------+
|301|  2011|    Delhi|2011-05-21|    Delhi Daredevils|       Pune Warriors|    Delhi Daredevils|          bat|no result|         0|  null|          0|             0|           null|    Feroz Shah Kotla|      SS Hazare|   RJ Tucker|   null|
|546|  2015|Bangalore|2015-04-29|Royal C

**From above it indicates that missing values for player of the match is valid as these matches had no results.**

In [0]:
## Understanding missing values for Deliveries

{col: deliveries_df.filter(deliveries_df[col].isNull()).count() for col in deliveries_df.columns }

Out[13]: {'match_id': 0,
 'inning': 0,
 'batting_team': 0,
 'bowling_team': 0,
 'over': 0,
 'ball': 0,
 'batsman': 0,
 'non_striker': 0,
 'bowler': 0,
 'is_super_over': 0,
 'wide_runs': 0,
 'bye_runs': 0,
 'legbye_runs': 0,
 'noball_runs': 0,
 'penalty_runs': 0,
 'batsman_runs': 0,
 'extra_runs': 0,
 'total_runs': 0,
 'player_dismissed': 143022,
 'dismissal_kind': 143022,
 'fielder': 145091}

In [0]:
## Understanding the fielder 
deliveries_df.count() - (deliveries_df[deliveries_df['dismissal_kind'] == "caught"].count())

Out[14]: 146087

#### Reformat the Colum Types suitable for downstream analysis

In [0]:
display(matches_df)

id,season,city,date,team1,team2,toss_winner,toss_decision,result,dl_applied,winner,win_by_runs,win_by_wickets,player_of_match,venue,umpire1,umpire2,umpire3
1,2017,Hyderabad,2017-04-05,Sunrisers Hyderabad,Royal Challengers Bangalore,Royal Challengers Bangalore,field,normal,0,Sunrisers Hyderabad,35,0,Yuvraj Singh,"Rajiv Gandhi International Stadium, Uppal",AY Dandekar,NJ Llong,
2,2017,Pune,2017-04-06,Mumbai Indians,Rising Pune Supergiant,Rising Pune Supergiant,field,normal,0,Rising Pune Supergiant,0,7,SPD Smith,Maharashtra Cricket Association Stadium,A Nand Kishore,S Ravi,
3,2017,Rajkot,2017-04-07,Gujarat Lions,Kolkata Knight Riders,Kolkata Knight Riders,field,normal,0,Kolkata Knight Riders,0,10,CA Lynn,Saurashtra Cricket Association Stadium,Nitin Menon,CK Nandan,
4,2017,Indore,2017-04-08,Rising Pune Supergiant,Kings XI Punjab,Kings XI Punjab,field,normal,0,Kings XI Punjab,0,6,GJ Maxwell,Holkar Cricket Stadium,AK Chaudhary,C Shamshuddin,
5,2017,Bangalore,2017-04-08,Royal Challengers Bangalore,Delhi Daredevils,Royal Challengers Bangalore,bat,normal,0,Royal Challengers Bangalore,15,0,KM Jadhav,M Chinnaswamy Stadium,,,
6,2017,Hyderabad,2017-04-09,Gujarat Lions,Sunrisers Hyderabad,Sunrisers Hyderabad,field,normal,0,Sunrisers Hyderabad,0,9,Rashid Khan,"Rajiv Gandhi International Stadium, Uppal",A Deshmukh,NJ Llong,
7,2017,Mumbai,2017-04-09,Kolkata Knight Riders,Mumbai Indians,Mumbai Indians,field,normal,0,Mumbai Indians,0,4,N Rana,Wankhede Stadium,Nitin Menon,CK Nandan,
8,2017,Indore,2017-04-10,Royal Challengers Bangalore,Kings XI Punjab,Royal Challengers Bangalore,bat,normal,0,Kings XI Punjab,0,8,AR Patel,Holkar Cricket Stadium,AK Chaudhary,C Shamshuddin,
9,2017,Pune,2017-04-11,Delhi Daredevils,Rising Pune Supergiant,Rising Pune Supergiant,field,normal,0,Delhi Daredevils,97,0,SV Samson,Maharashtra Cricket Association Stadium,AY Dandekar,S Ravi,
10,2017,Mumbai,2017-04-12,Sunrisers Hyderabad,Mumbai Indians,Mumbai Indians,field,normal,0,Mumbai Indians,0,4,JJ Bumrah,Wankhede Stadium,Nitin Menon,CK Nandan,


In [0]:
from pyspark.sql.functions import unix_timestamp, format_number, col, year, month

spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

cleaned_matches_df = (matches_df
                         .withColumn("date",unix_timestamp(col("date"), "dd-MM-yyyy").cast("timestamp")))

cleaned_matches_df.cache()

cleaned_deliveries_df = (deliveries_df
                         .withColumn("batsman_runs", col("batsman_runs").cast("double")))




## Section - Data Lake Creation and Pipeline Creation

### Define the pipeline

#### Ingestion of Data into Staging Zone

#### Ingestion of entire Deliveries data required for pure numbers based performance metric ( Such as Strike Rate, Wickets Per Match)

In [0]:
## Write to particular location

cleaned_deliveries_df.write.format("parquet").saveAsTable("deliveries", path = "/FileStore/tables/DeliveriesNoPartition/", mode='overwrite')
cleaned_matches_df.write.format("parquet").saveAsTable("matches", path = "/FileStore/tables/MatchesNoPartition/", mode='overwrite')

#### Ingestion of Data into Staging Zone by Partition of Match Situations to derive match situation based performance metrics

In [0]:
from pyspark.ml.feature import Bucketizer
bins = [1,5,10,15,20]

bucketizer = Bucketizer(splits=bins ,inputCol="over", outputCol="matchSituations")
df_match_situations = bucketizer.setHandleInvalid("keep").transform(deliveries_df)

df_match_situations.show()


+--------+------+-------------------+--------------------+----+----+------------+------------+-----------+-------------+---------+--------+-----------+-----------+------------+------------+----------+----------+----------------+--------------+-------------+---------------+
|match_id|inning|       batting_team|        bowling_team|over|ball|     batsman| non_striker|     bowler|is_super_over|wide_runs|bye_runs|legbye_runs|noball_runs|penalty_runs|batsman_runs|extra_runs|total_runs|player_dismissed|dismissal_kind|      fielder|matchSituations|
+--------+------+-------------------+--------------------+----+----+------------+------------+-----------+-------------+---------+--------+-----------+-----------+------------+------------+----------+----------+----------------+--------------+-------------+---------------+
|       1|     1|Sunrisers Hyderabad|Royal Challengers...|   1|   1|   DA Warner|    S Dhawan|   TS Mills|            0|        0|       0|          0|          0|           0|  

In [0]:
df_match_situations.write.partitionBy("matchSituations").format("parquet").saveAsTable("MatchSituations", path = "/FileStore/tables/DeliveriesByMatchSituations/",mode='overwrite' )


In [0]:

##dbutils.fs.rm("dbfs:/user/hive/warehouse/matches", True)
##dbutils.fs.rm("dbfs:/user/hive/warehouse/deliveries", True)


In [0]:
from pyspark.sql.functions import sum, count, col, round
bowling_stats_df = (deliveries_df.filter(deliveries_df["dismissal_kind"] != 'run out')
                         .groupBy("bowler")
                         .agg(sum("total_runs").alias("total_runs"),
                              count("match_id").alias("total_matches_played"),
                              count("player_dismissed").alias("total_wickets_taken"))
                         .withColumn("overs_bowled", round(col("total_runs") / col("total_matches_played") / 6, 2))
                         .withColumn("economy", round(col("total_runs") / col("overs_bowled"), 2)))
                         

display(bowling_stats_df)


bowler,total_runs,total_matches_played,total_wickets_taken,overs_bowled,economy
Kuldeep Yadav,1,18,18,0.01,100.0
TM Dilshan,0,5,5,0.0,
M Muralitharan,0,64,64,0.0,
LA Carseldine,0,1,1,0.0,
J Botha,0,25,25,0.0,
DR Smith,0,26,26,0.0,
A Flintoff,0,2,2,0.0,
GR Napier,0,1,1,0.0,
AR Patel,0,58,58,0.0,
B Lee,0,25,25,0.0,


### Store in CURATED ZONE

In [0]:
%sql
DROP DATABASE IF EXISTS curated CASCADE;

CREATE DATABASE curated;
USE curated;

In [0]:
bowling_stats_df.write.saveAsTable("curated.bowling_stats")

In [0]:
%sql
SELECT * FROM curated.bowling_stats;

bowler,total_runs,total_matches_played,total_wickets_taken,overs_bowled,economy
Kuldeep Yadav,1,18,18,0.01,100.0
TM Dilshan,0,5,5,0.0,
M Muralitharan,0,64,64,0.0,
LA Carseldine,0,1,1,0.0,
J Botha,0,25,25,0.0,
DR Smith,0,26,26,0.0,
A Flintoff,0,2,2,0.0,
GR Napier,0,1,1,0.0,
AR Patel,0,58,58,0.0,
B Lee,0,25,25,0.0,


In [0]:
from pyspark.sql.functions import col, round, sum

cleaned_matches_df.write.mode("overwrite").saveAsTable("matches")
cleaned_deliveries_df.write.mode("overwrite").saveAsTable("deliveries")