In [0]:
%run "/Workspace/Project2/MyUtilityFunctions" 

[SecretScope(name='adlsstgconnection')]

[SecretMetadata(key='appid'),
 SecretMetadata(key='appsecret'),
 SecretMetadata(key='azure-sqldb-password'),
 SecretMetadata(key='azure-sqldb-username'),
 SecretMetadata(key='onprem-system-password')]

/mnt/project2-container is already mounted.


[FileInfo(path='dbfs:/mnt/project2-container/Bronze_Layer/', name='Bronze_Layer/', size=0, modificationTime=1745394195000),
 FileInfo(path='dbfs:/mnt/project2-container/Gold_Layer/', name='Gold_Layer/', size=0, modificationTime=1745524456000),
 FileInfo(path='dbfs:/mnt/project2-container/Silver_Layer/', name='Silver_Layer/', size=0, modificationTime=1745523623000)]

['accounts.csv', 'customers.csv', 'loan_payments.csv', 'loans.csv', 'transactions.csv']


In [0]:
# Exit notebook if the number of files in the Bronze Layer is less than 5

if len(files) < 5:
    dbutils.notebook.exit("Exiting because the number of files is less than 5")

In [0]:
# Load all CSV files from Bronze Layer into a list of DataFrames

raw_data_dfs=[]

for i in range(0,len(files)):
    raw_data_dfs.append(get_adls_data(get_file_path("Bronze_Layer"), "csv", files[i]))

display(raw_data_dfs[4])

transaction_id,account_id,transaction_date,transaction_amount,transaction_type
1,45,01-01-2024,100.5,Deposit
2,12,02-01-2024,200.75,Withdrawal
3,78,03-01-2024,150.0,Deposit
4,34,04-01-2024,300.25,Withdrawal
5,56,05-01-2024,250.0,Deposit
6,23,06-01-2024,175.0,Withdrawal
7,89,07-01-2024,225.5,Deposit
8,67,08-01-2024,275.75,Withdrawal
9,14,09-01-2024,325.0,Deposit
10,92,10-01-2024,375.25,Withdrawal


In [0]:
#  Import trim function and define function to filter out rows with null or blank ID columns
# Apply filters on specific ID columns from each dataset

from pyspark.sql.functions import trim

def get_filtered_df(df, id_column):
    return df.filter(trim(id_column).isNotNull())

filtered_dfs = []

filtered_dfs.append(get_filtered_df(raw_data_dfs[0], raw_data_dfs[0].account_id))
filtered_dfs.append(get_filtered_df(raw_data_dfs[1], raw_data_dfs[1].customer_id))
filtered_dfs.append(get_filtered_df(raw_data_dfs[2], raw_data_dfs[2].payment_id))   
filtered_dfs.append(get_filtered_df(raw_data_dfs[3], raw_data_dfs[3].loan_id))
filtered_dfs.append(get_filtered_df(raw_data_dfs[4], raw_data_dfs[4].transaction_id))

display(filtered_dfs[4])


transaction_id,account_id,transaction_date,transaction_amount,transaction_type
1,45,01-01-2024,100.5,Deposit
2,12,02-01-2024,200.75,Withdrawal
3,78,03-01-2024,150.0,Deposit
4,34,04-01-2024,300.25,Withdrawal
5,56,05-01-2024,250.0,Deposit
6,23,06-01-2024,175.0,Withdrawal
7,89,07-01-2024,225.5,Deposit
8,67,08-01-2024,275.75,Withdrawal
9,14,09-01-2024,325.0,Deposit
10,92,10-01-2024,375.25,Withdrawal


In [0]:
# Define function to return distinct records
# Create a list of distinct (deduplicated) DataFrames

def get_distinct_df(df):
    return df.distinct()

distinct_dfs = []

for i in range(0, len(filtered_dfs)):
    distinct_dfs.append(get_distinct_df(filtered_dfs[i]))

display(distinct_dfs[4])

transaction_id,account_id,transaction_date,transaction_amount,transaction_type
83,82,23-03-2024,150.0,Deposit
81,70,21-03-2024,100.5,Deposit
90,38,30-03-2024,375.25,Withdrawal
23,88,23-01-2024,150.0,Deposit
93,79,02-04-2024,150.0,Deposit
69,59,09-03-2024,325.0,Deposit
61,52,01-03-2024,100.5,Deposit
87,93,27-03-2024,225.5,Deposit
17,99,17-01-2024,225.5,Deposit
71,73,11-03-2024,100.5,Deposit


In [0]:
# Define dictionary to fill missing values with default placeholders

fillna_values = {
    "accounts.csv": 
    {
        "customer_id" : "-1", "account_type": "N/A", "balance": "-1.00"
    },
    "customers.csv": 
    {
        "first_name": "Unknown", "last_name": "Unknown", "address": "Unknown", "city": "Unknown", "state": "Unknown", "zip": "Unknown"
    },
    "loan_payments.csv": 
    {
        "loan_id": "-1", "payment_date": "01-01-1900", "payment_amount": "-1.00"
    },  
    "loans.csv": 
    {
        "customer_id": "-1", "loan_amount": "-1.00", "interest_rate": "-1.00", "loan_term": "-1"
    },
    "transactions.csv": 
    {
        "account_id": "-1", "transaction_date": "01-01-1900", "transaction_amount": "0.00", "transaction_type": "N/A"
    }
}

In [0]:
# Define function to apply fillna transformation
# Apply fillna to each distinct DataFrame using corresponding file key

def get_fillna_df(df, fillna_values):
    return df.fillna(fillna_values)

fillna_dfs = []

for i in range(0, len(distinct_dfs)):
    fillna_dfs.append( get_fillna_df(distinct_dfs[i], fillna_values[files[i]]))
   
display(fillna_dfs[4])

transaction_id,account_id,transaction_date,transaction_amount,transaction_type
83,82,23-03-2024,150.0,Deposit
81,70,21-03-2024,100.5,Deposit
90,38,30-03-2024,375.25,Withdrawal
23,88,23-01-2024,150.0,Deposit
93,79,02-04-2024,150.0,Deposit
69,59,09-03-2024,325.0,Deposit
61,52,01-03-2024,100.5,Deposit
87,93,27-03-2024,225.5,Deposit
17,99,17-01-2024,225.5,Deposit
71,73,11-03-2024,100.5,Deposit


In [0]:
# Import data types and transformation functions
# Cast columns in all dataframes to appropriate types using Spark data types
# Example shown for one dataset (cast_accounts_df)

from pyspark.sql.types import IntegerType, DoubleType, StringType, DateType
from pyspark.sql.functions import to_date


cast_accounts_df = fillna_dfs[0].select(
    fillna_dfs[0].account_id.cast(IntegerType()),
    fillna_dfs[0].customer_id.cast(IntegerType()),
    fillna_dfs[0].account_type.cast(StringType()),
    fillna_dfs[0].balance.cast(DoubleType())
)

cast_customers_df = fillna_dfs[1].select(
    fillna_dfs[1].customer_id.cast(IntegerType()),
    fillna_dfs[1].first_name.cast(StringType()),
    fillna_dfs[1].last_name.cast(StringType()),
    fillna_dfs[1].address.cast(StringType()),
    fillna_dfs[1].city.cast(StringType()),
    fillna_dfs[1].state.cast(StringType()),
    fillna_dfs[1].zip.cast(StringType())
)

cast_loan_payments_df = fillna_dfs[2].select(
    fillna_dfs[2].payment_id.cast(IntegerType()),
    fillna_dfs[2].loan_id.cast(IntegerType()),
    to_date(fillna_dfs[2].payment_date, "dd-MM-yyyy").alias("payment_date"),
    fillna_dfs[2].payment_amount.cast(DoubleType())
)

cast_loans_df = fillna_dfs[3].select(
    fillna_dfs[3].loan_id.cast(IntegerType()),
    fillna_dfs[3].customer_id.cast(IntegerType()),
    fillna_dfs[3].loan_amount.cast(DoubleType()),
    fillna_dfs[3].interest_rate.cast(DoubleType()),
    fillna_dfs[3].loan_term.cast(IntegerType())
)

cast_transactions_df = fillna_dfs[4].select(
    fillna_dfs[4].transaction_id.cast(IntegerType()),
    fillna_dfs[4].account_id.cast(IntegerType()),
    to_date(fillna_dfs[4].transaction_date, "dd-MM-yyyy").alias("transaction_date"),
    fillna_dfs[4].transaction_amount.cast(DoubleType()),
    fillna_dfs[4].transaction_type.cast(StringType())
)

display(cast_transactions_df)


transaction_id,account_id,transaction_date,transaction_amount,transaction_type
83,82,2024-03-23,150.0,Deposit
81,70,2024-03-21,100.5,Deposit
90,38,2024-03-30,375.25,Withdrawal
23,88,2024-01-23,150.0,Deposit
93,79,2024-04-02,150.0,Deposit
69,59,2024-03-09,325.0,Deposit
61,52,2024-03-01,100.5,Deposit
87,93,2024-03-27,225.5,Deposit
17,99,2024-01-17,225.5,Deposit
71,73,2024-03-11,100.5,Deposit


In [0]:
# Rename columns for better readability and consistency using CamelCase

accounts_df = cast_accounts_df.withColumnRenamed("account_id", "AccountId")\
    .withColumnRenamed("customer_id", "CustomerId")\
    .withColumnRenamed("account_type", "AccountType")\
    .withColumnRenamed("balance", "Balance")

customers_df = cast_customers_df.withColumnRenamed("customer_id", "CustomerId")\
    .withColumnRenamed("first_name", "FirstName")\
    .withColumnRenamed("last_name", "LastName")\
    .withColumnRenamed("address", "Address")\
    .withColumnRenamed("city", "City")\
    .withColumnRenamed("state", "State")\
    .withColumnRenamed("zip", "Zip")

loan_payments_df = cast_loan_payments_df.withColumnRenamed("payment_id", "PaymentId")\
    .withColumnRenamed("loan_id", "LoanId")\
    .withColumnRenamed("payment_date", "PaymentDate")\
    .withColumnRenamed("payment_amount", "PaymentAmount")

loans_df = cast_loans_df.withColumnRenamed("loan_id", "LoanId")\
    .withColumnRenamed("customer_id", "CustomerId")\
    .withColumnRenamed("loan_amount", "LoanAmount")\
    .withColumnRenamed("interest_rate", "InterestRate")\
    .withColumnRenamed("loan_term", "LoanTerm")

transactions_df = cast_transactions_df.withColumnRenamed("transaction_id", "TransactionId")\
    .withColumnRenamed("account_id", "AccountId")\
    .withColumnRenamed("transaction_date", "TransactionDate")\
    .withColumnRenamed("transaction_amount", "TransactionAmount")\
    .withColumnRenamed("transaction_type", "TransactionType")

display(transactions_df)

TransactionId,AccountId,TransactionDate,TransactionAmount,TransactionType
83,82,2024-03-23,150.0,Deposit
81,70,2024-03-21,100.5,Deposit
90,38,2024-03-30,375.25,Withdrawal
23,88,2024-01-23,150.0,Deposit
93,79,2024-04-02,150.0,Deposit
69,59,2024-03-09,325.0,Deposit
61,52,2024-03-01,100.5,Deposit
87,93,2024-03-27,225.5,Deposit
17,99,2024-01-17,225.5,Deposit
71,73,2024-03-11,100.5,Deposit


In [0]:
# Join all processed DataFrames to create a unified view for analysis and to generate a PowerBI report using it.

joined_df = accounts_df.join(customers_df, "CustomerId", "inner")\
    .join(loans_df, "CustomerId", "inner")\
    .join(loan_payments_df, "LoanId", "inner")\
    .join(transactions_df, "AccountId", "inner")\
    .distinct()\
    .select("AccountId", "CustomerId", "Balance", "LoanId", "LoanAmount", "PaymentId",\
         "PaymentAmount", "PaymentDate", "TransactionId", "TransactionAmount", "TransactionDate")\
    

display(joined_df)


AccountId,CustomerId,Balance,LoanId,LoanAmount,PaymentId,PaymentAmount,PaymentDate,TransactionId,TransactionAmount,TransactionDate
55,63,425.75,55,25000.75,14,750.0,2024-01-14,79,325.0,2024-03-19
33,85,150.25,33,15000.25,100,1000.0,2024-04-10,70,375.25,2024-03-10
24,11,2600.0,24,30000.0,93,4700.0,2024-04-02,46,175.0,2024-02-15
18,5,1600.5,18,27500.5,47,2400.0,2024-02-16,16,175.0,2024-01-16
5,56,500.0,5,25000.0,64,3250.0,2024-03-04,18,275.75,2024-01-18
83,82,775.75,83,15000.75,62,3150.0,2024-03-02,43,150.0,2024-02-12
6,23,1200.5,6,17500.5,55,2800.0,2024-02-24,48,275.75,2024-02-17
50,31,5100.5,50,37500.5,59,3000.0,2024-02-28,100,550.25,2024-04-09
34,41,3500.5,34,30000.5,6,350.0,2024-01-06,4,300.25,2024-01-04
80,30,8100.0,80,37500.0,89,4500.0,2024-03-29,99,325.0,2024-04-08


In [0]:
# Save cleaned and joined DataFrames to Silver Layer in Parquet format

put_adls_data("/mnt/project2-container/Silver_Layer/", "parquet", accounts_df, "accounts", "overwrite")
put_adls_data("/mnt/project2-container/Silver_Layer/", "parquet", customers_df, "customers", "overwrite")
put_adls_data("/mnt/project2-container/Silver_Layer/", "parquet", loan_payments_df, "loan_payments", "overwrite")
put_adls_data("/mnt/project2-container/Silver_Layer/", "parquet", loans_df, "loans", "overwrite")
put_adls_data("/mnt/project2-container/Silver_Layer/", "parquet", transactions_df, "transactions", "overwrite")

put_adls_data("/mnt/project2-container/Silver_Layer/", "parquet", joined_df, "joined_data", "append")


DataFrame[AccountId: int, CustomerId: int, Balance: double, LoanId: int, LoanAmount: double, PaymentId: int, PaymentAmount: double, PaymentDate: date, TransactionId: int, TransactionAmount: double, TransactionDate: date]

In [0]:
all_data_df=[]

for i in range(0,len(files)):
    all_data_df.append(get_adls_data("/mnt/project2-container/Silver_Layer/", "parquet", files[i]\
        .replace(".csv", "")))


In [0]:
display(all_data_df[0])
display(all_data_df[1])
display(all_data_df[2])
display(all_data_df[3])
display(all_data_df[4])

AccountId,CustomerId,AccountType,Balance
54,42,Checking,5500.5
88,1,Checking,8900.0
82,2,Checking,8300.5
92,44,Checking,9300.0
21,53,Savings,300.25
29,58,Savings,75.25
35,62,Savings,175.75
38,15,Checking,3900.5
72,17,Checking,7300.0
33,85,Savings,150.25


CustomerId,FirstName,LastName,Address,City,State,Zip
45,Christopher,Ward,4444 Maple Ave,Keswick,ON,L4P0A1
68,Charlotte,Griffin,6767 Poplar St,Victoria Harbour,ON,L0K0A1
86,Olivia,Gibson,8585 Elm St,New Liskeard,ON,P0J0A1
80,Emily,Jordan,8888 Airport rd,North Bay,ON,P1B0A1
37,Alexander,Bell,3636 Redwood Dr,Stratford,ON,N5A0A1
83,David,Fisher,8282 Ash Blvd,Verner,ON,P0H0A1
73,Andrew,Hamilton,7272 Maple Ave,Gravenhurst,ON,P1P0A1
34,Olivia,Reed,3333 Birch Blvd,Orillia,ON,L3V0A1
3,Michael,Johnson,789 Oak Dr,Montreal,QC,H1A1A1
9,William,Lopez,808 Redwood Dr,Victoria,BC,V8W0A1


PaymentId,LoanId,PaymentDate,PaymentAmount
4,89,2024-01-04,250.0
23,54,2024-01-23,1200.0
92,13,2024-04-01,4650.0
6,34,2024-01-06,350.0
21,32,2024-01-21,1100.0
39,30,2024-02-08,2000.0
76,37,2024-03-16,3850.0
1,45,2024-01-01,100.0
45,96,2024-02-14,2300.0
34,75,2024-02-03,1750.0


LoanId,CustomerId,LoanAmount,InterestRate,LoanTerm
54,42,30000.5,4.0,48
29,58,32500.25,5.0,36
72,17,20000.0,3.0,24
88,1,27500.0,3.0,24
78,4,27500.5,4.0,48
18,5,27500.5,4.5,48
70,33,37500.5,4.0,48
17,99,22500.25,5.5,36
46,24,17500.5,4.0,48
8,67,27500.0,3.0,24


TransactionId,AccountId,TransactionDate,TransactionAmount,TransactionType
83,82,2024-03-23,150.0,Deposit
81,70,2024-03-21,100.5,Deposit
90,38,2024-03-30,375.25,Withdrawal
23,88,2024-01-23,150.0,Deposit
93,79,2024-04-02,150.0,Deposit
69,59,2024-03-09,325.0,Deposit
61,52,2024-03-01,100.5,Deposit
87,93,2024-03-27,225.5,Deposit
17,99,2024-01-17,225.5,Deposit
71,73,2024-03-11,100.5,Deposit
