connect to adls using service principle (app registration)

In [0]:
#setup a Azure app "databricks_adls_connection" using App Registration who's credential will be used to access ADLS from Azure Databricks 
# when we create an app a secret will be created and this will be stored in Azure Key Vault "dbadlsconn"(vault name) along with client_id, tenant_id of the registered app in valuts objects/secrets section for secure access
# You can store all the access related secret's in Azure Key Vault
#Now to access the secrets from the valut we need to create Azure Key Vault-backed secret scope in Azure Databricks see link below for guidance, 
#https://learn.microsoft.com/en-us/azure/databricks/security/secrets/
#In my case I have created a scope "key-vault-secret" referting to the Azure key vault DNS and Resource ID so we can access the secrets usind dbutils.secrets.get(scope="<scope_name>", key="<key_name>")

#Here <scope_name> is the name of the scope created Azure Databricks https://<databricks-instance>#secrets/createScope in my case it is key-vault-secret
#AND <key_name> is the name of the secret created in Azure Key Vault, in my case I have created a secret "sp-client-id" and "sp-secret-key" and "sp-tenent-id"


#Lastly Access: 
#  Grant Azure Databricks to access the Azure Key Vault, this can be done by giving "key valut secret users" IAM role to the 
#  Azure Databricks user.

#Grant registered app "databricks_adls_connection" to access the container in storage account, granted "Storage Blob Data Contributor" IAM role to the "databricks_adls_connection" 

#Below code establishes connection from Azure Databricks to ADLS container using the Azure Key Vault-backed secret's
 
client_id = dbutils.secrets.get(scope="key-vault-secret", key="sp-client-id")
tenant_id = dbutils.secrets.get(scope="key-vault-secret", key="sp-tenent-id")
client_secret_key = dbutils.secrets.get(scope="key-vault-secret", key="sp-secret-key")

spark.conf.set("fs.azure.account.auth.type.iplsandbox.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.iplsandbox.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.iplsandbox.dfs.core.windows.net", f"{client_id}")
spark.conf.set("fs.azure.account.oauth2.client.secret.iplsandbox.dfs.core.windows.net", f"{client_secret_key}")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.iplsandbox.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

Mount adls bronze zone container to databricks workspace

In [0]:
adls_path = "abfss://ipldata@iplsandbox.dfs.core.windows.net/"
mount_point = "/mnt/ipldata"
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": f"{client_id}",
    "fs.azure.account.oauth2.client.secret": f"{client_secret_key}",
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
    }
try: 
    dbutils.fs.ls(mount_point)
    print(f"{adls_path} already mounted to {mount_point}...")
except Exception:
    dbutils.fs.mount(source=adls_path,mount_point=mount_point,extra_configs=configs)

abfss://ipldata@iplsandbox.dfs.core.windows.net/ already mounted to /mnt/ipldata...


In [0]:
%fs 
ls  "/mnt/ipldata"

path,name,size,modificationTime
dbfs:/mnt/ipldata/bronze/,bronze/,0,1731618076000
dbfs:/mnt/ipldata/gold/,gold/,0,1731618089000
dbfs:/mnt/ipldata/silver/,silver/,0,1731618083000


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType,DecimalType
import json


# Initialize SparkSession
spark = SparkSession.builder.appName("IPL_Orange_Cap_Stats").getOrCreate()


#schema's
raw_ball_by_ball_stats_schema = StructType([
    StructField("Match_id", IntegerType(), False),
    StructField("Over_id", IntegerType(), False),
    StructField("Ball_id", IntegerType(), False),
    StructField("Innings_No", IntegerType(), False),
    StructField("Team_Batting", IntegerType(), True),
    StructField("Team_Bowling", IntegerType(), True),
    StructField("Striker_Batting_Position", IntegerType(), True),
    StructField("Extra_Type", StringType(), True),
    StructField("Runs_Scored", IntegerType(), True),
    StructField("Extra_runs", IntegerType(), True),
    StructField("Wides", IntegerType(), True),
    StructField("Legbyes", IntegerType(), True),
    StructField("Byes", IntegerType(), True),
    StructField("Noballs", IntegerType(), True),
    StructField("Penalty", IntegerType(), True),
    StructField("Bowler_Extras", IntegerType(), True),
    StructField("Out_type", StringType(), True),
    StructField("Caught", IntegerType(), True),
    StructField("Bowled", IntegerType(), True),
    StructField("Run_out", IntegerType(), True),
    StructField("LBW", IntegerType(), True),
    StructField("Retired_hurt", IntegerType(), True),
    StructField("Stumped", IntegerType(), True),
    StructField("caught_and_bowled", IntegerType(), True),
    StructField("hit_wicket", IntegerType(), True),
    StructField("ObstructingFeild", IntegerType(), True),
    StructField("Bowler_Wicket", IntegerType(), True),
    StructField("Match_Date", DateType(), True),
    StructField("Season", IntegerType(), False),
    StructField("Striker", IntegerType(), True),
    StructField("Non_Striker", IntegerType(), True),
    StructField("Bowler", IntegerType(), True),
    StructField("Player_Out", IntegerType(), True),
    StructField("Fielders", IntegerType(), True),
    StructField("Striker_match_SK", IntegerType(), True),
    StructField("StrikerSK", IntegerType(), True),
    StructField("NonStriker_match_SK", IntegerType(), True),
    StructField("NONStriker_SK", IntegerType(), True),
    StructField("Fielder_match_SK", IntegerType(), True),
    StructField("Fielder_SK", IntegerType(), True),
    StructField("Bowler_match_SK", IntegerType(), True),
    StructField("BOWLER_SK", IntegerType(), True),
    StructField("PlayerOut_match_SK", IntegerType(), True),
    StructField("BattingTeam_SK", IntegerType(), True),
    StructField("BowlingTeam_SK", IntegerType(), True),
    StructField("Keeper_Catch", IntegerType(), True),
    StructField("Player_out_sk", IntegerType(), True),
    StructField("MatchDateSK", IntegerType(), True)  # Changed type from DATA to IntegerType
])

raw_player_match_schema= StructType([
    StructField("Player_match_SK", IntegerType(), False),
    StructField("PlayerMatch_key", DecimalType(20, 0), False),  # DECIMAL(20)
    StructField("Match_Id", IntegerType(), False),
    StructField("Player_Id", IntegerType(), False),
    StructField("Player_Name", StringType(), True),  # VARCHAR(255)
    StructField("DOB", DateType(), True),  # DATE
    StructField("Batting_hand", StringType(), True),  # VARCHAR(50)
    StructField("Bowling_skill", StringType(), True),  # VARCHAR(50)
    StructField("Country_Name", StringType(), True),  # VARCHAR(100)
    StructField("Role_Desc", StringType(), True),  # VARCHAR(100)
    StructField("Player_team", StringType(), True),  # VARCHAR(100)
    StructField("Opposit_Team", StringType(), True),  # VARCHAR(100)
    StructField("Season_year", IntegerType(), False),
    StructField("is_manofThematch", IntegerType(), True),
    StructField("Age_As_on_match", IntegerType(), True),
    StructField("IsPlayers_Team_won", IntegerType(), True),
    StructField("Batting_Status", StringType(), True),  # VARCHAR(50)
    StructField("Bowling_Status", StringType(), True),  # VARCHAR(50)
    StructField("Player_Captain", StringType(), True),  # VARCHAR(50)
    StructField("Opposit_captain", StringType(), True),  # VARCHAR(50)
    StructField("Player_keeper", StringType(), True),  # VARCHAR(50)
    StructField("Opposit_keeper", StringType(), True)  # VARCHAR(50)
])

raw_players_data_schema = StructType([
    StructField("PLAYER_SK", IntegerType(), nullable=False),
    StructField("Player_Id", IntegerType(), nullable=False),
    StructField("Player_Name", StringType(), nullable=True),
    StructField("DOB", DateType(), nullable=True),
    StructField("Batting_hand", StringType(), nullable=True),
    StructField("Bowling_skill", StringType(), nullable=True),
    StructField("Country_Name", StringType(), nullable=True)
])


raw_matches_schema = StructType([
    StructField("Match_SK", IntegerType(), nullable=False),
    StructField("match_id", IntegerType(), nullable=False),
    StructField("Team1", StringType(), nullable=True),
    StructField("Team2", StringType(), nullable=True),
    StructField("match_date", StringType(), nullable=True),
    StructField("Season_Year", IntegerType(), nullable=False),
    StructField("Venue_Name", StringType(), nullable=True),
    StructField("City_Name", StringType(), nullable=True),
    StructField("Country_Name", StringType(), nullable=True),
    StructField("Toss_Winner", StringType(), nullable=True),
    StructField("match_winner", StringType(), nullable=True),
    StructField("Toss_Name", StringType(), nullable=True),
    StructField("Win_Type", StringType(), nullable=True),
    StructField("Outcome_Type", StringType(), nullable=True),
    StructField("ManOfMatch", StringType(), nullable=True),
    StructField("Win_Margin", IntegerType(), nullable=True),
    StructField("Country_id", IntegerType(), nullable=True)
])

raw_teams_schema = StructType([
    StructField("team_sk", IntegerType(), nullable=False),
    StructField("team_id", IntegerType(), nullable=False),
    StructField("team_name", StringType(), nullable=True)])



In [0]:
%sql
drop table if exists ipl_data.ball_by_ball_stats

In [0]:
# Path to the CSV file's
dbfs_raw_file_path = dbutils.widgets.get("dbfs_raw_file_path")
dbfs_silver_path=dbutils.widgets.get("dbfs_silver_path")
from pyspark.sql.functions import split, col, expr, concat_ws, when,date_format,to_date
# Read the CSV file's into a DataFrame


#Enforce schema on when reading raw files from bronze layer

raw_ball_by_ball_stats_df = spark.read.csv(
    dbfs_raw_file_path +'Ball_By_Ball.csv', 
    header=True, 
    schema=raw_ball_by_ball_stats_schema,
    mode="FAILFAST")

raw_player_match_df = spark.read.csv(
    dbfs_raw_file_path +'Player_match.csv',
     header=True, 
     schema=raw_player_match_schema,
     mode="FAILFAST")

raw_players_data_df= spark.read.csv(
    dbfs_raw_file_path +'Player.csv', 
    header=True, 
    schema=raw_players_data_schema,
    mode="FAILFAST")

raw_matches_df= spark.read.csv(
    dbfs_raw_file_path +'Match.csv', 
    header=True, 
    schema=raw_matches_schema,
    mode="FAILFAST")

raw_teams_df= spark.read.csv(
    dbfs_raw_file_path +'Teams.csv', 
    header=True, 
    schema=raw_teams_schema,
    mode="FAILFAST")

## Apply minor transformations like Add new fileds, standardizing column values, select necessary fields  


ball_by_ball_stats_df=raw_ball_by_ball_stats_df.select("season", 
    "match_id", 
    "innings_no", 
    "over_id", 
    "ball_id", 
    "match_date", 
    "team_batting", 
    "team_bowling", 
    "striker_batting_position", 
    "extra_type", 
    "runs_scored", 
    "extra_runs", 
    "wides", 
    "legbyes", 
    "byes", 
    "noballs", 
    "penalty", 
    "bowler_extras", 
    "out_type", 
    "caught", 
    "bowled", 
    "run_out", 
    "lbw", 
    "retired_hurt", 
    "stumped", 
    "caught_and_bowled", 
    "hit_wicket", 
    "obstructingfeild", 
    "bowler_wicket", 
    "striker", 
    "non_striker", 
    "bowler", 
    "player_out", 
    "fielders", 
    "keeper_catch").withColumn("extra_type",
     when(col("extra_type") == "byes", "Byes")
    .when(col("extra_type") == "legbyes", "Legbyes")
    .when(col("extra_type") == "noballs", "Noballs")
    .otherwise(col("extra_type")))



teams_df=raw_teams_df.select("team_id", "team_name").withColumn("team_name_abbrivation",
                             expr("concat_ws('', transform(split(team_name, ' '),x->substring(x,1,1)))"))

player_match_df=raw_player_match_df.select("match_id", 
    "season_year",                                       
    "player_id", 
    "player_name", 
    "dob", 
    "batting_hand", 
    "bowling_skill", 
    "country_name", 
    "role_desc", 
    "player_team", 
    "opposit_team", 
    "is_manofthematch", 
    "age_as_on_match", 
    "isplayers_team_won", 
    "batting_status", 
    "bowling_status", 
    "player_captain", 
    "opposit_captain", 
    "player_keeper", 
    "opposit_keeper").withColumn("player_team_abbrivation",
                     expr("concat_ws('', transform(split(player_team, ' '),x->substring(x,1,1)))"))\
                    .withColumn("Opposit_Team_abbrivation",expr("concat_ws('', transform(split(Opposit_Team, ' '),x->substring(x,1,1)))"))

#convert match_date to date type in match_df 

matches_df = raw_matches_df.select("match_id",
    "season_year", 
    "team1", 
    "team2", 
    "match_date", 
    "venue_name", 
    "city_name", 
    "country_name", 
    "toss_winner", 
    "match_winner", 
    "toss_name", 
    "win_type", 
    "outcome_type", 
    "manofmatch", 
    "win_margin", 
    "country_id").withColumn("match_date", date_format(to_date("match_date", "M/d/yy"), "yyyy-MM-dd"))

players_data_df=raw_players_data_df.select("player_id", 
    "player_name", 
    "dob", 
    "batting_hand", 
    "bowling_skill", 
    "country_name")


# Write to silver layer as delta tables.

#helper function to write to silver layer
def write_to_silver_layer(df, table_name,partition_column=None):
    if partition_column:
        df.write.partitionBy(partition_column).format("delta").mode("overwrite").save(dbfs_silver_path + table_name)
    else:
        df.write.format("delta").mode("overwrite").save(dbfs_silver_path + table_name)


write_to_silver_layer(ball_by_ball_stats_df, "ball_by_ball_stats", "season")
write_to_silver_layer(player_match_df, "player_match", "season_year")
write_to_silver_layer(players_data_df, "players_data")
write_to_silver_layer(matches_df, "matches")
write_to_silver_layer(teams_df, "teams")

In [0]:
%sql
drop database if exists ipl_data CASCADE;
CREATE DATABASE IF NOT EXISTS ipl_data;

In [0]:
%sql
CREATE EXTERNAL LOCATION IF NOT EXISTS my_external_location
URL 'abfss://ipldata@iplsandbox.dfs.core.windows.net/silver/'
WITH (STORAGE CREDENTIAL adls_data_access);


Create External tables in Azure Databricks using the external location

In [0]:
%sql
---Register the table in the metastore
CREATE EXTERNAL TABLE IF NOT EXISTS ipl_data.teams
USING DELTA
LOCATION 'abfss://ipldata@iplsandbox.dfs.core.windows.net/silver/teams';

CREATE TABLE IF NOT EXISTS ipl_data.matches
USING DELTA
LOCATION 'abfss://ipldata@iplsandbox.dfs.core.windows.net/silver/matches';

CREATE TABLE IF NOT EXISTS ipl_data.ball_by_ball_stats
USING DELTA
LOCATION 'abfss://ipldata@iplsandbox.dfs.core.windows.net/silver/ball_by_ball_stats';

CREATE TABLE IF NOT EXISTS ipl_data.player_match
USING DELTA
LOCATION 'abfss://ipldata@iplsandbox.dfs.core.windows.net/silver/player_match';

CREATE TABLE IF NOT EXISTS ipl_data.players_data
USING DELTA
LOCATION 'abfss://ipldata@iplsandbox.dfs.core.windows.net/silver/players_data';

Get File table counts

In [0]:
%sql
drop table if exists ipl_data.table_load_count

In [0]:
spark.conf.set("spark.sql.legacy.allowNonEmptyLocationInCTAS", "true")



query = f"""
CREATE external TABLE ipl_data.table_load_count
USING parquet
LOCATION 'abfss://ipldata@iplsandbox.dfs.core.windows.net/silver/table_load_count'
AS
SELECT 'ball_by_ball_stats' AS table_name, COUNT(*) AS record_count FROM ipl_data.ball_by_ball_stats
UNION 
SELECT 'matches' AS table_name, COUNT(*) AS record_count FROM ipl_data.matches
UNION 
SELECT 'player_match' AS table_name, COUNT(*) AS record_count FROM ipl_data.player_match
UNION 
SELECT 'players_data' AS table_name, COUNT(*) AS record_count FROM ipl_data.players_data
UNION
SELECT 'teams' AS table_name, COUNT(*) AS record_count FROM ipl_data.teams;
"""


spark.sql(query)


Out[37]: DataFrame[]

In [0]:
%sql 
SHOW TABLES IN ipl_data ;

database,tableName,isTemporary
ipl_data,ball_by_ball_stats,False
ipl_data,matches,False
ipl_data,player_match,False
ipl_data,players_data,False
ipl_data,table_load_count,False
ipl_data,teams,False
