In [0]:
gold_transactions_path = "/mnt/gold/FactTransactions"
gold_fraud_path = "/mnt/gold/FactFraudAlerts"


In [0]:
# Load From Silver
atm_silver = spark.read.format("delta").load("/mnt/silver/ATMTransactions")
upi_silver = spark.read.format("delta").load("/mnt/silver/UPIEvents")
fraud_silver = spark.read.format("delta").load("/mnt/silver/FraudAlerts")


In [0]:
# Create Gold Fact Transactions Table
from pyspark.sql.functions import col

fact_transactions = (
    atm_silver.select(
        col("transaction_id"),
        col("customer_id"),
        col("account_number"),
        col("amount"),
        col("txn_type"),
        col("txn_timestamp"),
        col("status"),
        col("atm_id"),
        col("location")
    )
    .unionByName(
        upi_silver.select(
            col("transaction_id"),
            col("customer_id"),
            col("account_number"),
            col("amount"),
            col("txn_type"),
            col("txn_timestamp"),
            col("status"),
            col("payer_upi").alias("atm_id"),  # UPI replaces ATMID field
            col("location")
        ),
        allowMissingColumns=True
    )
)


In [0]:
display(fact_transactions.limit(10))


transaction_id,customer_id,account_number,amount,txn_type,txn_timestamp,status,atm_id,location
ATM000681,CUST399,1002003435,8000.0,WITHDRAWAL,2025-01-01T11:20:00Z,SUCCESS,ATM016,Mumbai
ATM005945,CUST010,1002003166,10000.0,WITHDRAWAL,2025-01-05T03:04:00Z,SUCCESS,ATM029,Bangalore
ATM000682,CUST191,1002003087,15000.0,WITHDRAWAL,2025-01-01T11:21:00Z,SUCCESS,ATM002,Delhi
ATM005946,CUST085,1002003027,90000.0,WITHDRAWAL,2025-01-05T03:05:00Z,SUCCESS,ATM025,Delhi
ATM000683,CUST492,1002003191,60000.0,WITHDRAWAL,2025-01-01T11:22:00Z,SUCCESS,ATM035,Bangalore
ATM005947,CUST392,1002003441,8000.0,WITHDRAWAL,2025-01-05T03:06:00Z,SUCCESS,ATM002,Delhi
ATM005948,CUST031,1002003257,10000.0,WITHDRAWAL,2025-01-05T03:07:00Z,SUCCESS,ATM031,Hyderabad
ATM000684,CUST281,1002003220,15000.0,WITHDRAWAL,2025-01-01T11:23:00Z,SUCCESS,ATM028,Pune
ATM005949,CUST310,1002003371,8000.0,WITHDRAWAL,2025-01-05T03:08:00Z,SUCCESS,ATM007,Pune
ATM000685,CUST274,1002003415,90000.0,WITHDRAWAL,2025-01-01T11:24:00Z,SUCCESS,ATM006,Pune


In [0]:
import requests

storage_key = "F2WuJLyMgpLWvOsoOr1arKJRNJBcvh/Qgj5OO07CWFTh5VcmvNG/kzOTtennS/RLZhV1fPC4OOCo+AStr0aICg=="

workspace_url = "https://adb-2019097948988452.12.azuredatabricks.net"
token = "dapie9fa701fda8d154acf5df2b2074c7451-2"

response = requests.post(
    workspace_url + "/api/2.0/secrets/put",
    headers={
        "Authorization": f"Bearer {token}"
    },
    json={
        "scope": "bankscope",
        "key": "storagekey",
        "string_value": storage_key
    }
)

print(response.text)

{}


In [0]:
my_secret = dbutils.secrets.get("bankscope", "storagekey")
print(my_secret)


[REDACTED]


In [0]:
storage_account = "azurebankstorage01"
container = "gold"

dbutils.fs.mount(
    source=f"wasbs://{container}@{storage_account}.blob.core.windows.net",
    mount_point="/mnt/gold",
    extra_configs={
        f"fs.azure.account.key.{storage_account}.blob.core.windows.net": dbutils.secrets.get("bankscope", "storagekey")
    }
)



True

In [0]:
fact_transactions.write \
    .format("delta") \
    .mode("overwrite") \
    .save(gold_transactions_path)

print("🔥 FactTransactions written to GOLD:", gold_transactions_path)


🔥 FactTransactions written to GOLD: /mnt/gold/FactTransactions


In [0]:
from pyspark.sql.functions import *

fact_fraud = fraud_silver.select(
    col("alert_id"),
    col("alert_type"),
    col("txn_id"),
    col("amount"),
    col("txn_type"),
    col("source_file"),
    col("alert_time")
)

display(fact_fraud.limit(10))


alert_id,alert_type,txn_id,amount,txn_type,source_file,alert_time
TXN007871_Unusual UPI Transfer,Unusual UPI Transfer,TXN007871,75000.0,UPI,upi/upi_transactions.csv,2025-12-07T19:57:47.450552Z
ATM007552_High-value transaction,High-value transaction,ATM007552,60000.0,ATM,atm/atm_transaction_records.csv,2025-12-07T19:55:20.100295Z
ATM007552_Large ATM withdrawal,Large ATM withdrawal,ATM007552,60000.0,ATM,atm/atm_transaction_records.csv,2025-12-07T19:55:20.174527Z
ATM007553_High-value transaction,High-value transaction,ATM007553,60000.0,ATM,atm/atm_transaction_records.csv,2025-12-07T19:55:20.331804Z
ATM007553_Large ATM withdrawal,Large ATM withdrawal,ATM007553,60000.0,ATM,atm/atm_transaction_records.csv,2025-12-07T19:55:20.401892Z
ATM007554_Large ATM withdrawal,Large ATM withdrawal,ATM007554,45000.0,ATM,atm/atm_transaction_records.csv,2025-12-07T19:55:20.538577Z
ATM007555_High-value transaction,High-value transaction,ATM007555,60000.0,ATM,atm/atm_transaction_records.csv,2025-12-07T19:55:20.687148Z
ATM007555_Large ATM withdrawal,Large ATM withdrawal,ATM007555,60000.0,ATM,atm/atm_transaction_records.csv,2025-12-07T19:55:20.750991Z
TXN007886_High-value transaction,High-value transaction,TXN007886,95000.0,UPI,upi/upi_transactions.csv,2025-12-07T19:57:48.77504Z
TXN007886_Unusual UPI Transfer,Unusual UPI Transfer,TXN007886,95000.0,UPI,upi/upi_transactions.csv,2025-12-07T19:57:48.844119Z


In [0]:
fact_fraud.write \
    .format("delta") \
    .mode("overwrite") \
    .save(gold_fraud_path)

print("🚨 Fact Fraud Alerts written to GOLD:", gold_fraud_path)


🚨 Fact Fraud Alerts written to GOLD: /mnt/gold/FactFraudAlerts


In [0]:
display(dbutils.fs.ls("/mnt/gold"))


path,name,size,modificationTime
dbfs:/mnt/gold/FactFraudAlerts/,FactFraudAlerts/,0,0
dbfs:/mnt/gold/FactTransactions/,FactTransactions/,0,0


In [0]:
from pyspark.sql.functions import col

# Extract Customer master from transactions
dim_customer = (
    silver_atm.select("customer_id")
    .union(silver_upi.select("customer_id"))
    .drop_duplicates()
)

dim_customer = dim_customer.withColumn("customer_name", col("customer_id"))  # Dummy name mapping

gold_customer_path = "/mnt/gold/DimCustomer"

dim_customer.write.format("delta").mode("overwrite").save(gold_customer_path)

print("⭐ GOLD Dimension Written:", gold_customer_path)
display(dim_customer)


⭐ GOLD Dimension Written: /mnt/gold/DimCustomer


customer_id,customer_name
CUST073,CUST073
CUST158,CUST158
CUST303,CUST303
CUST359,CUST359
CUST051,CUST051
CUST438,CUST438
CUST103,CUST103
CUST356,CUST356
CUST404,CUST404
CUST238,CUST238


In [0]:
dim_account = (
    silver_atm.select("account_number")
    .union(silver_upi.select("account_number"))
    .drop_duplicates()
)

dim_account = dim_account.withColumn("account_type", lit("Savings"))  # Temporary mapping
dim_account = dim_account.withColumn("is_active", lit(1))

gold_account_path = "/mnt/gold/DimAccount"

dim_account.write.format("delta").mode("overwrite").save(gold_account_path)

print("🏦 DimAccount written:", gold_account_path)
display(dim_account)


🏦 DimAccount written: /mnt/gold/DimAccount


account_number,account_type,is_active
1002003092,Savings,1
1002003338,Savings,1
1002003041,Savings,1
1002003409,Savings,1
1002003062,Savings,1
1002003144,Savings,1
1002003422,Savings,1
1002003491,Savings,1
1002003073,Savings,1
1002003169,Savings,1


In [0]:
from pyspark.sql.functions import monotonically_increasing_id

dim_customer = dim_customer.withColumn("customer_key", monotonically_increasing_id())
dim_account = dim_account.withColumn("account_key", monotonically_increasing_id())


In [0]:
date_dim = (
    fact_transactions
    .select(col("txn_timestamp").alias("date"))
    .drop_duplicates()
)

date_dim = date_dim.withColumn("year", year(col("date")))
date_dim = date_dim.withColumn("month", month(col("date")))
date_dim = date_dim.withColumn("day", dayofmonth(col("date")))
date_dim = date_dim.withColumn("day_of_week", date_format(col("date"), "EEEE"))

gold_date_path = "/mnt/gold/DimDate"

date_dim.write.format("delta").mode("overwrite").save(gold_date_path)

print("📅 DimDate written:", gold_date_path)
display(date_dim)


📅 DimDate written: /mnt/gold/DimDate


date,year,month,day,day_of_week
2025-01-02T20:02:00Z,2025,1,2,Thursday
2025-01-02T20:20:00Z,2025,1,2,Thursday
2025-01-02T20:30:00Z,2025,1,2,Thursday
2025-01-06T18:52:00Z,2025,1,6,Monday
2025-01-06T19:14:00Z,2025,1,6,Monday
2025-01-06T19:31:00Z,2025,1,6,Monday
2025-01-02T23:43:00Z,2025,1,2,Thursday
2025-01-06T22:44:00Z,2025,1,6,Monday
2025-01-06T23:06:00Z,2025,1,6,Monday
2025-01-06T23:28:00Z,2025,1,6,Monday


In [0]:
from pyspark.sql.functions import col, count, sum, to_date, when

fact_fraud_detection = (
    fraud_silver
    .withColumn("fraud_date", to_date(col("alert_time")))
    .groupBy("txn_id", "fraud_date")
    .agg(
        count("*").alias("fraud_count"),
        sum("amount").alias("total_fraud_amount")
    )
)


In [0]:
fact_fraud_detection = (
    fact_fraud_detection
    .withColumn(
        "fraud_severity",
        when(col("fraud_count") >= 5, "HIGH")
        .when(col("fraud_count") >= 3, "MEDIUM")
        .otherwise("LOW")
    )
)


In [0]:
gold_fraud_path = "/mnt/gold/FactFraudDetection"

fact_fraud_detection.write \
    .format("delta") \
    .mode("overwrite") \
    .save(gold_fraud_path)

print("🔥 FactFraudDetection written into GOLD:", gold_fraud_path)


🔥 FactFraudDetection written into GOLD: /mnt/gold/FactFraudDetection


In [0]:
display(dbutils.fs.ls("/mnt/gold/FactFraudDetection"))
display(fact_fraud_detection.limit(10))


path,name,size,modificationTime
dbfs:/mnt/gold/FactFraudDetection/_delta_log/,_delta_log/,0,0
dbfs:/mnt/gold/FactFraudDetection/part-00000-2c2772ab-aeff-47ed-a806-2df45892aef1.c000.snappy.parquet,part-00000-2c2772ab-aeff-47ed-a806-2df45892aef1.c000.snappy.parquet,39383,1765195911000


txn_id,fraud_date,fraud_count,total_fraud_amount,fraud_severity
ATM001097,2025-12-07,2,120000.0,LOW
ATM001896,2025-12-07,2,180000.0,LOW
ATM002055,2025-12-07,1,45000.0,LOW
ATM003463,2025-12-07,1,45000.0,LOW
ATM003532,2025-12-07,2,120000.0,LOW
ATM003907,2025-12-07,2,120000.0,LOW
ATM004337,2025-12-07,1,45000.0,LOW
ATM004493,2025-12-07,2,180000.0,LOW
ATM004575,2025-12-07,2,120000.0,LOW
ATM004576,2025-12-07,1,45000.0,LOW


In [0]:
display(dbutils.fs.ls("/mnt/gold"))

path,name,size,modificationTime
dbfs:/mnt/gold/DimAccount/,DimAccount/,0,0
dbfs:/mnt/gold/DimCustomer/,DimCustomer/,0,0
dbfs:/mnt/gold/DimDate/,DimDate/,0,0
dbfs:/mnt/gold/FactFraudAlerts/,FactFraudAlerts/,0,0
dbfs:/mnt/gold/FactFraudDetection/,FactFraudDetection/,0,0
dbfs:/mnt/gold/FactTransactions/,FactTransactions/,0,0


In [0]:
jdbc_hostname = "azurebank-server.database.windows.net"
jdbc_port     = 1433
jdbc_database = "BankDWH"
jdbc_username = "azurebank"
jdbc_password = "azure@123456"

jdbc_url = f"jdbc:sqlserver://{jdbc_hostname}:{jdbc_port};database={jdbc_database};encrypt=true;trustServerCertificate=false;loginTimeout=30;"

connection_props = {
    "user": jdbc_username,
    "password": jdbc_password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}


In [0]:
# Load DimDate from Gold to SQL
from pyspark.sql.functions import to_date, year, month, dayofmonth, quarter, col

fact = spark.read.format("delta").load("/mnt/gold/FactTransactions")

dim_date_df = (
    fact
      .select(to_date(col("txn_timestamp")).alias("FullDate"))
      .dropna()
      .dropDuplicates()
      .withColumn("Year", year("FullDate"))
      .withColumn("Month", month("FullDate"))
      .withColumn("Day", dayofmonth("FullDate"))
      .withColumn("Quarter", quarter("FullDate"))
      .withColumn("DateKey",
                  (col("Year")*10000 + col("Month")*100 + col("Day")).cast("int"))
      .select("DateKey","FullDate","Year","Quarter","Month","Day")
)

dim_date_df.write.jdbc(
    url=jdbc_url,
    table="dbo.DimDate",
    mode="overwrite",
    properties=connection_props
)


In [0]:
# Load DimCustomer and DimAccount (initial full load)
# DimCustomer GOLD → SQL
dim_customer = spark.read.format("delta").load("/mnt/gold/DimCustomer")

dim_customer_sql = (
    dim_customer
      .withColumnRenamed("customer_id","CustomerID")
      .withColumnRenamed("full_name","FullName")
      .withColumnRenamed("segment","Segment")
      .withColumnRenamed("effective_from","EffectiveFrom")
      .withColumnRenamed("effective_to","EffectiveTo")
      .withColumnRenamed("is_current","IsCurrent")
)

dim_customer_sql.write.jdbc(
    url=jdbc_url,
    table="dbo.DimCustomer",
    mode="overwrite",
    properties=connection_props
)

# DimAccount GOLD → SQL
dim_account = spark.read.format("delta").load("/mnt/gold/DimAccount")

dim_account_sql = (
    dim_account
      .withColumnRenamed("account_number","AccountNumber")
      .withColumnRenamed("customer_id","CustomerID")
      .withColumnRenamed("status","Status")
      .withColumnRenamed("account_type","AccountType")
)

dim_account_sql.write.jdbc(
    url=jdbc_url,
    table="dbo.DimAccount",
    mode="overwrite",
    properties=connection_props
)


In [0]:
fact_gold = spark.read.format("delta").load("/mnt/gold/FactTransactions")
fact_gold.printSchema()
display(fact_gold.limit(5))


root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- account_number: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- txn_type: string (nullable = true)
 |-- txn_timestamp: timestamp (nullable = true)
 |-- status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- location: string (nullable = true)



transaction_id,customer_id,account_number,amount,txn_type,txn_timestamp,status,atm_id,location
TXN007169,CUST065,1002003290,5000.0,CREDIT,2025-01-01T09:57:20Z,SUCCESS,user444@upi,"27.6132,81.795"
TXN007170,CUST372,1002003354,75000.0,CREDIT,2025-01-01T09:57:25Z,SUCCESS,user447@upi,"20.2462,77.0446"
TXN007171,CUST275,1002003438,999.0,DEBIT,2025-01-01T09:57:30Z,SUCCESS,user333@upi,"15.3597,72.7228"
TXN007172,CUST369,1002003221,150.0,CREDIT,2025-01-01T09:57:35Z,SUCCESS,user34@upi,"19.9041,86.1759"
TXN007173,CUST339,1002003334,5000.0,DEBIT,2025-01-01T09:57:40Z,SUCCESS,user288@upi,"15.5684,85.4305"


In [0]:
# Load FactTransactions from Gold to SQL
from pyspark.sql.functions import year, month, dayofmonth, col

fact_gold = spark.read.format("delta").load("/mnt/gold/FactTransactions")

# Create DateKey
fact_sql = (
    fact_gold
      .withColumn("DateKey",
                  (year("txn_timestamp")*10000
                   + month("txn_timestamp")*100
                   + dayofmonth("txn_timestamp")).cast("int"))
      .select(
          col("transaction_id").alias("TransactionID"),
          col("DateKey"),
          col("customer_id").alias("CustomerID"),
          col("account_number").alias("AccountNumber"),
          col("txn_type").alias("TxnType"),
          col("amount").alias("Amount"),
          col("status").alias("Status"),
          col("atm_id").alias("ATMID"),
          col("location").alias("Location")
      )
)


In [0]:
fact_sql.write.jdbc(
    url=jdbc_url,
    table="dbo.FactTransactions",
    mode="overwrite",
    properties=connection_props
)


In [0]:
# Load FactFraudDetection from Gold to SQL
from pyspark.sql.functions import year, month, dayofmonth, col

fraud_gold = spark.read.format("delta").load("/mnt/gold/FactFraudDetection")

fraud_sql = (
    fraud_gold
    .withColumn("AlertDateKey",
                (year("fraud_date")*10000 +
                 month("fraud_date")*100 +
                 dayofmonth("fraud_date")).cast("int")
    )
    .select(
        col("txn_id").alias("TransactionID"),
        col("fraud_date").alias("FraudDate"),
        col("fraud_severity").alias("FraudSeverity"),
        col("fraud_count").alias("FraudCount"),
        col("total_fraud_amount").alias("TotalFraudAmount"),
        col("AlertDateKey")
    )
)


In [0]:
display(fraud_sql.limit(10))
fraud_sql.printSchema()


TransactionID,FraudDate,FraudSeverity,FraudCount,TotalFraudAmount,AlertDateKey
ATM001097,2025-12-07,LOW,2,120000.0,20251207
ATM001896,2025-12-07,LOW,2,180000.0,20251207
ATM002055,2025-12-07,LOW,1,45000.0,20251207
ATM003463,2025-12-07,LOW,1,45000.0,20251207
ATM003532,2025-12-07,LOW,2,120000.0,20251207
ATM003907,2025-12-07,LOW,2,120000.0,20251207
ATM004337,2025-12-07,LOW,1,45000.0,20251207
ATM004493,2025-12-07,LOW,2,180000.0,20251207
ATM004575,2025-12-07,LOW,2,120000.0,20251207
ATM004576,2025-12-07,LOW,1,45000.0,20251207


root
 |-- TransactionID: string (nullable = true)
 |-- FraudDate: date (nullable = true)
 |-- FraudSeverity: string (nullable = true)
 |-- FraudCount: long (nullable = true)
 |-- TotalFraudAmount: double (nullable = true)
 |-- AlertDateKey: integer (nullable = true)



In [0]:
fraud_sql.write.jdbc(
    url=jdbc_url,
    table="dbo.FactFraudDetection",
    mode="overwrite",
    properties=connection_props
)

print("🎯 FactFraudDetection loaded successfully to SQL")


🎯 FactFraudDetection loaded successfully to SQL


In [0]:
# Loading Data to Staging Tables 
from pyspark.sql.functions import col

# SQL connection props
jdbc_url = "jdbc:sqlserver://azurebank-server.database.windows.net:1433;database=BankDWH"

db_props = {
    "user": "azurebank",    
    "password": "azure@123456",  
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}


# Load GOLD DimAccount
dim_account_path = "/mnt/gold/DimAccount"
dim_account = spark.read.format("delta").load(dim_account_path)

# Prepare staging
stg_account = dim_account.select(
    col("account_number").alias("AccountNumber"),
    col("customer_id").alias("CustomerID"),
    col("Status").alias("Status"),
    col("AccountType").alias("AccountType")
)

# Write to SQL staging
stg_account.write.jdbc(
    url=jdbc_url,
    table="dbo.StgAccount",
    mode="overwrite",
    properties=db_props
)

print("🔥 StgAccount successfully loaded to SQL")




🔥 StgAccount successfully loaded to SQL


In [0]:
# Loading Data to StgAccount
from pyspark.sql.functions import lit, col

# Load Silver tables
silver_atm = spark.read.format("delta").load("/mnt/silver/ATMTransactions")
silver_upi = spark.read.format("delta").load("/mnt/silver/UPIEvents")

# UNION unique accounts
silver_accounts = (
    silver_atm
    .select("account_number", "customer_id")
    .unionByName(silver_upi.select("account_number", "customer_id"))
    .dropDuplicates()
)

# ADD Gold attributes
dim_account = (
    silver_accounts
    .withColumn("AccountType", lit("Savings"))
    .withColumn("Status", lit("Active"))
)

# Save to GOLD
dim_account_path = "/mnt/gold/DimAccount"
dim_account.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(dim_account_path)

print("🔥 Gold DimAccount regenerated successfully")


🔥 Gold DimAccount regenerated successfully


In [0]:
dim_customer_path = "/mnt/gold/DimCustomer"
dim_customer = spark.read.format("delta").load(dim_customer_path)

display(dim_customer)
dim_customer.printSchema()


customer_id,customer_name
CUST073,CUST073
CUST158,CUST158
CUST303,CUST303
CUST359,CUST359
CUST051,CUST051
CUST438,CUST438
CUST103,CUST103
CUST356,CUST356
CUST404,CUST404
CUST238,CUST238


root
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)



In [0]:
# Loading Data to StgCustomer
from pyspark.sql.functions import col

# Prepare staging from Gold
stg_customer = dim_customer.select(
    col("customer_id").alias("CustomerID"),
    col("customer_name").alias("FullName")
)

display(stg_customer)


CustomerID,FullName
CUST073,CUST073
CUST158,CUST158
CUST303,CUST303
CUST359,CUST359
CUST051,CUST051
CUST438,CUST438
CUST103,CUST103
CUST356,CUST356
CUST404,CUST404
CUST238,CUST238


In [0]:
stg_customer.write.jdbc(
    url=jdbc_url,
    table="dbo.StgCustomer",
    mode="overwrite",
    properties=db_props
)

print("🔥 StgCustomer loaded successfully to SQL")


🔥 StgCustomer loaded successfully to SQL
