In [0]:
dbutils.fs.ls('/mnt/adlsgen2project2container/')

In [0]:
from datetime import datetime

today = datetime.today()
src_path = f"/mnt/adlsgen2project2container/BronzeLayer/{today.year}/{today.strftime('%m')}/{today.strftime('%d')}"
print("Today's path:", src_path)

In [0]:
try:
    dbutils.fs.ls(src_path)
except:
    print(f'File path not found {src_path}')

In [0]:
src_pathAccounts = src_path + '/accounts.csv'

In [0]:
def createDataframeAccounts(src_pathAccounts, schema):
    try:
        return spark.read.format("csv").option("header", "true").schema(schema).load(src_pathAccounts)
    except:
        print(f'File path not found {src_pathAccounts}')
        return spark.createDataFrame([],schema)


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

In [0]:
accounts_schema=StructType([StructField('account_id', IntegerType()), StructField('customer_id', IntegerType()), StructField('account_type', StringType()), StructField('balance', DoubleType())])

In [0]:
accounts= createDataframeAccounts(src_pathAccounts, accounts_schema)

In [0]:
display(accounts)

In [0]:
def createDataFrameCustomers(src_pathCustomers, customer_schema):
    try:
        return spark.read.format("csv").option("header", "true").schema(customer_schema).load(src_pathCustomers)
    except:
        print(f'File path not found {src_pathCustomers}')
        return spark.createDataFrame([],customer_schema)

In [0]:
customer_schema=StructType([StructField('customer_id', IntegerType()), StructField('first_name', StringType()), StructField('last_name', StringType()), StructField('address', StringType()), StructField('city', StringType()), StructField('state', StringType()), StructField('zip', StringType())])

In [0]:
src_pathCustomers = src_path + '/customers.csv'
customers= createDataFrameCustomers(src_pathCustomers, customer_schema)
display(customers)

In [0]:
def createDataFrameTransactions(src_pathTransactions, schema):
    try:
        return spark.read.format("csv").option("header", "true").schema(schema).load(src_pathTransactions)
          
    except:
        print(f'File path not found {src_pathTransactions}')
        return spark.createDataFrame([],schema)


In [0]:
transaction_schema=StructType([StructField('transaction_id', IntegerType()), StructField('account_id', IntegerType()), StructField('transaction_date', DateType()), StructField('transaction_amount', DoubleType()), StructField('transaction_type', StringType())])

In [0]:
src_pathTransactions = src_path + '/transactions.csv'
transactions=createDataFrameTransactions(src_pathTransactions, transaction_schema)
display(transactions)

In [0]:
def createDataFrameLoans(src_pathLoans, schema):
    try:
        return spark.read.format("csv").option("header", "true").schema(schema).load(src_pathLoans)
          
    except:
        print(f'File path not found {src_pathLoans}')
        return spark.createDataFrame([],schema)

In [0]:
loans_schema=StructType([StructField('loan_id', IntegerType()), StructField('customer_id', IntegerType()), StructField('loan_amount', DoubleType()), StructField('interest_rate', DoubleType()), StructField('loan_term', IntegerType())])

In [0]:
src_pathLoans = src_path + '/loans.csv'
loans= createDataFrameLoans(src_pathLoans, loans_schema)
display(loans)

In [0]:
def createDataFrameLoanPayments(src_pathLoanPayments, schema):
    try:
        return spark.read.format("csv").option("header", "true").schema(schema).load(src_pathLoanPayments)
          
    except:
        print(f'File path not found {src_pathLoanPayments}')
        return spark.createDataFrame([],schema)

In [0]:
loanPayments_schema=StructType([StructField('payment_id', IntegerType()), StructField('loan_id', IntegerType()), StructField('payment_date', DateType()),StructField('payment_amount', DoubleType())])

In [0]:
src_pathLoanPayments = src_path + '/loan_payments.csv'
loanPayments= createDataFrameLoanPayments(src_pathLoanPayments, loanPayments_schema)
display(loanPayments)

In [0]:
customers=customers.dropna(subset=["customer_id"]).dropDuplicates()
customers=customers.fillna("unknown",subset=["address","zip","state","city","last_name"])
display(customers)
customers.write.mode("overwrite").option("header", "true").parquet('/mnt/adlsgen2project2container/SilverLayer/ParquetFiles/Customers')

In [0]:
accounts=accounts.dropna(subset=["account_id","customer_id"]).dropDuplicates().fillna("CheckingandSaving","account_type").fillna("0.0","balance")
display(accounts)
accounts.write.mode("overwrite").option("header", "true").parquet('/mnt/adlsgen2project2container/SilverLayer/ParquetFiles/Accounts')

In [0]:
loans=loans.dropna(subset=["loan_id","customer_id"]).dropDuplicates().fillna("0.0","loan_amount").fillna("0.0","interest_rate").fillna("0","loan_term")
display(loans)
loans.write.mode("overwrite").option("header", "true").parquet('/mnt/adlsgen2project2container/SilverLayer/ParquetFiles/Loans')

In [0]:
loanPayments=loanPayments.dropna(subset=["payment_id","loan_id"]).dropDuplicates().fillna("0.0","payment_amount").fillna("1900-01-01","payment_date")
display(loanPayments)
loanPayments.write.mode("overwrite").option("header", "true").parquet('/mnt/adlsgen2project2container/SilverLayer/ParquetFiles/LoanPayments')

In [0]:
transactions=transactions.dropna(subset=["transaction_id","account_id"]).dropDuplicates().fillna("0.0","transaction_amount").fillna("1900-01-01","transaction_date").fillna("unknown","transaction_type")
display(transactions)
transactions.write.mode("overwrite").option("header", "true").parquet('/mnt/adlsgen2project2container/SilverLayer/ParquetFiles/Transactions')

In [0]:
df_delta=accounts.join(customers, on="customer_id", how="inner").join(transactions, on="account_id", how="inner").join(loans, on="customer_id", how="inner").join(loanPayments, on="loan_id", how="inner")

In [0]:
df=df_delta.select("account_id","customer_id","loan_id","payment_id","transaction_id","balance","payment_amount","loan_amount","transaction_amount","payment_date","transaction_date")
display(df)

In [0]:
df.write.mode("append").format("Delta").option("header", "true").save('/mnt/adlsgen2project2container/SilverLayer/DeltaFiles')

In [0]:
df.printSchema()

In [0]:
%sql
use catalog hive_metastore

In [0]:
%sql
CREATE TABLE IF NOT EXISTS hive_metastore.default.jointables
(
    account_id INT,
    customer_id INT,
    loan_id INT,
    payment_id INT,
    transaction_id INT,
    balance DOUBLE,
    payment_amount DOUBLE,
    loan_amount DOUBLE,
    transaction_amount DOUBLE,
    payment_date DATE,
    transaction_date DATE
)
USING DELTA
LOCATION '/mnt/adlsgen2project2container/SilverLayer/DeltaFiles';

In [0]:
%sql
select * from hive_metastore.default.jointables


In [0]:
# from pyspark.sql.functions import *

In [0]:
# acc_cust_df = accounts.join(customers, on="customer_id", how="inner").join(transactions, on="account_id", how="inner")

# loans_payments_df = loans.join(loanPayments, on="loan_id", how="left")

# merged_df = acc_cust_df.join(loans_payments_df, on="customer_id", how="inner")

In [0]:
# df_selected_columns = merged_df.select(
#     col("account_id").alias("Account_ID"),
#     col("customer_id").alias("Customer_ID"),
#     col("loan_id").alias("Loan_ID"),
#     col("payment_id").alias("Payment_ID"),
#     col("transaction_id").alias("Transaction_ID"),
#     col("balance").alias("Account_Balance"),
#     col("payment_amount").alias("Payment_Amount"),
#     col("loan_amount").alias("Loan_Amount"),
#     col("transaction_amount").alias("Transaction_Amount"),
#     col("payment_date").alias("Payment_Date"),
#     col("transaction_date").alias("Transaction_Date")
# )

# display(df_selected_columns)

In [0]:
# df_selected_columns.write.format("delta").mode("overwrite").save("/mnt/adlsgen2project2container/SilverLayer/DeltaFiles")

In [0]:
# df_delta=loanPayments.join(loans, loanPayments.loan_id == loans.loan_id, 'inner').select("payment_id","payment_date","payment_amount","loan_amount","customer_id")
# #select ('loanPayments.payment_id', 'loans.loan_id', 'loans.loan_amount', 'loans.interest_rate', 'loans.loan_term')

In [0]:
# display(df_delta)

In [0]:
# from pyspark.sql.functions import col

In [0]:
# df_delta = loanPayments.join(
#     loans, 
#     loanPayments.loan_id == loans.loan_amount_id, 
#     'inner'
# ).select(
#     loanPayments.payment_id,
#     loans.loan_id,
#     loans.loan_amount,
#     loans.interest_rate,
#     loans.loan_term
# )

In [0]:
# df_delta=loanPayments.join(customers, customers.account_id == loanPayments.account_id, 'inner').join(loans, customers.customer_id == loans.customer_id, 'inner').join(loanPayments, loans.loan_id == loanPayments.loan_id, 'inner')

In [0]:
# acc_cust_df = accounts.join(customers, on="customer_id", how="inner").join(transactions, on="account_id", how="inner").join(loanPayments, on="loan_id", how="left").join(loanPayments, on="customer_id", how="inner")

In [0]:
# df_selected_columns = merged_df.select(
#     col("account_id").alias("Account_ID"),
#     col("customer_id").alias("Customer_ID"),
#     col("loan_id").alias("Loan_ID"),
#     col("payment_id").alias("Payment_ID"),
#     col("transaction_id").alias("Transaction_ID"),
#     col("balance").alias("Account_Balance"),
#     col("payment_amount").alias("Payment_Amount"),
#     col("loan_amount").alias("Loan_Amount"),
#     col("transaction_amount").alias("Transaction_Amount"),
#     col("payment_date").alias("Payment_Date"),
#     col("transaction_date").alias("Transaction_Date")
# )