Importing all the required libraries

In [1]:
import os
import pandas as pd
os.environ['JAVA_HOME']="C:\jdk"
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType,VarcharType,TimestampType,StringType

Starting the spark session

In [2]:
spark = SparkSession.builder.appName("CreditCardSystemApp").master("local[*]").getOrCreate()


Reading the customer data to dataframe and printing the schema

In [3]:
df_with_schema=spark.read.json("cdw_sapp_custmer.json")
df_with_schema.printSchema()
df_with_schema.show(5)

root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)

+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-----

Creating new dataframe according to the mapping requirement\
1.Converting cust_zip to integer\
2.Converting SSN to integer\
3.converts first character of firstname uppercase with init function\
4.converts middle name to all lowercase\
5.converts first chatacter of last name uppercase with init function\
6.Changing the LAST_UPDATED field to timestamp\
7.Concatinating and adding APT_NO  and STREET_NAME  with the literal comma and dropping individual columns\
8.Formatting CUST_PHONE  column as (949)xxx-xxxx

In [4]:
df_customer_withschema=df_with_schema.withColumn('CUST_ZIP',df_with_schema['CUST_ZIP'].cast('Int')).\
    withColumn('SSN',df_with_schema['SSN'].cast('Int'))\
    .withColumn('FIRST_NAME',func.initcap(df_with_schema['FIRST_NAME']))\
        .withColumn('MIDDLE_NAME',func.lower(df_with_schema['MIDDLE_NAME']))\
            .withColumn('LAST_NAME',func.initcap(df_with_schema['LAST_NAME']))\
             .withColumn('LAST_UPDATED',df_with_schema['LAST_UPDATED'].cast('timestamp'))\
             .withColumn('FULL_STREET_ADDRESS',func.concat(df_with_schema['APT_NO'],func.lit(','),df_with_schema['STREET_NAME'])).drop('APT_NO').drop('STREET_NAME')\
             .withColumn('CUST_PHONE',func.concat(func.lit("(949)"),\
                                           func.substring(df_with_schema['CUST_PHONE'],1,3),\
                                                    func.lit("-"),\
                                                        func.substring(df_with_schema['CUST_PHONE'],4,4)))

             
             

In [5]:
df_customer_withschema.printSchema()

root
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: integer (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)



In [6]:
df_customer_withschema.show(3)

+----------------+------------+-------------+-------------------+-------------+----------+--------+----------+---------+-------------------+-----------+---------+--------------------+
|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|   CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|       LAST_UPDATED|MIDDLE_NAME|      SSN| FULL_STREET_ADDRESS|
+----------------+------------+-------------+-------------------+-------------+----------+--------+----------+---------+-------------------+-----------+---------+--------------------+
|4210653310061055|     Natchez|United States|AHooper@example.com|(949)123-7818|        MS|   39120|      Alec|   Hooper|2018-04-21 12:49:02|         wm|123456100|656,Main Street N...|
|4210653310102868|Wethersfield|United States|EHolman@example.com|(949)123-8933|        CT|    6109|      Etta|   Holman|2018-04-21 12:49:02|    brendan|123453023|   829,Redwood Drive|
|4210653310116272|     Huntley|United States|WDunham@example.com|(949)124-3018| 

In [7]:
df_customer_withschema.columns

['CREDIT_CARD_NO',
 'CUST_CITY',
 'CUST_COUNTRY',
 'CUST_EMAIL',
 'CUST_PHONE',
 'CUST_STATE',
 'CUST_ZIP',
 'FIRST_NAME',
 'LAST_NAME',
 'LAST_UPDATED',
 'MIDDLE_NAME',
 'SSN',
 'FULL_STREET_ADDRESS']

Reading data from Branch details file and printing Schema

In [8]:
df_branch=spark.read.json("cdw_sapp_branch.json")
df_branch.printSchema()
df_branch.show(5)

root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)

+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|    BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|     Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank| 

Creating new dataframe according to the mapping requirement\
1.Converting BRANCH_ZIP to INTEGER  and insering 99999 if it is null \
2.Changing the LAST_UPDATED field to timestamp\
3.Formatting BRANCH_PHONE  column as (XXX)xxx-xxxx



In [9]:
df_branch_withschema=df_branch.withColumn('BRANCH_ZIP',func.when(df_branch['BRANCH_ZIP'].isNull(),'99999').otherwise(df_branch['BRANCH_ZIP']).cast('int'))\
    .withColumn('LAST_UPDATED',df_branch['LAST_UPDATED'].cast('timestamp'))\
    .withColumn('BRANCH_PHONE',func.concat(func.lit("("),\
                                           func.substring(df_branch['BRANCH_PHONE'],1,3),\
                                            func.lit(")"),\
                                                func.substring(df_branch['BRANCH_PHONE'],4,3),\
                                                    func.lit("-"),\
                                                        func.substring(df_branch['BRANCH_PHONE'],7,4)))
   

In [10]:
df_branch_withschema.columns

['BRANCH_CITY',
 'BRANCH_CODE',
 'BRANCH_NAME',
 'BRANCH_PHONE',
 'BRANCH_STATE',
 'BRANCH_STREET',
 'BRANCH_ZIP',
 'LAST_UPDATED']

In [11]:
df_branch_withschema.printSchema()

root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: integer (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)



In [12]:
df_branch_withschema.show(5)

+-----------------+-----------+------------+-------------+------------+-----------------+----------+-------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME| BRANCH_PHONE|BRANCH_STATE|    BRANCH_STREET|BRANCH_ZIP|       LAST_UPDATED|
+-----------------+-----------+------------+-------------+------------+-----------------+----------+-------------------+
|        Lakeville|          1|Example Bank|(123)456-5276|          MN|     Bridle Court|     55044|2018-04-18 16:51:47|
|          Huntley|          2|Example Bank|(123)461-8993|          IL|Washington Street|     60142|2018-04-18 16:51:47|
|SouthRichmondHill|          3|Example Bank|(123)498-5926|          NY|    Warren Street|     11419|2018-04-18 16:51:47|
|       Middleburg|          4|Example Bank|(123)466-3064|          FL| Cleveland Street|     32068|2018-04-18 16:51:47|
|    KingOfPrussia|          5|Example Bank|(123)484-9701|          PA|      14th Street|     19406|2018-04-18 16:51:47|
+-----------------+-----------+-

In [13]:
df_branch.select('BRANCH_ZIP').show()

+----------+
|BRANCH_ZIP|
+----------+
|     55044|
|     60142|
|     11419|
|     32068|
|     19406|
|      7501|
|     14534|
|      6109|
|     44070|
|      8844|
|     48071|
|     32765|
|     17050|
|     11803|
|     42001|
|     19438|
|     20901|
|     55337|
|     98444|
|     17013|
+----------+
only showing top 20 rows



In [14]:
df_branch.filter('BRANCH_ZIP is NULL').show()

+-----------+-----------+-----------+------------+------------+-------------+----------+------------+
|BRANCH_CITY|BRANCH_CODE|BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|BRANCH_STREET|BRANCH_ZIP|LAST_UPDATED|
+-----------+-----------+-----------+------------+------------+-------------+----------+------------+
+-----------+-----------+-----------+------------+------------+-------------+----------+------------+



In [15]:
df_branch.select('BRANCH_ZIP').distinct().show(150)

+----------+
|BRANCH_ZIP|
+----------+
|     48047|
|     17201|
|     29576|
|     98908|
|     48867|
|     11419|
|     48178|
|     30101|
|     91740|
|     50010|
|      7111|
|     17013|
|     19438|
|     53045|
|     54601|
|     11510|
|     32068|
|      7740|
|     11756|
|     55337|
|     23112|
|     17325|
|     32765|
|     11530|
|     46530|
|     28173|
|     11001|
|     55044|
|     14534|
|     44512|
|     43512|
|     11791|
|     30741|
|     33442|
|     19380|
|     33414|
|     34990|
|     33594|
|      6511|
|     34711|
|     77904|
|     36330|
|     90278|
|     19406|
|     52804|
|     32708|
|     38655|
|     60091|
|     27284|
|     29680|
|     53066|
|     48239|
|     49418|
|     44663|
|     55311|
|     60103|
|     47274|
|     44070|
|     52722|
|     12601|
|     11803|
|     29550|
|     44805|
|     20901|
|     75088|
|      7866|
|     60148|
|     27103|
|     44224|
|     17543|
|     30052|
|     39759|
|     53151|
|     95993|

Reading Trasaction details from cdw_sapp_credit json file and printing the schema

In [16]:
df_creditcard=spark.read.json("cdw_sapp_credit.json")
df_creditcard.printSchema()
df_creditcard.show(5)

root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             5

Changing the date and month format to 2 digits by left padding 0 if it is 1 digit 

In [17]:
df_formatted=df_creditcard.withColumn("DAY",func.expr("CASE WHEN len(DAY)==1 THEN lpad(DAY,2,'0') ELSE DAY END"))\
                .withColumn("MONTH",func.expr("CASE WHEN len(MONTH)==1 THEN lpad(MONTH,2,'0') ELSE MONTH END"))

Creating new dataframe according to the mapping requirement\
1.Converting BRANCH_CODE to integer\
2.Converting CREDIT_CARD_NO to varchar and chnging the table name \
3.converts CUST_SSN TO INTEGER\
4.Concatinating year month  and day as column TIMEID and dropping individual columns


In [18]:
df_creditcard_withschema=df_formatted.withColumn('BRANCH_CODE',df_formatted['BRANCH_CODE'].cast('int'))\
.withColumn('CUST_CC_NO',df_formatted['CREDIT_CARD_NO'].cast('VARCHAR(30)')).drop(df_formatted['CREDIT_CARD_NO'])\
.withColumn('CUST_SSN',df_formatted['CUST_SSN'].cast('int'))\
.withColumn('TIMEID',func.concat(df_formatted['YEAR'],df_formatted['MONTH'],df_formatted['DAY']))\
                            .drop(df_formatted['YEAR'])\
                                .drop(df_formatted['MONTH'])\
                                    .drop(df_formatted['DAY'])
                       


In [19]:
df_creditcard_withschema.show(5)

+-----------+---------+--------------+----------------+-----------------+----------------+--------+
|BRANCH_CODE| CUST_SSN|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|      CUST_CC_NO|  TIMEID|
+-----------+---------+--------------+----------------+-----------------+----------------+--------+
|        114|123459988|             1|       Education|             78.9|4210653349028689|20180214|
|         35|123459988|             2|   Entertainment|            14.24|4210653349028689|20180320|
|        160|123459988|             3|         Grocery|             56.7|4210653349028689|20180708|
|        114|123459988|             4|   Entertainment|            59.73|4210653349028689|20180419|
|         93|123459988|             5|             Gas|             3.59|4210653349028689|20181010|
+-----------+---------+--------------+----------------+-----------------+----------------+--------+
only showing top 5 rows



In [20]:
df_creditcard_withschema.dtypes

[('BRANCH_CODE', 'int'),
 ('CUST_SSN', 'int'),
 ('TRANSACTION_ID', 'bigint'),
 ('TRANSACTION_TYPE', 'string'),
 ('TRANSACTION_VALUE', 'double'),
 ('CUST_CC_NO', 'string'),
 ('TIMEID', 'string')]

In [21]:
df_customer_withschema.show()

+----------------+------------+-------------+--------------------+-------------+----------+--------+----------+---------+-------------------+-----------+---------+--------------------+
|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|   CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|       LAST_UPDATED|MIDDLE_NAME|      SSN| FULL_STREET_ADDRESS|
+----------------+------------+-------------+--------------------+-------------+----------+--------+----------+---------+-------------------+-----------+---------+--------------------+
|4210653310061055|     Natchez|United States| AHooper@example.com|(949)123-7818|        MS|   39120|      Alec|   Hooper|2018-04-21 12:49:02|         wm|123456100|656,Main Street N...|
|4210653310102868|Wethersfield|United States| EHolman@example.com|(949)123-8933|        CT|    6109|      Etta|   Holman|2018-04-21 12:49:02|    brendan|123453023|   829,Redwood Drive|
|4210653310116272|     Huntley|United States| WDunham@example.com|(949)124-

In [22]:
df_branch_withschema.show()

+-----------------+-----------+------------+-------------+------------+-------------------+----------+-------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME| BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|       LAST_UPDATED|
+-----------------+-----------+------------+-------------+------------+-------------------+----------+-------------------+
|        Lakeville|          1|Example Bank|(123)456-5276|          MN|       Bridle Court|     55044|2018-04-18 16:51:47|
|          Huntley|          2|Example Bank|(123)461-8993|          IL|  Washington Street|     60142|2018-04-18 16:51:47|
|SouthRichmondHill|          3|Example Bank|(123)498-5926|          NY|      Warren Street|     11419|2018-04-18 16:51:47|
|       Middleburg|          4|Example Bank|(123)466-3064|          FL|   Cleveland Street|     32068|2018-04-18 16:51:47|
|    KingOfPrussia|          5|Example Bank|(123)484-9701|          PA|        14th Street|     19406|2018-04-18 16:51:47|
|         Paters

In [23]:
df_creditcard_withschema.show()

+-----------+---------+--------------+----------------+-----------------+----------------+--------+
|BRANCH_CODE| CUST_SSN|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|      CUST_CC_NO|  TIMEID|
+-----------+---------+--------------+----------------+-----------------+----------------+--------+
|        114|123459988|             1|       Education|             78.9|4210653349028689|20180214|
|         35|123459988|             2|   Entertainment|            14.24|4210653349028689|20180320|
|        160|123459988|             3|         Grocery|             56.7|4210653349028689|20180708|
|        114|123459988|             4|   Entertainment|            59.73|4210653349028689|20180419|
|         93|123459988|             5|             Gas|             3.59|4210653349028689|20181010|
|        164|123459988|             6|       Education|             6.89|4210653349028689|20180528|
|        119|123459988|             7|   Entertainment|            43.39|4210653349028689|20180519|


Writing df_customer_withschema dataframe to database creditcard_capstone  with the table name CDW_SAPP_CUSTOMER

In [25]:
df_customer_withschema.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_CUSTOMER") \
  .option("user", "root") \
  .option("password", "password") \
  .option("createTableColumnTypes","SSN INT,FIRST_NAME VARCHAR(50),MIDDLE_NAME VARCHAR(50),LAST_NAME VARCHAR(50) ,\
          CREDIT_CARD_NO VARCHAR(50),FULL_STREET_ADDRESS VARCHAR(200),CUST_CITY VARCHAR(50),CUST_STATE VARCHAR(50),CUST_COUNTRY VARCHAR(50),\
          CUST_ZIP INT,CUST_PHONE VARCHAR(50),CUST_EMAIL VARCHAR(100),LAST_UPDATED TIMESTAMP")\
  .save()


Writing df_branch_withschema dataframe to database creditcard_capstone  with the table name CDW_SAPP_BRANCH

In [26]:
df_branch_withschema.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_BRANCH") \
  .option("user", "root") \
  .option("password", "password") \
  .option("createTableColumnTypes","BRANCH_CODE INT,BRANCH_NAME VARCHAR(50),BRANCH_STREET VARCHAR(100),BRANCH_CITY VARCHAR(50) ,\
          BRANCH_STATE VARCHAR(50),BRANCH_ZIP INT,BRANCH_PHONE VARCHAR(50),LAST_UPDATED TIMESTAMP")\
  .save()


Writing df_creditcard_withschema dataframe to database creditcard_capstone  with the table name CDW_SAPP_CREDIT_CARD

In [27]:
df_creditcard_withschema.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_CREDIT_CARD") \
  .option("user", "root") \
  .option("password", "password") \
  .option("createTableColumnTypes","CUST_CC_NO VARCHAR(50),TIMEID VARCHAR(100),CUST_SSN INT ,\
          BRANCH_CODE INT,TRANSACTION_TYPE VARCHAR(50),TRANSACTION_VALUE DOUBLE,TRANSACTION_ID INT")\
  .save()
