## 1. Extract data with PySpark

In [1]:
%pip install SQLAlchemy pymysql pyspark mysql-connector-python

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pandas as pd
from sqlalchemy import create_engine
from pyspark.sql import SparkSession
import urllib.request
import os
import base64

In [3]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Fintech Credit Risk Analytics") \
    .config("spark.jars", "./libs/mysql-connector-j-9.0.0.jar") \
    .getOrCreate()

In [4]:
# Load scv file
data_path = "../Data/loan.csv"
loan_data = spark.read.csv(data_path, header=True, inferSchema=True)

In [5]:
# Inspect the data
loan_data.printSchema()
loan_data.show(5)

root
 |-- id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- url: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: strin

## 2. Clean data with PySpark

In [6]:
# Remove duplicates
loan_data.dropDuplicates()

DataFrame[id: string, member_id: string, loan_amnt: int, funded_amnt: int, funded_amnt_inv: double, term: string, int_rate: double, installment: double, grade: string, sub_grade: string, emp_title: string, emp_length: string, home_ownership: string, annual_inc: string, verification_status: string, issue_d: string, loan_status: string, pymnt_plan: string, url: string, desc: string, purpose: string, title: string, zip_code: string, addr_state: string, dti: string, delinq_2yrs: string, earliest_cr_line: string, inq_last_6mths: string, mths_since_last_delinq: string, mths_since_last_record: string, open_acc: string, pub_rec: string, revol_bal: string, revol_util: string, total_acc: string, initial_list_status: string, out_prncp: string, out_prncp_inv: string, total_pymnt: string, total_pymnt_inv: string, total_rec_prncp: string, total_rec_int: string, total_rec_late_fee: string, recoveries: string, collection_recovery_fee: string, last_pymnt_d: string, last_pymnt_amnt: string, next_pymnt_d

In [7]:
# Handle missing values
transformed_loan_data = loan_data.na.drop(subset=["loan_amnt", "term", "int_rate", "installment", "grade", "loan_status"])

## 3. Load data into MySQL

In [8]:
# Retrieve the password and decode if it's Base64-encoded
encoded_password = os.getenv("MYSQL_PASSWORD")
if encoded_password:
    decoded_password = base64.b64decode(encoded_password).decode()
    print("Password retrieved successfully.")
else:
    print("Failed to retrieve MYSQL_PASSWORD.")


Password retrieved successfully.


In [9]:
# Database connection properties
jdbc_url = "jdbc:mysql://localhost:3306/lending_club_db"
table_name = "loans"
properties = {
    "user": "root",
    "password": decoded_password,
    "driver": "com.mysql.cj.jdbc.Driver"
}

# Write DataFrame to MySQL
transformed_loan_data.write.jdbc(url=jdbc_url, table=table_name, mode='overwrite', properties=properties)