In [1]:
from pyspark.sql import SparkSession
import requests
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql.functions import substring
import seaborn as sns

In [3]:
spark = SparkSession.builder.getOrCreate()

branch_rdd = spark.read.json("CDW_SAPP_BRANCH.JSON").rdd
credit_rdd = spark.read.json("CDW_SAPP_CREDIT.JSON").rdd
customer_rdd = spark.read.json("CDW_SAPP_CUSTMER.JSON").rdd

In [None]:
branch_transformed_rdd = branch_rdd.map(lambda row: (
    int(row["BRANCH_CODE"]),
    str(row["BRANCH_NAME"]),
    str(row["BRANCH_STREET"]),
    str(row["BRANCH_CITY"]),
    str(row["BRANCH_STATE"]),
    int(row["BRANCH_ZIP"]) if row["BRANCH_ZIP"] is not None else 99999,
    "(" + row["BRANCH_PHONE"][:3] + ")" + row["BRANCH_PHONE"][3:6] + "-" + row["BRANCH_PHONE"][6:],
))      

branch_transformed_df = branch_transformed_rdd.toDF(["BRANCH_CODE", "BRANCH_NAME", "BRANCH_STREET", "BRANCH_CITY", "BRANCH_STATE", "BRANCH_ZIP", "BRANCH_PHONE"])

branch_transformed_df.show()

In [None]:
credit_transformed_rdd = credit_rdd.map(lambda row: (
    str(row["CREDIT_CARD_NO"]),
    str(row["YEAR"]) + str(row["MONTH"]).zfill(2) + str(row["DAY"]).zfill(2),
    int(row["CUST_SSN"]),
    int(row["BRANCH_CODE"]),
    str(row["TRANSACTION_TYPE"]),
    float(row["TRANSACTION_VALUE"]),
    int(row["TRANSACTION_ID"])
))

credit_transformed_df = credit_transformed_rdd.toDF(["CREDIT_CARD_NO", "TIMEID", "CUST_SSN", "BRANCH_CODE", "TRANSACTION_TYPE", "TRANSACTION_VALUE", "TRANSACTION_ID"])

credit_transformed_df.show()

In [None]:
customer_transformed_rdd = customer_rdd.map(lambda row: (
    int(row["SSN"]),
    str(row["FIRST_NAME"]).title(),
    str(row["MIDDLE_NAME"]).lower(),
    str(row["LAST_NAME"]).title(),
    str(row["CREDIT_CARD_NO"]),
    str(row["STREET_NAME"]) + ", " + str(row["APT_NO"]),
    str(row["CUST_CITY"]),
    str(row["CUST_STATE"]),
    str(row["CUST_COUNTRY"]),
    int(row["CUST_ZIP"]),
    "(" + str(row["CUST_PHONE"])[:3] + ")" + str(row["CUST_PHONE"])[3:6] + "-" + str(row["CUST_PHONE"])[6:] + "XXX",
    str(row["CUST_EMAIL"]),
    row["LAST_UPDATED"]
))

customer_transformed_df = customer_transformed_rdd.toDF(["SSN", "FIRST_NAME", "MIDDLE_NAME", "LAST_NAME", "CREDIT_CARD_NO","FULL_STREET_ADDRESS", "CUST_CITY", "CUST_STATE", "CUST_COUNTRY","CUST_ZIP", "CUST_PHONE", "CUST_EMAIL", "LAST_UPDATED"])

customer_transformed_df.show()