In [18]:
import pandas as pd
import pyspark as ps
from pyspark.sql import SparkSession
import matplotlib as mp
from pyspark.sql.functions import *

In [19]:
# Creating Spark Session
spark = SparkSession.builder.appName('SBA 350').getOrCreate()
# Reading /loading the Dataset from JSON file, SPARK by defualt infersSchema for JSON files; only need headers parameter typically for csv files
customer_spark = spark.read.load("cdw_sapp_customer.json", format="json")

In [20]:
branch_spark = spark.read.load("cdw_sapp_branch.json", format= "json")
credit_spark = spark.read.load("cdw_sapp_credit.json", format= "json")

In [17]:
# credit_spark.show()
credit_spark.dtypes
# [('BRANCH_CODE', 'bigint'),
#  ('CREDIT_CARD_NO', 'string'),
#  ('CUST_SSN', 'bigint'),
#  ('DAY', 'bigint'),
#  ('MONTH', 'bigint'),
#  ('TRANSACTION_ID', 'bigint'),
#  ('TRANSACTION_TYPE', 'string'),
#  ('TRANSACTION_VALUE', 'double'),
#  ('YEAR', 'bigint')]

[('BRANCH_CODE', 'bigint'),
 ('CREDIT_CARD_NO', 'string'),
 ('CUST_SSN', 'bigint'),
 ('DAY', 'bigint'),
 ('MONTH', 'bigint'),
 ('TRANSACTION_ID', 'bigint'),
 ('TRANSACTION_TYPE', 'string'),
 ('TRANSACTION_VALUE', 'double'),
 ('YEAR', 'bigint')]

In [16]:
# branch_spark.show()
branch_spark.dtypes
# [('BRANCH_CITY', 'string'),
#  ('BRANCH_CODE', 'bigint'),
#  ('BRANCH_NAME', 'string'),
#  ('BRANCH_PHONE', 'string'),
#  ('BRANCH_STATE', 'string'),
#  ('BRANCH_STREET', 'string'),
#  ('BRANCH_ZIP', 'bigint'),
#  ('LAST_UPDATED', 'string')]

[('BRANCH_CITY', 'string'),
 ('BRANCH_CODE', 'bigint'),
 ('BRANCH_NAME', 'string'),
 ('BRANCH_PHONE', 'string'),
 ('BRANCH_STATE', 'string'),
 ('BRANCH_STREET', 'string'),
 ('BRANCH_ZIP', 'bigint'),
 ('LAST_UPDATED', 'string')]

In [15]:
# customer_spark.show()
# customer_spark.dtypes
#[('APT_NO', 'string'),
#  ('CREDIT_CARD_NO', 'string'),
#  ('CUST_CITY', 'string'),
#  ('CUST_COUNTRY', 'string'),
#  ('CUST_EMAIL', 'string'),
#  ('CUST_PHONE', 'bigint'),
#  ('CUST_STATE', 'string'),
#  ('CUST_ZIP', 'string'),
#  ('FIRST_NAME', 'string'),
#  ('LAST_NAME', 'string'),
#  ('LAST_UPDATED', 'string'),
#  ('MIDDLE_NAME', 'string'),
#  ('SSN', 'bigint'),
#  ('STREET_NAME', 'string')]

[('APT_NO', 'string'),
 ('CREDIT_CARD_NO', 'string'),
 ('CUST_CITY', 'string'),
 ('CUST_COUNTRY', 'string'),
 ('CUST_EMAIL', 'string'),
 ('CUST_PHONE', 'bigint'),
 ('CUST_STATE', 'string'),
 ('CUST_ZIP', 'string'),
 ('FIRST_NAME', 'string'),
 ('LAST_NAME', 'string'),
 ('LAST_UPDATED', 'string'),
 ('MIDDLE_NAME', 'string'),
 ('SSN', 'bigint'),
 ('STREET_NAME', 'string')]

In [21]:

# tan_customer = customer_spark['FIRST_NAME', 'MIDDLE_NAME', 'LAST_NAME', 'STREET_NAME','APT_NO', 'CUST_PHONE']
# customer_spark.createOrReplaceTempView('customer')


def tran_cust_title_case(df, column_name):
    return df.withColumn(column_name,initcap(col(column_name)))
def tran_cust_lower_case(df, column_name):
    return df.withColumn(column_name,lower(col(column_name)))
def concat_cust_street_apt(df, col1, col2):
    #concat_ws concatenates multiple string columns 
    return df.withColumn('FULL_STREET_ADDRESS', concat_ws(',', col(col1), col(col2)))
#the cust_phone doesn't have an area code, need to rectify somehow
def tran_phone_num(df, column_name):
    return df.withColumn(column_name, concat(lit('('), 
                                             substring(col(column_name), 1, 3),
                                             lit(')'),
                                             substring(col(column_name), 4, 3),
                                             lit('-'),
                                             substring(col(column_name), 7, 4))
                                             .cast('string'))
#checks if the branches zip is null and defaults it to 99999, and if it isn't it returns the branches zip
def tran_branch_zip(df):
    return df.withColumn('BRANCH_ZIP', when(col('BRANCH_ZIP').isNull(), lit(99999)).otherwise(col('BRANCH_ZIP')))

#match cust state to branch state and slice branch phone 3-5 (last included?) to append to cust phone after 2nd element
#first left join the cust df and the branch df on their respective states like in sql
custJoinbranch = customer_spark.join(branch_spark, customer_spark['CUST_STATE'] == branch_spark['BRANCH_STATE'], 'left')

# Updating the customer phone by appending a sliced portion of the branch phone
customer_fix = custJoinbranch.withColumn('CUST_PHONE', concat(col('CUST_PHONE').substr(1, 2), col('BRANCH_PHONE').substr(3, 3), col('CUST_PHONE').substr(3, 7)))

# concat the day, month, year columns into a TIMEID (YYYYMMDD)
def tran_to_timeid(df, day, month, year):
    # first concat the columns so you can use the to_date function
    #need to lpad the month and day values so not to throw error when parsing to to_date
    #the 2 represents the desired length of the string and the 0 is what we're left-padding with
    date_string = concat(
        col(year),
        lpad(col(month), 2, '0'),
        lpad(col(day), 2, '0'))
    return df.withColumn('TIMEID', to_date(date_string, 'yyyyMMdd'))

In [22]:
#transforming the specified columns, from the mapping document, of the extracted customer df into new df
tran_cust_spark = customer_spark.transform(tran_cust_title_case, 'FIRST_NAME')\
.transform(tran_cust_lower_case, 'MIDDLE_NAME')\
.transform(tran_cust_title_case, 'LAST_NAME')\
.transform(concat_cust_street_apt, 'APT_NO', 'STREET_NAME')\
.drop('STREET_NAME', 'APT_NO')\
.transform(tran_phone_num, 'CUST_PHONE')


customer_spark.show()
tran_cust_spark.show()


+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States| AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         Wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States| EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    Brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States| WDunham@exam

In [23]:
#transforming the specified columns, from the mapping document, of the extracted branch df into new df
#just need to transform the branch_zip if the source value is null load default (99999) value else direct move
#and branch_phone change the format of phone number to (xxx)xxx-xxxx
tran_branch_spark = branch_spark.transform(tran_branch_zip)\
.transform(tran_phone_num, 'BRANCH_PHONE')

branch_spark.show()
tran_branch_spark.show()



+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|       Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|  Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank|  1234985926|          NY|      Warren Street|     11419|2018-04-18T16:51:...|
|       Middleburg|          4|Example Bank|  1234663064|          FL|   Cleveland Street|     32068|2018-04-18T16:51:...|
|    KingOfPrussia|          5|Example Bank|  1234849701|          PA|        14th Street|     19406|2018-04-18T16:51:...|
|         Paters

In [25]:
#have to set sparks configuration to legacy to stop getting error from parsing the to_date, throws null now if it isn't valid
# spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [26]:
#transforming the specified columns, from the mapping document, of the extracted credit df into new df
#just need to convert/concat the day, month, year columns into a TIMEID (YYYYMMDD)
#is there a way to use a timestamp and not just concat? is there an easier way?
tran_credit_spark = credit_spark.transform(tran_to_timeid, 'DAY', 'MONTH', 'YEAR')\
.drop('DAY','MONTH','YEAR')

credit_spark.show()
tran_credit_spark.show(100)


+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             56.7|2018|
|        114|4210653349028689|123459988| 19|    4|             4|   Entertainment|            59.73|2018|
|         93|4210653349028689|123459988| 10|   10|             5|             Gas|             3.59|2018|
|        164|4210653349028689|123459988| 28|    5|             6|       Education|             6.89|2018|
|        119|4210653349028689|123459988| 19|  

In [35]:
distinct_credit = tran_credit_spark.dropDuplicates(['CUST_SSN'])
# distinct_credit.show()
print(distinct_credit.count())
print(tran_credit_spark.count())

952
46694


cdw_sapp_custmer.json (fixed the customer typo)--> CDW_SAPP_CUSTOMER is target tapple, target field names(columns) are the same

SSN(int): dm

FIRST_NAME(varchar): convert the name to title case

MIDDLE_NAME(varchar): convert the name to lower case

LAST_NAME(varchar): convert the name to title case

CREDIT_CARD_NO(varchar): dm

STREET_NAME,APT_NO(varchar): concatenate aprtment no and street name of customer's residence with comma as a serpator

CUST_CITY(varchar): dm

CUST_STATE(varchar): dm

CUST_COUNTRY(varchar): dm

CUST_ZIP(int): dm

CUST_PHONE(varchar): change the format of phone number to (XXX)XXX-XXXX

CUST_EMAIL(varchar): dm

LAST_UPDATED(timestamp): dm