In [0]:
print("hello silver")

In [0]:
latest_match_id = '335987'

In [0]:
df = spark.read.table("ipl.bronze.ball_by_ball")
df = df.filter(df.MatcH_id == latest_match_id)

In [0]:
display(df.limit(10))

In [0]:
col_to_drop = ['Match_Date', 'Season', 'Striker_match_SK', 'NonStriker_match_SK', 'Fielder_match_SK', 'Bowler_match_SK', 'PlayerOut_match_SK', 'MatchDateSK', 'file_name', 'insert_ts']
df =  df.drop(*col_to_drop)

In [0]:
from pyspark.sql.functions import current_timestamp
df = df.withColumn('insert_ts', current_timestamp())

In [0]:
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import IntegerType, StringType, TimestampType


bronze_df = (
    df
    # Match & Ball identifiers
    .withColumn("MatcH_id", col("MatcH_id").cast(IntegerType()))
    .withColumn("Over_id", col("Over_id").cast(IntegerType()))
    .withColumn("Ball_id", col("Ball_id").cast(IntegerType()))
    .withColumn("Innings_No", col("Innings_No").cast(IntegerType()))

    # Teams
    .withColumn("Team_Batting", col("Team_Batting").cast(IntegerType()))
    .withColumn("Team_Bowling", col("Team_Bowling").cast(IntegerType()))

    # Batting position
    .withColumn("Striker_Batting_Position", regexp_replace("Striker_Batting_Position", r"\.0$", "").cast(IntegerType()))

    # Extras & runs
    .withColumn("Extra_Type", col("Extra_Type").cast(StringType()))
    .withColumn("Runs_Scored", col("Runs_Scored").cast(IntegerType()))
    .withColumn("Extra_runs", col("Extra_runs").cast(IntegerType()))
    .withColumn("Wides", col("Wides").cast(IntegerType()))
    .withColumn("Legbyes", col("Legbyes").cast(IntegerType()))
    .withColumn("Byes", col("Byes").cast(IntegerType()))
    .withColumn("Noballs", col("Noballs").cast(IntegerType()))
    .withColumn("Penalty", col("Penalty").cast(IntegerType()))
    .withColumn("Bowler_Extras", col("Bowler_Extras").cast(IntegerType()))

    # Wicket details
    .withColumn("Out_type", col("Out_type").cast(StringType()))
    .withColumn("Caught", col("Caught").cast(IntegerType()))
    .withColumn("Bowled", col("Bowled").cast(IntegerType()))
    .withColumn("Run_out", col("Run_out").cast(IntegerType()))
    .withColumn("LBW", col("LBW").cast(IntegerType()))
    .withColumn("Retired_hurt", col("Retired_hurt").cast(IntegerType()))
    .withColumn("Stumped", col("Stumped").cast(IntegerType()))
    .withColumn("caught_and_bowled", col("caught_and_bowled").cast(IntegerType()))
    .withColumn("hit_wicket", col("hit_wicket").cast(IntegerType()))
    .withColumn("ObstructingFeild", col("ObstructingFeild").cast(IntegerType()))
    .withColumn("Bowler_Wicket", col("Bowler_Wicket").cast(IntegerType()))
    .withColumn("Keeper_Catch", col("Keeper_Catch").cast(IntegerType()))

    # Player names
    .withColumn("Striker", col("Striker").cast(StringType()))
    .withColumn("Non_Striker", col("Non_Striker").cast(StringType()))
    .withColumn("Bowler", col("Bowler").cast(StringType()))
    .withColumn("Player_Out", col("Player_Out").cast(StringType()))
    .withColumn("Fielders", col("Fielders").cast(StringType()))

    # Surrogate keys
    .withColumn("StrikerSK", col("StrikerSK").cast(IntegerType()))
    .withColumn("NONStriker_SK", col("NONStriker_SK").cast(IntegerType()))
    .withColumn("Fielder_SK", col("Fielder_SK").cast(IntegerType()))
    .withColumn("BOWLER_SK", col("BOWLER_SK").cast(IntegerType()))
    .withColumn("BattingTeam_SK", col("BattingTeam_SK").cast(IntegerType()))
    .withColumn("BowlingTeam_SK", col("BowlingTeam_SK").cast(IntegerType()))
    .withColumn("Player_out_sk", col("Player_out_sk").cast(IntegerType()))

    # Technical columns
    .withColumn("insert_ts", col("insert_ts").cast(TimestampType()))
    .withColumn("row_id", col("row_id").cast(StringType()))
)


In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import col, sum as spark_sum

w = (
    Window
    .partitionBy("MatcH_id", "Innings_No", "Over_id")
    .orderBy("Ball_id")
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

bronze_df = bronze_df.withColumn(
    "ball_no",
    col("Ball_id") - spark_sum("Bowler_Extras").over(w).cast(IntegerType())
)


In [0]:
bronze_df = bronze_df.select([col(c).alias(c.lower()) for c in df.columns])

In [0]:
bronze_df.printSchema()

In [0]:
display(bronze_df.limit(10))

In [0]:
from delta.tables import DeltaTable
target = DeltaTable.forName(spark, "ipl.silver.ball_by_ball").alias('tgt')
source = bronze_df.alias('src')

# merging
(
    target
    .merge(
        source,
        "tgt.row_id = src.row_id"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)