This program uses Pyspark SQL to extract customer, branch and card data from the provided json file.

In [1]:
# Import libraries 
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import * 
import matplotlib as mpl
import matplotlib.pyplot as plt

In [2]:
# Application to create Dataframes from source
spark = SparkSession.builder.master('local[1]').appName('CreditCardSystems').getOrCreate() 

# Extract the JSON files branch, credit and customer into a dataframe
df_branch = spark.read.json('cdw_sapp_branch.json')  
df_credit = spark.read.json('cdw_sapp_credit.json') 
df_customer = spark.read.json('cdw_sapp_customer.json')

# Register the DataFrame as a SQL temporary view
df_credit.createOrReplaceTempView("credit")
df_customer.createOrReplaceTempView("customer")
df_branch.createOrReplaceTempView("branch")

# 1. Functional Requirements - Load Credit Card Database (SQL)

<b>Data Extraction and Transformation with Python and PySpark. </b><br>
For “Credit Card System,” create a Python and PySpark SQL program to read/extract the following JSON files according to the specifications found in the mapping document.
1. CDW_SAPP_BRANCH.JSON <br>
2. CDW_SAPP_CREDITCARD.JSON <br>
3. CDW_SAPP_CUSTOMER.JSON <br>
Note: Data Engineers will be required to transform the data based on the requirements found in the Mapping Document.
Hint: [You can use PYSQL “select statement query” or simple Pyspark RDD].

In [None]:
# Application to create Dataframes from source
spark = SparkSession.builder.master('local[1]').appName('CreditCardSystems').getOrCreate() 

# Extract the JSON files branch, credit and customer into a dataframe
df_branch = spark.read.json('cdw_sapp_branch.json')  
df_credit = spark.read.json('cdw_sapp_credit.json') 
df_customer = spark.read.json('cdw_sapp_customer.json')

# Register the DataFrame as a SQL temporary view
df_credit.createOrReplaceTempView("credit")
df_customer.createOrReplaceTempView("customer")
df_branch.createOrReplaceTempView("branch")

In [None]:
# Adjust customer table acording to the mapping document 

# Convert first and last name to Title Case and middle name to lower case
df_customer = df_customer.withColumn("FIRST_NAME", initcap(df_customer["FIRST_NAME"]))
df_customer = df_customer.withColumn("MIDDLE_NAME", lower(df_customer.MIDDLE_NAME))
df_customer = df_customer.withColumn("LAST_NAME", initcap(df_customer["LAST_NAME"]))
df_customer.select("FIRST_NAME", "MIDDLE_NAME","LAST_NAME").show(10)

In [None]:
# Concatenate Apartment no and Street name of customer's Residence with comma as a seperator (Street, Apartment)
df_customer = df_customer.withColumn("FULL_STREET_ADDRESS", concat(df_customer["APT_NO"], lit(",") , df_customer["STREET_NAME"]))
df_customer.select("FULL_STREET_ADDRESS").show(10)

In [None]:
# Change the format of phone number to XXX-XXXX
df_customer.select("CUST_PHONE").show(10)

In [None]:
# Grab the first the 3 digits 
df_customer = df_customer.withColumn('PHONE_1', split(df_customer['CUST_PHONE'], "\d{4}$"))
df_customer.select("PHONE_1").show(10)

In [None]:
# Grab the last four digits 
df_customer = df_customer.withColumn('PHONE_2', split(df_customer['CUST_PHONE'], "^\d{3}"))
df_customer.select("PHONE_2").show(10)

In [None]:
# Concat the first 3 and last 4 digits with - 
df_customer = df_customer.withColumn("CUST_PHONE_FORMATED", concat(df_customer["PHONE_1"], lit("-") , df_customer["PHONE_2"]))
df_customer.select("CUST_PHONE_FORMATED").show(10)

In [None]:
udf1 = udf(lambda x,y : x+y,ArrayType(StringType()))
df_customer = df_customer.withColumn("CUST_PHONE_FORMATED",udf1('PHONE_1','PHONE_2'))
df_customer.select("CUST_PHONE_FORMATED").show(10)

In [None]:
# Adjust branch table acording to the mapping document 
# if zipcode is null then load 000000
#df_customer = df_customer.withColumn("FULL_STREET_ADDRESS", concat)
#df_branch.select("BRANCH_ZIP").show(10)

<b>Data loading into Database </b><br>
Once PySpark reads data from JSON files, and then utilizes Python, PySpark, and Python modules to load data into RDBMS(SQL), perform
the following: <br>
a) Create a Database in SQL(MariaDB), named “creditcard_capstone.” <br>
b) Create a Python and Pyspark Program to load/write the “Credit Card System Data” into RDBMS(creditcard_capstone). <br>
Tables should be created by the following names in RDBMS: <br>
CDW_SAPP_BRANCH <br>
CDW_SAPP_CREDIT_CARD <br>
CDW_SAPP_CUSTOMER <br>

In [None]:
# Create the table CDW_SAPP_BRANCH 
df_branch.write.format("jdbc") \
.mode("append") \
.option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
.option("dbtable", "creditcard_capstone.CDW_SAPP_BRANCH") \
.option("user", "root") \
.option("password", "a") \
.save()

In [None]:
# Create the table CDW_SAPP_CREDIT_CARD 
df_credit.write.format("jdbc") \
.mode("append") \
.option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
.option("dbtable", "creditcard_capstone.CDW_SAPP_CREDIT_CARD") \
.option("user", "root") \
.option("password", "a") \
.save()

In [None]:
# Create the table CDW_SAPP_CUSTOMER 
df_customer.write.format("jdbc") \
.mode("append") \
.option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
.option("dbtable", "creditcard_capstone.CDW_SAPP_CUSTOMER") \
.option("user", "root") \
.option("password", "a") \
.save()

# 2. Functional Requirements - Application Front-End
Once data is loaded into the database, we need a front-end (console) to see/display data. For that, create a console-based Python program to satisfy System Requirements 2 (2.1 and 2.2).

<b> Req-2.1 Transaction Details Module </b><br>
1) Used to display the transactions made by customers living in a given zip code for a given month and year. Order by day in
descending order. <br>
2) Used to display the number and total values of transactions for a given type.<br>
3) Used to display the number and total values of transactions for branches in a given state.<br>

# 2.1.1 Order by day in descending order.

In [None]:
# Input fo rmonth, year and zipcode
Month = 8 # Holds the input value for month     | 8
Year = 2018  # Holds the input value for year   | 2018
Zipcode = 39120 # Holds the input value for zipcode | 39120

# Use cdw_app_credit_card table to get TRANSACTION_VALUE, DAY, MONTH AND YEAR
# Use cdw_app_customer to get CUST_ZIP 

sel = "SELECT customer.CUST_ZIP, credit.DAY, credit.MONTH, credit.YEAR, credit.TRANSACTION_TYPE, credit.TRANSACTION_VALUE"
frm = " FROM credit, customer"
where = " WHERE credit.YEAR = " + str(Year) + " AND credit.MONTH = " + str(Month) + " AND customer.CUST_ZIP = " + str(Zipcode)
ordr = " ORDER BY credit.DAY DESC" # Order by ascending 

sqlCredit = spark.sql(sel + frm + where + ordr)
sqlCredit.show(20)


# 2.1.2 Display the number and total values of transactions for a given type

In [None]:
# Input for a given transaction type 
transact_type = "Bills"

sel = "SELECT TRANSACTION_ID, TRANSACTION_TYPE, TRANSACTION_VALUE"
frm = " FROM credit"
where = " WHERE TRANSACTION_TYPE = " + "\""+ transact_type + "\""
sqlCredit = spark.sql(sel + frm + where)
sqlCredit.show(20)

# 2.1.3 Display the number and total values of transactions for branches in a given state

In [None]:
# Register the DataFrame as a SQL temporary view
df_branch.createOrReplaceTempView("branch")

# Input for a given state
state = "TX"

sel = "SELECT branch.BRANCH_STATE, credit.TRANSACTION_ID, credit.TRANSACTION_VALUE"
frm = " FROM branch, credit"
where = " WHERE branch.BRANCH_STATE = " + "\""+ state + "\""
sqlCredit = spark.sql(sel + frm + where)
sqlCredit.show(20)

<b>Req-2.2 Customer Details </b><br>
1) Used to check the existing account details of a customer.<br>
2) Used to modify the existing account details of a customer.<br>
3) Used to generate a monthly bill for a credit card number for a given month and year. <br>
4) Used to display the transactions made by a customer between two dates. Order by year, month, and day in descending order. <br>

# 2.2.1 Check the existing account details of a customer


In [None]:
# Input customer name
first_name = "Alec"
last_name = "Hooper"

sel = "SELECT APT_NO, CREDIT_CARD_NO, CUST_CITY, CUST_COUNTRY, CUST_EMAIL, CUST_PHONE, CUST_STATE, CUST_ZIP"
frm = " FROM customer"
where = " WHERE FIRST_NAME = " + "\""+ first_name + "\"" + "AND LAST_NAME = " + "\"" + last_name + "\""
sqlCredit = spark.sql(sel + frm + where)
sqlCredit.show(20)


# 2.2.2 Modify the exsiting account details of a customer


In [None]:
# ALTER DATABASE inventory SET DBPROPERTIES ('Edited-by' = 'John', 'Edit-date' = '01/01/2001');

# 2.2.3 Generate a monthly bill for a credit card number for a given month and year


In [None]:
# Input for month and year bill
month_bill = 8
year_bill = 2018
card_number = 4210653310061055

sel = "SELECT credit.CREDIT_CARD_NO, credit.YEAR, credit.MONTH, SUM(credit.TRANSACTION_VALUE)"
frm = " FROM credit"
where = " WHERE CREDIT_CARD_NO = " + str(card_number) + " AND MONTH = " + str(month_bill) + " AND YEAR = " + str(year_bill) 
grp_by = "GROUP BY CREDIT_CARD_NO"

sql_bill = spark.sql(sel + frm + where + grp_by)
sql_bill.show(20)



# 2.2.4 Display the transactions made by a customer between two date. Order by year, month, and day in desc. 

In [None]:
# Input for the two dates
Day_1 = 8
Day_2 = 14

sel = "SELECT YEAR, MONTH, DAY, TRANSACTION_VALUE"
frm = " FROM credit"
where = " WHERE DAY BETWEEN " + str(Day_1) + " AND " + str(Day_2) 
ordr_by = " ORDER BY YEAR, MONTH, DAY DESC"

sql_two_date = spark.sql(sel + frm + where + ordr_by )
sql_two_date.show(20)


# 3 - Functional Requirements - Data analysis and Visualization

After data is loaded into the database, users can make changes from the front end, and they can also view data from the front end. Now, the business analyst team wants to analyze and visualize the data according to the below requirements.


# 3.1 Find and plot which transaction type has a high rate of transactions

In [3]:
# Bargraph 
#x = transaction type 
#y = transactions per type
df_credit_pd = df_credit.toPandas()
df_credit_pd

Unnamed: 0,BRANCH_CODE,CREDIT_CARD_NO,CUST_SSN,DAY,MONTH,TRANSACTION_ID,TRANSACTION_TYPE,TRANSACTION_VALUE,YEAR
0,114,4210653349028689,123459988,14,2,1,Education,78.90,2018
1,35,4210653349028689,123459988,20,3,2,Entertainment,14.24,2018
2,160,4210653349028689,123459988,8,7,3,Grocery,56.70,2018
3,114,4210653349028689,123459988,19,4,4,Entertainment,59.73,2018
4,93,4210653349028689,123459988,10,10,5,Gas,3.59,2018
...,...,...,...,...,...,...,...,...,...
46689,49,4210653344660822,123451007,12,9,46690,Gas,66.20,2018
46690,168,4210653344660822,123451007,5,2,46691,Grocery,100.13,2018
46691,104,4210653344660822,123451007,16,12,46692,Test,35.83,2018
46692,32,4210653344660822,123451007,15,1,46693,Entertainment,90.99,2018


In [None]:
df_transact = df_credit_pd[['TRANSACTION_TYPE','TRANSACTION_VALUE']]
df_transact = df_transact.groupby(['TRANSACTION_TYPE'])['TRANSACTION_VALUE'].sum()


In [None]:
df_transact.plot(kind='bar', figsize=(10, 6))

plt.xlabel('Transaction Type') # add to x-label to the plot
plt.ylabel('Number of transactions') # add y-label to the plot
plt.title('Transaction Rate For Every Transaction Type') # add title to the plot

plt.show()


# 3.2 Find and plot which state has a high number of customers
# 3.3 Find and plot the sum of all transactions for each customer, and which customer has the highest transaction amount.hint(use CUST_SSN)



# 4. Functional Requirements - LOAN Application Dataset

1. Create a Python program to GET (consume) data from the above API endpoint for the loan application dataset. <br>
2. Find the status code of the above API endpoint. <br>
3. Once Python reads data from the API, utilize PySpark to load data into RDBMS(SQL). The table name should be CDW-SAPP_loan_application in the database.

# 5 - Functional Requirements - Data Analysis and Visualization for Loan Application

1. Find and plot the percentage of applications approved for self-employed applicants. <br>
2. Find the percentage of rejection for married male applicants. <br>
3. Find and plot the top three months with the largest transaction data.<br>
4. Find and plot which branch processed the highest total dollar value of healthcare transactions.