## Initialize Variables

In [0]:
DatasetURL = "https://cricsheet.org/downloads/"
file = "ipl_json.zip"
BaseURL = DatasetURL + file
contentName = "ipl"
basePath = f"/user/{contentName}/"
rawPath = f"{basePath}raw"
silverPath = f"{basePath}silver"
goldPath = f"{basePath}gold"
fileFormat = "json"
driverPath = "/databricks/driver/"
factPath = goldPath + "/" + "fact"
dimPath = goldPath + "/" + "dim"

SilverSchemaName = "staging"
factSchema = "fact"
dimSchema = "dim"

SilverTableName = "t_cricsheet"
deliveriesTableName = "t_deliveries"
matchTableName = "t_match"
teamPlayerTableName = "t_team_players"

SilverSchemaPath = silverPath+ "/" +SilverSchemaName
SilverTablePath = SilverSchemaPath+ "/" + SilverTableName
SilverTableFullName = SilverSchemaName + "." + SilverTableName

deliveriesTablePath = factPath + "/" + deliveriesTableName
deliveriesTableFullName = factSchema + "." + deliveriesTableName

matchTablePath = dimPath + "/" + matchTableName
matchTableFullName = dimSchema + "." + matchTableName

teamPlayerTablePath = dimPath + "/" + teamPlayerTableName
teamPlayerTableFullName = dimSchema + "." + teamPlayerTableName

## Import Required Functions

In [0]:
from pyspark.sql.functions import col, input_file_name, regexp_extract, json_tuple, from_json, to_json, get_json_object, explode, posexplode, array, struct, lit, year, concat_ws, sum, count, min, max, when, coalesce, rank, desc, asc, round, expr
from pyspark.sql.window import Window
from delta.tables import DeltaTable

## Create Schema

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SilverSchemaName} location '{SilverSchemaPath}'");
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {factSchema} location '{factPath}'");
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {dimSchema} location '{dimPath}'");

## Creating tables if folder exists (relevant only for Databricks community edition, since metastore gets deleted with cluster)

In [0]:
def delta_table_exists(path):
  from delta.tables import DeltaTable
  try:
    DeltaTable.forPath(spark, path)
    return True
  except Exception as e:
      return False
if delta_table_exists(SilverTablePath):
  spark.sql(f"CREATE TABLE IF NOT EXISTS {SilverTableFullName} USING DELTA LOCATION '{SilverTablePath}/'")
if delta_table_exists(deliveriesTablePath):
  spark.sql(f"CREATE TABLE IF NOT EXISTS {deliveriesTableFullName} USING DELTA LOCATION '{deliveriesTablePath}/'")
if delta_table_exists(matchTablePath):
  spark.sql(f"CREATE TABLE IF NOT EXISTS {matchTableFullName} USING DELTA LOCATION '{matchTablePath}/'")

## Download Files to driver

In [0]:
from urllib.request import urlretrieve
urlretrieve(BaseURL, file)

## Unzip Files

In [0]:
import zipfile
with zipfile.ZipFile(f"{driverPath}{file}", 'r') as zip_ref:
    zip_ref.extractall(f"{driverPath}{contentName}")

## Check if "raw" folder exists, if yes incrementally load new files, else do a full copy

In [0]:
def file_exists(path):
  try:
    dbutils.fs.ls(path)
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise

if file_exists(f"dbfs:{rawPath}"):
  downloadedList = [f.name for f in dbutils.fs.ls(f"file:{driverPath}{contentName}") if f.name.endswith("json")]
  rawList = [f.name for f in dbutils.fs.ls(f"dbfs:{rawPath}")]
  newFiles = list(set(downloadedList) - set(rawList))
  for file in newFiles:
     dbutils.fs.mv(f"file:{driverPath}{contentName}/{file}", f"dbfs:{rawPath}{file}")
else:
  dbutils.fs.mv(f"file:{driverPath}{contentName}", f"dbfs:{rawPath}", recurse=True)
  dbutils.fs.rm(f"dbfs:{rawPath}/README.txt")

## Incrementally Load Silver table via AutoLoader

In [0]:
df = (
  spark
  .readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("multiline", "true")
  .schema("info string, innings string")
  .load(rawPath)
)
df = (
  df
  .select(regexp_extract(input_file_name(), "\\d+", 0).cast("int").alias("match_id"),
          col("info").alias("match_info"), 
          col("innings").alias("match_innings"))
)

(
  df
  .writeStream
  .trigger(once=True)
  .format('delta')
  .option("checkpointLocation",SilverTablePath+"/_checkpoint")
  .toTable(SilverTableFullName)
  .awaitTermination()
)

## Build Datamarts (facts and dims)

### Create temp view for sql

In [0]:
(
  spark
  .readStream
  .table(SilverTableFullName)
  .createOrReplaceTempView("v_cricsheet")
)

### Create dimension table t_match

In [0]:
(
  spark
  .sql(
'''
with base as (
  select
    match_id,
    cast(match_info :dates [0] as date) as match_date,
    match_info :gender as match_gender,
    match_info :season as season,
    match_info :event.name as event_name,
    cast(match_info :event.match_number as int) as event_match_number,
    match_info :city as city,
    match_info :venue as venue,
    match_info :match_type as match_type,
    coalesce(
      match_info :outcome.winner,
      match_info :outcome.result
    ) as match_winner,
    match_info :toss.winner as toss_winner,
    match_info :toss.decision as toss_decision,
    match_info :player_of_match [0] as player_of_match,
    match_info :players as team_players
  from
    v_cricsheet
)
select
  *,
  CASE
    WHEN toss_decision = 'bat' THEN toss_winner
    ELSE CASE
      WHEN toss_winner = split(team_players, '"') [1] THEN split(team_players, '"') [25]
      ELSE split(team_players, '"') [1]
    END
  END AS first_bat,
  CASE
    WHEN toss_decision = 'field' THEN toss_winner
    ELSE CASE
      WHEN toss_winner = split(team_players, '"') [1] THEN split(team_players, '"') [25]
      ELSE split(team_players, '"') [1]
    END
  END AS first_field
from
  base
''')
  .writeStream
  .trigger(once=True)
  .option("checkpointLocation",matchTablePath+"/_checkpoint")
  .toTable(matchTableFullName)
  .awaitTermination()
)

### Create dimension table t_team_players

In [0]:
team_player_df = (
  spark
  .table(matchTableFullName)
  .select("match_id",
          "team_players")
)
json_schema = (
  spark
  .read
  .json(team_player_df.rdd.map(lambda row: row.team_players))
  .schema
)
pivot_df= (
  team_player_df
   .select("match_id",
           from_json("team_players", json_schema).alias("team_players"))
   .select("match_id", "team_players.*")
)
columns_to_keep = ["match_id"]
columns_to_unpivot = [c for (c, t) in pivot_df.dtypes if c not in columns_to_keep]
unpivoted_columns = [
    explode(
        array(
            [
                struct(lit(c).alias("team"), col(c).alias("players"))
                for c in columns_to_unpivot
            ]
        )
    )
]
unpivot_df = (
  pivot_df
  .select(columns_to_keep + unpivoted_columns)
  .select("match_id", 
          "col.team", 
          explode("col.players").alias("player_name"))
)
(
  unpivot_df
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema","true")
  .saveAsTable(teamPlayerTableFullName)
)

### Create Fact table t_deliveries

In [0]:
(
  spark
  .sql(
'''
with innings AS (
  select
    match_id,
    posexplode(from_json(match_innings, 'array<string>'))
  from
    v_cricsheet
),
overs as (
  select
    match_id,
    pos + 1 as innings,
    posexplode(from_json(col :overs, 'array<string>'))
  from
    innings
),
balls AS (
  select
    match_id,
    innings,
    pos + 1 as overs,
    posexplode(from_json(col :deliveries, 'array<string>'))
  from
    overs
)
select
  match_id,
  innings,
  overs,
  pos + 1 as balls,
  col :batter as batter_name,
  col :bowler AS bowler_name,
  cast(col :extras.byes as int) as byes,
  cast(col :extras.legbyes as int) as leg_byes,
  cast(col :extras.noballs as int) as no_balls,
  cast(col :extras.wides as int) as wides,
  col :non_striker as non_striker_name,
  cast(col :runs.batter as int) as batter_runs,
  cast(col :runs.total as int) as total_runs,
  col :wickets [0].kind as wicket_kind,
  col :wickets [0].player_out as player_out,
  col :wickets [0].fielders [0].name as fielder_name
from
  balls
''')
  .writeStream
  .trigger(once=True)
  .option("checkpointLocation",deliveriesTablePath+"/_checkpoint")
  .toTable(deliveriesTableFullName)
  .awaitTermination()
)

# Analysis of Data

## Create Function for Division
This will avoid the div/0 error and return NULL instead

In [0]:
%sql CREATE
OR REPLACE FUNCTION divide(numerator int, denominator int, extra int DEFAULT 1) 
RETURNS float 
RETURN round(numerator / nullif(denominator, 0) / extra, 2)

## Compute Player Stats (Python)

In [0]:
# Match
mat = (
  spark
  .table(matchTableFullName)
  .filter(col("event_name") == lit("Indian Premier League")) # filter not required when working with only IPL data, added here in case you want to try with data containing more events from Cricsheet
  .select("match_id", 
          year("match_date").alias("match_year"), 
          "player_of_match")
)
mat = mat.alias("mat")

# Player
tpl = spark.table(teamPlayerTableFullName).alias("tpl")
tpl = tpl.join(mat, mat.match_id == tpl.match_id, 'inner')
tpl = (
  tpl
  .groupBy("tpl.player_name")
  .agg(count("*").alias("match_cnt"), 
       concat_ws("-",min("mat.match_year"),  max("mat.match_year")).alias("year_span"), 
       count(when(col("mat.player_of_match") == col("tpl.player_name"), 1)).alias("player_of_match_cnt"))
)
tpl = tpl.alias("tpl")

# Delivery
tdl = spark.table(deliveriesTableFullName)
tdl = tdl.join(mat, mat.match_id == tdl.match_id, 'left_semi')
tdl = tdl.alias("tdl")

# Batsman
bat = tdl.filter(col("tdl.batter_name").isNotNull())
bat = (
  bat
  .groupBy("tdl.batter_name", "tdl.match_id")
  .agg(count("*").alias("delivery_cnt"), 
       sum("tdl.batter_runs").alias("batter_runs"), 
       count(when(col("tdl.batter_runs") == 4, 1)).alias("4s_cnt"), 
       count(when(col("tdl.batter_runs") == 6, 1)).alias("6s_cnt"), 
       count(when(bat.wides != 0, 1)).alias("wides_cnt"))
)
bat = (
  bat
  .groupBy("tdl.batter_name")
  .agg(count("*").alias("bat_match_cnt"), 
       sum("batter_runs").alias("batter_runs"), 
       max("batter_runs").alias("highest_score"), 
       sum("delivery_cnt").alias("delivery_cnt"), 
       sum("4s_cnt").alias("4s_cnt"), 
       sum("6s_cnt").alias("6s_cnt"), 
       sum("wides_cnt").alias("wides_cnt"), 
       count(when(col("batter_runs") >= 100, 1)).alias("100s_cnt"), 
       count(when((col("batter_runs") >= 50) & (col("batter_runs") < 100), 1)).alias("50s_cnt"))
)
bat = bat.alias("bat")

# Bowler
bowl = tdl.filter(col("tdl.bowler_name").isNotNull())
bowl = (
  bowl
  .groupBy("tdl.bowler_name", "tdl.match_id")
  .agg(count("*").alias("delivery_cnt"), 
       (coalesce(sum("tdl.total_runs"), lit(0)) - coalesce(sum("tdl.byes"), lit(0)) - coalesce(sum("tdl.leg_byes"), lit(0))).alias("conceded_runs"),
       count(when(col("tdl.wicket_kind").isin(['bowled', 'caught', 'caught and bowled', 'lbw', 'stumped']), 1)).alias("wicket_cnt"),
       count(when(col("tdl.wicket_kind") == 'caught and bowled', 1)).alias("catch_and_bowl_cnt"),
       count(when(col("tdl.wides") != 0, 1)).alias("wides_cnt"),
       count(when(col("tdl.no_balls") != 0, 1)).alias("no_balls_cnt"))
)
bowl = bowl.withColumn("best_bowl_rnk", rank().over(Window.partitionBy("tdl.bowler_name").orderBy(desc("wicket_cnt"), asc("conceded_runs"))))
bowl = (
  bowl
  .groupBy("tdl.bowler_name")
  .agg(count("*").alias("bowl_match_cnt"), 
       sum("conceded_runs").alias("conceded_runs"), 
       sum("delivery_cnt").alias("delivery_cnt"), 
       sum("wicket_cnt").alias("wicket_cnt"), 
       sum("wides_cnt").alias("wides_cnt"), 
       sum("no_balls_cnt").alias("no_balls_cnt"), 
       sum("catch_and_bowl_cnt").alias("catch_and_bowl_cnt"), 
       max(when(col("best_bowl_rnk") == 1, concat_ws("/", "wicket_cnt", "conceded_runs"))).alias("best_bowl"),
       count(when(col("wicket_cnt") >= 5, 1)).alias("5wicket_cnt"))
)
bowl = bowl.alias("bowl")

# Player Out
out = tdl.filter(col("tdl.player_out").isNotNull())
out = (
  out
  .groupBy("tdl.player_out")
  .agg(count("*").alias("out_count"))
)
out = out.alias("out")

# Fielder
fld = tdl.filter((col("tdl.fielder_name").isNotNull()) & (col("tdl.wicket_kind") == "caught"))
fld = (
  fld
  .groupBy("tdl.fielder_name")
  .agg(count("*").alias("catch_cnt"))
)
fld = fld.alias("fld")

# Player Stats
ply = (
  tpl
  .join(bat, bat.batter_name == tpl.player_name, 'leftouter')
  .join(bowl,bowl.bowler_name == tpl.player_name, 'leftouter')
  .join(out, out.player_out == tpl.player_name, 'leftouter')
  .join(fld, fld.fielder_name == tpl.player_name, 'leftouter')
)
ply = (
  ply
  .select("tpl.player_name", 
          "tpl.match_cnt", 
          "tpl.year_span", 
          "bat.bat_match_cnt", 
          (col("bat.bat_match_cnt") - col("out.out_count")).alias("not_out_cnt"), 
          "bat.batter_runs", 
          "bat.highest_score", 
          expr("divide(bat.batter_runs, out.out_count)").alias("batting_average"),
          expr("divide(bat.batter_runs * 100, bat.delivery_cnt - bat.wides_cnt)").alias("strike_rate"),
          "bat.100s_cnt", 
          "bat.50s_cnt", 
          "bat.4s_cnt", 
          "bat.6s_cnt", 
          "bowl.bowl_match_cnt", 
          "bowl.wicket_cnt", 
          "bowl.best_bowl", 
          "bowl.5wicket_cnt", 
          expr("divide(bowl.conceded_runs, bowl.wicket_cnt)").alias("bowling_average"),      
          expr("divide(bowl.conceded_runs, bowl.delivery_cnt - bowl.wides_cnt - bowl.no_balls_cnt, 6)").alias("economy"),     
          (coalesce(col("fld.catch_cnt"),lit(0)) + coalesce(col("bowl.catch_and_bowl_cnt"),lit(0))).alias("catch_cnt"),
          "tpl.player_of_match_cnt")
)
ply = ply.orderBy(col("match_cnt").desc())

# Display Player Stats
display(ply)

## Compute Player Stats (sql)

In [0]:
%sql 
WITH mat AS (
  SELECT
    mat.match_id,
    year(mat.match_date) as match_year,
    mat.player_of_match
  FROM
    dim.t_match mat
  WHERE
    event_name = 'Indian Premier League'
),
tpl AS (
  SELECT
    tpl.player_name,
    count(*) as match_cnt,
    min(mat.match_year) || '-' || max(mat.match_year) AS year_span,
    count(
      CASE
        WHEN mat.player_of_match = tpl.player_name THEN 1
      END
    ) AS player_of_match_cnt
  FROM
    dim.t_team_players tpl
    INNER JOIN mat ON (mat.match_id = tpl.match_id)
  group by
    tpl.player_name
),
bat AS (
  select
    batter_name,
    count(*) as bat_match_cnt,
    SUM(batter_runs) AS batter_runs,
    MAX(batter_runs) AS highest_score,
    sum(delivery_cnt) as delivery_cnt,
    sum(4s_cnt) AS 4s_cnt,
    SUM(6s_cnt) AS 6s_cnt,
    SUM(wide_cnt) AS wide_cnt,
    count(1) filter(
      WHERE
        batter_runs >= 100
    ) AS 100s_cnt,
    count(1) filter(
      WHERE
        batter_runs >= 50
        AND batter_runs < 100
    ) AS 50s_cnt
  FROM
    (
      SELECT
        del.batter_name,
        del.match_id,
        SUM(del.batter_runs) AS batter_runs,
        count(*) AS delivery_cnt,
        count(1) filter(
          WHERE
            del.batter_runs = 4
        ) AS 4s_cnt,
        count(1) filter(
          WHERE
            del.batter_runs = 6
        ) AS 6s_cnt,
        count(
          CASE
            WHEN del.wides <> 0 THEN 1
          END
        ) AS wide_cnt
      FROM
        fact.t_deliveries del
        INNER JOIN mat ON (mat.match_id = del.match_id)
      WHERE
        del.batter_name IS NOT NULL
      GROUP BY
        del.batter_name,
        del.match_id
    )
  GROUP BY
    batter_name
),
bowl AS (
  select
    bowler_name,
    count(*) as bowl_match_cnt,
    sum(conceded_runs) as conceded_runs,
    sum(delivery_cnt) as delivery_cnt,
    sum(wicket_cnt) as wicket_cnt,
    sum(wide_cnt) as wide_cnt,
    sum(no_ball_cnt) as no_ball_cnt,
    sum(catch_cnt) as catch_cnt,
    max(wicket_cnt || '/' || conceded_runs) filter(
      WHERE
        best_bowl_rnk = 1
    ) AS best_bowl,
    count(1) filter(
      WHERE
        wicket_cnt >= 5
    ) AS 5wicket_cnt
  from
    (
      SELECT
        t.*,
        rank() over(
          PARTITION BY bowler_name
          ORDER BY
            wicket_cnt DESC,
            conceded_runs ASC
        ) AS best_bowl_rnk
      FROM
        (
          SELECT
            del.bowler_name,
            del.match_id,
            nvl(SUM(del.total_runs), 0) - nvl(SUM(del.byes), 0) - nvl(SUM(del.leg_byes), 0) AS conceded_runs,
            count(*) AS delivery_cnt,
            count(*) filter(
              WHERE
                wicket_kind IN (
                  'bowled',
                  'caught',
                  'caught and bowled',
                  'lbw',
                  'stumped'
                )
            ) AS wicket_cnt,
            count(1) filter(
              WHERE
                wicket_kind = 'caught and bowled'
            ) AS catch_cnt,
            SUM(
              CASE
                WHEN del.wides <> 0 THEN 1
                ELSE 0
              END
            ) AS wide_cnt,
            SUM(
              CASE
                WHEN del.no_balls <> 0 THEN 1
                ELSE 0
              END
            ) AS no_ball_cnt
          FROM
            fact.t_deliveries del
            INNER JOIN mat ON (mat.match_id = del.match_id)
          WHERE
            del.bowler_name IS NOT NULL
          GROUP BY
            del.bowler_name,
            del.match_id
        ) t
    )
  group by
    bowler_name
),
out AS (
  SELECT
    del.player_out,
    count(*) AS out_count
  FROM
    fact.t_deliveries del
    INNER JOIN mat ON (mat.match_id = del.match_id)
  WHERE
    del.player_out IS NOT NULL
  GROUP BY
    del.player_out
),
fld AS (
  SELECT
    del.fielder_name,
    count(*) AS catch_cnt
  FROM
    fact.t_deliveries del
    INNER JOIN mat ON (mat.match_id = del.match_id)
  WHERE
    del.wicket_kind = 'caught'
    AND del.fielder_name IS NOT NULL
  GROUP BY
    del.fielder_name
)
SELECT
  tpl.player_name,
  tpl.match_cnt,
  tpl.year_span,
  bat.bat_match_cnt,
  bat.bat_match_cnt - out.out_count AS not_out_cnt,
  bat.batter_runs,
  bat.highest_score,
  divide(bat.batter_runs, out.out_count) AS batting_average,
  divide(bat.batter_runs * 100, bat.delivery_cnt - bat.wide_cnt) AS strike_rate,
  bat.100s_cnt,
  bat.50s_cnt,
  bat.4s_cnt,
  bat.6s_cnt,
  bowl.bowl_match_cnt,
  bowl.wicket_cnt,
  bowl.best_bowl,
  bowl.5wicket_cnt,
  divide(bowl.conceded_runs, bowl.wicket_cnt) AS bowling_average,
  divide(bowl.conceded_runs, bowl.delivery_cnt - bowl.wide_cnt - bowl.no_ball_cnt, 6) AS economy,
  nvl(fld.catch_cnt, 0) + nvl(bowl.catch_cnt, 0) AS catch_cnt,
  tpl.player_of_match_cnt
FROM
  tpl
  LEFT OUTER JOIN bat ON (bat.batter_name = tpl.player_name)
  LEFT OUTER JOIN bowl ON (bowl.bowler_name = tpl.player_name)
  LEFT OUTER JOIN fld ON (fld.fielder_name = tpl.player_name)
  LEFT OUTER JOIN out ON (out.player_out = tpl.player_name)
ORDER BY
  tpl.match_cnt DESC

# IPL Results

In [0]:
%sql with mat as (
  select
    event_name,
    season,
    match_id,
    match_date,
    match_winner,
    toss_winner,
    toss_decision,
    first_bat,
    first_field
  from
    dim.t_match
),
base as (
  select
    max(event_name) as event_name,
    max(season) as season,
    match_id,
    max(match_date) as match_date,
    max(match_winner) as match_winner,
    max(toss_winner) as toss_winner,
    max(toss_decision) as toss_decision,
    max(first_bat) as first_bat,
    max(first_field) as first_field,
    sum(
      case
        when innings = 1 then total_runs
      end
    ) AS runs_scored_first,
    sum(
      case
        when innings = 2 then total_runs
      end
    ) AS runs_scored_second,
    count(
      distinct case
        when innings = 1 then player_out
      end
    ) AS player_out_first,
    count(
      distinct case
        when innings = 2 then player_out
      end
    ) AS player_out_second
  from
    fact.t_deliveries
    inner join mat using (match_id)
  GROUP by
    match_id
)
select
  event_name,
  season,
  match_date,
  first_bat || ' vs ' || first_field AS teams,
  toss_winner || ' won the toss and elected to ' || toss_decision as toss_result,
  first_bat || ': ' || runs_scored_first || '/' || player_out_first || Char(10) || first_field || ': ' || runs_scored_second || '/' || player_out_second as score,
  CASE
    WHEN match_winner = 'tie' then match_winner
    else match_winner || ' won by ' || case
      when match_winner = first_bat then runs_scored_first - runs_scored_second || ' runs'
      else 10 - player_out_second || ' wkts'
    end
  end as match_result
from
  base
order by
  match_id desc