In [0]:
from pyspark.sql.functions import input_file_name
import os

bronze_path = "/mnt/input//Bronze"

files_info = dbutils.fs.ls(bronze_path)

bronze_files = [
    {"file_path": f.path, "file_name": os.path.basename(f.path), "modified_time": f.modificationTime}
    for f in files_info if f.path.endswith(".csv")  # or .parquet, etc.
]


In [0]:
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, TimestampType



metadata_table_path = "/mnt/input/Bronze/file_metadata"

file_names = ["accounts.csv", "customers.csv", "loan_payments.csv", "loans.csv", "transactions.csv"]
primary_columns = ["account_id", "customer_id", "payment_id", "loan_id", "transaction_id"]

data = [(str(i), file_names[i], None, primary_columns[i]) for i in range(len(file_names))]

schema = StructType([
    StructField("file_id", StringType(), False),
    StructField("file_name", StringType(), True),
    StructField("modified_time", TimestampType(), True),
    StructField("primary_column", StringType(), True)
])

df = spark.createDataFrame(data, schema)

df = df.withColumn("modified_time", lit("1900-01-01T00:00:00.000+00:00").cast(TimestampType()))

df.write.format("delta").mode("overwrite").save(metadata_table_path)

print(f" Metadata Delta table created at: {metadata_table_path}")


✅ Metadata Delta table created at: /mnt/input/Bronze/file_metadata


In [0]:
metadata_df = spark.read.format("delta").options( inferSchema="true").load("/mnt/input/Bronze/file_metadata")



file_id,file_name,modified_time,primary_column
0,accounts.csv,1970-01-01T00:29:05.195692Z,account_id
1,customers.csv,1970-01-01T00:29:05.195692Z,customer_id
2,loan_payments.csv,1970-01-01T00:29:05.195692Z,payment_id
3,loans.csv,1970-01-01T00:29:05.195692Z,loan_id
4,transactions.csv,1970-01-01T00:29:05.195692Z,transaction_id


In [0]:
import pandas as pd
metadata_df = spark.read.format("delta").options( inferSchema="true").load("/mnt/input/Bronze/file_metadata")

# Step 1: Convert bronze files and metadata to Pandas
bronze_df = pd.DataFrame(bronze_files)
metadata_pd = metadata_df.toPandas()

# Ensure 'modified_time' is in datetime format
bronze_df["modified_time"] = pd.to_datetime(bronze_df["modified_time"])
metadata_pd["modified_time"] = pd.to_datetime(metadata_pd["modified_time"])

# Step 2: Merge on file name
merged = pd.merge(bronze_df, metadata_pd, on="file_name", how="left", suffixes=("", "_old"))

# Step 3: Replace NaT with a known early date for safe comparison
merged["modified_time_old"] = pd.to_datetime(merged["modified_time_old"])
merged["modified_time_old"].fillna(pd.Timestamp("1900-01-01"), inplace=True)

# Step 4: Filter files that are new or modified
modified_files = merged[merged["modified_time"] > merged["modified_time_old"]]
modified_files.display()


file_path,file_name,modified_time,file_id,modified_time_old,primary_column
dbfs:/mnt/input/Bronze/accounts.csv,accounts.csv,1970-01-01T00:29:05.195692Z,0,1900-01-01T00:00:00Z,account_id
dbfs:/mnt/input/Bronze/customers.csv,customers.csv,1970-01-01T00:29:05.195692Z,1,1900-01-01T00:00:00Z,customer_id
dbfs:/mnt/input/Bronze/loan_payments.csv,loan_payments.csv,1970-01-01T00:29:05.195692Z,2,1900-01-01T00:00:00Z,payment_id
dbfs:/mnt/input/Bronze/loans.csv,loans.csv,1970-01-01T00:29:05.195692Z,3,1900-01-01T00:00:00Z,loan_id
dbfs:/mnt/input/Bronze/transactions.csv,transactions.csv,1970-01-01T00:29:05.195692Z,4,1900-01-01T00:00:00Z,transaction_id


In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col, lit
import datetime

modified_spark_df = spark.createDataFrame(modified_files[["file_name", "modified_time"]])

delta_table = DeltaTable.forPath(spark, "/mnt/input/Bronze/file_metadata")

for row in modified_spark_df.collect():
    file_name = row["file_name"]
    new_time = row["modified_time"]
    
    delta_table.update(
        condition = f"file_name = '{file_name}'",
        set = { "modified_time": lit(new_time) }
    )




✅ Delta metadata table updated with new modified_time values.


In [0]:
dbutils.secrets.listScopes()

[SecretScope(name='adlsconn'), SecretScope(name='adlsconnection')]

In [0]:
dbutils.secrets.list("adlsconnection")


[SecretMetadata(key='clientid'),
 SecretMetadata(key='pwd'),
 SecretMetadata(key='secretid'),
 SecretMetadata(key='sqlpwd')]

In [0]:
configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": dbutils.secrets.get(scope="adlsconnection", key="clientid"),
  "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="adlsconnection", key="secretid"),
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/fcee7905-be7c-4a7c-b3f6-7c94700f97cb/oauth2/token"
}

dbutils.fs.mount(
  source = "abfss://input@adlsgen2vishal.dfs.core.windows.net/",
  mount_point = "/mnt/input1",
  extra_configs=configs)


True

In [0]:
from pyspark.sql import DataFrame


def TransformBronzeFile(file_name: str,primary_key_column: str, bronze_path: str = "/mnt/input/Bronze") -> DataFrame:
    
    file_path = f"{bronze_path}/{file_name}"
    
    df = spark.read.option("inferSchema", True).option("header", True).csv(file_path)
    
    df = df.dropDuplicates()
    df = df.dropna(subset=[primary_key_column])
    
    return df


In [0]:
silver_base_path = "/mnt/input/Silver"
modified_file_list = modified_files[["file_name", "primary_column"]].to_dict(orient="records")


for file_info in modified_file_list:
    file_name = file_info["file_name"]
    primary_column = file_info["primary_column"]
    
    # Transform the file
    cleaned_df = transform_bronze_file(file_name)
    
    base_name = file_name.replace(".csv", "")
    silver_path = f"{silver_base_path}/{base_name}"
    
    cleaned_df.write.mode("overwrite").option("header", "true").parquet(silver_path)
    
   



✅ All modified files transformed and saved to Silver.


In [0]:
dbutils.notebook.run("SCD Type-1",400)


In [0]:
accounts=spark.read.format('parquet').options(inferschema=True).load('/mnt/input/Silver/accounts')
customers=spark.read.format('parquet').options(inferschema=True).load('/mnt/input/Silver/customers')
loanpayments=spark.read.format('parquet').options(inferschema=True).load('/mnt/input/Silver/loan_payments')
loans=spark.read.format('parquet').options(inferschema=True).load('/mnt/input/Silver/loans')
transactions=spark.read.format('parquet').options(inferschema=True).load('/mnt/input/Silver/transactions')


In [0]:
display(accounts)

account_id,customer_id,account_type,balance
36,27,Checking,3700.0
85,65,Savings,800.25
82,2,Checking,8300.5
52,10,Checking,5300.0
65,69,Savings,550.25
76,22,Checking,7700.0
62,35,Checking,6300.5
98,49,Checking,9900.5
53,86,Savings,400.25
66,26,Checking,6700.5


In [0]:
joined_df =(
     accounts
.join(customers, on="customer_id", how="inner")
.join(loans, on="customer_id", how="inner")
.join(loanpayments, on="loan_id", how='inner')
.join(transactions, on="account_id", how="inner")
)



In [0]:
final_joined_df=joined_df.select("account_id","loan_id","customer_id","balance","first_name","last_name","city","state","loan_amount","loan_term","payment_id","payment_date","payment_amount","transaction_id","transaction_date","transaction_amount",)

In [0]:
final_joined_df.write.mode("overwrite").option("header", "true").format("delta").save("/mnt/input/Silver/final")
display(spark.read.format("delta").load("/mnt/input/Silver/final"))

account_id,loan_id,customer_id,balance,first_name,last_name,city,state,loan_amount,loan_term,payment_id,payment_date,payment_amount,transaction_id,transaction_date,transaction_amount
50,50,31,5100.5,David,Sanchez,North Bay,ON,37500.5,48,59.0,2024-02-28,3000.0,100.0,2024-04-09,375.25
33,33,85,150.25,John,Harrison,Temagami,ON,15000.25,36,12.0,2024-01-12,650.0,70.0,2024-03-10,375.25
85,85,65,800.25,Daniel,Bryant,Elmvale,ON,25000.25,36,44.0,2024-02-13,2250.0,33.0,2024-02-02,150.0
21,21,53,300.25,James,Jenkins,Queensville,ON,10000.25,36,20.0,2024-01-20,1050.0,20.0,2024-01-20,375.25
3,3,78,1500.0,Abigail,Cole,Sundridge,ON,15000.0,60,82.0,2024-03-22,4150.0,11.0,2024-01-11,100.5
4,4,34,3000.25,Olivia,Reed,Orillia,ON,30000.25,24,73.0,2024-03-13,3700.0,78.0,2024-03-18,275.75
12,12,81,2700.0,Michael,Owens,Mattawa,ON,20000.0,24,5.0,2024-01-05,300.0,64.0,2024-03-04,300.25
56,56,28,5700.0,Emily,Edwards,Brantford,ON,17500.0,24,7.0,2024-01-07,400.0,5.0,2024-01-05,250.0
19,19,76,400.75,Evelyn,Wallace,Huntsville,ON,32500.75,60,38.0,2024-02-07,1950.0,40.0,2024-02-09,375.25
36,36,27,3700.0,James,Evans,Guelph,ON,17500.0,24,85.0,2024-03-25,4300.0,42.0,2024-02-11,200.75


In [0]:
final_joined_df.write.mode("overwrite").option("header", "true").format("parquet").save("/mnt/input/Silver/final_parquet")
display(spark.read.format("parquet").load("/mnt/input/Silver/final_parquet"))

account_id,loan_id,customer_id,balance,first_name,last_name,city,state,loan_amount,loan_term,payment_id,payment_date,payment_amount,transaction_id,transaction_date,transaction_amount
50,50,31,5100.5,David,Sanchez,North Bay,ON,37500.5,48,59.0,2024-02-28,3000.0,100.0,2024-04-09,375.25
33,33,85,150.25,John,Harrison,Temagami,ON,15000.25,36,12.0,2024-01-12,650.0,70.0,2024-03-10,375.25
85,85,65,800.25,Daniel,Bryant,Elmvale,ON,25000.25,36,44.0,2024-02-13,2250.0,33.0,2024-02-02,150.0
21,21,53,300.25,James,Jenkins,Queensville,ON,10000.25,36,20.0,2024-01-20,1050.0,20.0,2024-01-20,375.25
3,3,78,1500.0,Abigail,Cole,Sundridge,ON,15000.0,60,82.0,2024-03-22,4150.0,11.0,2024-01-11,100.5
4,4,34,3000.25,Olivia,Reed,Orillia,ON,30000.25,24,73.0,2024-03-13,3700.0,78.0,2024-03-18,275.75
12,12,81,2700.0,Michael,Owens,Mattawa,ON,20000.0,24,5.0,2024-01-05,300.0,64.0,2024-03-04,300.25
56,56,28,5700.0,Emily,Edwards,Brantford,ON,17500.0,24,7.0,2024-01-07,400.0,5.0,2024-01-05,250.0
19,19,76,400.75,Evelyn,Wallace,Huntsville,ON,32500.75,60,38.0,2024-02-07,1950.0,40.0,2024-02-09,375.25
36,36,27,3700.0,James,Evans,Guelph,ON,17500.0,24,85.0,2024-03-25,4300.0,42.0,2024-02-11,200.75
