<a href="https://colab.research.google.com/github/puneetpushkar/IPL-Data-Analysis-with-PySpark/blob/main/IPL_Data_Analysis_with_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Section 01: Environment Setup

In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [None]:
import findspark
findspark.init()
findspark.add_packages('mysql:mysql-connector-java:8.0.11')

In [None]:
import pyspark.sql
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
spark

In [None]:
ball_by_ball_df = spark.read.format("csv") \
    .option("header", True) \
    .load("/content/ipl_ball_by_ball.csv")

ipl_venue_df= spark.read.format("csv") \
    .option("header", True) \
    .load("/content/ipl_venue.csv")

ipl_matches_df = spark.read.format("csv") \
    .option("header", True) \
    .load("ipl_matches.csv")

In [None]:
ball_by_ball_df.printSchema()

In [None]:
ipl_venue_df.printSchema()

In [None]:
ipl_matches_df.printSchema()

In [None]:
# Creating table from dataframe

ipl_venue_df.createOrReplaceTempView("ipl_venue")
spark.sql("select * from ipl_venue")
ipl_matches_df.createOrReplaceTempView("ipl_matches")
spark.sql("select * from ipl_matches")
ball_by_ball_df.createOrReplaceTempView("ipl_ball_by_ball")
spark.sql("select * from ipl_ball_by_ball")

### Section 02: Analysis

##### Question A:
Find the top 3 venues which hosted the most number of eliminator
matches?

In [None]:
spark.sql(
"""
select v.venue, count(v.venue_id) venue_count from ipl_matches m 
join ipl_venue v on m.venue_id=v.venue_id
where m.eliminator = 'Y'
group by v.venue
order by venue_count desc
limit 3;
""").show()

##### Question B: 
Return most number of catches taken by a player in IPL history?

In [None]:
spark.sql(
"""
select fielder, count(dismissal_kind) NoOfCatches from ipl_ball_by_ball
where dismissal_kind = "caught"
group by fielder
order by NoOfCatches desc
limit 1;
""").show()

##### Question C:
Write a query to return a report for highest wicket taker in matches
which were affected by Duckworth-Lewis’s method (D/L method).

In [None]:
spark.sql(
"""
select m.match_id, bbb.bowler, count(bbb.bowler) NoOfWickets from ipl_ball_by_ball bbb
join ipl_matches m on m.match_id = bbb.match_id
where m.method = "D/L" and is_wicket = 1
group by m.match_id, bbb.bowler
order by NoOfWickets desc;
""").show()

##### Question D: 
Write a query to return a report for highest strike rate by a batsman in non
powerplay overs(7-20 overs)

In [None]:
spark.sql(
"""
select batsman, (sum(batsman_runs)/count(ball)*100) as strikeRate from ipl_ball_by_ball
where (extras_type not in ("wides","noballs")) 
and (overs >= 7 and overs <= 20)
group by batsman
order by strikeRate desc;
""").show()

##### Question E.
Write a query to return a report for highest extra runs in a venue (stadium, city).

In [None]:
spark.sql(
"""
select v.venue, v.city, sum(bbb.extra_runs) extra_runs from ipl_ball_by_ball bbb
inner join ipl_matches as m on bbb.match_id=m.match_id
inner join ipl_venue as v on m.venue_id =v.venue_id
group by venue, v.city
order by extra_runs desc;
""").show()

##### Question F: 
Write a query to return a report for the cricketers with the most number of players of
the match award in neutral venues

In [None]:
spark.sql(
"""
select m.player_of_match, count(m.player_of_match) Count_Of_Player_Of_Match from ipl_matches m
join ipl_venue v on m.venue_id = v.venue_id
where neutral_venue = 1
group by m.player_of_match
order by count(m.player_of_match) desc;
""").show()

##### Question G: 
Write a query to get a list of top 10 players with the highest batting average

In [None]:
spark.sql(
"""
select Batsman_, sum(batsman_runs)/count(player_dismissed) as Average 
from
(
 (select batsman as Batsman_,batsman_runs,player_dismissed from IPL_BALL_BY_BALL where player_dismissed != 'NA')
)
group by Batsman_ 
order by Average desc;
""").show()

##### Question H:
Write a query to find out who has officiated (as an umpire) the most
number of matches in IPL

In [None]:
spark.sql(
"""
select umpire, count(*) as No_of_Matches
from ((select umpire1 as umpire from ipl_matches) union all
      (select umpire2 from ipl_matches)
     ) ipl_matches
group by umpire
order by count(*) desc ;
""").show()
#Combing two similar columns by union and counting numbers on times they have officiated

##### Question I:  
Find venue details of the match where V Kohli scored his highest individual runs in
IPL.

In [None]:
spark.sql(
"""
select m.match_id, v.venue, v.city, sum(b.batsman_runs) TotalRuns from ipl_ball_by_ball as b
inner join ipl_matches as m on b.match_id=m.match_id
inner join ipl_venue as v on m.venue_id =v.venue_id
where b.batsman= 'V Kohli'
group by m.match_id, v.venue, v.city
order by sum(b.batsman_runs) desc limit 1; 
""").show()

##### Question J: 
Please analyze how winning/losing tosses can impact a match and it's result?
(Bonus for Visualization here)

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col

# count data where match_winner = toss_winner
team_wins_df = ipl_matches_df.filter(F.col('winner')==F.col('toss_winner')).groupBy('winner').agg(F.count('winner').alias('team_wins'))

# count no of toss winner
toss_wins_df = ipl_matches_df.groupBy('toss_winner').count().orderBy('toss_winner')
#print(team_wins_df.show(2))
#print(toss_wins_df.show(2))

toss_wins_df = toss_wins_df.withColumnRenamed("toss_winner", "winner").withColumnRenamed("count", "toss_wins")

# join the two dataframes (team_wins_df & toss_wins_df)
team_toss_wins = team_wins_df.join(toss_wins_df.select('winner', 'toss_wins'), ['winner'])
team_toss_wins = team_toss_wins.withColumnRenamed("winner", "teams")


In [None]:
# converting spark dataframes to pandas dataframe

team_toss_wins = team_toss_wins.toPandas()

In [None]:
# Graph with respect to match wins vs toss wins

team_toss_wins = team_toss_wins.sort_values(by=['team_wins', 'toss_wins'], ascending = False)
team_toss_wins.plot(x="teams", y=["team_wins", "toss_wins"], kind="bar", title = "Team & Toss Wins")

In [None]:
win_count = 0
for value in ipl_matches_df.collect():
    if(value['toss_winner']==value['winner']):
        win_count += 1
print(f'The number of times the team winning toss have won: {win_count}')
prob = win_count/(ipl_matches_df.count())
print('The probability of winning if won the toss: {:.2f}' .format(prob))

From the analysis we can say that :<br>
1. The team with highest no. of toss wins has mostly won the match most of the times.<br>
2. A team will have ***51%*** chance of winnning the match if it wins the toss

### Section 03: Expose Data

In [None]:
import sqlite3
import pandas as pd

# creating sqlite3 database
con = sqlite3.connect('iplmatches.db')

In [None]:
# load data

df = pd.read_csv('/content/ipl_ball_by_ball.csv')

# strip whitespace from headers
df.columns = df.columns.str.strip()

# drop data into database
df.to_sql("ipl_ball_by_ball", con)

In [None]:
# load data
df = pd.read_csv('/content/ipl_venue.csv')

# strip whitespace from headers
df.columns = df.columns.str.strip()

# drop data into database
df.to_sql("ipl_venue", con)

In [None]:
# load data
df = pd.read_csv('/content/ipl_matches.csv')

# strip whitespace from headers
df.columns = df.columns.str.strip()

# drop data into database
df.to_sql("ipl_matches", con)

In [None]:
class Database:
    # constructor 
    def __init__(self):
        self.db_name = "iplmatches.db"
        
    def get_conn(self):
        try:
            con = sqlite3.connect(self.db_name)
            print("Database connected successfully")
            return con
        except:
            print("Unable to connect data base")
            
    def get_status(self,con):
        try:
            con.cursor()
            print("Database is connected")
        except Exception as ex:
            print("Database is not connected")
            
    def close_conn(self,con):
        try:
            con.close()
            print("Database connection closed successfully")
        except:
            print("Unable to close Database connection")
            
    # Question(A)
    def get_query1_result(self,con):
        cur = con.execute("""select v.venue, count(v.venue_id) venue_count from ipl_matches m 
                         join ipl_venue v on m.venue_id=v.venue_id
                        where m.eliminator = 'Y'
                        group by v.venue
                        order by venue_count desc
                            limit 3;""")
        desc = cur.description
        column_names = [col[0] for col in desc]
        data = [dict(zip(column_names, row))  
        for row in cur.fetchall()]
        return data
      
    # Question (B)
    def get_query2_result(self, con):
        cur = con.execute("""select fielder, count(dismissal_kind) NoOfCatches from ipl_ball_by_ball
                           where dismissal_kind = "caught"
                           group by fielder
                           order by NoOfCatches desc
                           limit 1;""")
        desc = cur.description
        column_names = [col[0] for col in desc]
        data = [dict(zip(column_names, row))  
        for row in cur.fetchall()]
        return data
    
    # Question (C)
    def get_query3_result(self, con):
        cur=con.execute("""select m.match_id, bbb.bowler, count(bbb.bowler) NoOfWickets from ipl_ball_by_ball bbb
                           join ipl_matches m on m.match_id = bbb.match_id
                           where m.method = "D/L" and is_wicket = 1
                           group by m.match_id, bbb.bowler
                           order by NoOfWickets desc;""")
        desc = cur.description
        column_names = [col[0] for col in desc]
        data = [dict(zip(column_names, row))  
        for row in cur.fetchall()]
        return data
    
    # Question (D)
    def get_query4_result(self, con):
        cur=con.execute("""select batsman, (sum(batsman_runs)/count(ball)*100) as strikeRate from ipl_ball_by_ball
                           where (extras_type not in ("wides","noballs")) 
                           and (overs >= 7 and overs <= 20)
                           group by batsman
                           order by strikeRate desc;""")

        desc = cur.description
        column_names = [col[0] for col in desc]
        data = [dict(zip(column_names, row))  
        for row in cur.fetchall()]
        return data
    
    # Question (E)
    def get_query5_result(self, con):
        cur=con.execute("""select v.venue, v.city, sum(bbb.extra_runs) extra_runs from ipl_ball_by_ball bbb
                           inner join ipl_matches as m on bbb.match_id=m.match_id
                           inner join ipl_venue as v on m.venue_id =v.venue_id
                           group by venue, v.city
                           order by extra_runs desc;""")
        desc = cur.description
        column_names = [col[0] for col in desc]
        data = [dict(zip(column_names, row))  
        for row in cur.fetchall()]
        return data
    
    # Question (F)
    def get_query6_result(self, con):
        cur=con.execute("""select m.player_of_match, count(m.player_of_match) from ipl_matches m
                           join ipl_venue v on m.venue_id = v.venue_id
                           where neutral_venue = 1
                           group by m.player_of_match
                           order by count(m.player_of_match) desc;""")
        desc = cur.description
        column_names = [col[0] for col in desc]
        data = [dict(zip(column_names, row))  
        for row in cur.fetchall()]
        return data
    
    # Question (H)
    def get_query7_result(self, con):
        cur = con.execute("""select umpire, count(*) as umpire_count from
                               (select umpire1 as umpire from ipl_matches union all
                                 select umpire2 from ipl_matches) ipl_matches
                                    group by ipl_matches.umpire
                                    order by count(*) desc""")
        desc = cur.description
        column_names = [col[0] for col in desc]
        data = [dict(zip(column_names, row))  
        for row in cur.fetchall()]
        return data
    
    # Question (I)
    def get_query8_result(self, con):
        cur= con.execute("""select m.match_id, v.venue, v.city, sum(b.batsman_runs) TotalRuns from ipl_ball_by_ball as b
                            inner join ipl_matches as m on b.match_id=m.match_id
                            inner join ipl_venue as v on m.venue_id =v.venue_id
                            where b.batsman= 'V Kohli'
                            group by m.match_id, v.venue, v.city
                            order by sum(b.batsman_runs) desc limit 1;""")
        desc = cur.description
        column_names = [col[0] for col in desc]
        data = [dict(zip(column_names, row))  
        for row in cur.fetchall()]
        return data


In [None]:
# creating class database object
db = Database()
 
# calling the instance method using the object db
con = db.get_conn()

In [None]:
qry1_result = db.get_query1_result(con);
print(qry1_result)

In [None]:
qry2_result = db.get_query2_result(con);
print(qry2_result)

In [None]:
qry3_result = db.get_query3_result(con);
print(qry3_result)

In [None]:
qry4_result = db.get_query4_result(con);
print(qry4_result)

In [None]:
qry5_result = db.get_query5_result(con);
print(qry5_result)

In [None]:
qry6_result = db.get_query6_result(con);
print(qry6_result)

In [None]:
qry7_result = db.get_query7_result(con);
print(qry7_result)

In [None]:
qry8_result = db.get_query8_result(con);
print(qry8_result)

In [None]:
db.close_conn(con)