In [0]:
from pyspark.sql.functions import col

def clean_and_save_data(
    input_path: str,
    output_path: str,
    file_format: str = "csv",
    output_format: str = "parquet",
    header: bool = True
):
    """
    Reads a CSV file from ADLS Gen2, removes duplicates and nulls, then saves it in Parquet format.
    Parameters:
    - input_path: ADLS Gen2 path to the input CSV file (e.g., abfss://container@storageaccount.dfs.core.windows.net/folder/input.csv)
    - output_path: ADLS Gen2 path where cleaned file will be saved
    - file_format: Format of input file (default is 'csv')
    - output_format: Format to save cleaned data (default is 'parquet')
    - header: Whether the CSV has a header row (default True)
    """
    
    # Read CSV from ADLS Gen2
    df = spark.read.format(file_format) \
        .option("header", str(header).lower()) \
        .load(input_path)

    # Remove nulls and duplicates
    df_cleaned = df.dropna().dropDuplicates()

    # Save as Parquet to new folder
    df_cleaned.write.mode("overwrite").format(output_format).save(output_path)

    print(f"Cleaned data saved to {output_path}")



In [0]:
input_path = "/mnt/bootcampproject-2/bronze layer/accounts.csv"
output_path = "/mnt/bootcampproject-2/silver layer/accounts_cleaned"

clean_and_save_data(input_path, output_path)


In [0]:
#customers.csv
input_path = "/mnt/bootcampproject-2/bronze layer/customers.csv"
output_path = "/mnt/bootcampproject-2/silver layer/customers_cleaned"
clean_and_save_data(input_path, output_path)

#loan_payments.csv
input_path = "/mnt/bootcampproject-2/bronze layer/loan_payments.csv"
output_path = "/mnt/bootcampproject-2/silver layer/loan_payments_cleaned"
clean_and_save_data(input_path, output_path)

#loans.csv
input_path = "/mnt/bootcampproject-2/bronze layer/loans.csv"
output_path = "/mnt/bootcampproject-2/silver layer/loans_cleaned"
clean_and_save_data(input_path, output_path)

#transactions.csv
input_path = "/mnt/bootcampproject-2/bronze layer/transactions.csv"
output_path = "/mnt/bootcampproject-2/silver layer/transactions_cleaned"
clean_and_save_data(input_path, output_path)

In [0]:
from pyspark.sql.functions import col

# Read all cleaned silver data
accounts_df = spark.read.parquet("/mnt/bootcampproject-2/silver layer/accounts_cleaned")
customers_df = spark.read.parquet("/mnt/bootcampproject-2/silver layer/customers_cleaned")
loan_payments_df = spark.read.parquet("/mnt/bootcampproject-2/silver layer/loan_payments_cleaned")
loans_df = spark.read.parquet("/mnt/bootcampproject-2/silver layer/loans_cleaned")
transactions_df = spark.read.parquet("/mnt/bootcampproject-2/silver layer/transactions_cleaned")

# Step 1: Join accounts with customers on customer_id
acc_cust_df = accounts_df.join(customers_df, on="customer_id", how="left")

# Step 2: Join with transactions on account_id
acc_cust_trans_df = acc_cust_df.join(transactions_df, on="account_id", how="left")

# Step 3: Join with loans on account_id
acc_cust_trans_loans_df = acc_cust_trans_df.join(loans_df, on="customer_id", how="left")

# Step 4: Join with loan_payments on loan_id
final_df = acc_cust_trans_loans_df.join(loan_payments_df, on="loan_id", how="left")

# Final: Drop duplicates and select required columns
final_df = final_df.dropDuplicates().select(
    col("account_id").cast("int"), 
    col("customer_id").cast("int"),
    col("transaction_id").cast("int"),
    col("loan_id").cast("int"), 
    col("payment_id").cast("int"),
    col("balance").cast("float"),
    col("transaction_date").cast("timestamp"),
    col("transaction_amount").cast("float"),
    col("loan_amount").cast("float"),
    col("payment_date").cast("timestamp"),
    col("payment_amount").cast("float")
)
    
# Write the result as a Delta table to the silver layer
final_df.write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/bootcampproject-2/silver layer/final_joined_data_delta")


In [0]:
final_df.show()

#### accounts table scdtype-1

In [0]:
%sql
--gold layer begins now
-- we'll do accounts_cleaned file first
CREATE TABLE IF NOT EXISTS hive_metastore.default.accounts (
    account_id INT,
    customer_id INT,
    account_type STRING,
    balance DOUBLE,
    created_by string,
    created_date timestamp,
    updated_by string,
    updated_date timestamp,
    hashkey bigint
)
USING DELTA
LOCATION '/mnt/bootcampproject-2/gold layer/accounts'


In [0]:
src_path_accounts="/mnt/bootcampproject-2/silver layer/accounts_cleaned"
print(src_path_accounts)
tgt_path_accounts="/mnt/bootcampproject-2/gold layer/accounts"
print(tgt_path_accounts)

In [0]:
#reading the file just to check if data is present
df_src=spark.read.format("parquet").option("header", "true").option("inferSchema", "true").load(src_path_accounts)
display(df_src)

In [0]:
#add hashkey
from pyspark.sql.functions import *
df_hash_accounts=df_src.withColumn("hash_key",crc32(concat(*df_src.columns)))
display(df_hash_accounts)

In [0]:
#converting delta table object to dataframe so we can display it
from delta.tables import DeltaTable
dbtable_accounts = DeltaTable.forPath(spark, tgt_path_accounts)
dbtable_accounts.toDF().show()

In [0]:
df_hash_accounts=df_hash_accounts.alias("src").join(dbtable_accounts.toDF().alias("tgt"), ((col("src.account_id") == col("tgt.account_id")) & (col("src.hash_key") == col("tgt.hashkey"))), "anti").select(col("src.*"))
df_hash_accounts.show()

In [0]:
dbtable_accounts.alias("tgt").merge(df_hash_accounts.alias("src"),"tgt.account_id = src.account_id")\
    .whenMatchedUpdate(
        set={
            "tgt.account_id": "src.account_id",
            "tgt.customer_id": "src.customer_id",
            "tgt.account_type": "src.account_type",
            "tgt.balance": "src.balance",
            "tgt.hashkey": "src.hash_key",
            "tgt.updated_date": current_timestamp(),
            "tgt.updated_by": lit("databricks-updated")
        }
    )\
    .whenNotMatchedInsert(
        values={
            "tgt.account_id": "src.account_id",
            "tgt.customer_id": "src.customer_id",
            "tgt.account_type": "src.account_type",
            "tgt.balance": "src.balance",
            "tgt.hashkey": "src.hash_key",
            "tgt.created_date": current_timestamp(),
            "tgt.created_by": lit("databricks"),
            "tgt.updated_date": current_timestamp(),
            "tgt.updated_by": lit("databricks")
        }
    ).execute()

In [0]:
display(spark.read.format("delta").option("header","true").load(tgt_path_accounts))

#### customers table scdtype-1

In [0]:
%sql
--create target table for customers_cleaned file
CREATE TABLE IF NOT EXISTS hive_metastore.default.customers (
    customer_id int,
    first_name string,
    last_name string,
    address string,
    city string,
    state string,
    zip string,
    created_by string,
    created_date timestamp,
    updated_by string,
    updated_date timestamp,
    hashkey bigint
)
USING DELTA
LOCATION '/mnt/bootcampproject-2/gold layer/customers'

In [0]:
src_path_customers="/mnt/bootcampproject-2/silver layer/customers_cleaned"
print(src_path_customers)
tgt_path_customers="/mnt/bootcampproject-2/gold layer/customers"
print(tgt_path_customers)

In [0]:
#reading the file just to check if data is present
df_src=spark.read.format("parquet").option("header", "true").option("inferSchema", "true").load(src_path_customers)
display(df_src)

In [0]:
#add hashkey
from pyspark.sql.functions import *
df_hash_customers=df_src.withColumn("hash_key",crc32(concat(*df_src.columns)))
display(df_hash_customers)

In [0]:
#converting delta table object to dataframe so we can display it
from delta.tables import DeltaTable
dbtable_customers = DeltaTable.forPath(spark, tgt_path_customers)
dbtable_customers.toDF().show()

In [0]:
df_hash_customers=df_hash_customers.alias("src").join(dbtable_customers.toDF().alias("tgt"), ((col("src.customer_id") == col("tgt.customer_id")) & (col("src.hash_key") == col("tgt.hashkey"))), "anti").select(col("src.*"))
df_hash_customers.show()

In [0]:
dbtable_customers.alias("tgt").merge(df_hash_customers.alias("src"),"tgt.customer_id = src.customer_id")\
    .whenMatchedUpdate(
        set={
            "tgt.customer_id": "src.customer_id",
            "tgt.first_name": "src.first_name",
            "tgt.last_name": "src.last_name",
            "tgt.address": "src.address",
            "tgt.city": "src.city",
            "tgt.state": "src.state",
            "tgt.zip": "src.zip",
            "tgt.hashkey": "src.hash_key",
            "tgt.updated_date": current_timestamp(),
            "tgt.updated_by": lit("databricks-updated")
        }
    )\
    .whenNotMatchedInsert(
        values={
            "tgt.customer_id": "src.customer_id",
            "tgt.first_name": "src.first_name",
            "tgt.last_name": "src.last_name",
            "tgt.address": "src.address",
            "tgt.city": "src.city",
            "tgt.state": "src.state",
            "tgt.zip": "src.zip",
            "tgt.hashkey": "src.hash_key",
            "tgt.created_date": current_timestamp(),
            "tgt.created_by": lit("databricks"),
            "tgt.updated_date": current_timestamp(),
            "tgt.updated_by": lit("databricks")        
        }
    ).execute()
    
display(spark.read.format("delta").option("header","true").load(tgt_path_customers))

#### loan_payments table scdtype-1

In [0]:
%sql
--create target table for loan_payments_cleaned file
CREATE TABLE IF NOT EXISTS hive_metastore.default.loan_payments (
    payment_id int,
    loan_id int,
    payment_date date,
    payment_amount double,
    created_by string,
    created_date timestamp,
    updated_by string,
    updated_date timestamp,
    hashkey bigint
)
USING DELTA
LOCATION '/mnt/bootcampproject-2/gold layer/loan_payments'

In [0]:
src_path_loan_payments="/mnt/bootcampproject-2/silver layer/loan_payments_cleaned"
print(src_path_loan_payments)
tgt_path_loan_payments="/mnt/bootcampproject-2/gold layer/loan_payments"
print(tgt_path_loan_payments)

In [0]:
#reading the file just to check if data is present
df_src=spark.read.format("parquet").option("header", "true").option("inferSchema", "true").load(src_path_loan_payments)
display(df_src)

In [0]:
#add hashkey
from pyspark.sql.functions import *
df_hash_loan_payments=df_src.withColumn("hash_key",crc32(concat(*df_src.columns)))
display(df_hash_loan_payments)

In [0]:
#converting delta table object to dataframe so we can display it
from delta.tables import DeltaTable
dbtable_loan_payments = DeltaTable.forPath(spark, tgt_path_loan_payments)
dbtable_loan_payments.toDF().show()

In [0]:
df_hash_loan_payments=df_hash_loan_payments.alias("src").join(dbtable_loan_payments.toDF().alias("tgt"), ((col("src.payment_id") == col("tgt.payment_id")) & (col("src.hash_key") == col("tgt.hashkey"))), "anti").select(col("src.*"))
df_hash_loan_payments.show()

In [0]:
dbtable_loan_payments.alias("tgt").merge(df_hash_loan_payments.alias("src"),"tgt.payment_id = src.payment_id")\
    .whenMatchedUpdate(
        set={
            "tgt.payment_id": "src.payment_id",
            "tgt.loan_id": "src.loan_id",
            "tgt.payment_date": "src.payment_date",
            "tgt.payment_amount": "src.payment_amount",
            "tgt.hashkey": "src.hash_key",
            "tgt.updated_date": current_timestamp(),
            "tgt.updated_by": lit("databricks-updated")
        }
    )\
    .whenNotMatchedInsert(
        values={
            "tgt.payment_id": "src.payment_id",
            "tgt.loan_id": "src.loan_id",
            "tgt.payment_date": "src.payment_date",
            "tgt.payment_amount": "src.payment_amount",
            "tgt.hashkey": "src.hash_key",
            "tgt.created_date": current_timestamp(),
            "tgt.created_by": lit("databricks"),
            "tgt.updated_date": current_timestamp(),
            "tgt.updated_by": lit("databricks")       
        }
    ).execute()
    
display(spark.read.format("delta").option("header","true").load(tgt_path_loan_payments))

#### loans table scdtype-1

In [0]:
%sql
--create target table for loans_cleaned file
CREATE TABLE IF NOT EXISTS hive_metastore.default.loans (
    loan_id int,
    customer_id int,
    loan_amount double,
    interest_rate double,
    loan_term int,
    created_by string,
    created_date timestamp,
    updated_by string,
    updated_date timestamp,
    hashkey bigint
)
USING DELTA
LOCATION '/mnt/bootcampproject-2/gold layer/loans'

In [0]:
src_path_loans="/mnt/bootcampproject-2/silver layer/loans_cleaned"
print(src_path_loans)
tgt_path_loans="/mnt/bootcampproject-2/gold layer/loans"
print(tgt_path_loans)

In [0]:
#reading the file just to check if data is present
df_src=spark.read.format("parquet").option("header", "true").option("inferSchema", "true").load(src_path_loans)
display(df_src)

In [0]:
#add hashkey
from pyspark.sql.functions import *
df_hash_loans=df_src.withColumn("hash_key",crc32(concat(*df_src.columns)))
display(df_hash_loans)

In [0]:
#converting delta table object to dataframe so we can display it
from delta.tables import DeltaTable
dbtable_loans = DeltaTable.forPath(spark, tgt_path_loans)
dbtable_loans.toDF().show()

In [0]:
df_hash_loans=df_hash_loans.alias("src").join(dbtable_loans.toDF().alias("tgt"), ((col("src.loan_id") == col("tgt.loan_id")) & (col("src.hash_key") == col("tgt.hashkey"))), "anti").select(col("src.*"))
df_hash_loans.show()

In [0]:
dbtable_loans.alias("tgt").merge(df_hash_loans.alias("src"),"tgt.loan_id = src.loan_id")\
    .whenMatchedUpdate(
        set={
            "tgt.loan_id": "src.loan_id",
            "tgt.customer_id": "src.customer_id",
            "tgt.loan_amount": "src.loan_amount",
            "tgt.interest_rate": "src.interest_rate",
            "tgt.loan_term": "src.loan_term",
            "tgt.hashkey": "src.hash_key",
            "tgt.updated_date": current_timestamp(),
            "tgt.updated_by": lit("databricks-updated")
        }
    )\
    .whenNotMatchedInsert(
        values={
            "tgt.loan_id": "src.loan_id",
            "tgt.customer_id": "src.customer_id",
            "tgt.loan_amount": "src.loan_amount",
            "tgt.interest_rate": "src.interest_rate",
            "tgt.loan_term": "src.loan_term",
            "tgt.hashkey": "src.hash_key",
            "tgt.created_date": current_timestamp(),
            "tgt.created_by": lit("databricks"),
            "tgt.updated_date": current_timestamp(),
            "tgt.updated_by": lit("databricks")       
        }
    ).execute()
    
display(spark.read.format("delta").option("header","true").load(tgt_path_loans))

#### transactions table scdtype-1

In [0]:
%sql
--create target table for loans_cleaned file
CREATE TABLE IF NOT EXISTS hive_metastore.default.transactions (
    transaction_id int,
    account_id int,
    transaction_date date,
    transaction_amount double,
    transaction_type string,
    created_by string,
    created_date timestamp,
    updated_by string,
    updated_date timestamp,
    hashkey bigint
)
USING DELTA
LOCATION '/mnt/bootcampproject-2/gold layer/transactions'

In [0]:
src_path_transactions="/mnt/bootcampproject-2/silver layer/transactions_cleaned"
print(src_path_transactions)
tgt_path_transactions="/mnt/bootcampproject-2/gold layer/transactions"
print(tgt_path_transactions)

In [0]:
#reading the file just to check if data is present
df_src=spark.read.format("parquet").option("header", "true").option("inferSchema", "true").load(src_path_transactions)
display(df_src)

In [0]:
#add hashkey
from pyspark.sql.functions import *
df_hash_transactions=df_src.withColumn("hash_key",crc32(concat(*df_src.columns)))
display(df_hash_transactions)

In [0]:
#converting delta table object to dataframe so we can display it
from delta.tables import DeltaTable
dbtable_transactions = DeltaTable.forPath(spark, tgt_path_transactions)
dbtable_transactions.toDF().show()

In [0]:
df_hash_transactions=df_hash_transactions.alias("src").join(dbtable_transactions.toDF().alias("tgt"), ((col("src.transaction_id") == col("tgt.transaction_id")) & (col("src.hash_key") == col("tgt.hashkey"))), "anti").select(col("src.*"))
df_hash_transactions.show()

In [0]:
dbtable_transactions.alias("tgt").merge(df_hash_transactions.alias("src"),"tgt.transaction_id = src.transaction_id")\
    .whenMatchedUpdate(
        set={
            "tgt.transaction_id": "src.transaction_id",
            "tgt.account_id": "src.account_id",
            "tgt.transaction_date": "src.transaction_date",
            "tgt.transaction_amount": "src.transaction_amount",
            "tgt.transaction_type": "src.transaction_type",
            "tgt.hashkey": "src.hash_key",
            "tgt.updated_date": current_timestamp(),
            "tgt.updated_by": lit("databricks-updated")
        }
    )\
    .whenNotMatchedInsert(
        values={
            "tgt.transaction_id": "src.transaction_id",
            "tgt.account_id": "src.account_id",
            "tgt.transaction_date": "src.transaction_date",
            "tgt.transaction_amount": "src.transaction_amount",
            "tgt.transaction_type": "src.transaction_type",
            "tgt.hashkey": "src.hash_key",
            "tgt.created_date": current_timestamp(),
            "tgt.created_by": lit("databricks"),
            "tgt.updated_date": current_timestamp(),
            "tgt.updated_by": lit("databricks")       
        }
    ).execute()
    
display(spark.read.format("delta").option("header","true").load(tgt_path_transactions))