In [81]:
bank_df = spark.read.format("delta").load("Files/FinData_Silver/Silver_bank_data")
customer_df = spark.read.format("delta").load("Files/FinData_Silver/Silver_customer_data")
transaction_df = spark.read.format("delta").load("Files/FinData_Silver/Silver_transaction_data")


StatementMeta(, 8fcb76ff-051e-4dae-b90d-5618c6df9272, 82, Finished, Available, Finished)

In [94]:
Cust_Bank_df = customer_df.join(bank_df, on="Branch_ID", how="left")
Cust_Trans_df = transaction_df.join(customer_df, on="Customer_ID", how="left")
Fin_Data_df = transaction_df.join(Cust_Bank_df, on="Customer_ID", how="left")
# display(Fin_Data_df)

Cust_Bank_df.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("CustomerBank")

Cust_Trans_df.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("CustomerTransaction")

Fin_Data_df.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("FinancialData")


StatementMeta(, 8fcb76ff-051e-4dae-b90d-5618c6df9272, 101, Finished, Available, Finished)

### **Transction Trends**
##### Transactions Per Months

In [97]:
from pyspark.sql.functions import to_date, month, year, count

MonthlyTrans = (Fin_Data_df
    .withColumn("Transaction_Month", month("Transaction_Date"))
    .withColumn("Transaction_Year", year("Transaction_Date"))
    .groupBy("Transaction_Year", "Transaction_Month", "Branch_ID", "Customer_City")
    .agg(count("Transaction_ID").alias("Transaction_Count"))
    .orderBy("Transaction_Year", "Transaction_Month", "Branch_ID", "Customer_City"))


# display(MonthlyTrans)
MonthlyTrans.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("MonthlyCustomerTransaction")


StatementMeta(, 8fcb76ff-051e-4dae-b90d-5618c6df9272, 105, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 46892edb-a33e-4f99-b7e0-ea33da492ed8)

#### Transactions based on Account Type

In [98]:
from pyspark.sql.functions import sum

accountSummary = (transaction_df
    .groupBy("Account_Type")
    .agg(count("Transaction_ID").alias("Transactions_Count"),
         sum("Transaction_Amount").alias("Total_Amount"))
    .orderBy("Total_Amount", ascending=False))

# display(accountSummary)
accountSummary.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("AccountTypeSummary")


StatementMeta(, 8fcb76ff-051e-4dae-b90d-5618c6df9272, 106, Finished, Available, Finished)

#### Transactions Based on region and City

In [99]:
customer_df = customer_df.withColumnRenamed("Region", "Customer_Region") \
                         .withColumnRenamed("City", "Customer_City")
bank_df = bank_df.withColumnRenamed("Region", "Bank_Region") \
                 .withColumnRenamed("City", "Bank_City")

regionSummary = (transaction_df
    .join(customer_df, on="Customer_ID", how="left")
    .join(bank_df, on="Branch_ID", how="left")
    .groupBy("Customer_Region", "Customer_City")
    .agg(sum("Transaction_Amount").alias("Total_Transaction_Amount"))
    .orderBy("Total_Transaction_Amount", ascending=False))

# display(regionSummary)
regionSummary.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("CustomerCityRegionSummary")


StatementMeta(, 8fcb76ff-051e-4dae-b90d-5618c6df9272, 107, Finished, Available, Finished)

### **Customer Spending Behaviour**

In [86]:
from pyspark.sql.functions import avg, col, when

spendingsDf = (transaction_df
    .join(customer_df, on="Customer_ID", how="left"))

spendingsDf = spendingsDf.withColumn( "Age_Group",
    when(col("Age") < 20, "Teen")
    .when((col("Age") >= 20) & (col("Age") < 30), "Young Adult")
    .when((col("Age") >= 30) & (col("Age") < 50), "Adult")
    .otherwise("Senior"))

# display(spendingsDf)


StatementMeta(, 8fcb76ff-051e-4dae-b90d-5618c6df9272, 87, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cc468c31-92e4-4111-8dcf-486d8080a7ed)

#### Spending By Age Group

In [100]:
from pyspark.sql.functions import sum, round, col, when

spendingByAge = (spendingsDf
    .groupBy("Age_Group")
    .agg(round(sum(col("Transaction_Amount")), 2).alias("Total_Spending"))  
    .orderBy(col("Total_Spending").desc()))

# display(spendingByAge)
spendingByAge.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("CustomerSpendings_Age")

StatementMeta(, 8fcb76ff-051e-4dae-b90d-5618c6df9272, 108, Finished, Available, Finished)

#### Spending Based on Customer Type

In [101]:
spendingsCustType = (spendingsDf
    .groupBy("Customer_Type")
    .agg(round(sum(col("Transaction_Amount")), 2).alias("Total_Spending")) 
    .orderBy("Total_Spending", ascending=False))

# display(spendingsCustType)
spendingsCustType.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("CustomerSpendings_CustomerType")

StatementMeta(, 8fcb76ff-051e-4dae-b90d-5618c6df9272, 109, Finished, Available, Finished)

#### Spending based on City and Cstomer Type 

In [102]:
from pyspark.sql.functions import sum, col, round

SpendingsCityCT = (spendingsDf
    .filter((col("Customer_City") != "Unknown") & (col("Customer_Type") != "Unknown"))
    .groupBy("Customer_City", "Customer_Type")
    .agg(round(sum(col("Transaction_Amount")), 2).alias("Total_Spending"))  
    .orderBy(col("Total_Spending").desc()))

# display(SpendingsCityCT)
SpendingsCityCT.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("CustomerSpendings_CustomerType_City")


StatementMeta(, 8fcb76ff-051e-4dae-b90d-5618c6df9272, 110, Finished, Available, Finished)

### **Risk Assessment**

#### Customer Risk Score 
 RiskScore = Transaction_Amount / (Total_Balance + Investment_Amount)

#### Customer Risk 

In [107]:
from pyspark.sql.functions import col, when

customerRiskDf = (
    Cust_Trans_df
    .withColumn("Risk_Score", 
        when((col("Total_Balance") + col("Investment_Amount")) > 0,
             col("Transaction_Amount") / (col("Total_Balance") + col("Investment_Amount")))
        .otherwise(0))
    .withColumn("Risk_Bucket", 
        when(col("Risk_Score") > 2, "High")
        .when((col("Risk_Score") > 1) & (col("Risk_Score") <= 2), "Medium")
        .otherwise("Low"))
    .select("Customer_ID", "Customer_Type", "Customer_City", "Customer_Region", "Risk_Score", "Risk_Bucket")
    .orderBy(col("Risk_Score").desc())
)

# display(Cust_Trans_df)
# display(customerRiskDf)
customerRiskDf.write.format("delta") \
    .option("overwriteSchema", "true")\
    .mode("overwrite") \
    .saveAsTable("CustomerRisks")


StatementMeta(, 8fcb76ff-051e-4dae-b90d-5618c6df9272, 115, Finished, Available, Finished)

#### Bank Risk 

In [108]:
branchRiskDf = (bank_df
    .withColumn("High_Expense_Flag", when(col("Expenses") > 1000000, 1).otherwise(0))
    .withColumn("Low_Profit_Flag", when(col("Profit_Margin") < 0.10, 1).otherwise(0))
    .withColumn("Branch_Risk", col("High_Expense_Flag") + col("Low_Profit_Flag"))
    .select("Branch_ID", "Bank_City", "Bank_Region", "Expenses", "Profit_Margin", "Branch_Risk")
    .orderBy(col("Branch_Risk").desc()))

# display(branchRiskDf)
branchRiskDf.write.format("delta") \
    .option("overwriteSchema", "true")\
    .mode("overwrite") \
    .saveAsTable("BranchRisks")

StatementMeta(, 8fcb76ff-051e-4dae-b90d-5618c6df9272, 116, Finished, Available, Finished)