In [1]:
#!pip install pyspark

In [2]:
#Import all necessary libraries and packages

import pyspark
import pyspark.sql.functions as funct
from pyspark.sql import SparkSession
import pandas as pd
import credentials as cred 

In [3]:
#Creating a shortcut to the filepath for the source file for branches
customer_filepath = r"C:\Users\chito\Developer\Capstone_350\Raw_Data\cdw_sapp_customer.json"

In [4]:
#This creates the new Sparksession
spark = SparkSession.builder.appName("customer_clean").getOrCreate()
#spark = SparkSession.builder.master("local").appName("test").getOrCreate()


In [5]:
#Google Colab
#Creating a shortcut to the filepath for the source file for branches
#customer_filepath = "/content/cdw_sapp_customer.json"

In [6]:
#The raw data json file contains multiline records that output an error when the spark.read.json function is used to attempt to load the data thus use the option("multiline", True)

customer_df = spark.read.option("multiLine", True).json(customer_filepath)

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
# Data Exploration


In [7]:
#Showing the first 10 rows
customer_df.show(10) #A quick review of the initial 10 rows indicates that the CUST_PHONE column is missing digits for a US based system that uses 10 digits including area code for telephone numbers.

+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States| AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         Wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States| EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    Brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States| WDunham@exam

In [8]:
customer_df.printSchema()

root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)



**As the columns will need to be reordered according to the mapping document, it makes sense to place the columns in the order presecribed in the mapping document. <span style="font-size: larger;">A special note here**
</span>. In this analysis, the assumption is that the APT_NO column actually represensents the number portion of a street address since it's unlikely that all customers would live in an apartment. Later in the transformation section of the customer dataframe, placing the APT_NO adjacent to the STREET_NAME column will be the basis of the analysis.


In [9]:
customer_df = customer_df.select('CREDIT_CARD_NO', 'SSN', 'CUST_EMAIL', 'CUST_PHONE','FIRST_NAME','MIDDLE_NAME', 'LAST_NAME', 'APT_NO', 'STREET_NAME','CUST_CITY', 'CUST_ZIP', 'CUST_STATE', 'CUST_COUNTRY', 'LAST_UPDATED')

customer_df.show(5)

+----------------+---------+-------------------+----------+----------+-----------+---------+------+-----------------+------------+--------+----------+-------------+--------------------+
|  CREDIT_CARD_NO|      SSN|         CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|APT_NO|      STREET_NAME|   CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+----------------+---------+-------------------+----------+----------+-----------+---------+------+-----------------+------------+--------+----------+-------------+--------------------+
|4210653310061055|123456100|AHooper@example.com|   1237818|      Alec|         Wm|   Hooper|   656|Main Street North|     Natchez|   39120|        MS|United States|2018-04-21T12:49:...|
|4210653310102868|123453023|EHolman@example.com|   1238933|      Etta|    Brendan|   Holman|   829|    Redwood Drive|Wethersfield|   06109|        CT|United States|2018-04-21T12:49:...|
|4210653310116272|123454487|WDunham@example.com|   1243018|    Wilber|

In [10]:
customer_df.describe().show()

+-------+--------------------+--------------------+--------------------+------------------+----------+-----------+---------+------------------+-----------+---------+------------------+----------+-------------+--------------------+
|summary|      CREDIT_CARD_NO|                 SSN|          CUST_EMAIL|        CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|            APT_NO|STREET_NAME|CUST_CITY|          CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+-------+--------------------+--------------------+--------------------+------------------+----------+-----------+---------+------------------+-----------+---------+------------------+----------+-------------+--------------------+
|  count|                 952|                 952|                 952|               952|       952|        952|      952|               952|        952|      952|               952|       952|          952|                 952|
|   mean|4.210653353718597...|1.2345552588130252E8|                NULL|1239

### Again, as indicated above, the CUST_PHONE column shows the value to be only 7 digits rather than 10 digits thus requiring transformation. Similarly, credit card numbers are 16 digits, social security numbers are 9 digits, US states use the 2 letter postal abbreviation, and zip codes are 5 digits.

In [11]:
customer_df.explain()

== Physical Plan ==
*(1) Project [CREDIT_CARD_NO#1, SSN#12L, CUST_EMAIL#4, CUST_PHONE#5L, FIRST_NAME#8, MIDDLE_NAME#11, LAST_NAME#9, APT_NO#0, STREET_NAME#13, CUST_CITY#2, CUST_ZIP#7, CUST_STATE#6, CUST_COUNTRY#3, LAST_UPDATED#10]
+- FileScan json [APT_NO#0,CREDIT_CARD_NO#1,CUST_CITY#2,CUST_COUNTRY#3,CUST_EMAIL#4,CUST_PHONE#5L,CUST_STATE#6,CUST_ZIP#7,FIRST_NAME#8,LAST_NAME#9,LAST_UPDATED#10,MIDDLE_NAME#11,SSN#12L,STREET_NAME#13] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/chito/Developer/Capstone_350/Raw_Data/cdw_sapp_customer..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<APT_NO:string,CREDIT_CARD_NO:string,CUST_CITY:string,CUST_COUNTRY:string,CUST_EMAIL:string...




In [12]:
#Count of customer records
customer_df.count()

952

In [13]:
print(customer_df.distinct().count()) # Should be 952 unique customers
customer_df.select(funct.countDistinct("CREDIT_CARD_NO")).show() #952 unique Credit Card #s for each customer
customer_df.select(funct.countDistinct("CUST_EMAIL")).show() #928 unique customer emails so the data should be examined for null or missing values
customer_df.select(funct.countDistinct("CUST_PHONE")).show() #901 unique customer phone numbers. This could be because of area code issues which will be engaged in the data transformation section
customer_df.select(funct.countDistinct("SSN")).show()

952
+------------------------------+
|count(DISTINCT CREDIT_CARD_NO)|
+------------------------------+
|                           952|
+------------------------------+

+--------------------------+
|count(DISTINCT CUST_EMAIL)|
+--------------------------+
|                       928|
+--------------------------+

+--------------------------+
|count(DISTINCT CUST_PHONE)|
+--------------------------+
|                       901|
+--------------------------+

+-------------------+
|count(DISTINCT SSN)|
+-------------------+
|                952|
+-------------------+



In [14]:
# Find count of NA or missing values for each column
from pyspark.sql.functions import col
na_counts = {col_name: customer_df.filter(col(col_name).isNull() | (col(col_name) == "")).count() for col_name in customer_df.columns}

# Print the counts of missing values per column
for column, count in na_counts.items():
    print(f"Column {column} has {count} missing values")

Column CREDIT_CARD_NO has 0 missing values
Column SSN has 0 missing values
Column CUST_EMAIL has 0 missing values
Column CUST_PHONE has 0 missing values
Column FIRST_NAME has 0 missing values
Column MIDDLE_NAME has 0 missing values
Column LAST_NAME has 0 missing values
Column APT_NO has 0 missing values
Column STREET_NAME has 0 missing values
Column CUST_CITY has 0 missing values
Column CUST_ZIP has 0 missing values
Column CUST_STATE has 0 missing values
Column CUST_COUNTRY has 0 missing values
Column LAST_UPDATED has 0 missing values


In [15]:
#Since there are no null or missing values, the CUST_EMAIL column needs to be inspected for duplicate values
# Display rows with duplicate CUST_EMAIL values
customer_df.groupBy("CUST_EMAIL").count().where("count > 1").show(24) # of duplicate emails (952-928 = 24). These will need transformation.
#The duplicate emails are exhibited below:

+--------------------+-----+
|          CUST_EMAIL|count|
+--------------------+-----+
|    TLiu@example.com|    2|
|  MSwain@example.com|    2|
| CGamble@example.com|    2|
|   RHood@example.com|    2|
| ETruong@example.com|    2|
|JGodfrey@example.com|    2|
|   BYork@example.com|    2|
| EArthur@example.com|    2|
|CSpencer@example.com|    2|
|   LHaas@example.com|    2|
| KBonner@example.com|    2|
|  LCombs@example.com|    2|
| DDorsey@example.com|    2|
|  PMoore@example.com|    2|
|  SBraun@example.com|    2|
| BGuerra@example.com|    2|
| SCrouch@example.com|    2|
| RDeleon@example.com|    2|
| SHoover@example.com|    2|
|DCrowley@example.com|    2|
|  JEmery@example.com|    2|
|  MHatch@example.com|    2|
|SPadgett@example.com|    2|
|   EOdom@example.com|    2|
+--------------------+-----+



In [16]:
# check that all emails in the CUST_EMAIL column have the correct format with alphanumeric "characters@.example.com" using Regex for pyspark



# Define the regular expression pattern for valid emails
email_pattern = "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.example\.com$"

# Filter the DataFrame to select rows where the CUST_EMAIL column does not match the pattern
invalid_emails_df = customer_df.filter(~funct.col("CUST_EMAIL").rlike(email_pattern))

# Count the number of rows with invalid emails
invalid_email_count = invalid_emails_df.count()

# Print the count of invalid emails
print(f"Number of rows with invalid emails: {invalid_email_count}")

# Show the rows with invalid emails
invalid_emails_df.show(5)


Number of rows with invalid emails: 952
+----------------+---------+-------------------+----------+----------+-----------+---------+------+-----------------+------------+--------+----------+-------------+--------------------+
|  CREDIT_CARD_NO|      SSN|         CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|APT_NO|      STREET_NAME|   CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+----------------+---------+-------------------+----------+----------+-----------+---------+------+-----------------+------------+--------+----------+-------------+--------------------+
|4210653310061055|123456100|AHooper@example.com|   1237818|      Alec|         Wm|   Hooper|   656|Main Street North|     Natchez|   39120|        MS|United States|2018-04-21T12:49:...|
|4210653310102868|123453023|EHolman@example.com|   1238933|      Etta|    Brendan|   Holman|   829|    Redwood Drive|Wethersfield|   06109|        CT|United States|2018-04-21T12:49:...|
|4210653310116272|123454487|WD

#### **All customer emails have the correct format. The issue is with the duplicated emails where individuals share the same first name first inititial and the same last name. There are 24 duplicate emails in the dataframe. 952-24 = 928 as the count of count(DISTINCT CUST_EMAIL) showed 928. However, we will need to account for this when transforming the data. The methodology will use the customer middle name, MIDDLE_NAME, as a way to distinguish between duplicate emails since it appears that the email format reflects: first_initial_last_name@example.com. Transformation will only occur for those duplicates and not each emai in the data frame**

____________________________________________________________________________________________________________________________________________
### **Data Transformation**

In [17]:
#Adding space between customer city names that have multiple words therby creating space to appropriate decipher the customer city name, e.g., "El Paso", "King of Prussia"
from pyspark.sql.functions import regexp_replace

#Using regexp_replace, the text is searched through regex functionality to search for a lowercase letter followed by an uppercase letter. The two characters are then separated by a white space.
customer_df = customer_df.withColumn("CUST_CITY", regexp_replace(customer_df["CUST_CITY"], r"([a-z])([A-Z])", r"$1 $2"))
customer_df.show(10)


+----------------+---------+--------------------+----------+----------+-----------+---------+------+-----------------+-------------+--------+----------+-------------+--------------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|APT_NO|      STREET_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+----------------+---------+--------------------+----------+----------+-----------+---------+------+-----------------+-------------+--------+----------+-------------+--------------------+
|4210653310061055|123456100| AHooper@example.com|   1237818|      Alec|         Wm|   Hooper|   656|Main Street North|      Natchez|   39120|        MS|United States|2018-04-21T12:49:...|
|4210653310102868|123453023| EHolman@example.com|   1238933|      Etta|    Brendan|   Holman|   829|    Redwood Drive| Wethersfield|   06109|        CT|United States|2018-04-21T12:49:...|
|4210653310116272|123454487| WDunham@example.com|   1243018|

### Transforming the duplicate emails with the use of the customer MIDDLE_NAME field to differentiate duplicates. This seems the best method to avoid duplicate emails from what are almost certainly unique individuals not sharing a household with the other person.

In [18]:

from pyspark.sql.window import Window
from pyspark.sql import functions as funct  # Using your alias

w = Window.partitionBy('CUST_EMAIL').orderBy('MIDDLE_NAME')
ranked_df = customer_df.withColumn('rank', funct.rank().over(w))

# Update emails for duplicates
ranked_df = (
    ranked_df.withColumn(
        'CUST_EMAIL',  # Correct the column name to 'CUST_EMAIL'
        funct.when(
            funct.col('rank') > 1,
            funct.concat(
                funct.substring('FIRST_NAME', 1, 1),
                funct.substring('MIDDLE_NAME', 1, 1),
                funct.col('LAST_NAME'),                   # Full last name (no slicing)
                funct.lit('@example.com'),
            ),
        ).otherwise(funct.col('CUST_EMAIL')),  # Reference correct original email column
    )
    .drop('rank')
)

ranked_df.show(10)


+----------------+---------+--------------------+----------+----------+-----------+---------+------+---------------+----------------+--------+----------+-------------+--------------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|APT_NO|    STREET_NAME|       CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+----------------+---------+--------------------+----------+----------+-----------+---------+------+---------------+----------------+--------+----------+-------------+--------------------+
|4210653388841064|123458478|AAguilar@example.com|   1238016|     April|     Damien|  Aguilar|   163|    York Street|            Stow|   44224|        OH|United States|2018-04-21T12:49:...|
|4210653370850340|123456849|  AArias@example.com|   1237253|     Adele|  Constance|    Arias|    13|  Edgewood Road|          Oxnard|   93035|        CA|United States|2018-04-21T12:49:...|
|4210653379318909|123454439|  AAvila@example.com|   124

In [19]:
from pyspark.sql.functions import col

# Perform a join on a unique identifier (e.g., CUST_ID)
joined_df = customer_df.alias("original").join(ranked_df.alias("updated"), on="SSN")

# Filter to show only rows where the email was changed
corrected_duplicates_df = joined_df.filter(
    col("original.CUST_EMAIL") != col("updated.CUST_EMAIL")
).select(
    col("original.SSN"),
    col("original.FIRST_NAME"),
    col("original.MIDDLE_NAME"),
    col("original.LAST_NAME"),
    col("original.CUST_EMAIL").alias("original_email"),
    col("updated.CUST_EMAIL").alias("updated_email"),
)

corrected_duplicates_df.show(24)


+---------+----------+-----------+---------+--------------------+--------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|      original_email|       updated_email|
+---------+----------+-----------+---------+--------------------+--------------------+
|123452454|    Bertie|   Mercedes|     York|   BYork@example.com|  BMYork@example.com|
|123458724|     Tonya|   Porfirio|      Liu|    TLiu@example.com|   TPLiu@example.com|
|123459742|        Jo|     Sylvia|  Godfrey|JGodfrey@example.com|JSGodfrey@example...|
|123454597|   Jillian|    Normand|    Emery|  JEmery@example.com| JNEmery@example.com|
|123451762|   Enrique|      Tasha|   Arthur| EArthur@example.com|ETArthur@example.com|
|123451070|   Rosanne|      Tania|   Deleon| RDeleon@example.com|RTDeleon@example.com|
|123458390| Sylvester|      Wyatt|  Padgett|SPadgett@example.com|SWPadgett@example...|
|123458353|    Susana|   Kendrick|   Crouch| SCrouch@example.com|SKCrouch@example.com|
|123457826|     Eliza|     Krista|   Truong

In [20]:
from pyspark.sql.functions import col, coalesce

# Join on SSN, prioritize updated emails
updated_customer_df = customer_df.alias("original").join(
    ranked_df.alias("updated"), on="SSN", how="left"
).select(
    # Include all original columns except the original CUST_EMAIL
    *[col("original." + c) for c in customer_df.columns if c != "CUST_EMAIL"],
    # Select the updated or original CUST_EMAIL column
    coalesce(col("updated.CUST_EMAIL"), col("original.CUST_EMAIL")).alias("CUST_EMAIL")
)

# Display the updated dataframe
updated_customer_df.show()


+----------------+---------+----------+----------+-----------+---------+------+-----------------+-------------+--------+----------+-------------+--------------------+--------------------+
|  CREDIT_CARD_NO|      SSN|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|APT_NO|      STREET_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|          CUST_EMAIL|
+----------------+---------+----------+----------+-----------+---------+------+-----------------+-------------+--------+----------+-------------+--------------------+--------------------+
|4210653310061055|123456100|   1237818|      Alec|         Wm|   Hooper|   656|Main Street North|      Natchez|   39120|        MS|United States|2018-04-21T12:49:...| AHooper@example.com|
|4210653310102868|123453023|   1238933|      Etta|    Brendan|   Holman|   829|    Redwood Drive| Wethersfield|   06109|        CT|United States|2018-04-21T12:49:...| EHolman@example.com|
|4210653310116272|123454487|   1243018|    Wilber|   Ezequie

In [21]:
#Reordering columns again as per the mapping document
updated_customer_df = updated_customer_df.select('CREDIT_CARD_NO', 'SSN', 'CUST_EMAIL', 'CUST_PHONE','FIRST_NAME','MIDDLE_NAME', 'LAST_NAME', 'APT_NO', 'STREET_NAME','CUST_CITY', 'CUST_ZIP', 'CUST_STATE', 'CUST_COUNTRY', 'LAST_UPDATED')

updated_customer_df.show(10)

+----------------+---------+--------------------+----------+----------+-----------+---------+------+-----------------+-------------+--------+----------+-------------+--------------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|APT_NO|      STREET_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+----------------+---------+--------------------+----------+----------+-----------+---------+------+-----------------+-------------+--------+----------+-------------+--------------------+
|4210653310061055|123456100| AHooper@example.com|   1237818|      Alec|         Wm|   Hooper|   656|Main Street North|      Natchez|   39120|        MS|United States|2018-04-21T12:49:...|
|4210653310102868|123453023| EHolman@example.com|   1238933|      Etta|    Brendan|   Holman|   829|    Redwood Drive| Wethersfield|   06109|        CT|United States|2018-04-21T12:49:...|
|4210653310116272|123454487| WDunham@example.com|   1243018|

In [22]:
updated_customer_df.select(funct.countDistinct("CUST_EMAIL")).show()

+--------------------------+
|count(DISTINCT CUST_EMAIL)|
+--------------------------+
|                       952|
+--------------------------+



### After transformation, the updated data frame shows 952 distinct customer emails. The dataframe has been updated accordingly. It is now updated_customer_df.

##### **The mapping document requires that the First and Last Names be entered in "Title" case. I will begin this transformation with the LAST_NAME column. However, I did notice that some of the last names with an "Mc" prefix do not have the first letter after the prefix beginning with a capital letter, as in "McKinney". First, I will transform these records to this format: "Mc[A-Z]"**

In [23]:
Last_Name_McC = updated_customer_df.filter(funct.col("LAST_NAME").startswith("Mc"))
total_count = Last_Name_McC.count()
print(total_count)
Last_Name_McC.show()


26
+----------------+---------+--------------------+----------+----------+-----------+----------+------+-----------------+----------------+--------+----------+-------------+--------------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME| LAST_NAME|APT_NO|      STREET_NAME|       CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+----------------+---------+--------------------+----------+----------+-----------+----------+------+-----------------+----------------+--------+----------+-------------+--------------------+
|4210653311015303|123454339|HMckinney@example...|   1238165|     Homer|      Henry|  Mckinney|   611|      East Avenue|         El Paso|   79930|        TX|United States|2018-04-21T12:49:...|
|4210653312838058|123452805| RMcneil@example.com|   1237290|     Reyes|    Chasity|    Mcneil|   850|    School Street|        Paterson|   07501|        NJ|United States|2018-04-21T12:49:...|
|4210653313088465|123455975|CMcdermot

In [24]:


# Apply a user defined function to capitalize the second and third letters
def fix_mc_name(name):
    if name and len(name) >= 3 and name.startswith("Mc"):
        return f"Mc{name[2].upper()}{name[3:].lower()}"
    return name

fix_mc_name_udf = funct.udf(fix_mc_name)

# Update the original DataFrame
updated_customer_df = updated_customer_df.withColumn(
    "LAST_NAME", fix_mc_name_udf(funct.col("LAST_NAME"))
)

# Display the updated DataFrame to verify changes
updated_customer_df.show()


+----------------+---------+--------------------+----------+----------+-----------+---------+------+-----------------+-------------+--------+----------+-------------+--------------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|APT_NO|      STREET_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+----------------+---------+--------------------+----------+----------+-----------+---------+------+-----------------+-------------+--------+----------+-------------+--------------------+
|4210653310061055|123456100| AHooper@example.com|   1237818|      Alec|         Wm|   Hooper|   656|Main Street North|      Natchez|   39120|        MS|United States|2018-04-21T12:49:...|
|4210653310102868|123453023| EHolman@example.com|   1238933|      Etta|    Brendan|   Holman|   829|    Redwood Drive| Wethersfield|   06109|        CT|United States|2018-04-21T12:49:...|
|4210653310116272|123454487| WDunham@example.com|   1243018|

In [25]:
Last_Name_McC = updated_customer_df.filter(funct.col("LAST_NAME").startswith("Mc"))
total_count = Last_Name_McC.count()
print(total_count)
Last_Name_McC.show()
#The last names have been adjusted to reflect the correct format to Mc[A-Z]

26
+----------------+---------+--------------------+----------+----------+-----------+----------+------+-----------------+----------------+--------+----------+-------------+--------------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME| LAST_NAME|APT_NO|      STREET_NAME|       CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+----------------+---------+--------------------+----------+----------+-----------+----------+------+-----------------+----------------+--------+----------+-------------+--------------------+
|4210653311015303|123454339|HMckinney@example...|   1238165|     Homer|      Henry|  McKinney|   611|      East Avenue|         El Paso|   79930|        TX|United States|2018-04-21T12:49:...|
|4210653312838058|123452805| RMcneil@example.com|   1237290|     Reyes|    Chasity|    McNeil|   850|    School Street|        Paterson|   07501|        NJ|United States|2018-04-21T12:49:...|
|4210653313088465|123455975|CMcdermot

#### Now, let us inititiate the transformation to title case for both the first and last names as per the mapping document.

In [26]:
first_name_filter = updated_customer_df.filter(~funct.col('FIRST_NAME').rlike('^[A-Z][a-z]+$'))
last_name_filter = updated_customer_df.filter(~funct.col('LAST_NAME').rlike('^[A-Z][a-z]+$'))

# ^[A-Z] checks to see if there is a single capital letter at the start of the string.
# [a-z]+$ checks to see if there are only lower case letters after the first letter
# and those letters also go on till the end of the string. Since this is using a ~ operation, the Mc[A-Z] format names I transformed will be identified in the filter. However, none of these have to be transformed. Likewise, no first names need to be transformed to reflect title case.
first_name_filter.select('FIRST_NAME').show()
last_name_filter.select('LAST_NAME').show(26)

+----------+
|FIRST_NAME|
+----------+
+----------+

+----------+
| LAST_NAME|
+----------+
|  McKinney|
|    McNeil|
| McDermott|
|   McCarty|
|  McKinley|
|     McKay|
|  McMullen|
|  McIntosh|
|McLaughlin|
|  McIntosh|
| McDermott|
|     McKee|
|  McKnight|
| McFarland|
|    McCann|
|McCullough|
|   McElroy|
|     McGee|
|McWilliams|
| McConnell|
|   McGuire|
| McConnell|
|  McCauley|
|   McClain|
|     McCoy|
|   McClain|
+----------+



##### No additional transformation of first and last names is required, all First and Last Names are in title case. The middle name must now be converted to lowercase as required by the mapping document.


In [27]:
from pyspark.sql.functions import lower

# Convert MIDDLE_NAME to lowercase
updated_customer_df = updated_customer_df.withColumn("MIDDLE_NAME", lower(funct.col("MIDDLE_NAME")))

# Display updated DataFrame
updated_customer_df.show()


+----------------+---------+--------------------+----------+----------+-----------+---------+------+-----------------+-------------+--------+----------+-------------+--------------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|APT_NO|      STREET_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+----------------+---------+--------------------+----------+----------+-----------+---------+------+-----------------+-------------+--------+----------+-------------+--------------------+
|4210653310061055|123456100| AHooper@example.com|   1237818|      Alec|         wm|   Hooper|   656|Main Street North|      Natchez|   39120|        MS|United States|2018-04-21T12:49:...|
|4210653310102868|123453023| EHolman@example.com|   1238933|      Etta|    brendan|   Holman|   829|    Redwood Drive| Wethersfield|   06109|        CT|United States|2018-04-21T12:49:...|
|4210653310116272|123454487| WDunham@example.com|   1243018|

#### Previously discussed, I will merge the APT_NO and STREET_NAME columns into a new column called FULL_STREET_ADDRESS.

In [28]:
from pyspark.sql.functions import concat, lit

# Concatenate APT_NO and STREET_NAME, adding a space in between
updated_customer_df = updated_customer_df.withColumn(
    "FULL_STREET_ADDRESS",
    concat(funct.col("APT_NO"), lit(" "), funct.col("STREET_NAME"))
)

updated_customer_df = updated_customer_df.drop("APT_NO", "STREET_NAME")

# Display updated DataFrame
updated_customer_df.show()


+----------------+---------+--------------------+----------+----------+-----------+---------+-------------+--------+----------+-------------+--------------------+--------------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED| FULL_STREET_ADDRESS|
+----------------+---------+--------------------+----------+----------+-----------+---------+-------------+--------+----------+-------------+--------------------+--------------------+
|4210653310061055|123456100| AHooper@example.com|   1237818|      Alec|         wm|   Hooper|      Natchez|   39120|        MS|United States|2018-04-21T12:49:...|656 Main Street N...|
|4210653310102868|123453023| EHolman@example.com|   1238933|      Etta|    brendan|   Holman| Wethersfield|   06109|        CT|United States|2018-04-21T12:49:...|   829 Redwood Drive|
|4210653310116272|123454487| WDunham@example.com|   1243018|    Wilber|   ezequi

**This transformation requires that the LAST_UPDATED column be in a timestamp format as per the mapping document**

In [29]:
from pyspark.sql import functions as funct


updated_customer_df = updated_customer_df.withColumn('LAST_UPDATED', funct.to_timestamp('LAST_UPDATED', 'yyyy-MM-dd\'T\'HH:mm:ss.SSSXXX'))
updated_customer_df.show()

+----------------+---------+--------------------+----------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|       LAST_UPDATED| FULL_STREET_ADDRESS|
+----------------+---------+--------------------+----------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+
|4210653310061055|123456100| AHooper@example.com|   1237818|      Alec|         wm|   Hooper|      Natchez|   39120|        MS|United States|2018-04-21 11:49:02|656 Main Street N...|
|4210653310102868|123453023| EHolman@example.com|   1238933|      Etta|    brendan|   Holman| Wethersfield|   06109|        CT|United States|2018-04-21 11:49:02|   829 Redwood Drive|
|4210653310116272|123454487| WDunham@example.com|   1243018|    Wilber|   ezequiel|  

### The area code for each customer is missing. The mapping document requires that the customer phone number be entered as a 10 digiit format of (xxx)xxx-xxxx. However, all the customer phone numbers are missing an area code. The challenge then becomes finding a way to map the location of each customer's home address to a viable area code. Very few resources exist for matching area codes to zip codes or city/town jurisdictions. I found a resource; however, that had an easy crosswalk from zip code and city/town to area codes that didnn't require multiple transformation steps. This dataset is in a CSV format that I will match to the existing updated_customer df to add area code values to each customer phone number. 

##### **NOTE: There are some zip codes that have more than 1 area code attached. For simplicity and ease of processing, the first area code listed will be utilized. Finding exact area codes would require signficant time resources and geocoding for exactly precise matches well beyond the intent of this ETL Capstone**

In [30]:
#area_code_df = spark.read.csv(r"C:\Users\chito\Developer\Capstone_350\Raw_Data\ZipData.csv", header=False, inferSchema=True)
#area_code_df = spark.read.csv("/content/ZipData.csv", header=False, inferSchema=True)#Google Colab code
area_code_df = spark.read.csv("ZipData.csv", header=False, inferSchema=True)
area_code_df = area_code_df.toDF('CUST_ZIP', 'CUST_STATE', 'CUST_CITY', 'CUST_COUNTY', 'AREA_CODE', 'TIME_ZONE', 'DST')
area_code_df.show()
area_code_df.count() # There are 42407 records in the area code dataframe.

+--------+----------+-----------+-----------+---------+----------+---+
|CUST_ZIP|CUST_STATE|  CUST_CITY|CUST_COUNTY|AREA_CODE| TIME_ZONE|DST|
+--------+----------+-----------+-----------+---------+----------+---+
|   00501|        NY| Holtsville|    Suffolk|  631/934| UTC -5:00|  Y|
|   00544|        NY| Holtsville|    Suffolk|  631/934| UTC -5:00|  Y|
|   00601|        PR|   Adjuntas|   Adjuntas|  787/939|UTC -04:00|  N|
|   00602|        PR|     Aguada|     Aguada|  787/939|UTC -04:00|  N|
|   00603|        PR|  Aguadilla|  Aguadilla|  787/939|UTC -04:00|  N|
|   00604|        PR|  Aguadilla|  Aguadilla|  787/939|UTC -04:00|  N|
|   00605|        PR|  Aguadilla|  Aguadilla|  787/939|UTC -04:00|  N|
|   00606|        PR|    Maricao|    Maricao|  787/939|UTC -04:00|  N|
|   00610|        PR|     Anasco|     Anasco|  787/939|UTC -04:00|  N|
|   00611|        PR|    Angeles|     Utuado|  787/939|UTC -04:00|  N|
|   00612|        PR|    Arecibo|    Arecibo|  787/939|UTC -04:00|  N|
|   00

42407

In [31]:
# Assuming 'updated_customer_df' is the left DataFrame and 'area_code_df' is the right DataFrame
joined_df = updated_customer_df.join(area_code_df,
                                  on=updated_customer_df['CUST_ZIP'] == area_code_df['CUST_ZIP'],
                                  how='left') \
                             .select(updated_customer_df["*"],
                                     area_code_df["CUST_CITY"].alias("right_CUST_CITY"),
                                     area_code_df["CUST_STATE"].alias("right_CUST_STATE"),
                                     area_code_df["CUST_COUNTY"].alias("right_CUST_COUNTY"),
                                     area_code_df["AREA_CODE"].alias("right_AREA_CODE"))

joined_df.show()
joined_df.count()
joined_df.printSchema()

+----------------+---------+--------------------+----------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+---------------+----------------+-----------------+---------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|       LAST_UPDATED| FULL_STREET_ADDRESS|right_CUST_CITY|right_CUST_STATE|right_CUST_COUNTY|right_AREA_CODE|
+----------------+---------+--------------------+----------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+---------------+----------------+-----------------+---------------+
|4210653310061055|123456100| AHooper@example.com|   1237818|      Alec|         wm|   Hooper|      Natchez|   39120|        MS|United States|2018-04-21 11:49:02|656 Main Street N...|        Natchez|              MS|            Adams|        601/769|


### Unfortunately, after the join the number of records increased from 952 to 979. I'm assuming there were duplicate values on the zip code since that is what was used to join the dataframes. I need to correct for this now.

In [32]:
duplicate_zip_counts = area_code_df.groupBy('CUST_ZIP').count().filter("count > 1")
duplicate_zip_counts.show()

+--------+-----+
|CUST_ZIP|count|
+--------+-----+
|   08648|    4|
|   77339|    2|
|   06063|    3|
|   60491|    2|
|   84118|    4|
|   90094|    2|
|   39218|    2|
|   84660|    2|
|   14227|    3|
|   74549|    2|
|   31027|    2|
|   56678|    3|
|   78641|    2|
|   02494|    2|
|   84511|    2|
|   00803|    2|
|   23847|    2|
|   38002|    2|
|   41135|    4|
|   02125|    2|
+--------+-----+
only showing top 20 rows



In [33]:
from pyspark.sql.functions import split, col

# Split the 'AREA_CODE' column by '/' and extract the first element

joined_df = joined_df.withColumn('right_AREA_CODE', split(col('right_AREA_CODE'), '/')[0])
joined_df.show()

+----------------+---------+--------------------+----------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+---------------+----------------+-----------------+---------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|       LAST_UPDATED| FULL_STREET_ADDRESS|right_CUST_CITY|right_CUST_STATE|right_CUST_COUNTY|right_AREA_CODE|
+----------------+---------+--------------------+----------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+---------------+----------------+-----------------+---------------+
|4210653310061055|123456100| AHooper@example.com|   1237818|      Alec|         wm|   Hooper|      Natchez|   39120|        MS|United States|2018-04-21 11:49:02|656 Main Street N...|        Natchez|              MS|            Adams|            601|


In [34]:
from pyspark.sql.functions import concat, lit

# Concatenate AREA_CODE and CUST_PHONE in the desired format 
joined_df = joined_df.withColumn('FORMATTED_PHONE',
                                 concat(lit('('),
                                        col('right_AREA_CODE'), lit(')'),
                                        col('CUST_PHONE').substr(1, 3), lit('-'),
                                        col('CUST_PHONE').substr(4, 4)))

joined_df.show()

+----------------+---------+--------------------+----------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+---------------+----------------+-----------------+---------------+---------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|       LAST_UPDATED| FULL_STREET_ADDRESS|right_CUST_CITY|right_CUST_STATE|right_CUST_COUNTY|right_AREA_CODE|FORMATTED_PHONE|
+----------------+---------+--------------------+----------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+---------------+----------------+-----------------+---------------+---------------+
|4210653310061055|123456100| AHooper@example.com|   1237818|      Alec|         wm|   Hooper|      Natchez|   39120|        MS|United States|2018-04-21 11:49:02|656 Main Street N...|        Natchez|    

In [35]:
# Replace CUST_PHONE with FORMATTED_PHONE
joined_df = joined_df.drop('CUST_PHONE').withColumnRenamed('FORMATTED_PHONE', 'CUST_PHONE')

joined_df.show()

+----------------+---------+--------------------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+---------------+----------------+-----------------+---------------+-------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|FIRST_NAME|MIDDLE_NAME|LAST_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|       LAST_UPDATED| FULL_STREET_ADDRESS|right_CUST_CITY|right_CUST_STATE|right_CUST_COUNTY|right_AREA_CODE|   CUST_PHONE|
+----------------+---------+--------------------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+---------------+----------------+-----------------+---------------+-------------+
|4210653310061055|123456100| AHooper@example.com|      Alec|         wm|   Hooper|      Natchez|   39120|        MS|United States|2018-04-21 11:49:02|656 Main Street N...|        Natchez|              MS|            Adams|            601|(60

In [36]:
# List of columns to drop
columns_to_drop = ["right_CUST_CITY", "right_CUST_STATE", "right_CUST_COUNTY", "right_AREA_CODE"]
# Drop the columns
joined_df = joined_df.drop(*columns_to_drop)

joined_df.show()
joined_df.describe().show()

+----------------+---------+--------------------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+-------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|FIRST_NAME|MIDDLE_NAME|LAST_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|       LAST_UPDATED| FULL_STREET_ADDRESS|   CUST_PHONE|
+----------------+---------+--------------------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+-------------+
|4210653310061055|123456100| AHooper@example.com|      Alec|         wm|   Hooper|      Natchez|   39120|        MS|United States|2018-04-21 11:49:02|656 Main Street N...|(601)123-7818|
|4210653310102868|123453023| EHolman@example.com|      Etta|    brendan|   Holman| Wethersfield|   06109|        CT|United States|2018-04-21 11:49:02|   829 Redwood Drive|(860)123-8933|
|4210653310116272|123454487| WDunham@example.com|    Wilber|   ezequie

In [37]:
from pyspark.sql.functions import col

updated_customer_df = updated_customer_df.select(col("*"), "CUST_PHONE")
updated_customer_df.show()

+----------------+---------+--------------------+----------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+----------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|       LAST_UPDATED| FULL_STREET_ADDRESS|CUST_PHONE|
+----------------+---------+--------------------+----------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+----------+
|4210653310061055|123456100| AHooper@example.com|   1237818|      Alec|         wm|   Hooper|      Natchez|   39120|        MS|United States|2018-04-21 11:49:02|656 Main Street N...|   1237818|
|4210653310102868|123453023| EHolman@example.com|   1238933|      Etta|    brendan|   Holman| Wethersfield|   06109|        CT|United States|2018-04-21 11:49:02|   829 Redwood Drive|   1238933|
|4210653310116272|123454487| W

In [38]:

# Select all columns from joined_df, including the renamed 'CUST_PHONE'
final_df = joined_df.select(col("*"))
final_df.show()

+----------------+---------+--------------------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+-------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|FIRST_NAME|MIDDLE_NAME|LAST_NAME|    CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|       LAST_UPDATED| FULL_STREET_ADDRESS|   CUST_PHONE|
+----------------+---------+--------------------+----------+-----------+---------+-------------+--------+----------+-------------+-------------------+--------------------+-------------+
|4210653310061055|123456100| AHooper@example.com|      Alec|         wm|   Hooper|      Natchez|   39120|        MS|United States|2018-04-21 11:49:02|656 Main Street N...|(601)123-7818|
|4210653310102868|123453023| EHolman@example.com|      Etta|    brendan|   Holman| Wethersfield|   06109|        CT|United States|2018-04-21 11:49:02|   829 Redwood Drive|(860)123-8933|
|4210653310116272|123454487| WDunham@example.com|    Wilber|   ezequie

In [39]:
final_df.printSchema()

root
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)



In [40]:
final_df = final_df.select('SSN','FIRST_NAME','MIDDLE_NAME','LAST_NAME','CREDIT_CARD_NO', 'FULL_STREET_ADDRESS', 'CUST_CITY', 'CUST_STATE', 'CUST_COUNTRY', 'CUST_ZIP', 'CUST_PHONE', 'CUST_EMAIL', 'LAST_UPDATED')

In [41]:
final_df.show()

+---------+----------+-----------+---------+----------------+--------------------+-------------+----------+-------------+--------+-------------+--------------------+-------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|  CREDIT_CARD_NO| FULL_STREET_ADDRESS|    CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|   CUST_PHONE|          CUST_EMAIL|       LAST_UPDATED|
+---------+----------+-----------+---------+----------------+--------------------+-------------+----------+-------------+--------+-------------+--------------------+-------------------+
|123456100|      Alec|         wm|   Hooper|4210653310061055|656 Main Street N...|      Natchez|        MS|United States|   39120|(601)123-7818| AHooper@example.com|2018-04-21 11:49:02|
|123453023|      Etta|    brendan|   Holman|4210653310102868|   829 Redwood Drive| Wethersfield|        CT|United States|   06109|(860)123-8933| EHolman@example.com|2018-04-21 11:49:02|
|123454487|    Wilber|   ezequiel|   Dunham|4210653310116272|683 12th 

In [42]:
final_df.count()
final_df.show(979)

+---------+----------+-----------+-------------+----------------+--------------------+-------------------+----------+-------------+--------+-------------+--------------------+-------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|    LAST_NAME|  CREDIT_CARD_NO| FULL_STREET_ADDRESS|          CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|   CUST_PHONE|          CUST_EMAIL|       LAST_UPDATED|
+---------+----------+-----------+-------------+----------------+--------------------+-------------------+----------+-------------+--------+-------------+--------------------+-------------------+
|123456100|      Alec|         wm|       Hooper|4210653310061055|656 Main Street N...|            Natchez|        MS|United States|   39120|(601)123-7818| AHooper@example.com|2018-04-21 11:49:02|
|123453023|      Etta|    brendan|       Holman|4210653310102868|   829 Redwood Drive|       Wethersfield|        CT|United States|   06109|(860)123-8933| EHolman@example.com|2018-04-21 11:49:02|
|123454487|    Wilbe

In [43]:
from pyspark.sql.functions import count

grouped_df = final_df.groupBy('CREDIT_CARD_NO').agg(count('*').alias('num_records'))
grouped_df.show(979)

+----------------+-----------+
|  CREDIT_CARD_NO|num_records|
+----------------+-----------+
|4210653373626291|          1|
|4210653383230786|          1|
|4210653328645639|          1|
|4210653374779886|          1|
|4210653385559404|          1|
|4210653319826232|          1|
|4210653330396787|          1|
|4210653349028689|          1|
|4210653354436037|          1|
|4210653313922436|          1|
|4210653317141342|          1|
|4210653380077193|          1|
|4210653382394208|          1|
|4210653385932779|          1|
|4210653316211933|          1|
|4210653327599222|          1|
|4210653342532702|          1|
|4210653378239100|          1|
|4210653393389344|          1|
|4210653397435570|          1|
|4210653316625296|          1|
|4210653343509148|          1|
|4210653315669217|          1|
|4210653321728527|          4|
|4210653347187692|          1|
|4210653379290313|          1|
|4210653396887042|          1|
|4210653325318749|          1|
|4210653333257054|          1|
|4210653

In [44]:
duplicate_credit_card_df = grouped_df.filter(col('num_records') > 1)
duplicate_credit_card_df.show(30)

+----------------+-----------+
|  CREDIT_CARD_NO|num_records|
+----------------+-----------+
|4210653321728527|          4|
|4210653389719729|          2|
|4210653352282525|          2|
|4210653389005263|          2|
|4210653342740065|          2|
|4210653313338885|          2|
|4210653344121779|          2|
|4210653369088511|          2|
|4210653315377872|          2|
|4210653347095962|          2|
|4210653388606505|          2|
|4210653361724866|          2|
|4210653384046762|          2|
|4210653355020087|          2|
|4210653334888971|          4|
|4210653379430189|          2|
|4210653327270061|          2|
|4210653371353793|          2|
|4210653366885341|          2|
|4210653314812374|          2|
|4210653331576279|          2|
|4210653344416975|          2|
|4210653328441999|          2|
+----------------+-----------+



In [45]:
duplicate_records_df = duplicate_credit_card_df.join(final_df, on='CREDIT_CARD_NO', how='inner')
duplicate_records_df.show(30)
#verifying the duplicates by Credit Card no and dropping the duplicates

+----------------+-----------+---------+----------+-----------+-------------+--------------------+----------+----------+-------------+--------+-------------+--------------------+-------------------+
|  CREDIT_CARD_NO|num_records|      SSN|FIRST_NAME|MIDDLE_NAME|    LAST_NAME| FULL_STREET_ADDRESS| CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|   CUST_PHONE|          CUST_EMAIL|       LAST_UPDATED|
+----------------+-----------+---------+----------+-----------+-------------+--------------------+----------+----------+-------------+--------+-------------+--------------------+-------------------+
|4210653321728527|          4|123452980|      Cora|      myrna|      Bullock|    41 School Street|Wellington|        FL|United States|   33414|(561)124-1537|CBullock@example.com|2018-04-21 11:49:02|
|4210653321728527|          4|123452980|      Cora|      myrna|      Bullock|    41 School Street|Wellington|        FL|United States|   33414|(561)124-1537|CBullock@example.com|2018-04-21 11:49:02|
|4210

In [46]:
final_customer_dedup_df = final_df.dropDuplicates(['CREDIT_CARD_NO'])
final_customer_dedup_df.count()
final_customer_dedup_df.describe().show()
final_customer_dedup_df.show()
#Once again, we the final deduplicated data frame shows 952 records

+-------+--------------------+----------+-----------+---------+--------------------+-------------------+---------+----------+-------------+------------------+-------------+--------------------+
|summary|                 SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|      CREDIT_CARD_NO|FULL_STREET_ADDRESS|CUST_CITY|CUST_STATE| CUST_COUNTRY|          CUST_ZIP|   CUST_PHONE|          CUST_EMAIL|
+-------+--------------------+----------+-----------+---------+--------------------+-------------------+---------+----------+-------------+------------------+-------------+--------------------+
|  count|                 952|       952|        952|      952|                 952|                952|      952|       952|          952|               952|          952|                 952|
|   mean|1.2345552588130252E8|      NULL|       NULL|     NULL|4.210653353718597...|               NULL|     NULL|      NULL|         NULL|36312.616596638654|         NULL|                NULL|
| stddev|  2561.1858044909427|

### **Writing the final_customer_dedup_df directly to the creditcard_capstone DB is key. Below, you will notice several code blocks in which I tried roundabout methods of loading data. TRANSFORMING INTO A PANDAS DATAFRAME AND THEN READING TO JSON FILES CREATES AN INFINITE NUMBER OF ISSUES. DO NOT ATTEMPT THIS APPROACH!! 

In [47]:
final_customer_dedup_df.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_CUSTOMER ") \
  .option("user", cred.user) \
  .option("password", cred.password) \
  .save()


#### For the sake of verification, I copied the final_customer_dedup_df to a pandas dataframe to review the set up of the columns and format of the table to ensure the mysql table matches.

In [48]:
pandas_df = final_customer_dedup_df.toPandas()
pandas_df.head()

Unnamed: 0,SSN,FIRST_NAME,MIDDLE_NAME,LAST_NAME,CREDIT_CARD_NO,FULL_STREET_ADDRESS,CUST_CITY,CUST_STATE,CUST_COUNTRY,CUST_ZIP,CUST_PHONE,CUST_EMAIL,LAST_UPDATED
0,123456100,Alec,wm,Hooper,4210653310061055,656 Main Street North,Natchez,MS,United States,39120,(601)123-7818,AHooper@example.com,2018-04-21 11:49:02
1,123453023,Etta,brendan,Holman,4210653310102868,829 Redwood Drive,Wethersfield,CT,United States,6109,(860)123-8933,EHolman@example.com,2018-04-21 11:49:02
2,123454487,Wilber,ezequiel,Dunham,4210653310116272,683 12th Street East,Huntley,IL,United States,60142,(847)124-3018,WDunham@example.com,2018-04-21 11:49:02
3,123459758,Eugenio,trina,Hardy,4210653310195948,253 Country Club Road,New Berlin,WI,United States,53151,(262)124-3215,EHardy@example.com,2018-04-21 11:49:02
4,123454431,Wilfred,may,Ayers,4210653310356919,301 Madison Street,El Paso,TX,United States,79930,(915)124-2074,WAyers@example.com,2018-04-21 11:49:02


In [49]:
!pip list

Package                   Version
------------------------- -----------
asttokens                 2.4.1
attrs                     23.2.0
blinker                   1.8.2
branca                    0.7.2
certifi                   2024.2.2
cffi                      1.16.0
charset-normalizer        3.3.2
click                     8.1.7
colorama                  0.4.6
comm                      0.2.2
contourpy                 1.2.1
cryptography              42.0.8
cycler                    0.12.1
dash                      2.17.0
dash-core-components      2.0.0
dash-html-components      2.0.0
dash-table                5.0.0
debugpy                   1.8.1
decorator                 5.1.1
executing                 2.0.1
fastjsonschema            2.19.1
findspark                 2.0.1
Flask                     3.0.3
folium                    0.17.0
fonttools                 4.51.0
greenlet                  3.0.3
grpcio                    1.64.1
grpcio-tools              1.64.1
idna               