In [1]:
import pyspark
from pyspark.sql import SparkSession
import findspark
findspark.init()
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.appName('Capstone').getOrCreate()

In [47]:
df_cust_raw = spark.read.json("cdw_sapp_custmer.json")
df_cust_raw.printSchema()
df_cust_raw.show()

root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)

+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+---

In [63]:
from pyspark.sql.functions import lower, initcap, upper, col, lit
df_middle = df_cust_raw.withColumn('MIDDLE_NAME', lower(col('MIDDLE_NAME')))
df_middle.show()

+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States| AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States| EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States| WDunham@exam

In [4]:
df_first = df_middle.withColumn('FIRST_NAME', upper(col('FIRST_NAME')))
df_first.show(5)

+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States|AHooper@example.com|   1237818|        MS|   39120|      ALEC|   Hooper|2018-04-21T12:49:...|         wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States|EHolman@example.com|   1238933|        CT|   06109|      ETTA|   Holman|2018-04-21T12:49:...|    brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States|WDunham@example.co

In [5]:
df_last = df_first.withColumn('LAST_NAME', upper(col('LAST_NAME')))
df_last.show(5)

+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States|AHooper@example.com|   1237818|        MS|   39120|      ALEC|   HOOPER|2018-04-21T12:49:...|         wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States|EHolman@example.com|   1238933|        CT|   06109|      ETTA|   HOLMAN|2018-04-21T12:49:...|    brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States|WDunham@example.co

In [6]:
from pyspark.sql.functions import concat, concat_ws
#df_addr = df_last.select(concat_ws(',',df.STREET_NAME,df.APT_NO).alias("FULL_STREET_ADDRESS"))
df_address = df_last.withColumn('FULL_STREET_ADDRESS', concat_ws(',', col('STREET_NAME'), col('APT_NO')))
df_addr = df_address.drop('APT_NO', 'STREET_NAME')
df_addr.show(5)

+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+--------------------+
|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN| FULL_STREET_ADDRESS|
+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+--------------------+
|4210653310061055|     Natchez|United States|AHooper@example.com|   1237818|        MS|   39120|      ALEC|   HOOPER|2018-04-21T12:49:...|         wm|123456100|Main Street North...|
|4210653310102868|Wethersfield|United States|EHolman@example.com|   1238933|        CT|   06109|      ETTA|   HOLMAN|2018-04-21T12:49:...|    brendan|123453023|   Redwood Drive,829|
|4210653310116272|     Huntley|United States|WDunham@example.com|   1243018|        IL|   

In [92]:
df_cust_clean = df_addr.withColumn('CUST_PHONE', concat(lit('(777)'), col('CUST_PHONE')[0:3], lit('-'), 
                                                            col('CUST_PHONE')[4:6]))
#added '777' to make the phone number 10 digits long
df_cust_clean.show(5)

+----------------+------------+-------------+-------------------+-------------+----------+--------+----------+---------+--------------------+-----------+---------+--------------------+
|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|   CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN| FULL_STREET_ADDRESS|
+----------------+------------+-------------+-------------------+-------------+----------+--------+----------+---------+--------------------+-----------+---------+--------------------+
|4210653310061055|     Natchez|United States|AHooper@example.com|(777)123-7818|        MS|   39120|      ALEC|   HOOPER|2018-04-21T12:49:...|         wm|123456100|Main Street North...|
|4210653310102868|Wethersfield|United States|EHolman@example.com|(777)123-8933|        CT|   06109|      ETTA|   HOLMAN|2018-04-21T12:49:...|    brendan|123453023|   Redwood Drive,829|
|4210653310116272|     Huntley|United States|WDunham@example.com|(777)124-3

In [95]:
df_credit_raw = spark.read.json("cdw_sapp_credit.json")
df_credit_raw.printSchema()
df_credit_raw.show(5)

root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             5

In [59]:
from pyspark.sql.functions import lpad
df_padded = df_credit_raw.withColumn('MONTH_PADDED', lpad(df_credit_raw.MONTH,2, '0'))
df_padded.show(5)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+------------+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|MONTH_PADDED|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+------------+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|          02|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|          03|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             56.7|2018|          07|
|        114|4210653349028689|123459988| 19|    4|             4|   Entertainment|            59.73|2018|          04|
|         93|4210653349028689|123459988| 10|   10|             5|             Gas|             3.59|2018|          10|
+-----------+----------------+---------+---+----

In [60]:
df_all_padded = df_padded.withColumn('DAY_PADDED', lpad(df_padded.DAY,2, '0'))
df_all_padded.show(5)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+------------+----------+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|MONTH_PADDED|DAY_PADDED|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+------------+----------+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|          02|        14|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|          03|        20|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             56.7|2018|          07|        08|
|        114|4210653349028689|123459988| 19|    4|             4|   Entertainment|            59.73|2018|          04|        19|
|         93|4210653349028689|123459988| 10|   10|             5|             Gas|        

In [106]:
df_year = df_all_padded.withColumn('TIMEID', concat(col('YEAR'), col('MONTH_PADDED'), col('DAY_PADDED')))
df_credit_card_clean = df_year.drop('YEAR', 'MONTH', 'DAY', 'MONTH_PADDED', 'DAY_PADDED')
df_credit_card_clean.show()

+-----------+----------------+---------+--------------+----------------+-----------------+--------+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|  TIMEID|
+-----------+----------------+---------+--------------+----------------+-----------------+--------+
|        114|4210653349028689|123459988|             1|       Education|             78.9|20180214|
|         35|4210653349028689|123459988|             2|   Entertainment|            14.24|20180320|
|        160|4210653349028689|123459988|             3|         Grocery|             56.7|20180708|
|        114|4210653349028689|123459988|             4|   Entertainment|            59.73|20180419|
|         93|4210653349028689|123459988|             5|             Gas|             3.59|20181010|
|        164|4210653349028689|123459988|             6|       Education|             6.89|20180528|
|        119|4210653349028689|123459988|             7|   Entertainment|            43.39|20180519|


In [93]:
df_branch_raw = spark.read.json("cdw_sapp_branch.json")
df_branch_raw.printSchema()
df_branch_raw.show(5)

root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)

+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|    BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|     Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank| 

In [94]:
from pyspark.sql.functions import when
# df3.filter(df3.BRANCH_ZIP.isNull()).show()
df_zip = df_branch_raw.withColumn("BRANCH_ZIP", when(df_branch_raw.BRANCH_ZIP.isNull(),00000).otherwise(df_branch_raw.BRANCH_ZIP))
df_zip.show()

+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|       Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|  Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank|  1234985926|          NY|      Warren Street|     11419|2018-04-18T16:51:...|
|       Middleburg|          4|Example Bank|  1234663064|          FL|   Cleveland Street|     32068|2018-04-18T16:51:...|
|    KingOfPrussia|          5|Example Bank|  1234849701|          PA|        14th Street|     19406|2018-04-18T16:51:...|
|         Paters

In [103]:
#df_cust_clean = df_addr.withColumn('CUST_PHONE', concat(lit('(777)'), col('CUST_PHONE')[0:3], lit('-'), 
#                                                            col('CUST_PHONE')[4:6]))
df_branch_clean = df_zip.withColumn('BRANCH_PHONE', concat(lit('('), col('BRANCH_PHONE')[0:3], lit(')'),
                                                          col('BRANCH_PHONE')[4:3], lit('-'), col('BRANCH_PHONE')[7:9]))
df_branch_clean.show(5)

+-----------------+-----------+------------+-------------+------------+-----------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME| BRANCH_PHONE|BRANCH_STATE|    BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+-------------+------------+-----------------+----------+--------------------+
|        Lakeville|          1|Example Bank|(123)456-5276|          MN|     Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|(123)461-8993|          IL|Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank|(123)498-5926|          NY|    Warren Street|     11419|2018-04-18T16:51:...|
|       Middleburg|          4|Example Bank|(123)466-3064|          FL| Cleveland Street|     32068|2018-04-18T16:51:...|
|    KingOfPrussia|          5|Example Bank|(123)484-9701|          PA|      14th Street|     19406|2018-04-18T16:51:...|
+-----------------+-----

In [None]:
with open(r"C:\Users\izumr\Desktop\Work\data-eng-project\Console_App\hidden.txt", 'r') as f:
    hidden_ls = f.readlines()
    usr = hidden_ls[0][:-1]
    pwd = hidden_ls[1]
    f.close()

In [110]:
df_cust_clean.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "CDW_SAPP_CUSTOMER") \
  .option("user", usr) \
  .option("password", pwd) \
  .save()

In [107]:
df_credit_card_clean.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "CDW_SAPP_CREDIT_CARD") \
  .option("user", usr) \
  .option("password", pwd) \
  .save()

In [108]:
df_branch_clean.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "CDW_SAPP_BRANCH") \
  .option("user", usr) \
  .option("password", pwd) \
  .save()