In [21]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as fun
from pyspark.sql.types import IntegerType

In [22]:
file = open('database.txt')

lines = file.readlines()
url = lines[0].rstrip()
driver = lines[1].rstrip()
user = lines[2].rstrip()
password = lines[3].rstrip()

In [23]:
spark = SparkSession.builder.appName('CreditCardSystem').getOrCreate()

In [24]:
#Load JSON
brnch_df = spark.read.json('cdw_sapp_branch.json')
crdt_df = spark.read.json('cdw_sapp_credit.json')
cstmr_df = spark.read.json('cdw_sapp_custmer.json')

In [25]:
#Modify Customer Table to Guideline Standards
cstmr_df = cstmr_df.withColumn('FULL_STREET_ADDRESS', fun.concat('STREET_NAME', fun.lit(', '), 'APT_NO'))
cstmr_df = cstmr_df.withColumn('CUST_PHONE', fun.concat(fun.substring('CUST_PHONE', 0, 3), fun.lit('-'), fun.substring('CUST_PHONE', 4, 4)))
cstmr_df = cstmr_df.withColumn('CUST_ZIP', cstmr_df['CUST_ZIP'].cast(IntegerType()))
cstmr_df = cstmr_df.withColumn('LAST_UPDATED', fun.to_timestamp(cstmr_df['LAST_UPDATED']))

cstmr_df = cstmr_df.drop('STREET_NAME', 'APT_NO')

cstmr_df.show()

+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+-------------------+-----------+---------+--------------------+
|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|       LAST_UPDATED|MIDDLE_NAME|      SSN| FULL_STREET_ADDRESS|
+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+-------------------+-----------+---------+--------------------+
|4210653310061055|     Natchez|United States| AHooper@example.com|  123-7818|        MS|   39120|      Alec|   Hooper|2018-04-21 09:49:02|         Wm|123456100|Main Street North...|
|4210653310102868|Wethersfield|United States| EHolman@example.com|  123-8933|        CT|    6109|      Etta|   Holman|2018-04-21 09:49:02|    Brendan|123453023|  Redwood Drive, 829|
|4210653310116272|     Huntley|United States| WDunham@example.com|  124-3018|        IL|  

In [26]:
#Modify Branch Table to Guideline Standards
brnch_df = brnch_df.withColumn('BRANCH_PHONE', fun.concat(fun.lit('('), fun.substring('BRANCH_PHONE', 0, 3), fun.lit(')'), fun.substring('BRANCH_PHONE', 4, 3), fun.lit('-'), fun.substring('BRANCH_PHONE', 7, 4)))
brnch_df = brnch_df.withColumn('LAST_UPDATED', fun.to_timestamp(brnch_df['LAST_UPDATED']))

brnch_df.show()

+-----------------+-----------+------------+-------------+------------+-------------------+----------+-------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME| BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|       LAST_UPDATED|
+-----------------+-----------+------------+-------------+------------+-------------------+----------+-------------------+
|        Lakeville|          1|Example Bank|(123)456-5276|          MN|       Bridle Court|     55044|2018-04-18 13:51:47|
|          Huntley|          2|Example Bank|(123)461-8993|          IL|  Washington Street|     60142|2018-04-18 13:51:47|
|SouthRichmondHill|          3|Example Bank|(123)498-5926|          NY|      Warren Street|     11419|2018-04-18 13:51:47|
|       Middleburg|          4|Example Bank|(123)466-3064|          FL|   Cleveland Street|     32068|2018-04-18 13:51:47|
|    KingOfPrussia|          5|Example Bank|(123)484-9701|          PA|        14th Street|     19406|2018-04-18 13:51:47|
|         Paters

In [27]:
#Modify Credit Card Table to Guideline Standards
crdt_df = crdt_df.withColumn('TIMEID', fun.expr('make_date(YEAR, MONTH, DAY)'))
crdt_df = crdt_df.withColumn('TIMEID', fun.to_timestamp(crdt_df['TIMEID']))

crdt_df = crdt_df.drop('YEAR', 'MONTH', 'DAY')

crdt_df.show()

+-----------+----------------+---------+--------------+----------------+-----------------+-------------------+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|             TIMEID|
+-----------+----------------+---------+--------------+----------------+-----------------+-------------------+
|        114|4210653349028689|123459988|             1|       Education|             78.9|2018-02-14 00:00:00|
|         35|4210653349028689|123459988|             2|   Entertainment|            14.24|2018-03-20 00:00:00|
|        160|4210653349028689|123459988|             3|         Grocery|             56.7|2018-07-08 00:00:00|
|        114|4210653349028689|123459988|             4|   Entertainment|            59.73|2018-04-19 00:00:00|
|         93|4210653349028689|123459988|             5|             Gas|             3.59|2018-10-10 00:00:00|
|        164|4210653349028689|123459988|             6|       Education|             6.89|2018-05-28 00:00:00|
|

In [28]:
#load branch dataframe to mariadb
brnch_df.write.format('jdbc').options(
      url=url,
      driver=driver,
      dbtable='CDW_SAPP_BRANCH',
      user=user,
      password=password).mode('overwrite').save()

In [29]:
#load credit card dataframe to mariadb
crdt_df.write.format('jdbc').options(
      url=url,
      driver=driver,
      dbtable='CDW_SAPP_CREDIT_CARD',
      user=user,
      password=password).mode('overwrite').save()

In [30]:
#load customer dataframe to mariadb
cstmr_df.write.format('jdbc').options(
      url=url,
      driver=driver,
      dbtable='CDW_SAPP_CUSTOMER',
      user=user,
      password=password).mode('overwrite').save()