In [15]:
import pyspark
import mysql.connector as mydbconnection
from mysql.connector import Error
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from credentials import mysql_username, mysql_password

# create the SparkSession
spark = SparkSession.builder.appName('branch-pyspark').getOrCreate()

In [16]:
# Load / Read data from json file:
df_branch = spark.read.json("json_source_data/cdw_sapp_branch.json")
df_branch.show()

+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|       Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|  Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank|  1234985926|          NY|      Warren Street|     11419|2018-04-18T16:51:...|
|       Middleburg|          4|Example Bank|  1234663064|          FL|   Cleveland Street|     32068|2018-04-18T16:51:...|
|    KingOfPrussia|          5|Example Bank|  1234849701|          PA|        14th Street|     19406|2018-04-18T16:51:...|
|         Paters

In [17]:
df_branch.columns
df_branch.printSchema()
df_branch.describe().show()
df_branch.show()

root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)

+-------+-----------+-----------------+------------+--------------------+------------+-------------+------------------+--------------------+
|summary|BRANCH_CITY|      BRANCH_CODE| BRANCH_NAME|        BRANCH_PHONE|BRANCH_STATE|BRANCH_STREET|        BRANCH_ZIP|        LAST_UPDATED|
+-------+-----------+-----------------+------------+--------------------+------------+-------------+------------------+--------------------+
|  count|        115|              115|         115|                 115|         115|          115|               115|                 115|
|   mean|       null|76.67826086956522|        null|1.2345499259478261E9|        null|

In [18]:
df_branch.select(F.countDistinct("BRANCH_PHONE")).show()

+----------------------------+
|count(DISTINCT BRANCH_PHONE)|
+----------------------------+
|                         115|
+----------------------------+



In [19]:
# find zip codes with less than 5 values
df_branch.filter(F.length(df_branch["BRANCH_ZIP"]) < 5)\
         .select('BRANCH_CITY', 'BRANCH_STATE', 'BRANCH_ZIP')\
         .show()

+------------+------------+----------+
| BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP|
+------------+------------+----------+
|    Paterson|          NJ|      7501|
|Wethersfield|          CT|      6109|
|Hillsborough|          NJ|      8844|
|     Medford|          MA|      2155|
|    Rockaway|          NJ|      7866|
|  LongBranch|          NJ|      7740|
|   Irvington|          NJ|      7111|
|    NewHaven|          CT|      6511|
|      Quincy|          MA|      2169|
+------------+------------+----------+



In [20]:
# add a 0 infront of zip codes with 4 values
df_branch = df_branch.withColumn('BRANCH_ZIP',\
                    F.when((F.length(df_branch['BRANCH_ZIP']) == 4) &
                        df_branch['BRANCH_STATE'].isin(["NJ", "CT", "MA"]),
                    F.format_string("0%s",df_branch['BRANCH_ZIP']))\
                    .otherwise(df_branch["BRANCH_ZIP"]))

In [21]:
# verify there are no more zip codes with less than 5 values
df_branch.select('BRANCH_ZIP')\
    .where(F.length(df_branch["BRANCH_ZIP"]) < 4).show()

+----------+
|BRANCH_ZIP|
+----------+
+----------+



In [22]:
df_branch = df_branch.withColumn('BRANCH_PHONE',
            F.format_string("(%s)%s-%s", 
                            F.substring(df_branch['BRANCH_PHONE'], 1, 3), 
                            F.substring(df_branch['BRANCH_PHONE'], 4, 3), 
                            F.substring(df_branch['BRANCH_PHONE'], 7, 4)))


In [23]:
df_branch = df_branch.withColumn(
    'BRANCH_CITY',
    F.regexp_replace(df_branch['BRANCH_CITY'], "(?<=.)([A-Z])", ' $1')
)

In [24]:
df_branch.columns
df_branch = df_branch.select("BRANCH_CODE", "BRANCH_NAME", "BRANCH_STREET", "BRANCH_CITY", "BRANCH_STATE", "BRANCH_ZIP", "BRANCH_PHONE", "LAST_UPDATED")
df_branch.show()

+-----------+------------+-------------------+-------------------+------------+----------+-------------+--------------------+
|BRANCH_CODE| BRANCH_NAME|      BRANCH_STREET|        BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP| BRANCH_PHONE|        LAST_UPDATED|
+-----------+------------+-------------------+-------------------+------------+----------+-------------+--------------------+
|          1|Example Bank|       Bridle Court|          Lakeville|          MN|     55044|(123)456-5276|2018-04-18T16:51:...|
|          2|Example Bank|  Washington Street|            Huntley|          IL|     60142|(123)461-8993|2018-04-18T16:51:...|
|          3|Example Bank|      Warren Street|South Richmond Hill|          NY|     11419|(123)498-5926|2018-04-18T16:51:...|
|          4|Example Bank|   Cleveland Street|         Middleburg|          FL|     32068|(123)466-3064|2018-04-18T16:51:...|
|          5|Example Bank|        14th Street|    King Of Prussia|          PA|     19406|(123)484-9701|2018-04-18T16:

In [25]:
df_branch.printSchema()
df_branch.describe().show()


root
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_ZIP: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = false)
 |-- LAST_UPDATED: string (nullable = true)

+-------+-----------------+------------+-------------+-----------+------------+------------------+-------------+--------------------+
|summary|      BRANCH_CODE| BRANCH_NAME|BRANCH_STREET|BRANCH_CITY|BRANCH_STATE|        BRANCH_ZIP| BRANCH_PHONE|        LAST_UPDATED|
+-------+-----------------+------------+-------------+-----------+------------+------------------+-------------+--------------------+
|  count|              115|         115|          115|        115|         115|               115|          115|                 115|
|   mean|76.67826086956522|        null|         null|       null|        null|  38975.2347826087|         null

In [26]:
# convert the PySpark DataFrame to a pandas DataFrame
df_branch = df_branch.toPandas()


In [27]:
# branch tabke DDL
branch_table = (
    "CREATE TABLE IF NOT EXISTS `cdw_sapp_branch` ("
    "BRANCH_CODE INT PRIMARY KEY," 
    "BRANCH_NAME VARCHAR(55)," 
    "BRANCH_STREET VARCHAR(55)," 
    "BRANCH_CITY VARCHAR(55)," 
    "BRANCH_STATE VARCHAR(55)," 
    "BRANCH_ZIP INT DEFAULT 999999," 
    "BRANCH_PHONE VARCHAR(15)," 
    "LAST_UPDATED TIMESTAMP"
    ")"
)

In [28]:
try:
    conn = mydbconnection.connect(database='creditcard_capstone', user=mysql_username, password=mysql_password)
    cursor = conn.cursor()

    # Create the table
    print('Creating cdw_sapp_branch table....')
    cursor.execute(branch_table)
    print("cdw_sapp_branch table is created....")

    # Prepare the data for batch insertion
    data_to_insert = [tuple(row) for _, row in df_branch.iterrows()]
    sql = "INSERT INTO creditcard_capstone.cdw_sapp_branch VALUES (%s,%s,%s,%s,%s,%s,%s,%s)"

    # Batch insert the data
    cursor.executemany(sql, data_to_insert)
    print(f"All {len(data_to_insert)} Branch Records inserted")

    # Commit the transaction
    conn.commit()
    print("Branch data fully loaded")

except Error as e:
    print("Error while connecting to MySQL", e)

finally:
    # Ensure the connection is closed
    if conn and conn.is_connected():
        cursor.close()
        conn.close()
        print("MySQL connection is closed.")

Creating cdw_sapp_branch table....
cdw_sapp_branch table is created....
Error while connecting to MySQL 1062 (23000): Duplicate entry '1' for key 'cdw_sapp_branch.PRIMARY'
MySQL connection is closed.
