In [128]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext 

spark = SparkSession.builder \
    .appName("PySpark MySQL Connection") \
    .config("spark.jars", "/Users/iris/Downloads/mysql-connector-java-8.0.23.jar") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()
    

In [129]:
spark.stop()

In [40]:
# spark.conf.set("spark.executor.heartbeatInterval", "200000") \
#     .set("spark.network.timeout", "300000")

In [41]:
# CDW_SAPP_CUSTOMER.JSON: This file has the existing customer details.
# CDW_SAPP_CREDITCARD.JSON: This file contains all credit card transaction information.
# CDW_SAPP_BRANCH.JSON: Each branch’s information and details are recorded in this file. 

In [125]:
path_customer = './data/cdw_sapp_custmer.json'
path_credit = './data/cdw_sapp_credit.json'
path_branch = './data/cdw_sapp_branch.json'
loan_endpoint = 'https://raw.githubusercontent.com/platformps/LoanDataset/main/loan_data.json'

In [43]:
branch = spark.read.json(path_branch)
credit = spark.read.json(path_credit)
customer = spark.read.json(path_customer)


In [44]:
branch.show(3)

+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|    BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|     Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank|  1234985926|          NY|    Warren Street|     11419|2018-04-18T16:51:...|
+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
only showing top 3 rows



In [45]:
credit.show(3)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             56.7|2018|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
only showing top 3 rows



In [46]:
customer.show(3)

+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States|AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         Wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States|EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    Brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States|WDunham@example.co

In [47]:
from pyspark.sql.functions import format_string, substring, lpad, concat, initcap, lower, concat_ws
from pyspark.sql.types import IntegerType, TimestampType, StringType

In [48]:
customer.printSchema()

root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)



In [49]:
# CUST_PHONE	Change the format of phone number to (XXX)XXX-XXXX
# STREET_NAME,APT_NO	Concatenate Apartment no and Street name of customer's Residence with comma as a seperator (Street, Apartment)
# LAST_NAME	Convert the Last Name in Title Case
# MIDDLE_NAME	Convert the middle name in lower case
# FIRST_NAME	Convert the Name to Title Case

In [50]:
# CUST_PHONE	VARCHAR
# FULL_STREET_ADDRESS	VARCHAR
# LAST_NAME	VARCHAR
# MIDDLE_NAME	VARCHAR
# FIRST_NAME	VARCHAR
# SSN	INT
# CUST_CITY	VARCHAR
# CUST_STATE	VARCHAR
# CUST_COUNTRY	VARCHAR
# CUST_ZIP	INT
# CUST_EMAIL	VARCHAR
# LAST_UPDATED	TIMESTAMP
# Credit_card_no	VARCHAR

In [51]:
# initcap
# lower
# format_string
from pyspark.sql.functions import col, concat, lit
# df = df.withColumn("column_join", concat(col("column_1"), lit("-"), col("column_2"), lit("-"), col("column_3")))
customer_df = customer \
              .withColumn("FIRST_NAME",
                          initcap(customer["FIRST_NAME"])) \
              .withColumn("MIDDLE_NAME",
                          lower(customer["MIDDLE_NAME"])) \
              .withColumn("LAST_NAME",
                          initcap(customer["LAST_NAME"])) \
              .withColumn("CUST_PHONE",
                          format_string("(000)%s-%s", substring("CUST_PHONE", 1, 3), substring("CUST_PHONE", 4, 7))) \
              .withColumn('FULL_STREET_ADDRESS', 
                          concat(customer['APT_NO'], lit(','), customer['STREET_NAME'])) \
              .withColumn('SSN', 
                          customer['SSN'].cast(IntegerType())) \
              .withColumn('CUST_ZIP', 
                          customer['CUST_ZIP'].cast(IntegerType())) \
              .withColumn('LAST_UPDATED', 
                          customer['LAST_UPDATED'].cast(TimestampType()))


In [52]:
customer_df.show(3)

+------+----------------+------------+-------------+-------------------+-------------+----------+--------+----------+---------+-------------------+-----------+---------+-----------------+--------------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|   CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|       LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME| FULL_STREET_ADDRESS|
+------+----------------+------------+-------------+-------------------+-------------+----------+--------+----------+---------+-------------------+-----------+---------+-----------------+--------------------+
|   656|4210653310061055|     Natchez|United States|AHooper@example.com|(000)123-7818|        MS|   39120|      Alec|   Hooper|2018-04-21 09:49:02|         wm|123456100|Main Street North|656,Main Street N...|
|   829|4210653310102868|Wethersfield|United States|EHolman@example.com|(000)123-8933|        CT|    6109|      Etta|   Holman|2018-04-21 09:49:02|    brendan|12345

In [53]:
columns_to_drop = ['STREET_NAME','APT_NO']
customer_df = customer_df.drop(*columns_to_drop)
customer_df.show()


+----------------+------------+-------------+--------------------+-------------+----------+--------+----------+---------+-------------------+-----------+---------+--------------------+
|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|   CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|       LAST_UPDATED|MIDDLE_NAME|      SSN| FULL_STREET_ADDRESS|
+----------------+------------+-------------+--------------------+-------------+----------+--------+----------+---------+-------------------+-----------+---------+--------------------+
|4210653310061055|     Natchez|United States| AHooper@example.com|(000)123-7818|        MS|   39120|      Alec|   Hooper|2018-04-21 09:49:02|         wm|123456100|656,Main Street N...|
|4210653310102868|Wethersfield|United States| EHolman@example.com|(000)123-8933|        CT|    6109|      Etta|   Holman|2018-04-21 09:49:02|    brendan|123453023|   829,Redwood Drive|
|4210653310116272|     Huntley|United States| WDunham@example.com|(000)124-

In [54]:
customer_df.printSchema()

root
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: string (nullable = false)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: integer (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)



In [55]:
branch.show()

+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|       Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|  Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank|  1234985926|          NY|      Warren Street|     11419|2018-04-18T16:51:...|
|       Middleburg|          4|Example Bank|  1234663064|          FL|   Cleveland Street|     32068|2018-04-18T16:51:...|
|    KingOfPrussia|          5|Example Bank|  1234849701|          PA|        14th Street|     19406|2018-04-18T16:51:...|
|         Paters

In [56]:
# BRANCH_ZIP	If the source value is null load default (99999) value else Direct move
# BRANCH_PHONE	Change the format of phone number to (XXX)XXX-XXXX

# BRANCH_ZIP	INT
# BRANCH_NAME	VARCHAR
# BRANCH_STREET	VARCHAR
# BRANCH_CITY	VARCHAR
# BRANCH_STATE	VARCHAR
# BRANCH_CODE	INT
# LAST_UPDATED	TIMESTAMP
# BRANCH_PHONE	VARCHAR

branch_df = branch \
            .withColumn("BRANCH_CODE",
                        branch["BRANCH_CODE"].cast(IntegerType())) \
            .withColumn("BRANCH_ZIP",
                        branch["BRANCH_ZIP"].cast(IntegerType())) \
            .withColumn("LAST_UPDATED",
                        branch["LAST_UPDATED"].cast(TimestampType())) \
            .withColumn("BRANCH_PHONE",
                        format_string("(%s)%s-%s", substring("BRANCH_PHONE", 1, 3), substring("BRANCH_PHONE", 4, 3), substring("BRANCH_PHONE", 7, 4)))
branch_df.na.fill(value=99999, subset=["BRANCH_ZIP"])


DataFrame[BRANCH_CITY: string, BRANCH_CODE: int, BRANCH_NAME: string, BRANCH_PHONE: string, BRANCH_STATE: string, BRANCH_STREET: string, BRANCH_ZIP: int, LAST_UPDATED: timestamp]

In [57]:
branch_df.show(3)

+-----------------+-----------+------------+-------------+------------+-----------------+----------+-------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME| BRANCH_PHONE|BRANCH_STATE|    BRANCH_STREET|BRANCH_ZIP|       LAST_UPDATED|
+-----------------+-----------+------------+-------------+------------+-----------------+----------+-------------------+
|        Lakeville|          1|Example Bank|(123)456-5276|          MN|     Bridle Court|     55044|2018-04-18 13:51:47|
|          Huntley|          2|Example Bank|(123)461-8993|          IL|Washington Street|     60142|2018-04-18 13:51:47|
|SouthRichmondHill|          3|Example Bank|(123)498-5926|          NY|    Warren Street|     11419|2018-04-18 13:51:47|
+-----------------+-----------+------------+-------------+------------+-----------------+----------+-------------------+
only showing top 3 rows



In [58]:
branch_df.printSchema()

root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: integer (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = false)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: integer (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)



In [59]:
credit.show(3)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             56.7|2018|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
only showing top 3 rows



In [60]:
credit.printSchema()

root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)



In [61]:
# CUST_CC_NO	VARCHAR +
# CUST_SSN	INT +
# BRANCH_CODE	INT +
# TRANSACTION_TYPE	VARCHAR
# TRANSACTION_VALUE	DOUBLE +
# TRANSACTION_ID	INT +

# CREDIT_CARD_NO to CUST_CC_NO +

from pyspark.sql.types import DoubleType

credit_df = credit \
            .withColumn("CUST_CC_NO",
                        credit["CREDIT_CARD_NO"]) \
            .withColumn("CUST_SSN",
                        credit["CUST_SSN"].cast(IntegerType())) \
            .withColumn("DAY",
                        credit["DAY"].cast(StringType())) \
            .withColumn("MONTH",
                        credit["MONTH"].cast(StringType())) \
            .withColumn("YEAR",
                        credit["YEAR"].cast(StringType())) \
            .withColumn("BRANCH_CODE",
                        credit["BRANCH_CODE"].cast(IntegerType())) \
            .withColumn("TRANSACTION_ID",
                        credit["TRANSACTION_ID"].cast(IntegerType())) \
            .withColumn("TRANSACTION_VALUE",
                        credit["TRANSACTION_VALUE"].cast(DoubleType()))


In [62]:
credit_df.show(3)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+----------------+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|      CUST_CC_NO|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+----------------+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|4210653349028689|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|4210653349028689|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             56.7|2018|4210653349028689|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+----------------+
only showing top 3 rows



In [63]:
# DAY, MONTH, YEAR	Convert DAY, MONTH, and YEAR into a TIMEID (YYYYMMDD
# TIMEID 	VARCHAR

# lpad - df.select(lpad(df.s, 6, '#').alias('s')).collect()
# [Row(s='##abcd')]

credit_df = credit_df \
            .withColumn("DAY",
                        lpad(credit_df["DAY"], 2, "0")) \
            .withColumn("MONTH",
                        lpad(credit_df["MONTH"], 2, "0"))

credit_df = credit_df.withColumn("TIMEID", concat(credit_df['YEAR'], credit_df['MONTH'], credit_df['DAY']))
    

In [64]:
credit_df.show(2)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+----------------+--------+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|      CUST_CC_NO|  TIMEID|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+----------------+--------+
|        114|4210653349028689|123459988| 14|   02|             1|       Education|             78.9|2018|4210653349028689|20180214|
|         35|4210653349028689|123459988| 20|   03|             2|   Entertainment|            14.24|2018|4210653349028689|20180320|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+----------------+--------+
only showing top 2 rows



In [65]:
columns_to_drop = ['CREDIT_CARD_NO']
credit_df = credit_df.drop(*columns_to_drop)
credit_df = credit_df.withColumn("TIMEID", credit_df["TIMEID"].cast(StringType()))


In [66]:
credit_df.show(2)

+-----------+---------+---+-----+--------------+----------------+-----------------+----+----------------+--------+
|BRANCH_CODE| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|      CUST_CC_NO|  TIMEID|
+-----------+---------+---+-----+--------------+----------------+-----------------+----+----------------+--------+
|        114|123459988| 14|   02|             1|       Education|             78.9|2018|4210653349028689|20180214|
|         35|123459988| 20|   03|             2|   Entertainment|            14.24|2018|4210653349028689|20180320|
+-----------+---------+---+-----+--------------+----------------+-----------------+----+----------------+--------+
only showing top 2 rows



In [67]:
credit_df.printSchema()

root
 |-- BRANCH_CODE: integer (nullable = true)
 |-- CUST_SSN: integer (nullable = true)
 |-- DAY: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- TRANSACTION_ID: integer (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: string (nullable = true)
 |-- CUST_CC_NO: string (nullable = true)
 |-- TIMEID: string (nullable = true)



In [None]:
import os 
file_path = "./secret.yaml"  # Replace with the actual path

# Read the file and extract credentials
credentials = {}
with open(file_path, 'r') as file:
    for line in file:
        key, value = line.strip().split(': ')
        credentials[key] = value

# Access the credentials
mypass = credentials.get('MYPASS')
mylogin = credentials.get('MYLOGIN')

In [92]:
# creditcard_capstone database
import mysql.connector

db_connection = mysql.connector.connect(user=mylogin, password=mypass)
db_cursor = db_connection.cursor()

db_cursor.execute("CREATE DATABASE IF NOT EXISTS creditcard_capstone;")

db_cursor.execute("USE creditcard_capstone;")

In [None]:
# Set the JDBC URL and table name
url='jdbc:mysql://localhost:3306/creditcard_capstone'

# Path to your MySQL JDBC connector JAR
mysql_jar_path = "/Users/iris/Downloads/mysql-connector-java-8.0.23.jar"

# Set the properties including user and password
properties = {
    "user": mylogin,
    "password": mypass,
    "driver": "com.mysql.jdbc.Driver"
}

# Write DataFrame to MySQL
branch_df.write.jdbc(url=url, table="CDW_SAPP_BRANCH", mode="overwrite", properties=properties)
credit_df.write.jdbc(url = url, table="CDW_SAPP_CREDIT_CARD", mode="overwrite", properties=properties)
customer_df.write.jdbc(url = url, table="CDW_SAPP_CUSTOMER", mode="overwrite", properties=properties)

Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
                                                                                

In [None]:
spark.stop()

In [None]:
# Find the Spark Installation Directory:
# If you installed Apache Spark using a package manager or manually, you need to locate the installation directory. It's commonly found in /usr/local/spark.

# Navigate to the jars Directory:
# Inside the Spark installation directory, you'll find a directory named jars. Navigate to this directory.

# Place the JDBC Connector JAR File:
# Copy the MySQL JDBC connector JAR file (e.g., mysql-connector-java-x.y.z.jar) into this jars directory.

# Restart Spark Application:
# If you have a running Spark application, stop and restart it to apply the changes.

In [None]:
# DataFrame Transformations: select() filter() groupby() orderBy() dropDuplicates() withColumnRenamed()
# sort as transformation: sortedDF = originalDF.sort("column_name")

# DataFrame Actions: printSchema() show() count() columns() describe()
# sort as action: originalDF.sort("column_name").show()

In [31]:
# os.environ['MYSQL_User'] = 'root'
# os.environ['MYSQL_Password'] = 'root'
# os.environ['MYSQL_Driver'] = 'com.mysql.jdbc.Driver'

In [32]:
# import os
# # Set the properties including user and password
# properties = {
#     "user": os.getenv('MYSQL_User'),
#     "password": os.getenv('MYSQL_Password'),
#     "driver": os.getenv('MYSQL_Driver')
# }