In [10]:
from dotenv import load_dotenv
load_dotenv()
import os

import pyspark
from pyspark.sql import SparkSession #Importing the Libraries
# Creating Spark Session
spark = SparkSession.builder.appName('Capstone.com').getOrCreate()

In [2]:
#JSON Extract Function
df1= spark.read.json("CDW_SAPP_BRANCH.JSON")
df2= spark.read.json("CDW_SAPP_CREDITCARD.JSON")
df3= spark.read.json("CDW_SAPP_CUSTOMER.JSON")
df1.show(5)
df2.show(5)
df3.show(5)

+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|    BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|     Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank|  1234985926|          NY|    Warren Street|     11419|2018-04-18T16:51:...|
|       Middleburg|          4|Example Bank|  1234663064|          FL| Cleveland Street|     32068|2018-04-18T16:51:...|
|    KingOfPrussia|          5|Example Bank|  1234849701|          PA|      14th Street|     19406|2018-04-18T16:51:...|
+-----------------+-----------+-

In [3]:
#df1.printSchema()
#df2.printSchema()
#df3.printSchema()

In [4]:
# Transform
#Note: Data Engineers will be required to transform the data based on the requirements found in the Mapping Document.
#Mapping Document:https://docs.google.com/spreadsheets/d/1t8UxBrUV6dxx0pM1VIIGZpSf4IKbzjdJ/edit#gid=672931242
#Hint: [You can use  PYSQL “select statement query” or simple Pyspark RDD].
from pyspark.sql.functions import *

In [5]:
df1=df1.na.fill(99999,["BRANCH_ZIP"])
df1=df1.select("BRANCH_CODE", 
           "BRANCH_NAME", 
           "BRANCH_STREET",
           "BRANCH_CITY", 
           "BRANCH_STATE",
           "BRANCH_ZIP",
           regexp_replace("BRANCH_PHONE", '(\d\d\d)(\d\d\d)(\d\d\d\d)', '($1)$2-$3').alias("BRANCH_PHONE"),
           "LAST_UPDATED")

In [6]:
df2=df2.select("CREDIT_CARD_NO", 
           concat("YEAR",lpad("MONTH",2, '0'),lpad("DAY",2, '0')).alias("TIMEID"), 
           "CUST_SSN",
           "BRANCH_CODE", 
           "TRANSACTION_TYPE",
           "TRANSACTION_VALUE",
           "TRANSACTION_ID")

In [7]:
df3=df3.select("SSN", 
           initcap(col("FIRST_NAME")).alias("FIRST_NAME"), 
           lower(col("MIDDLE_NAME")).alias("MIDDLE_NAME"),
            initcap(col("LAST_NAME")).alias("LAST_NAME"), 
           "Credit_card_no",
           concat("STREET_NAME",lit(","),"APT_NO").alias("FULL_STREET_ADDRESS"),
           "CUST_CITY",
           "CUST_STATE",
           "CUST_COUNTRY",
           "CUST_ZIP",
           regexp_replace("CUST_PHONE", '(\d\d\d)(\d\d\d\d)', '$1-$2').alias("CUST_PHONE"),
           "CUST_EMAIL",
           "LAST_UPDATED")

In [8]:
## Data loading into Database
#Once PySpark reads data from JSON files, and then utilizes Python, PySpark, and Python modules to load data into RDBMS(SQL), perform the following:

In [12]:
df1.write.format("jdbc").mode("ignore").option("driver","com.mysql.jdbc.Driver")\
    .option("url", "jdbc:mysql://{}:3306/creditcard_capstone".format(os.getenv("MYSQL_HOST"))) \
	.option("dbtable", "CDW_SAPP_BRANCH") \
	.option("user", os.getenv("MYSQL_USER")).option("password", os.getenv("MYSQL_PASS")).save()


In [13]:
df2.write.format("jdbc").mode("ignore").option("driver","com.mysql.jdbc.Driver")\
    .option("url", "jdbc:mysql://{}:3306/creditcard_capstone".format(os.getenv("MYSQL_HOST"))) \
	.option("dbtable", "CDW_SAPP_CREDIT_CARD") \
	.option("user", os.getenv("MYSQL_USER")).option("password", os.getenv("MYSQL_PASS")).save()

In [14]:
df3.write.format("jdbc").mode("ignore").option("driver","com.mysql.jdbc.Driver")\
    .option("url", "jdbc:mysql://{}:3306/creditcard_capstone".format(os.getenv("MYSQL_HOST"))) \
	.option("dbtable", "CDW_SAPP_CUSTOMER") \
	.option("user", os.getenv("MYSQL_USER")).option("password", os.getenv("MYSQL_PASS")).save()

In [15]:
#