### Data Extraction and Transformation with Python and PySpark


In [8]:
from datetime import datetime
from datetime import date
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType

spark = SparkSession.builder.appName('capstone').getOrCreate()
branches = spark.read.json("cdw_sapp_branch.json")
customers = spark.read.json("cdw_sapp_customer.json")
credits = spark.read.json("cdw_sapp_credit.json")

branchRDD = branches.rdd.map(lambda x: (x[1], x[2], x[5], x[0], x[4], x[6], '('+x[3][:3]+')'+x[3][3:6]+'-'+x[3][6:], datetime.fromisoformat(x[7])))
branches = spark.createDataFrame(data=branchRDD, schema=["BRANCH_CODE", "BRANCH_NAME", "BRANCH_STREET", "BRANCH_CITY", "BRANCH_STATE", "BRANCH_ZIP", "BRANCH_PHONE", "LAST_UPDATED"])
branches.printSchema()
customerRDD = customers.rdd.map(lambda x: (int(x[12]), x[8].title(), x[11].lower(), x[9].title(), x[1], x[13]+", "+x[0], x[2], x[6], x[3], int(x[7]), str(x[5])[:3]+"-"+str(x[5])[3:], x[4], datetime.fromisoformat(x[10])))
customers = spark.createDataFrame(data=customerRDD, schema=["SSN","FIRST_NAME","MIDDLE_NAME", "LAST_NAME", "Credit_card_no", "FULL_STREET_ADDRESS", "CUST_CITY", "CUST_STATE", "CUST_COUNTRY", "CUST_ZIP", "CUST_PHONE", "CUST_EMAIL", "LAST_UPDATED"])
customers.printSchema()
creditRDD = credits.rdd.map(lambda x: (x[1], date(x[8], x[4], x[3]).strftime('%Y%m%d'), x[2], x[0], x[6], x[7], x[5]))  
credits = spark.createDataFrame(data=creditRDD, schema=["CUST_CC_NO", "TIMEID", "CUST_SSN", "BRANCH_CODE", "TRANSACTION_TYPE", "TRANSACTION_VALUE", "TRANSACTION_ID"])
credits.printSchema()

root
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)

root
 |-- SSN: long (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- Credit_card_no: string (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: long (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)

root
 |-- CUST_CC_NO: string (nullable = true)
 |-- TIMEID: string (nullable = true)
 |--

### Data loading into Database

In [9]:
branches.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/credit_capstone") \
  .option("dbtable", "credit_capstone.cdw_sapp_branch") \
  .option("user", "root") \
  .option("password", "root") \
  .save()
customers.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/credit_capstone") \
  .option("dbtable", "credit_capstone.cdw_sapp_customer") \
  .option("user", "root") \
  .option("password", "root") \
  .save()
credits.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/credit_capstone") \
  .option("dbtable", "credit_capstone.cdw_sapp_credit_card") \
  .option("user", "root") \
  .option("password", "root") \
  .save()

In [31]:
spark.stop()

### Import from API

In [44]:
import requests
import json
response = requests.get("https://raw.githubusercontent.com/platformps/LoanDataset/main/loan_data.json")
print(response)

<Response [200]>


In [45]:
loans = response.json()
loans = spark.createDataFrame(data=loans, schema=["Application_ID", "Gender", "Married", "Dependents", "Education", "Self_Employed", "Credit_History", "Property_Area", "Income", "Application_Status"])
loans.printSchema()

root
 |-- Application_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: long (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- Credit_History: string (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Application_Status: string (nullable = true)



### Load to RDBMS

In [None]:
loans.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/credit_capstone") \
  .option("dbtable", "credit_capstone.cdw_sapp_loan_application") \
  .option("user", "root") \
  .option("password", "root") \
  .save()