In [12]:
# Importing Libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import math
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.types import *
%matplotlib inline

In [None]:
spark = SparkSession.builder.master("local").appName('Ops').getOrCreate()

In [3]:
path = "E:/Rutgers/Projects/MDSR/IPL-MSDR"

In [4]:
# Reading Data
matches = pd.read_csv(path + '/dataset/original_ipldata/matches.csv')
deliveries = pd.read_csv(path + '/dataset/original_ipldata/deliveries.csv')

In [None]:
# Schema of original data (matches.csv)
matches.info()

In [None]:
# Schema of original data (deliveries.csv)
deliveries.info()

# Data Cleaning

In [5]:
# Dropping columns that are of no use
matches = matches.drop(columns = ['umpire1', 'umpire2','umpire3','date'])

In [6]:
#Filing missing values
matches['winner'].fillna('Draw', inplace=True)
matches['city'].fillna('Dubai',inplace=True)
deliveries = deliveries.fillna(value = 0)

In [7]:
# Fixing error Data
matches = matches.replace('Rising Pune Supergiants', 'Rising Pune Supergiant')

In [None]:
# Schema of cleaned data (matches.csv)
matches.info()

In [None]:
# Schema of cleaned data (deliveries.csv)
deliveries.info()

In [8]:
# Saving cleaned data (matches.csv)
matches.to_csv(path + '/dataset/clean_data/matches.csv')

In [9]:
# Saving cleaned data (deliveries.csv)
deliveries.to_csv(path + '/dataset/clean_data/deliveries.csv')

# Basic Analysis

In [None]:
# Teams playing in the league
teams = matches['team1'].unique()
print("Total number of teams participated so far: " + str(len(matches['team1'].unique())))
print("Teams participated so far: ")
for i in teams:
    print("- " + i)

In [None]:
# Total Venues
print("Number of venues matches were played: " + str(len(matches['venue'].unique())))
for i in matches['venue'].unique():
    print("- " + i)

In [None]:
# Cities the matches were played
print("Number of cities matches were played: " + str(len(matches['city'].unique())))
for i in matches['city'].unique():
    print("- " + i)

In [None]:
# Total number of bowlers so far
print("Total number of bowlers: " + str(len(deliveries['bowler'].unique())))

In [None]:
# Total number of batsmen so far
print("Total number of batsmen: " + str(len(deliveries['batsman'].unique())))

In [None]:
# Total number of participating players
players = set()
for i in range(len(deliveries['match_id'])):
    players.add(deliveries['bowler'][i])
    players.add(deliveries['batsman'][i])
    players.add(deliveries['non_striker'][i])
print("Total number of player: " + str(len(players)))

# Spark Analysis

In [None]:
matches = spark.read.csv(path + '/dataset/clean_data/matches.csv',inferSchema=True,header=True)
deliveries = spark.read.csv(path + '/dataset/clean_data/deliveries.csv',inferSchema=True,header=True)

### Total number of matches per season

In [None]:
matches.registerTempTable('seasons')
seasons = spark.sql('''Select distinct(season),count(*) as total_matches from seasons group by season ''') 
seasons.show()

In [None]:
# Plot
fig, a = plt.subplots()
a = sns.barplot(x ="season", y="total_matches", data=seasons.toPandas(),palette='viridis')
a.set_xlabel('Season')
a.set_ylabel('Total Matches')
a.set_title('Number of matches in each season')

### Number of maches played by each team since season 1

In [None]:
matches.registerTempTable('team')
team = spark.sql('''Select distinct(team), count(*) as total_matches from (Select team1 as team from team UNION ALL (select team2 as team from team)) group by team ''')
team.show()

In [None]:
# Plot
fig, a = plt.subplots(figsize = (5,5))
a = sns.barplot(x ="total_matches", y="team", data=team.toPandas(), palette='viridis')
a.set_ylabel('Team')
a.set_xlabel('Total Matches')
a.set_title('Number of matches played by each team')

### Total season in which teams have played

In [None]:
matches.registerTempTable('team_season')
team_season = spark.sql('''Select team1 as team, min(season) as first_season, max(season) as last_season, count(distinct(season)) as total_seasons from team_season group by team1 order by total_seasons desc''')
team_season.show()

### Total number of matches won by teams

In [None]:
matches.registerTempTable('most_win')
most_win = spark.sql('''Select distinct(winner) as team, count(*) as total_matches from most_win where winner <>'None' group by winner order by total_matches ''')
most_win.show()

In [None]:
# Plot
fig, a = plt.subplots(figsize = (5,5))
a = sns.barplot(x ="total_matches", y="team", data=most_win.toPandas(), palette='viridis')
a.set_ylabel('Team')
a.set_xlabel('Total Matches')
a.set_title('Number of matches won by each team')

### Total matches won by teams in each season

In [None]:
matches.registerTempTable('most_win_by_season')
most_win_by_season = spark.sql('''Select season, winner as team, count(*) as total_matches_won from most_win_by_season where winner <> 'None' group by season, winner order by total_matches_won desc''')
most_win_by_season.show()

### Players with maximum man of the match awards 

In [None]:
matches.registerTempTable('man_match')
man_match = spark.sql('''Select distinct(player_of_match), count(*) as total_matches from man_match group by player_of_match order by total_matches desc limit 10 ''')
man_match.show()

In [None]:
# Plot
fig, a = plt.subplots(figsize = (5,5))
a = sns.barplot(x ="total_matches", y="player_of_match", data=man_match.toPandas(), palette='viridis')
a.set_xlabel('Total Matches')
a.set_ylabel('Player')
a.set_title('Number of times player won man of the match')

### Number of matches per Venue

In [None]:
matches.registerTempTable('venue')
venue = spark.sql('''Select distinct(venue), count(*) as total_matches from venue group by venue''')
venue.show()

In [None]:
# Plot
fig, a = plt.subplots(figsize = (10,20))
a = sns.barplot(x ="total_matches", y="venue", data=venue.toPandas(), palette='viridis')
a.set_ylabel('Venue')
a.set_xlabel('Total Matches')
a.set_title('Number of matches at each venue')

### Percentage toss decisions 

In [None]:
matches.registerTempTable('toss')
toss = spark.sql('''Select distinct(toss_decision), ((count(toss_decision)*100)/ (select count(*) from toss)) as percentage_count from toss group by toss_decision''')
toss.show()

In [None]:
# Plot
fig, a = plt.subplots(figsize = (5,5))
a = sns.barplot(x ="toss_decision", y="percentage_count", data=toss.toPandas(), palette='viridis')
a.set_ylabel('Percentage')
a.set_xlabel('Toss Decision')
a.set_title('Percentage Plot of toss_decision')

### Percentage of team winning the toss as well as the match

In [None]:
matches.registerTempTable('toss_and_won')
matches.registerTempTable('toss_won_data')
toss_won_data = spark.sql('''Select t1.season, t1.total_matches, \
          t2.count_toss_and_won as count_toss_and_won, \
          (t2.count_toss_and_won / t1.total_matches * 100) as percent_toss_and_won from \
          (Select distinct(season),count(*) as total_matches from seasons group by season)t1 \
          left join (Select distinct(season), count(*) as count_toss_and_won from toss_and_won where toss_winner = winner group by season)t2 on t1.season = t2.season order by season''')
toss_won_data.show()

In [None]:
# Plot
fig, a = plt.subplots(figsize = (10,5))
a = sns.barplot(x ="season", y="percent_toss_and_won", data=toss_won_data.toPandas(), palette='viridis')
a.set_ylabel('Percentage')
a.set_xlabel('Season')
a.set_title('Percentage Plot of Season and Toss_and_won')

### Percentage matches won by batting first 

In [None]:
win_batting_first = spark.sql('''Select t1.season, t1.total_matches, \
          t2.win_batting_first as win_batting_first, \
          (t2.win_batting_first/ t1.total_matches * 100) as percent_win_batting_first from \
          (Select distinct(season),count(*) as total_matches from seasons group by season)t1 \
          left join (Select distinct(season), count(*) as win_batting_first from seasons where win_by_runs > 0  group by season)t2 on t1.season = t2.season order by season ''')
win_batting_first.show()

In [None]:
# Plot
fig, a = plt.subplots(figsize = (10,5))
a = sns.barplot(x ="season", y="percent_win_batting_first", data=win_batting_first.toPandas(), palette='viridis')
a.set_ylabel('Percentage')
a.set_xlabel('Season')
a.set_title('Percentage Plot of Season and won by batting')

### Percentage matches won by fielding first

In [None]:
win_bowling_first = spark.sql('''Select t1.season, t1.total_matches, \
          t2.win_bowling_first as win_bowling_first, \
          (t2.win_bowling_first/ t1.total_matches * 100) as percent_win_bowling_first from \
          (Select distinct(season),count(*) as total_matches from seasons group by season)t1 \
          left join (Select distinct(season), count(*) as win_bowling_first from seasons where win_by_wickets > 0  group by season)t2 on t1.season = t2.season order by season ''')
win_bowling_first.show()

In [None]:
# Plot
fig, a = plt.subplots(figsize = (10,5))
a = sns.barplot(x ="season", y="percent_win_bowling_first", data=win_bowling_first.toPandas(), palette='viridis')
a.set_ylabel('Percentage')
a.set_xlabel('Season')
a.set_title('Percentage Plot of Season and won by wickets ')

# Final Analysis

In [None]:
# Reading data
matches = spark.read.csv(path + '/dataset/clean_data/matches.csv',inferSchema=True,header=True)
deliveries = spark.read.csv(path + '/dataset/clean_data/deliveries.csv',inferSchema=True,header=True)

In [None]:
# Creating temporary tables of the data
matches.registerTempTable('matches_db')
deliveries.registerTempTable('deliveries_db')

In [None]:
# Merging both the tables
merged_db = spark.sql('select m.*,d.* from matches_db as m inner join deliveries_db as d on m.id=d.match_id')
merged_db.registerTempTable('analysis_db')

## Batting Metrics 

In [None]:
# nmba: no. of batsmen
# nm: no. of matches played by a batsman
# hha: hard hitting ability
# f: finisher
# fsa: fast scoring ability
# con: consistency
# rbw: running between wickets

In [None]:
# Number of Batsmen
nmba = spark.sql('select count(distinct(batsman)) as No_of_Batsman from analysis_db')
nmba.show()

In [None]:
# Number of Matches played by a batsmen
nm = spark.sql('select batsman, count(distinct(match_id)) as No_of_Matches \
                from analysis_db group by batsman')
nm.registerTempTable('no_of_matches_table')

### Hard Hitting Ability 

In [None]:
# Hard Hitting Ability = (4*Fours + 6*Sixes)/Balls Played by Batsman
hha = spark.sql('select nmt.batsman as Batsman, round(nvl(t4.hard_hitting_ability,0), 5) as \
                Hard_Hitting_Ability from \
                (select t1.batsman, (t1.fours*4 + t2.sixes*6)/t3.balls_played as hard_hitting_ability\
                from (select batsman,count(*) as fours from analysis_db where batsman_runs = 4 group by batsman) t1 \
                inner join  \
                (select batsman,count(*) as sixes from analysis_db where batsman_runs = 6 \
                group by batsman) t2 on t1.batsman=t2.batsman\
                inner join\
                (select batsman,count(*) as balls_played from analysis_db \
                group by batsman) t3 on t3.batsman=t1.batsman) t4 \
                right join \
                no_of_matches_table nmt on t4.batsman = nmt.batsman')
hha.registerTempTable('hard_hitting_ability')

In [None]:
hha = spark.sql('select rank() over (order by Hard_Hitting_Ability desc) as Rank, t1.* \
                  from hard_hitting_ability t1 \
                  inner join \
                  no_of_matches_table t2\
                  on t1.batsman = t2.batsman where no_of_matches>9')
hha.registerTempTable('hard_hitting_ability')
hha.show(10)

In [None]:
hha = spark.sql('select t1.*, round((240-rank)/240, 5) as Points, round((240-rank)*1.25/240, 5) as Weights \
                from hard_hitting_ability t1')
hha.registerTempTable('hard_hitting_ability')
hha.show(10)

### Finisher 

In [None]:
# Finisher = Not Out innings/Total Innings played
f = spark.sql('select t3.batsman as Batsman, round(t3.not_out_innings/t4.total_matches_played, 5) as Finisher from\
              (select t1.batsman, t1.matches_played-t2.number_of_times_out as not_out_innings from \
              (select batsman, count(distinct(match_id)) as matches_played from analysis_db group by batsman) t1\
              inner join \
              (select batsman, count(*) as number_of_times_out from analysis_db where player_dismissed = batsman group by batsman) t2\
              on t1.batsman=t2.batsman) t3\
              inner join\
              (select batsman, count(distinct(match_id)) as total_matches_played \
              from analysis_db group by batsman) t4\
              on t3.batsman = t4.batsman')
f.registerTempTable('finisher')

In [None]:
f = spark.sql('select rank() over (order by finisher desc) as Rank, t1.* \
              from finisher t1 \
              inner join \
              no_of_matches_table t2 \
              on t1.batsman = t2.batsman \
              where no_of_matches>9')
f.registerTempTable('finisher')
f.show(10)

In [None]:
f = spark.sql('select t1.*, round((240-rank)/240, 5) as Points, round((240-rank)*1.25/240, 5) as Weights \
              from finisher t1')
f.registerTempTable('finisher')
f.show(10)

### Fast Scoring Ability

In [None]:
# Fast Scoring Ability = Total Runs/Balls Played by Batsman
fsa = spark.sql('select batsman as Batsman, round(Total_Runs/balls_played, 5) as Fast_Scoring_Ability \
                  from (select batsman,sum(batsman_runs) as Total_Runs, count(*) as balls_played \
                  from analysis_db group by batsman)')
fsa.registerTempTable('fast_scoring_ability')

In [None]:
fsa = spark.sql('select rank() over (order by fast_scoring_ability desc) as Rank, t1.* \
                  from fast_scoring_ability t1 \
                  inner join \
                  no_of_matches_table t2 \
                  on t1.batsman = t2.batsman where no_of_matches>9')
fsa.registerTempTable('fast_scoring_ability')
fsa.show(10)

In [None]:
fsa = spark.sql('select t1.*, round((240-rank)/240, 5) as Points, round((240-rank)*1.25/240, 5) as Weights \
                from fast_scoring_ability t1')
fsa.registerTempTable('fast_scoring_ability')
fsa.show(10)

### Consistency

In [None]:
# Consistency = Total Runs/Number of Times Out
con = spark.sql('select t1.batsman as Batsman, round(t1.Total_runs/t2.no_of_times_dismissed, 5) as Consistency \
                from (select batsman,sum(batsman_runs) as Total_runs \
                from analysis_db group by batsman) t1 \
                inner join \
                (select batsman, count(*) as no_of_times_dismissed \
                from analysis_db where player_dismissed is not null \
                group by batsman) t2 on t1.batsman=t2.batsman')
con.registerTempTable('consistency')

In [None]:
con = spark.sql('select rank() over (order by consistency desc) as Rank, t1.* \
                  from consistency t1 \
                  inner join \
                  no_of_matches_table t2 \
                  on t1.batsman = t2.batsman where no_of_matches>9')
con.registerTempTable('consistency')
con.show(10)

In [None]:
con = spark.sql('select t1.*, round((240-rank)/240, 5) as Points, round((240-rank)/240, 5) as Weights \
                from consistency t1')
con.registerTempTable('consistency')
con.show(10)

###  Running Between Wickets

In [None]:
# Running Between Wickets = (Total Runs – (4*Fours + 6*Sixes))/(Total Balls Played – Boundary Balls)
rbw = spark.sql('select t9.batsman as Batsman, round(nvl(t8.running_between_wickets,0), 5) as Running_Between_Wickets \
                from (select t4.batsman, t4.first_bracket/t7.second_bracket as Running_Between_Wickets \
                from (select t1.batsman, t3.total_runs-(t1.fours*4 + t2.sixes*6) as first_bracket \
                from (select batsman,count(*) as fours from analysis_db where batsman_runs = 4 \
                group by batsman) t1 \
                inner join \
                (select batsman,count(*) as sixes from analysis_db where batsman_runs = 6 group by batsman) t2 \
                on t1.batsman=t2.batsman \
                inner join \
                (select batsman,sum(batsman_runs) as total_runs from analysis_db group by batsman) t3 \
                on t3.batsman=t1.batsman) t4 \
                inner join\
                (select t5.batsman, t5.total_balls_played-t6.boundry_balls as second_bracket from \
                (select batsman, count(*) as total_balls_played from analysis_db group by batsman) t5 \
                inner join \
                (select batsman, count(*) as boundry_balls from analysis_db where batsman_runs=4 or batsman_runs=6 group by batsman) t6\
                on t5.batsman=t6.batsman) t7 \
                on t4.batsman=t7.batsman) t8 \
                right join \
                no_of_matches_table t9 \
                on t8.batsman = t9.batsman')
rbw.registerTempTable('running_between_wickets')

In [None]:
rbw = spark.sql('select rank() over (order by running_between_wickets desc) as Rank, t1.* \
                  from running_between_wickets t1 \
                  inner join \
                  no_of_matches_table t2\
                  on t1.batsman = t2.batsman where no_of_matches>9')
rbw.registerTempTable('running_between_wickets')
rbw.show(10)

In [None]:
rbw = spark.sql('select t1.*, round((240-rank)/240, 5) as Points, round((240-rank)/240, 5) as Weights \
                from running_between_wickets t1')
rbw.registerTempTable('running_between_wickets')
rbw.show(10)

In [None]:
# Table Name for each Metric
# Hard Hitting Ability: hard_hitting_ability
# Finisher: finisher
# Fast Scoring Ability: fast_scoring_ability
# Consistency: consistency
# Running Between Wickets: running_between_wickets

##  Total Batting Weights

In [None]:
total_batting_weight = spark.sql('select hht.Batsman, round((hht.Weights+f.Weights+fsa.Weights+c.Weights+rbw.Weights), 5) as Total_Batting_Weights \
                                 from hard_hitting_ability hht \
                                 inner join finisher f \
                                 on hht.Batsman = f.Batsman \
                                 inner join fast_scoring_ability fsa \
                                 on hht.Batsman = fsa.Batsman \
                                 inner join consistency c \
                                 on hht.Batsman = c.Batsman \
                                 inner join running_between_wickets rbw \
                                 on hht.Batsman = rbw.Batsman \
                                 order by Total_Batting_Weights desc')
total_batting_weight.registerTempTable('total_batting_weight')
total_batting_weight.show(250)

In [None]:
# Dropping intermediate tables
table_names = ['no_of_matches_table', 'hard_hitting_ability', 'finisher', 'fast_scoring_ability', 'consistency', 'running_between_wickets']
for table in table_names:
    cmd = 'drop table if exists {}'.format(table)
    drop = spark.sql(cmd)
check = spark.sql('show tables')
check.show()

## Bowling Metrics 

In [None]:
# nmbo: no. of bowlers
# nmb: no. of matches played by a bowler
# eco: economy
# wta: wicket taking ability
# cons: consistency
# cwta: crucial wicket taking ability
# spi: short performance index

In [None]:
# Number of Bowlers
nmbo = spark.sql('Select count(distinct(bowler)) as No_of_Bowlers from analysis_db')
nmbo.show()

In [None]:
# Number of matches played by a bowler
nmb = spark.sql('select bowler as Bowler, count(distinct(match_id)) as No_of_Matches from analysis_db group by bowler')
nmb.registerTempTable('no_of_matches_bowlers')

### Economy 

In [None]:
# Economy = Runs Scored/(Number of balls bowled by bowler/6)
eco = spark.sql('Select bowler as Bowler, round(runs/overs, 5) as Economy \
                from (Select bowler,round(count(*)/6) \
                as overs,sum(total_runs) as runs \
                from analysis_db \
                group by bowler)')
eco.registerTempTable('economy')

In [None]:
eco = spark.sql('select row_number() over (order by e.Economy asc) as Rank, e.*,n.No_of_Matches \
                from economy e \
                inner join \
                no_of_matches_bowlers n \
                on e.Bowler = n.Bowler where n.No_of_Matches>9')
eco.registerTempTable('economy')
eco.show(10)

In [None]:
eco = spark.sql('select *, round((212 - Rank)/212, 5) as Points, round((212 - Rank)*1.5/212, 5) as Weight from economy')
eco.registerTempTable('economy')
eco.show(10)

### Wicket Taking Ability

In [None]:
# Wicket Taking Ability = Number of balls bowled/Wickets Taken
wta = spark.sql('(Select t1.bowler as Bowler, round(t2.balls/t1.wickets, 5) as Wicket_Taking_Ability from \
                (Select bowler,count(*) as wickets from analysis_db where player_dismissed is not null \
                and (dismissal_kind = \'bowled\' or  dismissal_kind = \'hit wicket\' \
                or  dismissal_kind = \'stumped\' or  dismissal_kind = \'lbw\' \
                or  dismissal_kind = \'caught and bowled\' or  dismissal_kind = \'caught\') \
                group by bowler) t1 \
                inner join \
                (select count(*) as balls,bowler from analysis_db group by bowler)t2 on \
                t1.bowler = t2.bowler)')
wta.registerTempTable('wicket_taking_ability')
wta.show(10)

In [None]:
wta = spark.sql('select row_number() over (order by w.Wicket_Taking_Ability asc) as Rank, w.*, n.No_of_Matches \
                from wicket_taking_ability w \
                inner join \
                no_of_matches_bowlers n on \
                w.Bowler = n.Bowler where n.No_of_Matches > 9')
wta.registerTempTable('wicket_taking_ability')
wta.show(10)

In [None]:
wta = spark.sql('select *,round((212-Rank)/212, 5) as Points,round(1.5*(212-Rank)/212, 5) as Weight \
                from wicket_taking_ability')
wta.registerTempTable('wicket_taking_ability')
wta.show(10)

### Consistency 

In [None]:
# Consistency = Runs Conceded/Wickets Taken
cons = spark.sql('select t1.bowler as Bowler, round(t1.runs/t2.wickets, 5) as Consistency \
                 from (select sum(total_runs) as runs,bowler from analysis_db group by bowler) t1 \
                 inner join \
                 (Select bowler,count(*) as wickets from analysis_db where player_dismissed is not null \
                 and (dismissal_kind = \'bowled\' or  dismissal_kind = \'hit wicket\' \
                 or dismissal_kind = \'stumped\' or  dismissal_kind = \'lbw\' \
                 or dismissal_kind = \'caught and bowled\' or  dismissal_kind = \'caught\') \
                 group by bowler)t2 on t1.bowler = t2.bowler')
cons.registerTempTable('consistency')
cons.show(10)

In [None]:
cons = spark.sql('select row_number() over (order by c.Consistency asc) as Rank, c.*,n.No_of_Matches \
                 from consistency c \
                 inner join \
                 no_of_matches_bowlers n on \
                 c.Bowler = n.Bowler where n.No_of_Matches > 9')
cons.registerTempTable('consistency')
cons.show(10)

In [None]:
cons = spark.sql('select *,round((212-Rank)/212, 5) as Points, round((212-Rank)/212, 5) as Weights from consistency')
cons.registerTempTable('consistency')
cons.show(10)

### Crucial Wicket Taking Ability

In [None]:
# Crucial Wicket Taking Ability = Number of times Four or Five Wickets Taken/Number of Innings Played
cwta = spark.sql('select t2.bowler as Bowler, round(nvl(t1.no_of_4wickets/t2.innings,0), 5) as Crucial_Wicket_Taking_Ablity \
                 from (select bowler,count(*) as no_of_4wickets from (select * from \
                 (select match_id,bowler,count(*) as wickets from analysis_db where player_dismissed \
                 is not null \
                 and (dismissal_kind = \'bowled\' or  dismissal_kind = \'hit wicket\' \
                 or  dismissal_kind = \'stumped\' or  dismissal_kind = \'lbw\' \
                 or  dismissal_kind = \'caught and bowled\' or  dismissal_kind = \'caught\') \
                 group by bowler,match_id ) \
                 where wickets > 3) group by bowler)t1 \
                 right join \
                 (select bowler,count(match_id) as \
                 innings from (select distinct(match_id),bowler from analysis_db) \
                 group by bowler)t2 \
                 on t1.bowler = t2.bowler order by Crucial_Wicket_Taking_Ablity desc')
cwta.registerTempTable('crucial_wicket_taking_ablity')
cwta.show(10)

In [None]:
cwta = spark.sql('select rank() over (order by cw.Crucial_Wicket_Taking_Ablity desc) as Rank, cw.*,n.No_of_Matches \
                 from crucial_wicket_taking_ablity cw \
                 inner join no_of_matches_bowlers n on \
                 cw.Bowler = n.Bowler where n.No_of_Matches > 9')
cwta.registerTempTable('crucial_wicket_taking_ablity')
cwta.show(10)

In [None]:
cwta = spark.sql('select *,round((212-Rank)/212, 5) as Points, round(1.5*(212-Rank)/212, 5) as Weights \
                 from crucial_wicket_taking_ablity')
cwta.registerTempTable('crucial_wicket_taking_ablity')
cwta.show(10)

### Short Performance Index

In [None]:
# Short Performance Index = (Wickets Taken – 4* Number of Times Four Wickets Taken – 5* Number of Times Five Wickets Taken)/(Innings Played – Number of Times Four Wickets or Five Wickets Taken)
spi = spark.sql('select n.bowler as Bowler, round(nvl(t5.Short_Performance_Index,0), 5) as Short_Performance_Index \
                from (select t1.bowler,(t3.wickets - 4*t1.no_of_4wickets - 5*t2.no_of_4wickets)/ \
                (t4.innings - t1.no_of_4wickets - t2.no_of_4wickets) as Short_Performance_Index \
                from (select bowler,count(*) as no_of_4wickets \
                from (select * from (select match_id,bowler,count(*) as wickets from analysis_db where player_dismissed \
                is not null \
                and (dismissal_kind = \'bowled\' or  dismissal_kind = \'hit wicket\' \
                or  dismissal_kind = \'stumped\' or  dismissal_kind = \'lbw\' \
                or  dismissal_kind = \'caught and bowled\' or  dismissal_kind = \'caught\')\
                group by bowler, match_id ) \
                where wickets = 4) group by bowler) t1 \
                inner join \
                (select bowler,count(*) as no_of_4wickets from (select * from \
                (select match_id,bowler,count(*) as wickets from analysis_db where player_dismissed \
                is not null \
                and (dismissal_kind = \'bowled\' or  dismissal_kind = \'hit wicket\' \
                or dismissal_kind = \'stumped\' or  dismissal_kind = \'lbw\' \
                or dismissal_kind = \'caught and bowled\' or  dismissal_kind = \'caught\')\
                group by bowler,match_id ) \
                where wickets = 5) group by bowler) t2 \
                inner join \
                (select bowler,count(*) as wickets from analysis_db where player_dismissed is not null \
                and (dismissal_kind = \'bowled\' or  dismissal_kind = \'hit wicket\' \
                or  dismissal_kind = \'stumped\' or  dismissal_kind = \'lbw\' \
                or  dismissal_kind = \'caught and bowled\' or  dismissal_kind = \'caught\') \
                group by bowler) t3 \
                inner join \
                (select bowler,count(match_id) as \
                innings from (select distinct(match_id),bowler from analysis_db) group by bowler) t4 \
                on t1.bowler = t2.bowler and t1.bowler = t3.bowler and t1.bowler = t4.bowler) t5 \
                right join \
                no_of_matches_bowlers n on t5.Bowler = n.Bowler order by Short_Performance_Index desc')
spi.registerTempTable('short_performance_index')
spi.show(10)

In [None]:
spi = spark.sql('select rank() over (order by sp.Short_Performance_Index desc) as Rank, sp.*, n.No_of_Matches \
                from short_performance_index sp \
                inner join \
                no_of_matches_bowlers n on \
                sp.Bowler = n.Bowler where n.No_of_Matches > 9')
spi.registerTempTable('short_performance_index')
spi.show(10)

In [None]:
spi = spark.sql('select *,round((212-Rank)/212, 5) as Points, round((212-Rank)/212, 5) as Weights \
                from short_performance_index')
spi.registerTempTable('short_performance_index')
spi.show(10)

In [None]:
# Table Name for each Metric
# Economy: economy
# Wicket Taking Ability: wicket_taking_ability
# Consistency: consistency
# Crucial Wicket Taking Ablity: crucial_wicket_taking_ablity
# Short Performance Index: short_performance_index

## Total Bowling Weights

In [None]:
total_bowling_weight = spark.sql('select e.bowler as Bowler, round((e.Weight+wta.Weight+c.Weights+cwta.Weights+spi.Weights), 5) as Total_Bowling_Weights \
                                 from economy e \
                                 inner join wicket_taking_ability wta \
                                 on e.Bowler = wta.Bowler \
                                 inner join consistency c \
                                 on e.Bowler = c.Bowler \
                                 inner join crucial_wicket_taking_ablity cwta \
                                 on e.Bowler = cwta.Bowler \
                                 inner join short_performance_index spi \
                                 on e.Bowler = spi.Bowler \
                                 order by Total_Bowling_Weights desc')
total_bowling_weight.registerTempTable('total_bowling_weight')
total_bowling_weight.show(10)

In [None]:
# Dropping intermediate Tables
table_names = ['no_of_matches_bowlers', 'economy', 'wicket_taking_ability', 'consistency', 'crucial_wicket_taking_ablity', 'short_performance_index']
for table in table_names:
    cmd = 'drop table if exists {}'.format(table)
    drop = spark.sql(cmd)
check = spark.sql('show tables')
check.show()

## Total Weights per Player

In [None]:
total_weight = spark.sql('select *, (coalesce(total_batting_weight, 0) + coalesce(total_bowling_weight, 0)) as Total_Weight\
                          from(select t1.batsman as Player, nvl(t1.Total_Batting_Weights,0) as Total_Batting_Weight, \
                          nvl(t2.Total_Bowling_Weights,0) as Total_Bowling_Weight\
                          from total_batting_weight t1 \
                          full outer join total_bowling_weight t2 \
                          on t1.batsman = t2.bowler) \
                          order by Total_Weight desc')
total_weight.registerTempTable('total_weight')
total_weight.show(10)

In [None]:
# Dropping intermediate tables
table_names = ['total_batting_weight', 'total_bowling_weight']
for table in table_names:
    cmd = 'drop table if exists {}'.format(table)
    drop = spark.sql(cmd)
check = spark.sql('show tables')
check.show()

In [None]:
# Saving the player weight data
total_weight.toPandas().to_csv(path + '/dataset/weights_data/total_weights.csv')

# Model Trainning

In [14]:
# Importing Libraries
import sklearn.preprocessing

ValueError: numpy.ufunc size changed, may indicate binary incompatibility. Expected 124 from C header, got 112 from PyObject

In [None]:
# Reading data
matches = pd.read_csv(path + '/dataset/clean_data/matches.csv')
deliveries = pd.read_csv(path + '/dataset/clean_data/deliveries.csv')

In [None]:
encode = {'team1': {'Mumbai Indians':1,'Kolkata Knight Riders':2,'Royal Challengers Bangalore':3,
                             'Deccan Chargers':4,'Chennai Super Kings':5,'Rajasthan Royals':6,'Delhi Daredevils':7,
                             'Gujarat Lions':8,'Kings XI Punjab':9,'Sunrisers Hyderabad':10,'Rising Pune Supergiant':11,
                             'Kochi Tuskers Kerala':12,'Pune Warriors':13, 'Delhi Capitals':14},
                   'team2': {'Mumbai Indians':1,'Kolkata Knight Riders':2,'Royal Challengers Bangalore':3,
                             'Deccan Chargers':4,'Chennai Super Kings':5,'Rajasthan Royals':6,'Delhi Daredevils':7,
                             'Gujarat Lions':8,'Kings XI Punjab':9,'Sunrisers Hyderabad':10,'Rising Pune Supergiant':11,
                             'Kochi Tuskers Kerala':12,'Pune Warriors':13, 'Delhi Capitals':14},
                   'toss_winner': {'Mumbai Indians':1,'Kolkata Knight Riders':2,'Royal Challengers Bangalore':3,
                                   'Deccan Chargers':4,'Chennai Super Kings':5,'Rajasthan Royals':6,'Delhi Daredevils':7,
                                   'Gujarat Lions':8,'Kings XI Punjab':9,'Sunrisers Hyderabad':10,'Rising Pune Supergiant':11,
                                   'Kochi Tuskers Kerala':12,'Pune Warriors':13, 'Delhi Capitals':14},
                   'winner': {'Mumbai Indians':1,'Kolkata Knight Riders':2,'Royal Challengers Bangalore':3,'Deccan Chargers':4,
                              'Chennai Super Kings':5,'Rajasthan Royals':6,'Delhi Daredevils':7,'Gujarat Lions':8,
                              'Kings XI Punjab':9,'Sunrisers Hyderabad':10,'Rising Pune Supergiant':11,'Kochi Tuskers Kerala':12,
                              'Pune Warriors':13, 'Delhi Capitals':14, 'Draw':15}}
matches.replace(encode, inplace=True)

In [None]:
# Checking the encoding result
matches.head(2)

In [None]:
matches = matches[['team1','team2','city','toss_decision','toss_winner','venue','winner','season']]
df = pd.DataFrame(matches)