# **# 1. ENVIRONMENT SET-UP**




In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz

In [3]:
!tar xf spark-3.2.1-bin-hadoop2.7.tgz

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"

In [5]:
# install the sqlite jdbc
!wget -q https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.36.0.3/sqlite-jdbc-3.36.0.3.jar -P /content/spark-3.2.1-bin-hadoop2.7/jars

# **# 2. IMPORTING THE ESSENTIAL LIBRARIES**

In [6]:
!pip install -q findspark
import findspark
findspark.init()

In [7]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import lit, array_remove

import sqlite3
import pandas as pd
import psutil 
import matplotlib.pyplot as plt

In [8]:
# connecting to the SQLITE JDBC DRIVER
from pyspark.sql import SparkSession
spark = (SparkSession
 .builder
 .appName("IPL_MATCHES")
 .config("/content/spark-3.2.1-bin-hadoop2.7/jars", "sqlite-jdbc-3.36.0.3.jar")
 .getOrCreate())

In [9]:
# importing the datasets
ipl_matches_pd = pd.read_csv("/content/ipl_matches.csv")
ipl_venue_pd = pd.read_csv("/content/ipl_venue.csv")
ipl_ball_by_ball_pd = pd.read_csv("/content/ipl_ball_by_ball.csv")

# **# 3. DATA ANALYSIS**

In [10]:
# We cannot eliminate the 'NA' values from the dataset because
# the NA values have meanings. For examples, matches are tie and
# therefore there is no result.
ipl_matches_pd.isnull().sum()

match_id             0
date                 0
player_of_match      4
venue_id             0
neutral_venue        0
team1                0
team2                0
toss_winner          0
toss_decision        0
winner               4
result               4
result_margin       17
eliminator           4
method             797
umpire1              0
umpire2              0
dtype: int64

In [11]:
ipl_venue_pd.isnull().sum()

venue_id    0
venue       0
city        0
dtype: int64

In [12]:
# Again the NA values should not be eliminated because the values
# have meanings in this dataset.
ipl_ball_by_ball_pd.isnull().sum()

match_id                 0
inning                   0
overs                    0
ball                     0
batsman                  0
non_striker              0
bowler                   0
batsman_runs             0
extra_runs               0
total_runs               0
non_boundary             0
is_wicket                0
dismissal_kind      183973
player_dismissed    183973
fielder             186684
extras_type         183235
batting_team             0
bowling_team           191
dtype: int64

In [13]:
# Creating a database named as information.db and at the
# same time extablishing a connection to the data base.
# Uploading the data that have been loaded through the pandas
# into the database info.db and later this database will be
# analysed using spark.

try:
  # Creating and connecting database
  connection = sqlite3.connect("information.db")

  # Loading the dataset into sql database
  ipl_matches_pd.to_sql("ipl_matches", connection, if_exists = 'replace', index = False)
  ipl_venue_pd.to_sql("ipl_venue", connection, if_exists = 'replace', index = False)
  ipl_ball_by_ball_pd.to_sql("ipl_ball_by_ball", connection, if_exists = 'replace', index = False)

  # Connecting PySpark with the sqllite database.

  ipl_matches = spark.read.format("jdbc").option("url", "jdbc:sqlite:information.db")\
                  .option("dbtable", "ipl_matches")\
                  .option("driver","org.sqlite.JDBC")\
                  .option("user", "gs")\
                  .option("password", "passkey")\
                  .load()

  ipl_venue = spark.read.format("jdbc").option("url", "jdbc:sqlite:information.db")\
                      .option("dbtable", "ipl_venue")\
                      .option("driver","org.sqlite.JDBC")\
                      .option("user", "gs")\
                      .option("password", "passkey")\
                      .load()

  ipl_ball_by_ball = spark.read.format("jdbc").option("url", "jdbc:sqlite:information.db")\
                      .option("dbtable", "ipl_ball_by_ball")\
                      .option("driver","org.sqlite.JDBC")\
                      .option("user", "gs")\
                      .option("password", "passkey")\
                      .load()

  # creating view instances of each table

  ipl_matches.createOrReplaceTempView("ipl_matches_view")
  ipl_venue.createOrReplaceTempView("ipl_venue_view")
  ipl_ball_by_ball.createOrReplaceTempView("ipl_ball_by_ball_view")

except:
  print("Error: Error generated in try space. Make sure that name of the \
  database file is correct, or make sure that the SQLite JDBC driver is available.")

In [14]:
def player_of_the_match_awards():
  """ A query to return a report for the cricketers with the most number
  of players of the match award"""

  ipl_man_of_the_matches = spark.sql("\
  SELECT player_of_match, COUNT(player_of_match) AS num_man_of_the_matches\
  FROM ipl_matches_view\
  GROUP BY player_of_match\
  ORDER BY num_man_of_the_matches DESC\
  LIMIT 10")

  show_result(ipl_man_of_the_matches)

In [15]:
def catches_per_player():
  """Most number of catches taken by a player in IPL"""

  fielders = spark.sql("\
    SELECT match_id, fielder\
    FROM ipl_ball_by_ball_view\
    WHERE dismissal_kind == 'caught'")

  fielders = fielders.groupBy('fielder')\
  .agg({'fielder': 'count'})\
  .toDF('fielder', 'num_of_catches')

  fielders.createOrReplaceTempView("num_of_catches")

  num_of_catches = spark.sql("\
    SELECT * FROM num_of_catches\
    ORDER BY num_of_catches DESC\
    LIMIT 10")

  show_result(num_of_catches)

In [16]:
def venues_and_matches():
  """Top 10 venues which hosted the most number of matches."""

  venue_id = spark.sql("\
    SELECT venue_id FROM ipl_matches_view")

  venue_view = spark.sql("SELECT * FROM ipl_venue_view")

  # inner joining the relation table venue with venue_id
  inner_joining = venue_id\
  .join(venue_view, venue_id.venue_id == venue_view.venue_id,"inner")\
  .drop(venue_view.venue_id)

  print("Inner Joining ipl_matches and ipl_venue")
  show_result(inner_joining)

  inner_joining.createOrReplaceTempView("stadium_as_venue")

  stadium_as_venue = spark.sql("\
    SELECT venue, COUNT(venue) AS num_of_matches\
    FROM stadium_as_venue\
    GROUP BY venue\
    ORDER BY num_of_matches desc\
    LIMIT 10")

  print("Stadium as a venue.")
  show_result(stadium_as_venue)

  num_of_matches_per_city = spark.sql("\
    SELECT city, COUNT(city) AS num_of_matches\
    FROM stadium_as_venue\
    GROUP BY city\
    ORDER BY num_of_matches desc\
    LIMIT 10")

  num_of_matches_per_city.createOrReplaceTempView("cities_as_venue")
  city_as_venue = spark.sql("\
    SELECT city, num_of_matches FROM cities_as_venue\
    ORDER BY num_of_matches desc\
    LIMIT 10")

  print("City as a venue.")
  show_result(city_as_venue)

In [17]:
def wickets_through_dl_method():
  """query to return a report for highest wicket taker in matches 
  which were affected by Duckworth-Lewis’s method (D/L method)."""

  dl_matches_id = spark.sql("\
    select match_id FROM ipl_matches_view\
    WHERE method = 'D/L'")

  ipl_most_wickets = spark.sql("\
    SELECT match_id, bowler, is_wicket\
    FROM ipl_ball_by_ball_view\
    WHERE is_wicket == 1")

  # Inner joinning matches that are effected by D/L and 
  max_wickets = dl_matches_id\
  .join(ipl_most_wickets, dl_matches_id.match_id == ipl_most_wickets.match_id,"inner")\
  .drop(ipl_most_wickets.match_id)

  max_wickets = max_wickets.groupBy('bowler').agg({'bowler': 'count'})\
  .toDF('bowler', 'num_of_wickets')

  max_wickets.createOrReplaceTempView("max_wickets")
  max_wickets = spark.sql("\
  select * from max_wickets\
  ORDER BY num_of_wickets DESC LIMIT 10")

  show_result(max_wickets)

In [18]:
def strike_rate_in_non_powerplay_overs():
  """ A query to return a report for highest strike rate by 
  the batsmans in non powerplay overs(7-20 overs)"""

  ipl_strike_rate = spark.sql("\
  SELECT batsman, COUNT(batsman) AS total_balls, SUM(batsman_runs) AS batsman_runs\
  FROM ipl_ball_by_ball_view\
  WHERE overs BETWEEN 7 AND 20\
  GROUP BY batsman")

  ipl_strike_rate.createOrReplaceTempView("ipl_strike_rate")
  ipl_strike_rate = spark.sql("\
  SELECT batsman, CAST(((batsman_runs / total_balls) * 100) AS decimal(16, 2)) \
  AS strike_rate FROM ipl_strike_rate\
  ORDER BY strike_rate DESC\
  LIMIT 10")
  show_result(ipl_strike_rate)

In [19]:
def highest_batting_average():
  """A query to get a list of top 10 players with the highest batting average."""

  batting_player = spark.sql("\
    SELECT batsman, count(batsman) as num_of_times_out\
    FROM ipl_ball_by_ball_view\
    WHERE batsman == player_dismissed\
    GROUP BY batsman")

  non_strike_player = spark.sql("\
    SELECT non_striker, count(non_striker) AS num_of_run_outs\
    FROM ipl_ball_by_ball_view\
    WHERE non_striker == player_dismissed\
    GROUP by non_striker")

  num_of_outs_per_player = batting_player.union(non_strike_player)

  num_of_outs_per_player.createOrReplaceTempView("num_of_outs")
  num_of_outs_per_player = spark.sql("\
    SELECT batsman, SUM(num_of_times_out) AS num_of_times_out\
    FROM num_of_outs\
    GROUP BY batsman")

  players_total_score = spark.sql("\
    SELECT batsman, sum(batsman_runs) AS runs\
    FROM ipl_ball_by_ball_view\
    GROUP BY batsman")

  players_total_score = players_total_score\
  .join(num_of_outs_per_player, players_total_score.batsman\
        == num_of_outs_per_player.batsman,"inner").drop(num_of_outs_per_player.batsman)

  players_total_score.createOrReplaceTempView("players_total_score")
  players_total_score = spark.sql("\
    SELECT batsman, CAST((runs / num_of_times_out) AS decimal(16, 2)) AS batting_average\
    FROM players_total_score\
    ORDER BY batting_average DESC\
    LIMIT 10")

  show_result(players_total_score)

In [20]:
def umpires_in_matches(get_ipl_matches):
  """a query to find out the number of apperances of umpires in IPL matches."""

  umpire1 = get_ipl_matches.select('umpire1')
  umpire2 = get_ipl_matches.select('umpire2')

  umpires = umpire1.union(umpire2)
  umpires.createOrReplaceTempView("all_umpires")

  umpires = spark.sql("\
  SELECT umpire1 AS umpires, COUNT(umpire1) AS num_of_matches\
  FROM all_umpires\
  GROUP BY umpire1\
  ORDER BY num_of_matches DESC\
  LIMIT 10")

  show_result(umpires)

In [21]:
def show_result(getInput):
  # Used to show the output of each function.
  getInput.show()

In [22]:
# Number of catches per player
catches_per_player()

+--------------+--------------+
|       fielder|num_of_catches|
+--------------+--------------+
|    KD Karthik|           118|
|      MS Dhoni|           113|
|AB de Villiers|           103|
|      SK Raina|            99|
|     RG Sharma|            88|
|    RV Uthappa|            87|
|    KA Pollard|            84|
|       V Kohli|            76|
|      S Dhawan|            73|
|     MK Pandey|            70|
+--------------+--------------+



In [23]:
# Top 10 strike rate of the batsman.
strike_rate_in_non_powerplay_overs()

+------------+-----------+
|     batsman|strike_rate|
+------------+-----------+
|  B Stanlake|     250.00|
|Kamran Akmal|     213.89|
|  ER Dwivedi|     211.11|
|    Umar Gul|     205.26|
|    RS Sodhi|     200.00|
| AC Blizzard|     200.00|
|   DJM Short|     187.10|
|   LJ Wright|     183.02|
|    W Jaffer|     180.77|
|Vishnu Vinod|     180.00|
+------------+-----------+



In [24]:
# Highest batting average
highest_batting_average()

+-------------+---------------+
|      batsman|batting_average|
+-------------+---------------+
|   MN van Wyk|          55.67|
|   RD Gaikwad|          51.00|
|     AC Voges|          45.25|
|     KL Rahul|          44.86|
|      HM Amla|          44.38|
|Iqbal Abdulla|          44.00|
|    DA Warner|          42.72|
|  JM Bairstow|          41.58|
|     CH Gayle|          41.14|
|     MS Dhoni|          40.99|
+-------------+---------------+



In [25]:
# Number of matches per venue.
venues_and_matches()

Inner Joining ipl_matches and ipl_venue
+--------+--------------------+-------------+
|venue_id|               venue|         city|
+--------+--------------------+-------------+
|       1|Dr. Y.S. Rajasekh...|Visakhapatnam|
|       1|Dr. Y.S. Rajasekh...|Visakhapatnam|
|       1|Dr. Y.S. Rajasekh...|Visakhapatnam|
|       1|Dr. Y.S. Rajasekh...|Visakhapatnam|
|       1|Dr. Y.S. Rajasekh...|Visakhapatnam|
|       1|Dr. Y.S. Rajasekh...|Visakhapatnam|
|       1|Dr. Y.S. Rajasekh...|Visakhapatnam|
|       1|Dr. Y.S. Rajasekh...|Visakhapatnam|
|       1|Dr. Y.S. Rajasekh...|Visakhapatnam|
|       1|Dr. Y.S. Rajasekh...|Visakhapatnam|
|       1|Dr. Y.S. Rajasekh...|Visakhapatnam|
|       1|Dr. Y.S. Rajasekh...|Visakhapatnam|
|       1|Dr. Y.S. Rajasekh...|Visakhapatnam|
|       2|Sharjah Cricket S...|      Sharjah|
|       2|Sharjah Cricket S...|      Sharjah|
|       2|Sharjah Cricket S...|      Sharjah|
|       2|Sharjah Cricket S...|      Sharjah|
|       2|Sharjah Cricket S...|      Sha

In [26]:
# Number of wickets per bowler
wickets_through_dl_method()

+------------+--------------+
|      bowler|num_of_wickets|
+------------+--------------+
|    R Ashwin|             8|
|    AB Dinda|             8|
|MC Henriques|             7|
|     B Kumar|             6|
|  JD Unadkat|             5|
|    L Balaji|             5|
|    TA Boult|             5|
|  DL Vettori|             5|
|   YS Chahal|             4|
|     M Ntini|             4|
+------------+--------------+



In [27]:
# Player of the match awards per player
player_of_the_match_awards()

+---------------+----------------------+
|player_of_match|num_man_of_the_matches|
+---------------+----------------------+
| AB de Villiers|                    23|
|       CH Gayle|                    22|
|      RG Sharma|                    18|
|      DA Warner|                    17|
|       MS Dhoni|                    17|
|      YK Pathan|                    16|
|      SR Watson|                    16|
|       SK Raina|                    14|
|      G Gambhir|                    13|
|        V Kohli|                    13|
+---------------+----------------------+



In [28]:
# Most number of umpires participate in the matches.
umpires_in_matches(ipl_matches)

+---------------+--------------+
|        umpires|num_of_matches|
+---------------+--------------+
|         S Ravi|           121|
|HDPK Dharmasena|            94|
|   AK Chaudhary|            87|
|  C Shamshuddin|            82|
|      M Erasmus|            65|
|      CK Nandan|            57|
|    Nitin Menon|            57|
|     SJA Taufel|            55|
|      Asad Rauf|            51|
|    VA Kulkarni|            50|
+---------------+--------------+



In [29]:
# Save (commit) the changes
# connection.commit()

# After this PySpark will be disconneted from SQLite database.
connection.close()