In [2]:
import pyspark
import pymysql
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DecimalType, TimestampType, DateType
import pyspark.sql.functions as F

In [3]:
mySqlConfig = {
    'host': '127.0.0.1',
    'user': 'test',
    'password': 'mysql',
    'database': 'world'
}


def connect_mysql():
    try:
        connection = pymysql.connect(**mySqlConfig)
        cursor = connection.cursor()
        return connection, cursor 
    
    except Exception as e:
        print(f"Failure in Connection --> {e}")


def exrtactor(spark, path, schema):
    try:
        print(f'Reading {path} data......')
        df = (
            spark.read
            .format("json")
            .option("multiline", True)
            .load(path=path, schema=schema)
            )
        print(f'Successfully read {path} file data')

        return df
    
    except Exception as e:
        print(f'There is an Error --> {e}')


In [4]:
def read_sql_file(path):
    try:
        with open(path, 'r') as file:
            return file.read()
    except Exception as e:
        print(f"SQL file doesn't exists or could not be read --> {e}")


def create_database(connection, cursor, dbName):
    try:
        query = f"CREATE DATABASE IF NOT EXISTS {dbName};"
        cursor.execute(query)
        connection.commit()
        print(f"Database '{dbName}' is Created if Not Exist")
    except Exception as e:
        print(f"Couldn't create Database '{dbName}' --> {e}")


def create_table(connection, cursor, path, databaseName, tableName):
    try:
        cursor.execute(read_sql_file(path))
        connection.commit()
        print(f"Table '{databaseName}.{tableName}' is created if not exists")
    except Exception as e:
        print(f"Could not create Table '{databaseName}.{tableName}' --> {e}")


def load_tables(connection, cursor, path, databaseName, tableName):
    try:
        cursor.execute(read_sql_file(path))
        connection.commit()
        print(f"Data is loaded into Table - '{databaseName}.{tableName}' successfully")
    except Exception as e:
        print(f"There was an error loading into Table '{databaseName}.{tableName}' --> {e}")

In [5]:
# Users Schema
users_schema = StructType([
    StructField("user_id", IntegerType(), False),
    StructField("user_name", StringType(), False),
    StructField("email", StringType(), False),
    StructField("phone", StringType(), False),
    StructField("date_of_birth", DateType(), False),  #format is YYYY-MM-DD
    StructField("address", StringType(), True),
    StructField("sign_up_date", TimestampType(), False),
    StructField("account_closed_date", TimestampType(), True)  ## This will be the date on which user Deletes his account in Loan App
])

# KYC Schema
kyc_schema = StructType([
    StructField("kyc_id", IntegerType(), False),
    StructField("user_id", IntegerType(), False),        
    StructField("document_type", StringType(), False),    # Aadhar, PAN etc  
    StructField("document_number", StringType(), False),
    StructField("kyc_status", StringType(), False),       # Success, Fail, Pending 
    StructField("submitted_at", TimestampType(), False)
])

# Available Loan Offers Schema
loan_offers_schema = StructType([
    StructField("loan_id", IntegerType(), False),
    StructField("loan_amount", IntegerType(), True),
    StructField("tenure", IntegerType(), True),
    StructField("interest_rate", DoubleType(), True),
    StructField("total_amount", DecimalType(), True), 
    StructField("offer_status", StringType(), True)          # Expired, Active 
])

# Loans Schema
user_loans_schema = StructType([
    StructField("loan_id", IntegerType(), False),
    StructField("user_id", IntegerType(), False),
    StructField("status", StringType(), False),
    StructField("applied_date", TimestampType(), False),
    StructField("approved_date", TimestampType(), True),
])


# Transactions Schema -- All the loan repayments done by users will be coming as Transactions data
transactions_schema = StructType([
    StructField("transaction_id", IntegerType(), False),
    StructField("user_id", IntegerType(), False),
    StructField("loan_id", IntegerType(), False),
    StructField("amount", DecimalType(10,2), False),
    StructField("transaction_type", StringType(), False),  # Credit, Debit (Credit in case of Loan Provided, Debit in Case of Loan Repayment)
    StructField("transaction_date", TimestampType(), False)
])


# Resolutions Schema 
resolutions_schema = StructType([
    StructField("executive_id", IntegerType(), False),  
    StructField("defaulter_id", IntegerType(), False),  
    StructField("executive_name", StringType(), False),  
    StructField("contact_number", StringType(), False),  
    StructField("is_resolved", StringType(), False),     # Yes/No
    StructField("assigned_at", TimestampType(), False),  
    StructField("updated_at", TimestampType(), False)  
])

In [9]:
spark = (
        SparkSession
        .builder
        .appName("Loan_Applications_Data")
        .config('spark.jars', 'mysql-connector-j-8.0.33.jar')
        .getOrCreate()
    )

# Extract data from respective source files 
input_path = 'D://Work_Items/Projects/FairMoney_Draft/src/data'
users = f"{input_path}/users.json"
kyc = f"{input_path}/kyc.json"
loan_offers = f"{input_path}/loan_offers.json"
user_loans = f"{input_path}/loans.json"
transactions = f"{input_path}/transactions.json"
resolutions = f"{input_path}/resolutions.json"

users_df = exrtactor(spark, users, users_schema)
kyc_df = exrtactor(spark, kyc, kyc_schema)
loan_offers_df = exrtactor(spark, loan_offers, loan_offers_schema)
user_loans_df = exrtactor(spark, user_loans, user_loans_schema)
transactions_df = exrtactor(spark, transactions, transactions_schema)
resolutions_df = exrtactor(spark, resolutions, resolutions_schema)

Reading D://Work_Items/Projects/FairMoney_Draft/src/data/users.json data......
Successfully read D://Work_Items/Projects/FairMoney_Draft/src/data/users.json file data
Reading D://Work_Items/Projects/FairMoney_Draft/src/data/kyc.json data......
Successfully read D://Work_Items/Projects/FairMoney_Draft/src/data/kyc.json file data
Reading D://Work_Items/Projects/FairMoney_Draft/src/data/loan_offers.json data......
Successfully read D://Work_Items/Projects/FairMoney_Draft/src/data/loan_offers.json file data
Reading D://Work_Items/Projects/FairMoney_Draft/src/data/loans.json data......
Successfully read D://Work_Items/Projects/FairMoney_Draft/src/data/loans.json file data
Reading D://Work_Items/Projects/FairMoney_Draft/src/data/transactions.json data......
Successfully read D://Work_Items/Projects/FairMoney_Draft/src/data/transactions.json file data
Reading D://Work_Items/Projects/FairMoney_Draft/src/data/resolutions.json data......
Successfully read D://Work_Items/Projects/FairMoney_Draft/

In [8]:
connection, cursor = connect_mysql()
if not connection or not cursor:
    print("MySQL DB Connection is not established")
    
database_name = 'fairmoney_db'
create_database(connection, cursor, database_name)

files_path = 'D://Work_Items/Projects/FairMoney_Draft/src/sql/ddl'
ddl_paths = {
        'users': f'{files_path}/users.sql',
        'user_kyc': f'{files_path}/user_kyc.sql', 
        'loan_offers': f'{files_path}/loan_offers.sql',
        'user_loans': f'{files_path}/user_loans.sql',
        'transactions': f'{files_path}/transactions.sql',
        'loan_balances': f'{files_path}/loan_balances.sql', 
        'defaulters': f'{files_path}/defaulters.sql',
        'resolutions': f'{files_path}/resolutions.sql'
    }

for table_name, ddl_path in ddl_paths.items():
    create_table(connection, cursor, ddl_path, database_name, table_name)

Database 'fairmoney_db' is Created if Not Exist
Table 'fairmoney_db.users' is created if not exists
Table 'fairmoney_db.user_kyc' is created if not exists
Table 'fairmoney_db.loan_offers' is created if not exists
Table 'fairmoney_db.user_loans' is created if not exists
Table 'fairmoney_db.transactions' is created if not exists
Table 'fairmoney_db.loan_balances' is created if not exists
Table 'fairmoney_db.defaulters' is created if not exists
Table 'fairmoney_db.resolutions' is created if not exists


In [None]:
# users_df.show()
# kyc_df.show()
# loan_offers_df.show()
# user_loans_df.show()
# transactions_df.show()
# resolutions_df.show()

In [10]:
mysql_url = "jdbc:mysql://127.0.0.1:3306/fairmoney_db"
mysql_properties = {
    "user": "test",
    "password": "mysql",
    "driver": "com.mysql.cj.jdbc.Driver"
}

# Users Pipeline 

In [None]:
# users_df.show()

users_updated = (users_df
    .withColumn("updated_at", F.current_timestamp())  # No need for F.lit()
    .withColumn("account_status", 
                F.when(F.col("account_closed_date").isNull(), F.lit("Active"))
                 .otherwise(F.lit("Deleted")))
    .select("user_id", "user_name", "email", "phone", "date_of_birth", "address", "account_status", "sign_up_date", "account_closed_date", "updated_at")
)

## Upsert Logic --> New Users will be Inserted and Existing Users details will be updated 

existing_table_df = (
    spark.read
    .jdbc(mysql_url, table = "users", properties = mysql_properties)
)

# New users --> From Incoming data
new_users_df = users_updated.join(
    existing_table_df,
    on="user_id",
    how="left_anti"  # Keeps only users that are NOT in MySQL
)

# Identify existing users (users already in MySQL)
existing_users_df = (users_updated.alias("new").join(
    existing_table_df.alias("existing"),
    on="user_id",
    how="inner"  # Keeps only users that are already in MySQL
    )
.select(F.col("existing.user_id"), 
        F.col("existing.user_name"),
        F.col("existing.email"),
        F.col("existing.phone"), 
        F.col("existing.date_of_birth"), 
        F.col("existing.address"), 
        F.col("existing.account_status"), 
        F.col("existing.sign_up_date"), 
        F.col("existing.account_closed_date"), 
        F.col("existing.updated_at")
		)
)

# new_users_df.show()
# existing_users_df.show()

#Use stage table to load existing users data
#Use stage table to load existing user_kyc data
cursor.execute("CREATE DATABASE IF NOT EXISTS stage_layer;")
cursor.execute("CREATE TABLE IF NOT EXISTS stage_layer.users_staging LIKE fairmoney_db.users;")
cursor.execute("TRUNCATE TABLE stage_layer.users_staging")

try:
    existing_users_df.write.jdbc(
    url=mysql_url,
    table="stage_layer.users_staging",  # Overwrites only existing users
    mode="append",  # Replaces only matching user records
    properties=mysql_properties
)
except Exception as e:
    print(e)

# Update Existing Users in the Table
cursor.execute("DELETE FROM fairmoney_db.users WHERE user_id IN (SELECT user_id FROM stage_layer.users_staging);")
cursor.execute("INSERT INTO fairmoney_db.users SELECT * FROM stage_layer.users_staging;")

# Insert New Users into the Table 
try:
    new_users_df.write.jdbc(
    url=mysql_url,
    table="users",
    mode="append",  # Insert only new users
    properties=mysql_properties
)
except Exception as e:
    print(e)


# Users KYC Pipeline

In [None]:
# user_kyc_df.show()

user_kyc_updated = kyc_df.withColumn("updated_at", F.current_timestamp())
    

## Upsert Logic --> New user_kyc will be Inserted and Existing user_kyc details will be updated 

existing_kyc_table_df = (
    spark.read
    .jdbc(mysql_url, table = "user_kyc", properties = mysql_properties)
)

# New user_kyc --> From Incoming data
new_user_kyc_df = user_kyc_updated.join(
    existing_kyc_table_df,
    on="user_id",
    how="left_anti"  # Keeps only user_kyc that are NOT in MySQL
)

# Identify existing user_kyc (user_kyc already in MySQL)
existing_user_kyc_df = (user_kyc_updated.alias("new").join(
    existing_kyc_table_df.alias("existing"),
    on="user_id",
    how="inner"  # Keeps only user_kyc that are already in MySQL
    )
.select(F.col("existing.kyc_id"),
        F.col("existing.user_id"),  
        F.col("existing.document_type"),
        F.col("existing.document_number"),
        F.col("existing.kyc_status"),
        F.col("existing.submitted_at"),
        F.col("existing.updated_at")
		)
)

# new_user_kyc_df.show()
# existing_user_kyc_df.show()

#Use stage table to load existing user_kyc data
cursor.execute("CREATE DATABASE IF NOT EXISTS stage_layer;")
cursor.execute("CREATE TABLE IF NOT EXISTS stage_layer.user_kyc_staging LIKE fairmoney_db.user_kyc;")
cursor.execute("TRUNCATE TABLE stage_layer.user_kyc_staging")

try:
    existing_user_kyc_df.write.jdbc(
    url=mysql_url,
    table="stage_layer.user_kyc_staging",  # Overwrites only existing user_kyc
    mode="append",  # Replaces only matching user records
    properties=mysql_properties
)
except Exception as e:
    print(e)

# Update Existing user_kyc in the Table
cursor.execute("DELETE FROM fairmoney_db.user_kyc WHERE user_id IN (SELECT user_id FROM stage_layer.user_kyc_staging);")
cursor.execute("INSERT INTO fairmoney_db.user_kyc SELECT * FROM stage_layer.user_kyc_staging;")

# Insert New user_kyc into the Table 
try:
    new_user_kyc_df.write.jdbc(
    url=mysql_url,
    table="user_kyc",
    mode="append",  # Insert only new user_kyc
    properties=mysql_properties
)
except Exception as e:
    print(e)


# Loan Offers Pipeline 

In [None]:
loan_offers_updated = loan_offers_df.withColumn("updated_at", F.current_timestamp())

existing_loan_offers_table_df = (
    spark.read
    .jdbc(mysql_url, table = "loan_offers", properties = mysql_properties)
)

new_loan_offers_df = loan_offers_updated.join(
    existing_loan_offers_table_df,
    on="loan_id",
    how="left_anti"  # Keeps only users that are NOT in MySQL
)

new_loan_offers_df.show()

existing_loan_offers_df = (loan_offers_updated.alias("new").join(
    existing_loan_offers_table_df.alias("existing"),
    on="loan_id",
    how="inner"  # Keeps only users that are already in MySQL
    )
.select(F.col("existing.loan_id"), 
        F.col("existing.loan_amount"),
        F.col("existing.tenure"),
        F.col("existing.interest_rate"), 
        F.col("existing.total_amount"), 
        F.col("existing.offer_status"), 
        F.col("existing.updated_at")
		)
)

existing_loan_offers_df.show()

cursor.execute("CREATE DATABASE IF NOT EXISTS stage_layer;")
cursor.execute("CREATE TABLE IF NOT EXISTS stage_layer.loan_offers_staging LIKE fairmoney_db.loan_offers;")
cursor.execute("TRUNCATE TABLE stage_layer.loan_offers_staging")


try:
    existing_loan_offers_df.write.jdbc(
    url=mysql_url,
    table="stage_layer.loan_offers_staging",  # Overwrites only existing users
    mode="append",  # Replaces only matching user records
    properties=mysql_properties
)
except Exception as e:
    print(e)
	
cursor.execute("DELETE FROM fairmoney_db.loan_offers WHERE loan_id IN (SELECT loan_id FROM stage_layer.loan_offers_staging);")
cursor.execute("INSERT INTO fairmoney_db.loan_offers SELECT * FROM stage_layer.loan_offers_staging;")

try:
    new_loan_offers_df.write.jdbc(
    url=mysql_url,
    table="loan_offers",
    mode="append",  # Insert only new users
    properties=mysql_properties
)
except Exception as e:
    print(e)
	


# User Loans

In [None]:
user_loans_updated = user_loans_df.withColumn("updated_at", F.current_timestamp())

try:
    user_loans_updated.write.jdbc(
    url=mysql_url,
    table="user_loans",
    mode="append", 
    properties=mysql_properties
)
except Exception as e:
    print(e)
    

existing_user_loans_table_df = (
    spark.read
    .jdbc(mysql_url, table = "user_loans", properties = mysql_properties)
)


new_user_loans_df = (user_loans_updated.join(
    user_loans_updated,
    on=["loan_id","user_id"],
    how="left_anti"  # Keeps only users that are NOT in MySQL
))


# new_user_loans_df.show()

existing_user_loans_df = (user_loans_updated.alias("new").join(
    existing_user_loans_table_df.alias("existing"),
    on=["loan_id","user_id"],
    how="inner"  # Keeps only users that are already in MySQL
    )
.select(F.col("existing.loan_id"), 
        F.col("existing.user_id"),
        F.col("existing.status"),
        F.col("existing.applied_date"), 
        F.col("existing.approved_date"), 
        F.col("existing.updated_at")
		)
)


#existing_user_loans_df.show()

cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
cursor.execute("CREATE DATABASE IF NOT EXISTS stage_layer;")
cursor.execute("CREATE TABLE IF NOT EXISTS stage_layer.user_loans_staging LIKE fairmoney_db.user_loans;")
cursor.execute("TRUNCATE TABLE stage_layer.user_loans_staging;")

try:
    existing_user_loans_df.write.jdbc(
    url=mysql_url,
    table="stage_layer.user_loans_staging",  # Overwrites only existing users
    mode="append",  # Replaces only matching user records
    properties=mysql_properties
)
except Exception as e:
    print(e)
    
cursor.execute("DELETE FROM fairmoney_db.user_loans WHERE loan_id IN (SELECT loan_id FROM stage_layer.user_loans_staging);")
cursor.execute("INSERT INTO fairmoney_db.user_loans SELECT * FROM stage_layer.user_loans_staging;")


try:
    new_user_loans_df.write.jdbc(
    url=mysql_url,
    table="user_loans",
    mode="append",  # Insert only new users
    properties=mysql_properties
)
except Exception as e:
    print(e)
    
cursor.execute("SET FOREIGN_KEY_CHECKS=1;")

# Transactions

In [9]:
try:
    transactions_df.write.jdbc(
    url=mysql_url,
    table="transactions",
    mode="append",  # Insert only new users
    properties=mysql_properties
)
except Exception as e:
    print(e)

# Loan Balances

In [None]:
transactions_df.cache().count()
user_loans_df.cache().count()

loan_balances_existing_df = (spark.read
                             .jdbc(mysql_url, table = "loan_balances", properties = mysql_properties)
)


# Step 1: Identify New Loans (Not in Existing Loan Balances Table)

new_loans_df = (user_loans_df.alias("ul").join(loan_balances_existing_df.alias("lb"), 
                                              on="loan_id", 
                                              how="left_anti"  # Select loans that are NOT in loan_balances
                                              ).join(loan_offers_df.alias("lo"), 
                                                     on="loan_id", 
                                                     how="inner"
                                                     ).select(
                                                        F.col("ul.loan_id"),
                                                        F.col("ul.user_id"),
                                                        F.col("lo.total_amount").alias("loan_amount"),
                                                        F.date_add(F.col("ul.approved_date"), F.col("lo.tenure") * 30).alias("due_date"),  # Approximate month as 30 days
                                                        F.col("lo.total_amount").alias("remaining_balance")
                                                        )
                                                        .filter(F.col("ul.status").isin(["Approved", "Active"]))
)

# new_loans_df.show()

# Compute Credit Transactions (Loan Disbursement)
credit_transactions = (transactions_df.filter(F.col("transaction_type") == "Credit")
                       .groupBy("loan_id")
                       .agg(F.sum(F.col("amount").cast("decimal(10,2)")).alias("total_credit")) 
                       )

new_loans_df = new_loans_df.join(credit_transactions, on="loan_id", how="left").fillna({"total_credit": 0})

# Compute Debit Transactions (Loan Repayments)
debit_transactions = (transactions_df.filter(F.col("transaction_type") == "Debit")
                      .groupBy("loan_id")
                      .agg(F.sum(F.col("amount").cast("decimal(10,2)")).alias("total_debit")) 
                      )

new_loans_df = new_loans_df.join(debit_transactions, on="loan_id", how="left").fillna({"total_debit": 0})

# Step 3: Compute Remaining Balance & Days Past Due

new_loans_df = (new_loans_df.withColumn("remaining_balance", F.col("loan_amount") - F.col("total_debit"))
                .withColumn("days_past_due", F.datediff(F.current_date(), F.col("due_date")))
                .withColumn("updated_at", F.current_timestamp())
                )

new_loans_df = (new_loans_df.withColumn("days_past_due", F.when((F.col("days_past_due") >= 0) & (F.col("remaining_balance") >= 0), F.col("days_past_due"))
                                        .otherwise(0))
                                        )

# Step 4: Merge New Loans with Existing Loan Balances

final_loan_balances_df = loan_balances_existing_df.unionByName(new_loans_df)

cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
try:
    final_loan_balances_df.write.jdbc(
    url=mysql_url,
    table="loan_balances",
    mode="overwrite", 
    properties=mysql_properties
)
except Exception as e:
    print(e)

cursor.execute("SET FOREIGN_KEY_CHECKS=1;")


# Defaulters and Resolution

In [11]:
# Step 1: Identify Defaulters
loan_balances_df = (spark.read
                        .jdbc(mysql_url, 
                              table = "loan_balances", 
                              properties = mysql_properties)
                    )

defaulters_df = (loan_balances_df.filter((F.col("days_past_due") > 0) & (F.col("remaining_balance") > 0))
                 .select(F.col("user_id").alias("defaulter_id"),
                         "loan_id",
                         "loan_amount",
                         "remaining_balance",
                         "days_past_due",
                         "updated_at"
                         )
                 )


cursor.execute("SET FOREIGN_KEY_CHECKS=0;")

try:
    defaulters_df.write.jdbc(
    url=mysql_url,
    table="defaulters",
    mode="overwrite", 
    properties=mysql_properties
)
except Exception as e:
    print(e)

cursor.execute("SET FOREIGN_KEY_CHECKS=1;")


# defaulters_df.show()
# resolutions_df.show()

# Get the resolved defaulters by joining defaulters and resolutions on defaulter_id
resolved_defaulters_df = (defaulters_df.join(resolutions_df, 
                                             on="defaulter_id",
                                             how="inner")
                                      .filter(resolutions_df["is_resolved"] == "Yes") 
                                      .select("defaulter_id", "loan_id")
                                      )
# resolved_defaulters_df.show()

# Update loan_balances for resolved defaulters, setting their remaining_balance to 0

if resolved_defaulters_df.count() != 0:
        new_loan_balances_df = (loan_balances_df.alias("lb").join(resolved_defaulters_df.alias("rd"),
                                                         (F.col("lb.loan_id") == F.col("rd.loan_id")) & (F.col("lb.user_id") == F.col("rd.defaulter_id")),
                                                         "left")
                                                 .withColumn("remaining_balance", F.when(F.col("rd.loan_id").isNotNull(), F.lit(0))  # If defaulter is found, set balance to 0
                                                             .otherwise(F.col("lb.remaining_balance")))
                                                 .select("lb.*")
                                )  # Keep only the original loan_balances_df columns
        cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
        
        try:
            new_loan_balances_df.write.jdbc(
            url=mysql_url,
            table="loan_balances",
            mode="overwrite", 
            properties=mysql_properties
        )
        except Exception as e:
            print(e)
        cursor.execute("SET FOREIGN_KEY_CHECKS=1;")
        
