In [3]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

StatementMeta(, 200ba7a1-55ae-470b-88ca-d8e927cacb90, 11, Finished, Available, Finished)

In [4]:
schema = StructType([
    StructField('info', StructType([
        StructField('city', StringType(), True),
        StructField('dates', ArrayType(StringType(), True), True),
        StructField('event', StructType([
            StructField('match_number', LongType(), True),
            StructField('stage', StringType(), True)
        ]), True),
        StructField('outcome', StructType([
            StructField('by', StructType([
                StructField('runs', LongType(), True),
                StructField('wickets', LongType(), True)
            ]), True),
            StructField('winner', StringType(), True),
            StructField('result', StringType(), True),
            StructField('eliminator', StringType(), True)
        ]), True),
        StructField('overs', LongType(), True),
        StructField('player_of_match', ArrayType(StringType(), True), True),
        StructField('season', StringType(), True),
        StructField('teams', ArrayType(StringType(), True), True),
        StructField('toss', StructType([
            StructField('decision', StringType(), True),
            StructField('winner', StringType(), True)
        ]), True),
        StructField('venue', StringType(), True)
    ]), True),
    StructField('innings', ArrayType(StructType([
        StructField('overs', ArrayType(StructType([
            StructField('deliveries', ArrayType(StructType([
                StructField('batter', StringType(), True),
                StructField('bowler', StringType(), True),
                StructField('extras', StructType([
                    StructField('byes', LongType(), True),
                    StructField('legbyes', LongType(), True),
                    StructField('noballs', LongType(), True),
                    StructField('penalty', LongType(), True),
                    StructField('wides', LongType(), True)
                ]), True),
                StructField('non_striker', StringType(), True),
                StructField('runs', StructType([
                    StructField('batter', LongType(), True),
                    StructField('extras', LongType(), True),
                    StructField('total', LongType(), True),
                    StructField('non_boundary', BooleanType(), True)
                ]), True),
                StructField('wickets', ArrayType(StructType([
                    StructField('fielders', ArrayType(StructType([
                        StructField('name', StringType(), True),
                        StructField('substitute', BooleanType(), True)
                    ]), True), True),
                    StructField('kind', StringType(), True),
                    StructField('player_out', StringType(), True)
                ]), True), True)
            ]), True), True),
            StructField('over', LongType(), True)
        ]), True), True),
        StructField('powerplays', ArrayType(StructType([
            StructField('from', DoubleType(), True),
            StructField('to', DoubleType(), True),
            StructField('type', StringType(), True)
        ]), True), True),
        StructField('target', StructType([
            StructField('overs', LongType(), True),
            StructField('runs', LongType(), True)
        ]), True),
        StructField('team', StringType(), True),
        StructField('super_over', BooleanType(), True)
    ]), True), True),
    StructField('meta', StructType([
        StructField('created', StringType(), True),
        StructField('revision', LongType(), True)
    ]), True)
])

In [5]:
df = spark.read.option("multiline", "true").schema(schema).json("Files/ipl_json/*.json")

StatementMeta(, 200ba7a1-55ae-470b-88ca-d8e927cacb90, 12, Finished, Available, Finished)

In [5]:
df_flat = df \
    .withColumn("innings", explode(col("innings"))) \
    .withColumn("over", explode(col("innings.overs"))) \
    .withColumn("delivery", explode(col("over.deliveries"))) \
    .withColumn("wicket", explode_outer(col("delivery.wickets"))) \
    .withColumn('filename', regexp_replace(regexp_extract(input_file_name(), r"([^/]+$)", 1), r"\.json$", "")) \
    .filter(col("innings.super_over").isNull())

window_spec = Window.partitionBy("innings", "over").orderBy(monotonically_increasing_id())

df_flat = df_flat.withColumn("delivery_id", row_number().over(window_spec))   

StatementMeta(, 200ba7a1-55ae-470b-88ca-d8e927cacb90, 13, Finished, Available, Finished)

In [6]:
df_result = df_flat.select(col('meta.created')
                        , col('meta.revision')
                        , col('info.city')
                        , col('info.dates')[0].alias('date')
                        , col('info.dates')[1].alias('reserve_date')
                        , col('info.season')
                        , col('info.event.match_number')
                        , when(col('info.event.stage').isNotNull(), col('info.event.stage')) \
                            .otherwise('group').alias('stage')
                        , when(col('info.outcome.winner').isNotNull(), col('info.outcome.winner')) \
                            .otherwise(col('info.outcome.eliminator')).alias('winner_team')
                        , col('info.outcome.result')
                        , col('info.outcome.by.runs').alias('by_runs')
                        , col('info.outcome.by.wickets').alias('by_wickets')
                        , col('info.player_of_match')[0].alias('player_of_match')
                        , col('info.toss.decision').alias('toss_decision')
                        , col('info.toss.winner').alias('toss_winner_team')
                        , col('info.venue')
                        , col('info.teams')[0].alias('team1')
                        , col('info.teams')[1].alias('team2')
                        , col("innings.team").alias("bat_team")
                        , when(col("innings.team") != col("info.teams")[0], col("info.teams")[0]) \
                            .otherwise(col("info.teams")[1]).alias("bowl_team")
                        , col("over.over").alias("over")  
                        , col("delivery_id")
                        , col("delivery.batter")
                        , col("delivery.bowler")
                        , col("delivery.non_striker")
                        , col("delivery.runs.batter").alias("bat_runs")
                        , col("delivery.runs.extras").alias("extra_runs")
                        , col("delivery.runs.total").alias("total_runs")
                        , col("delivery.runs.non_boundary").alias("non_boundary")
                        , col("delivery.extras.byes").alias("byes")
                        , col("delivery.extras.legbyes").alias("legbyes")
                        , col("delivery.extras.noballs").alias("noballs")
                        , col("delivery.extras.penalty").alias("penalty")
                        , col("delivery.extras.wides").alias("wides")
                        , col("wicket.kind").alias("wicket")
                        , col("wicket.player_out").alias("player_out")
                        , col("wicket.fielders")[0].getField("name").alias("fielder1")
                        , col("wicket.fielders")[1].getField("name").alias("fielder2")
                        , col("filename")
                    )

StatementMeta(, 200ba7a1-55ae-470b-88ca-d8e927cacb90, 17, Finished, Available, Finished)

In [7]:
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "Tables/innings_silver")

dfUpdates = df_result

deltaTable.alias("silver") \
  .merge(
    dfUpdates.alias("updates"),
    """
    silver.file_name = updates.filename AND
    silver.bat_team = updates.bat_team AND
    silver.over = updates.over AND
    silver.delivery_id = updates.delivery_id
    """
  ) \
  .whenMatchedUpdate(set={
    "created": "updates.created",
    "revision": "updates.revision",
    "city": "updates.city",
    "date": "updates.date",
    "reserve_date": "updates.reserve_date",
    "season": "updates.season",
    "match_number": "updates.match_number",
    "stage": "updates.stage",
    "winner_team": "updates.winner_team",
    "result": "updates.result",
    "by_runs": "updates.by_runs",
    "by_wickets": "updates.by_wickets",
    "player_of_match": "updates.player_of_match",
    "toss_decision": "updates.toss_decision",
    "toss_winner_team": "updates.toss_winner_team",
    "venue": "updates.venue",
    "team1": "updates.team1",
    "team2": "updates.team2",
    "batter": "updates.batter",
    "bowler": "updates.bowler", 
    "non_striker": "updates.non_striker",
    "bat_runs": "updates.bat_runs",
    "extra_runs": "updates.extra_runs",
    "total_runs": "updates.total_runs",
    "non_boundary": "updates.non_boundary",
    "byes": "updates.byes",
    "legbyes": "updates.legbyes",
    "noballs": "updates.noballs",
    "penalty": "updates.penalty",
    "wides": "updates.wides",
    "wicket": "updates.wicket",
    "player_out": "updates.player_out",
    "fielder1": "updates.fielder1",
    "fielder2": "updates.fielder2"
  }) \
  .whenNotMatchedInsert(values={
    "file_name": "updates.filename",
    "created": "updates.created",
    "revision": "updates.revision",
    "city": "updates.city",
    "date": "updates.date",
    "reserve_date": "updates.reserve_date",
    "season": "updates.season",
    "match_number": "updates.match_number",
    "stage": "updates.stage",
    "winner_team": "updates.winner_team",
    "result": "updates.result",
    "by_runs": "updates.by_runs",
    "by_wickets": "updates.by_wickets",
    "player_of_match": "updates.player_of_match",
    "toss_decision": "updates.toss_decision",
    "toss_winner_team": "updates.toss_winner_team",
    "venue": "updates.venue",
    "team1": "updates.team1",
    "team2": "updates.team2",
    "bat_team": "updates.bat_team",
    "bowl_team": "updates.bowl_team",
    "over": "updates.over",
    "delivery_id": "updates.delivery_id",
    "batter": "updates.batter",
    "bowler": "updates.bowler",
    "non_striker": "updates.non_striker",
    "bat_runs": "updates.bat_runs",
    "extra_runs": "updates.extra_runs",
    "total_runs": "updates.total_runs",
    "non_boundary": "updates.non_boundary",
    "byes": "updates.byes",
    "legbyes": "updates.legbyes",
    "noballs": "updates.noballs",
    "penalty": "updates.penalty",
    "wides": "updates.wides",
    "wicket": "updates.wicket",
    "player_out": "updates.player_out",
    "fielder1": "updates.fielder1",
    "fielder2": "updates.fielder2"
  }) \
  .execute()


StatementMeta(, 02e5e6dc-e556-47c6-a2a3-4b9653bf7282, 9, Finished, Available, Finished)