In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, when, lit, to_date, datediff
import pyspark.sql.functions as F
from pyspark.sql.window import Window



In [None]:
import pandas as pd
import numpy as np

In [None]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Spark SQL_Bi") \
    .getOrCreate()

In [None]:
# Read the Disbursement Excel file using pandas
disbursement_df = pd.read_excel("/content/BI_Analyst_Case_Study_Data.xlsx", sheet_name= "Disbursements")

In [None]:
# Read the Repayment Excel file using pandas
repayment_df = pd.read_excel("/content/BI_Analyst_Case_Study_Data.xlsx", sheet_name= "Repayments")

In [None]:
# convert pandas DataFrame to Spark DataFrame on disbursement data
dis_spark_df = spark.createDataFrame(disbursement_df)

In [None]:
# convert pandas DataFrame to Spark DataFrame on repayment data
rep_spark_df = spark.createDataFrame(repayment_df)

In [None]:
# Registering the DataFrame as a temporary SQL Table
dis_spark_df.createOrReplaceTempView("disbursements")

In [None]:
# Registering the DataFrame as temporary SQL Table
rep_spark_df.createOrReplaceTempView("repayments")

In [None]:
from pyspark.sql.functions import date_format
# Formatting dates from disbursements and repayments to one format
dis_spark_df = dis_spark_df.withColumn("disb_date", date_format(col("disb_date"), "yyyy-MM-dd"))
rep_spark_df = rep_spark_df.withColumn("date_time", date_format(col("date_time"), "yyyy-MM-dd"))

In [None]:
spark.sql("SELECT * FROM disbursements LIMIT 5").show()

+--------------------+-------------------+------+--------------------+-----------+--------+------------------+
|         customer_id|          disb_date|tenure|         account_num|loan_amount|loan_fee|disbursement_month|
+--------------------+-------------------+------+--------------------+-----------+--------+------------------+
|af046ce17d65fc256...|2024-01-01 00:00:00|    30|8Z6UEMB3MLTZ3C58Q...|        900|   135.0|          Jan-2024|
|9e6959bfdf55f7260...|2024-01-01 00:00:00|    14|80VUWDVJG3H93EP1X...|        140|    16.8|          Jan-2024|
|6f1aa549482fd527f...|2024-01-01 00:00:00|    30|8ESB6FAUBUDGJ4IW6...|       3500|   525.0|          Jan-2024|
|2a8dd19946bad9a45...|2024-01-01 00:00:00|    14|K0FQ983D12WYIINWD...|         70|     8.4|          Jan-2024|
|92ae786a7b7851930...|2024-01-01 00:00:00|     7|360XKMXQ9N8KHL2V0...|        850|    85.0|          Jan-2024|
+--------------------+-------------------+------+--------------------+-----------+--------+------------------+



In [None]:
spark.sql("SELECT * FROM repayments LIMIT 5").show()

+-------------------+--------------------+------+---------+---------------+--------------+
|          date_time|         customer_id|amount|rep_month|repayment_month|repayment_type|
+-------------------+--------------------+------+---------+---------------+--------------+
|2024-01-01 00:00:00|f6bbe9d609181778a...|266.66|   202401|       Jan-2024|        Manual|
|2024-01-01 00:00:00|7501da53759a05f3f...|180.18|   202401|       Jan-2024|     Automatic|
|2024-01-01 00:00:00|42e1482d23e0b41ab...|  8.33|   202401|       Jan-2024|     Automatic|
|2024-01-01 00:00:00|0e7efb36b0c32a8d7...|  2.92|   202401|       Jan-2024|     Automatic|
|2024-01-01 00:00:00|84a64ca68e373614f...| 91.64|   202401|       Jan-2024|        Manual|
+-------------------+--------------------+------+---------+---------------+--------------+



## Exploring Disbursements table - Understanding the structure and content

In [None]:
spark.sql("SELECT COUNT(customer_id) FROM disbursements").show()

+------------------+
|count(customer_id)|
+------------------+
|             26365|
+------------------+



In [None]:
# Checking for unique accounts
spark.sql("SELECT COUNT(DISTINCT customer_id) FROM disbursements").show()

+---------------------------+
|count(DISTINCT customer_id)|
+---------------------------+
|                       2996|
+---------------------------+



In [None]:
# Checking the earliest and latest disbursement
spark.sql("SELECT MIN(disb_date), MAX(disb_date) FROM disbursements").show()

+-------------------+-------------------+
|     min(disb_date)|     max(disb_date)|
+-------------------+-------------------+
|2024-01-01 00:00:00|2024-08-11 00:00:00|
+-------------------+-------------------+



In [None]:
spark.sql("SELECT AVG(loan_amount), MAX(loan_amount), MIN(loan_amount) FROM disbursements").show()

+------------------+----------------+----------------+
|  avg(loan_amount)|max(loan_amount)|min(loan_amount)|
+------------------+----------------+----------------+
|1004.5755736772236|            3500|              70|
+------------------+----------------+----------------+



In [None]:
spark.sql("SELECT AVG(loan_fee), MAX(loan_fee), MIN(loan_fee) FROM disbursements").show()

+------------------+-------------+-------------+
|     avg(loan_fee)|max(loan_fee)|min(loan_fee)|
+------------------+-------------+-------------+
|130.19424464251773|        525.0|          7.0|
+------------------+-------------+-------------+



In [None]:
# Removing Dupicates
# spark.sql("SELECT COUNT(DISTINCT customer_id, disb_date, account_num, loan_amount, loan_fee) FROM disbursements").show()

## Exploring Repayments Table

In [None]:
spark.sql("SELECT COUNT(customer_id) FROM repayments").show()

+------------------+
|count(customer_id)|
+------------------+
|             65210|
+------------------+



In [None]:
spark.sql("SELECT COUNT(DISTINCT customer_id) FROM repayments").show()

+---------------------------+
|count(DISTINCT customer_id)|
+---------------------------+
|                       2995|
+---------------------------+



In [None]:
# Checking the earliest repayment date
spark.sql("SELECT MIN(date_time), MAX(date_time) FROM repayments").show()

+-------------------+-------------------+
|     min(date_time)|     max(date_time)|
+-------------------+-------------------+
|2024-01-01 00:00:00|2024-08-26 00:00:00|
+-------------------+-------------------+



In [None]:
spark.sql("SELECT AVG(amount), MAX(amount), MIN(amount) FROM repayments").show()

+------------------+-----------+-----------+
|       avg(amount)|max(amount)|min(amount)|
+------------------+-----------+-----------+
|460.53170479988097|     4025.0|       0.01|
+------------------+-----------+-----------+



In [None]:
spark.sql("SELECT DISTINCT rep_month FROM repayments SORT BY rep_month").show()
# This spans 8 months from January to August, 2024

+---------+
|rep_month|
+---------+
|   202401|
|   202402|
|   202403|
|   202404|
|   202405|
|   202406|
|   202407|
|   202408|
+---------+



In [None]:
spark.sql("SELECT * FROM repayments LIMIT 5").show()

+-------------------+--------------------+------+---------+---------------+--------------+
|          date_time|         customer_id|amount|rep_month|repayment_month|repayment_type|
+-------------------+--------------------+------+---------+---------------+--------------+
|2024-01-01 00:00:00|f6bbe9d609181778a...|266.66|   202401|       Jan-2024|        Manual|
|2024-01-01 00:00:00|7501da53759a05f3f...|180.18|   202401|       Jan-2024|     Automatic|
|2024-01-01 00:00:00|42e1482d23e0b41ab...|  8.33|   202401|       Jan-2024|     Automatic|
|2024-01-01 00:00:00|0e7efb36b0c32a8d7...|  2.92|   202401|       Jan-2024|     Automatic|
|2024-01-01 00:00:00|84a64ca68e373614f...| 91.64|   202401|       Jan-2024|        Manual|
+-------------------+--------------------+------+---------+---------------+--------------+



In [None]:
# Removing duplicates
# spark.sql("SELECT COUNT(DISTINCT customer_id, date_time, amount, rep_month, repayment_type) FROM repayments").show()

### Checking for missing values from Disbursements

In [None]:
# There are no missing values from the disbursement dataset

spark.sql("""SELECT 'customer_id' AS column_name, COUNT(*) AS null_count FROM disbursements WHERE customer_id IS NULL
UNION ALL
SELECT 'disb_date', COUNT(*) FROM disbursements WHERE disb_date IS NULL
UNION ALL
SELECT 'account_num', COUNT(*) FROM disbursements WHERE account_num IS NULL
UNION ALL
SELECT 'loan_amount', COUNT(*) FROM disbursements WHERE loan_amount IS NULL
UNION ALL
SELECT 'loan_fee', COUNT(*) FROM disbursements WHERE loan_fee IS NULL""").show()


+-----------+----------+
|column_name|null_count|
+-----------+----------+
|customer_id|         0|
|  disb_date|         0|
|account_num|         0|
|loan_amount|         0|
|   loan_fee|         0|
+-----------+----------+



### Checking for missing values from the repayment dataset

In [None]:
spark.sql("""
    SELECT 'customer_id' AS column_name, COUNT(*) AS null_count FROM repayments WHERE customer_id IS NULL
    UNION ALL
    SELECT 'date_time', COUNT(*) FROM repayments WHERE date_time IS NULL
    UNION ALL
    SELECT 'amount', COUNT(*) FROM repayments WHERE amount IS NULL
    UNION ALL
    SELECT 'rep_month', COUNT(*) FROM repayments WHERE rep_month IS NULL
    UNION ALL
    SELECT 'repayment_type', COUNT(*) FROM repayments WHERE repayment_type IS NULL
""").show()

# There are no null or missing values from the repayment dataset

+--------------+----------+
|   column_name|null_count|
+--------------+----------+
|   customer_id|         0|
|     date_time|         0|
|        amount|         0|
|     rep_month|         0|
|repayment_type|         0|
+--------------+----------+



In [None]:
# Some of the key features of the lending product

Key Questions:

What is the time range of the data?

What are the typical loan amounts and fees?

What types of repayments are there?

Are there any missing values?

How many unique customers and accounts are there?

In [None]:
# sorting dataframe by repayment date ascending order


### Merging the data and Identifying defaults

In [None]:
merged_df = spark.sql("""
WITH loan_info AS (
    SELECT
        d.customer_id,
        d.disb_date,
        d.tenure,
        d.loan_amount,
        d.loan_fee,
        date_add(d.disb_date, cast(d.tenure as int)) AS due_date  -- Cast tenure to INT
    FROM disbursements d
),
repayment_info AS (
    SELECT
        r.customer_id,
        DATE_FORMAT(MAX(r.date_time), 'yyyy-MM-dd') AS repayment_date,
        SUM(r.amount) AS total_repaid
    FROM repayments r
    GROUP BY r.customer_id
),
loan_status AS (
    SELECT
        l.customer_id,
        l.disb_date,
        l.due_date,
        r.repayment_date,
        r.total_repaid,
        l.loan_amount,
        l.loan_fee,
        CASE
            WHEN r.repayment_date IS NULL OR r.repayment_date > l.due_date THEN 'Defaulted'
            ELSE 'Repaid on Time'
        END AS repayment_status
    FROM loan_info l
    LEFT JOIN repayment_info r ON l.customer_id = r.customer_id
)
SELECT * FROM loan_status
ORDER BY repayment_date ASC  -- Sorting by repayment_date in ascending order
LIMIT 20
""").show()


+--------------------+-------------------+----------+--------------+------------------+-----------+--------+----------------+
|         customer_id|          disb_date|  due_date|repayment_date|      total_repaid|loan_amount|loan_fee|repayment_status|
+--------------------+-------------------+----------+--------------+------------------+-----------+--------+----------------+
|373a0279abf696d43...|2024-01-14 00:00:00|2024-01-28|          NULL|              NULL|        150|    18.0|       Defaulted|
|561a158588df57451...|2024-01-06 00:00:00|2024-01-20|    2024-01-14|             324.8|        290|    34.8|  Repaid on Time|
|e629fdccdf577b8ba...|2024-08-02 00:00:00|2024-09-01|    2024-01-14|            2760.0|       2600|   390.0|  Repaid on Time|
|8bb494df97747a979...|2024-01-03 00:00:00|2024-01-17|    2024-01-17|214.99999999999997|        270|    32.4|  Repaid on Time|
|04deddb26f4e1b554...|2024-01-12 00:00:00|2024-01-19|    2024-01-19|             340.0|        210|    21.0|  Repaid o

In [108]:
merged_df.to_excel("merged_data.xlsx", index=False)

In [107]:
# spark.sql("SELECT * FROM loan_status").show()
# Getting data from dataframe
loan_status_df = spark.sql("SELECT * FROM loan_status")
# Converting to pandas df
loan_status_df = loan_status_df.toPandas()
loan_status_df.to_excel("loan_status.xlsx", index=False)


### Repayments By Month

In [None]:
monthly_repaid_df =spark.sql("""
SELECT repayment_month, SUM(amount) AS total_repaid
FROM repayments
GROUP BY repayment_month
ORDER BY repayment_month
""").show()

+---------------+------------------+
|repayment_month|      total_repaid|
+---------------+------------------+
|       Apr-2024|4250145.3099999875|
|       Aug-2024| 2730247.249999998|
|       Feb-2024|3600989.9699999853|
|       Jan-2024| 3422364.489999984|
|       Jul-2024|3416517.6499999883|
|       Jun-2024|3995342.2999999854|
|       Mar-2024| 4152027.519999991|
|       May-2024|  4463637.97999999|
+---------------+------------------+



### Disbursment by Date

In [None]:
monthly_disbursement_df =spark.sql("""
SELECT disbursement_month, SUM(loan_amount) AS total_disbursement
FROM disbursements
GROUP BY disbursement_month
ORDER BY disbursement_month
""").show()

+------------------+------------------+
|disbursement_month|total_disbursement|
+------------------+------------------+
|          Apr-2024|           3919811|
|          Aug-2024|           1194268|
|          Feb-2024|           3392645|
|          Jan-2024|           3172375|
|          Jul-2024|           3380538|
|          Jun-2024|           3499828|
|          Mar-2024|           3922131|
|          May-2024|           4004039|
+------------------+------------------+



### Profit/Loss Calculation
We define profit = Monthly Repayment - Monthly Disbursement

In [None]:
# Monthly Repayment - Monthly Disbursement
# Storing the DataFrames in the variables.

monthly_repaid_df = spark.sql("""
SELECT repayment_month, SUM(amount) AS total_repaid
FROM repayments
GROUP BY repayment_month
ORDER BY repayment_month
""")

monthly_disbursement_df = spark.sql("""
SELECT disbursement_month, SUM(loan_amount) AS total_disbursement
FROM disbursements
GROUP BY disbursement_month
ORDER BY disbursement_month
""")


# Converting the Spark DataFrames to Pandas DataFrames.
monthly_repaid_pd_df = monthly_repaid_df.toPandas()
monthly_disbursement_pd_df = monthly_disbursement_df.toPandas()

# Renaming the 'disbursement_month' column to 'repayment_month' for merging.
monthly_disbursement_pd_df = monthly_disbursement_pd_df.rename(columns={"disbursement_month": "repayment_month"})

# Merge the two Pandas DataFrames on 'repayment_month'.
merged_df = pd.merge(monthly_repaid_pd_df, monthly_disbursement_pd_df, on='repayment_month', how='outer')

# Fill NaN values with 0 for calculation.
merged_df = merged_df.fillna(0)

# Calculate the profit/loss.
merged_df['Profit'] = merged_df['total_repaid'] - merged_df['total_disbursement']

# Convert the Pandas DataFrame back to a Spark DataFrame.
profit_df = spark.createDataFrame(merged_df)

profit_df.show()

+---------------+------------------+------------------+------------------+
|repayment_month|      total_repaid|total_disbursement|            Profit|
+---------------+------------------+------------------+------------------+
|       Apr-2024|4250145.3099999875|           3919811| 330334.3099999875|
|       Aug-2024| 2730247.249999998|           1194268|1535979.2499999981|
|       Feb-2024|3600989.9699999853|           3392645| 208344.9699999853|
|       Jan-2024| 3422364.489999984|           3172375|249989.48999998393|
|       Jul-2024|3416517.6499999883|           3380538|35979.649999988265|
|       Jun-2024|3995342.2999999854|           3499828| 495514.2999999854|
|       Mar-2024| 4152027.519999991|           3922131|229896.51999999117|
|       May-2024|  4463637.97999999|           4004039| 459598.9799999902|
+---------------+------------------+------------------+------------------+



In [89]:
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio
pio.renderers.default = "colab"
import warnings
warnings.filterwarnings("ignore")

In [90]:
# Bar plot on total repaid and total_disbursement
fig = px.bar(profit_df, x='repayment_month', y=['total_repaid', 'total_disbursement'], barmode='group', title='Monthly Repayment and Disbursement')
fig.show()

- Repayments (blue) are generally higher than disbursements (red)
- This suggests that loan collections exceed new loans issued, which is good for liquidity.
--------------------------------------------------------------------------------
- March to May 2024 saw peak repayment and disbursement levels
- This could indicate higher loan activity during this period.
--------------------------------------------------------------------------------
- A noticeable decline in August 2024: This could indicate a decrease in lending activity or a seasonal effect.
- Repayments and disbursements were nearly equal in July 2024: Stable loan cycle in that month
--------------------------------------------------------------------------------
## Potential Insights:
- If this trend continues, the institution may need to increase loan disbursements to sustain its portfolio.
- The sharp drop in August could be due to policy changes, economic conditions, or seasonal lending trends.

In [94]:
# Trendline Chart on Profit
import plotly.express as px

# Data aggregated by month
profit_df = profit_df.groupby('repayment_month', as_index=False).sum()

# Line plot with markers and a thicker line
fig = px.line(profit_df, x='repayment_month', y='Profit', title='Monthly Profit Trend',
              markers=True)  # Add markers

fig.update_traces(line=dict(width=3))  # line thickness
fig.show()



- Overall Trend: While there's an overall upward trend from April to June and a huge spike in August, the volatility with the July drop raises concerns about the consistency and predictability of profit.
- However, the drop in July and the spike in August is attributed to the methodology used in profit calculation
   -  The difference in value of monthly repayments to the loans disbursed in july is minimal, resulting in lower profit.
   -  In August, the difference is significantly larger, despite the low volume of transactions. This creates the appearance of a sharp profit increase

### Forecasting 3-Month Profit/Loss
We use Exponential Smoothing Average in Python

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")
from statsmodels.tsa.holtwinters import ExponentialSmoothing




In [80]:
# Load data
profit_df = profit_df.toPandas()
profit_df["repayment_month"] = pd.to_datetime(profit_df["repayment_month"])


In [None]:
# Fit model
# The data spans eight months, which means we change the seasonal_periods to a value less than half of the data length
model = ExponentialSmoothing(profit_df['Profit'], trend='add', seasonal='add', seasonal_periods=3)
fit = model.fit()
# Not enough data for better forecast



In [None]:
# Forecast next 3 months
# forecast = fit.forecast(steps=3)
# print(forecast)

### Credit Exposure (ECL Method)
Expected Loss = PD * LGD * EAD

PD = Probability of default

LGD = Loss Given Default (Percentage of outstanding loan amount that will be lost if a borrower defaults)

EAD = Exposure at Default (The total amount a borrower owes at the time of default)

In [81]:
spark.sql("""WITH loan_info AS (
    SELECT
        d.customer_id,
        d.disb_date,
        d.tenure,
        d.loan_amount,
        d.loan_fee,
        DATE_ADD(d.disb_date, CAST(d.tenure AS INT)) AS due_date  -- Cast tenure to INT
    FROM disbursements d
),
repayment_info AS (
    SELECT
        r.customer_id,
        DATE_FORMAT(MAX(r.date_time), 'yyyy-MM-dd') AS repayment_date,
        SUM(r.amount) AS total_repaid
    FROM repayments r
    GROUP BY r.customer_id
),
loan_status AS (
    SELECT
        l.customer_id,
        l.disb_date,
        l.due_date,
        r.repayment_date,
        r.total_repaid,
        l.loan_amount,
        l.loan_fee,
        CASE
            WHEN r.total_repaid < l.loan_amount THEN 'Defaulted'
            ELSE 'Repaid on Time'
        END AS repayment_status
    FROM loan_info l
    LEFT JOIN repayment_info r ON l.customer_id = r.customer_id
),
default_rate AS (
    SELECT
        COUNT(CASE WHEN repayment_status = 'Defaulted' THEN 1 END) * 1.0 / COUNT(*) AS PD
    FROM loan_status
),
loan_exposure AS (
    SELECT
        SUM(CASE WHEN repayment_status = 'Defaulted' THEN loan_amount END) AS total_defaulted,
        SUM(loan_amount) AS total_outstanding
    FROM loan_status
),
expected_loss AS (
    SELECT
        dr.PD,
        le.total_outstanding * 0.40 AS LGD,  -- 40% LGD
        le.total_outstanding AS EAD,
        (dr.PD * 0.40 * le.total_outstanding) AS expected_loss  -- Fixed calculation
    FROM default_rate dr, loan_exposure le
)
SELECT * FROM expected_loss
          """).show()


+------------------+-----------+--------+-------------+
|                PD|        LGD|     EAD|expected_loss|
+------------------+-----------+--------+-------------+
|0.0012137303242936|10594254.00|26485635| 12858.567343|
+------------------+-----------+--------+-------------+



- Changed the code such that loans are only marked as defaulted if not fully repaid
    - Late repayments (but not fully repaid loans) are not falsely classified as defaults
- This is because some borrowers may have repaid late but still fully repaid

PD 0.12% of loans are defaulting, which is very low.

LGD 40% of total outstanding loans	Still correct, assuming 40% loss given default.

EAD Total outstanding loans	Makes sense, showing loans yet to be repaid.

Expected Loss PD × LGD × EAD	Very small loss amount, since PD is so low.


### Provisioning & write-off threshold
- Provisioning: Set aside reserves based on credit exposure
- Write-Offs: Consider a threshold where loans unpaid for > 90 days are written off

In [None]:
loan_status_df = spark.sql("""WITH loan_info AS (
    SELECT
        d.customer_id,
        d.disb_date,
        d.tenure,
        d.loan_amount,
        d.loan_fee,
        date_add(d.disb_date, cast(d.tenure as int)) AS due_date
    FROM disbursements d
),
repayment_info AS (
    SELECT
        r.customer_id,
        DATE_FORMAT(MAX(r.date_time), 'yyyy-MM-dd') AS repayment_date,
        SUM(r.amount) AS total_repaid
    FROM repayments r
    GROUP BY r.customer_id
),
loan_status AS (
    SELECT
        l.customer_id,
        l.disb_date,
        l.due_date,
        r.repayment_date,
        r.total_repaid,
        l.loan_amount,
        l.loan_fee,
        CASE
            WHEN r.total_repaid IS NULL OR r.total_repaid < l.loan_amount THEN 'Defaulted'
            ELSE 'Repaid on Time'
        END AS repayment_status
    FROM loan_info l
    LEFT JOIN repayment_info r ON l.customer_id = r.customer_id
)
SELECT
    customer_id,
    loan_amount,
    repayment_status,
    CASE
        WHEN repayment_status = 'Repaid on Time' THEN 'Active Loan'
        WHEN DATEDIFF(CURRENT_DATE(), due_date) > 90 THEN 'Written Off'
        ELSE 'Provision'
    END AS loan_status
FROM loan_status""").show()


+--------------------+-----------+----------------+-----------+
|         customer_id|loan_amount|repayment_status|loan_status|
+--------------------+-----------+----------------+-----------+
|844cce1d162dfaa5c...|        280|  Repaid on Time|Active Loan|
|30e629a2abbf54ef9...|       2170|  Repaid on Time|Active Loan|
|3aa2f0ab5e7ccbc96...|         80|  Repaid on Time|Active Loan|
|f1ae3d508d8561ee0...|        500|  Repaid on Time|Active Loan|
|a7a31b7eb5b810d9e...|       1030|  Repaid on Time|Active Loan|
|844cce1d162dfaa5c...|        280|  Repaid on Time|Active Loan|
|9ef50309636e19ae5...|         70|  Repaid on Time|Active Loan|
|30e629a2abbf54ef9...|       2170|  Repaid on Time|Active Loan|
|84489157091fedf5d...|         70|  Repaid on Time|Active Loan|
|844cce1d162dfaa5c...|        280|  Repaid on Time|Active Loan|
|f1ae3d508d8561ee0...|        500|  Repaid on Time|Active Loan|
|30e629a2abbf54ef9...|       2170|  Repaid on Time|Active Loan|
|203fc81288b614a7a...|       1010|  Repa

 - Only defaulted loans older than 90 days will be "Written Off"
 - Loans repaid on time will be "Active Loans"


In [None]:
spark.sql("""SELECT COUNT(*) FROM loan_status WHERE loan_status = 'Active Loan' """).show()

+--------+
|count(1)|
+--------+
|   26332|
+--------+



In [None]:
spark.sql("""SELECT COUNT(*) FROM loan_status WHERE loan_status = 'Written Off' """).show()

+--------+
|count(1)|
+--------+
|      33|
+--------+



In [96]:
# Pie chart on active and incative loans
loan_status_df = spark.sql("""WITH loan_info AS (
    SELECT
        d.customer_id,
        d.disb_date,
        d.tenure,
        d.loan_amount,
        d.loan_fee,
        date_add(d.disb_date, cast(d.tenure as int)) AS due_date
    FROM disbursements d
),
repayment_info AS (
    SELECT
        r.customer_id,
        DATE_FORMAT(MAX(r.date_time), 'yyyy-MM-dd') AS repayment_date,
        SUM(r.amount) AS total_repaid
    FROM repayments r
    GROUP BY r.customer_id
),
loan_status AS (
    SELECT
        l.customer_id,
        l.disb_date,
        l.due_date,
        r.repayment_date,
        r.total_repaid,
        l.loan_amount,
        l.loan_fee,
        CASE
            WHEN r.total_repaid IS NULL OR r.total_repaid < l.loan_amount THEN 'Defaulted'
            ELSE 'Repaid on Time'
        END AS repayment_status
    FROM loan_info l
    LEFT JOIN repayment_info r ON l.customer_id = r.customer_id
)
SELECT
    customer_id,
    loan_amount,
    repayment_status,
    CASE
        WHEN repayment_status = 'Repaid on Time' THEN 'Active Loan'
        WHEN DATEDIFF(CURRENT_DATE(), due_date) > 90 THEN 'Written Off'
        ELSE 'Provision'
    END AS loan_status
FROM loan_status""")


loan_status_counts = loan_status_df.groupBy('loan_status').count().toPandas()

fig = px.pie(loan_status_counts, values='count', names='loan_status', title='Loan Status Distribution')
fig.show()

- Very low number of defaulted loans

### Portfolio Triggers & Alerts Suggestions

- High Default Rate Alert : If PD > 5%, trigger a risk review.
- Late Payment Alert : If a loan is unpaid 10 days past due, notify collections.
- Liquidity Risk Alert : If monthly profit turns negative for 3 consecutive months, initiate financial review.