### 1. Azure Databricks to ADLS Connection

This section establishes a secure connection between the Databricks cluster and Azure Data Lake Storage account using Service Principal credentials stored in Azure Key Vault. This is a best practice for managing secrets, as it prevents hard-coding sensitive information directly in your notebook.

In [0]:
#Connection between Storage account and Databricks
app_secret = dbutils.secrets.get(scope="key-vault-scope",key="app-secret")
tanent_id = dbutils.secrets.get(scope="key-vault-scope",key="tanent-id")
application_id = dbutils.secrets.get(scope="key-vault-scope",key="application-id")

spark.conf.set("fs.azure.account.auth.type.cricketraw.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.cricketraw.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.cricketraw.dfs.core.windows.net", application_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.cricketraw.dfs.core.windows.net", app_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.cricketraw.dfs.core.windows.net", "https://login.microsoftonline.com/"+tanent_id+"/oauth2/token")

### 2. Verify Data Availability

This command confirms that the connection is successful and that the raw cricket data files are accessible in the `landing` container of your ADLS account.

In [0]:
# List the files in the 'ipl' directory within the 'landing' container
dbutils.fs.ls("abfss://landing@cricketraw.dfs.core.windows.net/ipl")

### 3. Import Libraries & Define Paths
Here, we import the necessary PySpark functions for data manipulation and define the file paths for our Bronze and Silver layers. This helps keep the code clean and easy to manage.

In [0]:
# Import PySpark functions for data processing and schema definition
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType,MapType, LongType
from pyspark.sql.functions import col, explode, when, array_except, monotonically_increasing_id, row_number, input_file_name, regexp_extract, array
from pyspark.sql import Window

# Define the base paths for the Bronze (raw) and Silver (Delta) layers
bronze_path = "abfss://landing@cricketraw.dfs.core.windows.net/ipl/"
silver_delta = "abfss://delta@cricketraw.dfs.core.windows.net/silver/ipl/"

### 4. Schema Definition
A robust schema is defined to ensure the data is read correctly and to prevent data corruption. This schema maps the nested structure of the raw JSON files, including arrays and nested structs, to a DataFrame.

In [0]:
# Define the Schema
jsonSchema = StructType(
    [
        StructField(
            "meta",
            StructType(
                [
                    StructField("data_version", StringType(), True),
                    StructField("created", StringType(), True),
                    StructField("revision", LongType(), True),
                ]
            ),
            True,
        ),
        StructField(
            "info",
            StructType(
                [
                    StructField("balls_per_over", LongType(), True),
                    StructField("city", StringType(), True),
                    StructField("dates", ArrayType(StringType(), True), True),
                    StructField(
                        "event",
                        StructType(
                            [
                                StructField("match_number", LongType(), True),
                                StructField("name", StringType(), True),
                            ]
                        ),
                        True,
                    ),
                    StructField("gender", StringType(), True),
                    StructField("match_type", StringType(), True),
                    StructField(
                        "officials",
                        StructType(
                            [
                                StructField(
                                    "match_referees", ArrayType(StringType(), True), True
                                ),
                                StructField(
                                    "reserve_umpires", ArrayType(StringType(), True), True
                                ),
                                StructField(
                                    "tv_umpires", ArrayType(StringType(), True), True
                                ),
                                StructField("umpires", ArrayType(StringType(), True), True),
                            ]
                        ),
                        True,
                    ),
                    StructField(
                        "outcome",
                        StructType(
                            [
                                StructField(
                                    "by",
                                    StructType(
                                        [
                                            StructField("runs", LongType(), True),
                                            StructField("wickets", LongType(), True),
                                        ]
                                    ),
                                    True,
                                ),
                                StructField("winner", StringType(), True),
                            ]
                        ),
                        True,
                    ),
                    StructField("overs", LongType(), True),
                    StructField(
                        "player_of_match", ArrayType(StringType(), True), True
                    ),
                    StructField(
                        "players",
                        MapType(StringType(), ArrayType(StringType(), True)),
                        True,
                    ),
                    StructField(
                        "registry",
                        StructType(
                            [
                                StructField(
                                    "people", MapType(StringType(), StringType()), True
                                )
                            ]
                        ),
                        True,
                    ),
                    StructField("season", StringType(), True),
                    StructField("team_type", StringType(), True),
                    StructField("teams", ArrayType(StringType(), True), True),
                    StructField(
                        "toss",
                        StructType(
                            [
                                StructField("decision", StringType(), True),
                                StructField("winner", StringType(), True),
                            ]
                        ),
                        True,
                    ),
                    StructField("venue", StringType(), True),
                ]
            ),
            True,
        ),
        StructField(
            "innings",
            ArrayType(
                StructType(
                    [
                        StructField("team", StringType(), True),
                        StructField(
                            "overs",
                            ArrayType(
                                StructType(
                                    [
                                        StructField("over", LongType(), True),
                                        StructField(
                                            "deliveries",
                                            ArrayType(
                                                StructType(
                                                    [
                                                        StructField(
                                                            "batter", StringType(), True
                                                        ),
                                                        StructField(
                                                            "bowler", StringType(), True
                                                        ),
                                                        StructField(
                                                            "extras",
                                                            StructType(
                                                                [
                                                                    StructField(
                                                                        "legbyes", LongType(), True
                                                                    ),
                                                                    StructField(
                                                                        "wides", LongType(), True
                                                                    ),
                                                                    StructField("byes", LongType(), True),
                                                                    StructField("noballs", LongType(), True),


                                                                ]
                                                            ),
                                                            True,
                                                        ),
                                                        StructField(
                                                            "non_striker", StringType(), True
                                                        ),
                                                        StructField(
                                                            "runs",
                                                            StructType(
                                                                [
                                                                    StructField(
                                                                        "batter", LongType(), True
                                                                    ),
                                                                    StructField(
                                                                        "extras", LongType(), True
                                                                    ),
                                                                    StructField(
                                                                        "total", LongType(), True
                                                                    ),
                                                                ]
                                                            ),
                                                            True,
                                                        ),
                                                        StructField(
                                                            "wickets",
                                                            ArrayType(
                                                                StructType(
                                                                    [
                                                                        StructField("kind", StringType(), True),
                                                                        StructField("player_out", StringType(), True),
                                                                        StructField(
                                                                            "fielders",
                                                                            ArrayType(
                                                                                StructType(
                                                                                    [
                                                                                        StructField("name", StringType(), True)
                                                                                    ]
                                                                                ),
                                                                                True
                                                                            ),
                                                                            True,
                                                                        ),
                                                                    ]
                                                                )
                                                            ),
                                                            True
                                                        ),
                                                    ]
                                                ),
                                                True,
                                            ),
                                            True,
                                        ),
                                    ]
                                )
                            ),
                            True,
                        ),
                    ]
                )
            ),
            True,
        ),
    ]
)

### 5. Data Normalization and Flattening
This is the core data processing step. We read the raw data, and then apply a series of transformations to normalize and flatten the complex nested JSON structure into six clean, relational tables. Each table serves a specific purpose, adhering to a normalized schema for efficient querying.

In [0]:
# Read the raw JSON data into a DataFrame
raw_df = spark.read.json(bronze_path, schema=jsonSchema, multiLine=True)
print(f"Count:{raw_df.count()}")

# Adding unique match_id and season for Joining
df_with_match_id = raw_df.withColumn(
    "match_id", regexp_extract(col("_metadata.file_path"), r"\/(\w+)\.json$", 1)
).withColumn("season", col("info.season"))

#Creating match info table
match_info_df = df_with_match_id.select(
    col("match_id"),
    col("season"),
    col("info.match_type").alias("match_type"),
    col("info.city").alias("city"),
    col("info.venue").alias("venue"),
    col("info.dates").getItem(0).alias("date"),
    col("info.toss.winner").alias("toss_winner"),
    col("info.toss.decision").alias("toss_decision"),
    col("info.outcome.winner").alias("winner"),
    when(col("info.outcome.by.runs").isNotNull(), "runs")
        .when(col("info.outcome.by.wickets").isNotNull(), "wickets")
        .otherwise(None)
        .alias("win_margin_type"),
    when(col("info.outcome.by.runs").isNotNull(), col("info.outcome.by.runs"))
        .when(col("info.outcome.by.wickets").isNotNull(), col("info.outcome.by.wickets"))
        .otherwise(None)
        .alias("win_margin_value"),
    col("info.player_of_match").getItem(0).alias("player_of_match")
)

#creating teams table
teams_df = df_with_match_id.select(
    col("match_id"),
    col("season"),
    explode(col("info.players")).alias("team_name", "players_list")
)

#Creating players registry table
players_registry_df = df_with_match_id.select(
    explode(col("info.registry.people")).alias("player_name", "player_id")
).distinct()

#Creating innings table
innings_df = df_with_match_id.select(
    col("match_id"),
    col("season"),
    explode(col("innings")).alias("innings_data"),
    col("info.teams").alias("all_teams")
).withColumn(
    "batting_team", col("innings_data.team")
).withColumn(
    "bowling_team", array_except(
        col("all_teams"), array(col("batting_team"))
    ).getItem(0)
).select(
    "match_id",
    "season",
    "batting_team",
    "bowling_team"
).withColumn(
    "innings_no", row_number().over(Window.partitionBy("match_id").orderBy(monotonically_increasing_id()))
)

#Creating deliveries table
deliveries_base_df = df_with_match_id.select(
    col("match_id"),
    col("season"),
    explode(col("innings")).alias("innings_data")
).select(
    col("match_id"),
    col("season"),
    col("innings_data.team").alias("batting_team"),
    explode(col("innings_data.overs")).alias("over_data")
).select(
    col("match_id"),
    col("season"),
    col("batting_team"),
    col("over_data.over").alias("over"),
    explode(col("over_data.deliveries")).alias("delivery_data")
).select(
    col("match_id"),
    col("season"),
    col("batting_team"),
    col("over"),
    col("delivery_data.batter").alias("batsman"),
    col("delivery_data.bowler").alias("bowler"),
    col("delivery_data.non_striker").alias("non_striker"),
    col("delivery_data.runs.batter").alias("runs_batsman"),
    col("delivery_data.runs.extras").alias("runs_extras"),
    col("delivery_data.runs.total").alias("runs_total"),
    when(col("delivery_data.extras.wides").isNotNull(), "wides")
        .when(col("delivery_data.extras.legbyes").isNotNull(), "legbyes")
        .when(col("delivery_data.extras.noballs").isNotNull(), "noballs")
        .when(col("delivery_data.extras.byes").isNotNull(), "byes")
        .otherwise(None)
        .alias("extras_type"),
    col("delivery_data.wickets").alias("wickets_data")
)

# Adding a sequential `innings_no` and `ball_no` to the deliveries table
deliveries_df = deliveries_base_df.withColumn(
    "ball",
    row_number().over(Window.partitionBy("match_id", "batting_team", "over").orderBy(monotonically_increasing_id()))
).withColumn(
    "innings_no",
    row_number().over(Window.partitionBy("match_id", "batting_team").orderBy(monotonically_increasing_id()))
).drop("wickets_data", "batting_team")

#Creating wickets table - Captures wicket-taking events
wickets_df = deliveries_base_df.select(
    col("match_id"),
    col("season"),
    col("over"),
    col("batting_team"),
    explode(col("wickets_data")).alias("wicket_data")
).withColumn(
    "ball",
    row_number().over(Window.partitionBy("match_id", "batting_team", "over").orderBy(monotonically_increasing_id()))
).withColumn(
    "innings_no",
    row_number().over(Window.partitionBy("match_id", "batting_team").orderBy(monotonically_increasing_id()))
).withColumn(
    "fielder_data", explode("wicket_data.fielders")
).select(
    "match_id",
    "season",
    "innings_no",
    "over",
    "ball",
    col("wicket_data.kind").alias("kind"),
    col("wicket_data.player_out").alias("player_out"),
    col("fielder_data.name").alias("fielder_name")
).dropDuplicates()

#Defining paths for the Delta tables
match_info_path = f"{silver_delta}match_info"
teams_path = f"{silver_delta}teams"
players_registry_path = f"{silver_delta}players_registry"
innings_path = f"{silver_delta}innings"
deliveries_path = f"{silver_delta}deliveries"
wickets_path = f"{silver_delta}wickets"

# Show a sample of the data
match_info_df.show(1)

### 6. Write to Delta Lake & Register Tables

Finally, the normalized DataFrames are written to the `delta` container in Delta Lake format. The tables are partitioned by `season` where appropriate to optimize query performance. After writing the data, we register these Delta tables with Unity Catalog, making them accessible via SQL for analysis.

In [0]:
# Write each DataFrame to its respective Delta table, partitioned by 'season'
match_info_df.write.partitionBy("season").format("delta").mode("overwrite").save(match_info_path)
teams_df.write.partitionBy("season").format("delta").mode("overwrite").save(teams_path)
players_registry_df.write.format("delta").mode("overwrite").save(players_registry_path)
innings_df.write.partitionBy("season").format("delta").mode("overwrite").save(innings_path)
deliveries_df.write.partitionBy("season").format("delta").mode("overwrite").save(deliveries_path)
wickets_df.write.partitionBy("season").format("delta").mode("overwrite").save(wickets_path)

# Registering the Delta tables with Unity Catalog
spark.sql(f"CREATE TABLE IF NOT EXISTS match_info USING DELTA LOCATION '{match_info_path}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS teams USING DELTA LOCATION '{teams_path}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS players_registry USING DELTA LOCATION '{players_registry_path}'")    
spark.sql(f"CREATE TABLE IF NOT EXISTS innings USING DELTA LOCATION '{innings_path}'")  
spark.sql(f"CREATE TABLE IF NOT EXISTS deliveries USING DELTA LOCATION '{deliveries_path}'") 
spark.sql(f"CREATE TABLE IF NOT EXISTS wickets USING DELTA LOCATION '{wickets_path}'")    

### 7. SQL Query for Verification
This cell demonstrates how to query the newly created tables using SQL, confirming that the data is correctly loaded and accessible.

In [0]:
%sql
SELECT count(*) FROM match_info;
SELECT count(*) FROM teams;
SELECT count(*) FROM players_registry;
SELECT count(*) FROM innings;
SELECT count(*) FROM deliveries;
SELECT count(*) FROM wickets;