In [5]:
# 1.1 Functional Requirements - Load Credit Card Database(SQL)
def display_customer_table():
    # pyspark code to read customer json file
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import initcap, lower,concat,lit,regexp_replace
    try:
    # Create a SparkSession
        spark = SparkSession.builder.appName("Read_Customer_JSON").getOrCreate()

        # Specify the path to the JSON file
        json_file_path = r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_custmer.json"

        # Read the JSON file into a DataFrame
        df = spark.read.json(json_file_path)
        df = df.withColumn("CUST_PHONE", concat(lit("337"), df["CUST_PHONE"]))

        formated_df = df.withColumn("ADDRESS", concat(df["APT_NO"], lit(", "), df["STREET_NAME"])) \
                        .withColumn("CUST_PHONE", regexp_replace(df["CUST_PHONE"].cast("string"), "(\\d{3})(\\d{3})(\\d{4})", "($1)$2-$3")) \
                        .withColumn("FIRST_NAME", initcap("FIRST_NAME")) \
                        .withColumn("MIDDLE_NAME", lower("MIDDLE_NAME")) \
                        .withColumn("LAST_NAME", initcap("LAST_NAME"))

        # Show the DataFrame
        formated_df.select("FIRST_NAME", "MIDDLE_NAME", "LAST_NAME", "SSN", "CREDIT_CARD_NO", "ADDRESS", "CUST_CITY", "CUST_STATE", "CUST_COUNTRY", "CUST_ZIP", "CUST_PHONE", "CUST_EMAIL", "LAST_UPDATED").show(formated_df.count(),truncate=False)
    except Exception as e:
        print(e)
    finally:
        # Stop the SparkSession
        spark.stop()
        

In [2]:
def display_branch_table():
    # pyspark to read branch json file
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import initcap, lower,concat,lit,regexp_replace
    try:
    # Create a SparkSession
        spark = SparkSession.builder.appName("Read_branch_JSON").getOrCreate()

        # Specify the path to the JSON file
        json_file_path = r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_branch.json"

        # Read the JSON file into a DataFrame
        df = spark.read.json(json_file_path)
        formated_df = df.withColumn("BRANCH_PHONE", regexp_replace(df["BRANCH_PHONE"].cast("string"), "(\\d{3})(\\d{3})(\\d{4})", "($1)$2-$3"))
     
    # Show the DataFrame
        formated_df.select("BRANCH_CODE","BRANCH_NAME","BRANCH_STREET","BRANCH_CITY","BRANCH_STATE","BRANCH_ZIP","BRANCH_PHONE","LAST_UPDATED").show(df.count(),truncate=False)
    except Exception as e: 
        print(e)
    finally:
    # Stop the SparkSession
        spark.stop()

In [4]:
def display_credit_table():
    # pyspark to read credit json file
    from pyspark.sql import SparkSession
    
    # Create a SparkSession
    spark = SparkSession.builder.appName("Read_credit_JSON").getOrCreate()

    # Specify the path to the JSON file
    json_file_path = r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_credit.json"

    # Read the JSON file into a DataFrame
    df = spark.read.json(json_file_path)

    try: 
    # Show the DataFrame
        df.select("TRANSACTION_ID","DAY","MONTH","YEAR","CREDIT_CARD_NO","CUST_SSN","BRANCH_CODE","TRANSACTION_TYPE","TRANSACTION_VALUE").show(df.count(),truncate=False)
    except Exception as e: 
        print(e)
    # Stop the SparkSession
    finally:
        spark.stop()


In [12]:
# 1.2 Data Loading into Database
def create_database_in_mysql():
    import mysql.connector
    import json
    try:
        conn = mysql.connector.connect(
            host="localhost",
            user="root",
            password="password",
        )
    
    # Execute a SQL query to create the database
        cursor = conn.cursor()
        cursor.execute("CREATE DATABASE IF NOT EXISTS `creditcard_capstone`")
        conn.commit()
        print("Database created successfully")
    except Exception as e:
        print(f"Error creating database: {e}")
    finally:
    # Close the Spark session
        cursor.close()
        conn.close()

In [41]:
def create_table_in_mysql():
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import initcap, lower,concat,lit,regexp_replace
    try:
    # Create a Spark session
        spark = SparkSession.builder.appName("CreateTableInMySQL").config("spark.jars", r"E:\soft\Spark\spark-3.5.0-bin-hadoop3\jars\mysql-connector-j-8.3.0.jar").getOrCreate()

        # Define the JDBC URL for the MySQL database
        jdbc_url = "jdbc:mysql://localhost:3306/creditcard_capstone"

        df = spark.read.json(r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_custmer.json")
        df2 = spark.read.json(r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_branch.json")
        df3 = spark.read.json(r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_credit.json")
        df = df.withColumn("CUST_PHONE", concat(lit("337"), df["CUST_PHONE"]))
        df = df.withColumn("CUST_PHONE", regexp_replace(df["CUST_PHONE"].cast("string"), "(\\d{3})(\\d{3})(\\d{4})", "($1)$2-$3")) \
                .withColumn("FIRST_NAME", initcap("FIRST_NAME")) \
                .withColumn("MIDDLE_NAME", lower("MIDDLE_NAME")) \
                .withColumn("LAST_NAME", initcap("LAST_NAME"))        
                
        df2 = df2.withColumn("BRANCH_PHONE", regexp_replace(df2["BRANCH_PHONE"].cast("string"), "(\\d{3})(\\d{3})(\\d{4})", "($1)$2-$3"))

        # Write the DataFrame to the MySQL table 
        df.write.format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "CDW_SAPP_CUSTOMER") \
        .option("user", "root") \
        .option("password", "password") \
        .mode("overwrite") \
        .save()
        
        
        df2.write.format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "CDW_SAPP_BRANCH") \
        .option("user", "root") \
        .option("password", "password") \
        .mode("overwrite") \
        .save()
        
        df3.write.format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "CDW_SAPP_CREDIT_CARD") \
        .option("user", "root") \
        .option("password", "password") \
        .mode("overwrite") \
        .save()
    except Exception as e:
        print(f"Error creating table: {e}")
        
    finally:
    # Close the Spark session
        spark.stop()

In [43]:
create_table_in_mysql()

In [61]:
# Use this function to modify the tables and set up relationship between table and primary key
def modify_tables():
    import mysql.connector
    import json
    try:
        conn = mysql.connector.connect(
            host="localhost",
            user="root",
            password="password",
            database="creditcard_capstone"
        )
    
        # Execute a SQL query
        cursor = conn.cursor()
        
        cursor.execute("""
                        ALTER TABLE cdw_sapp_customer 
                        MODIFY COLUMN CREDIT_CARD_NO VARCHAR(255),
                        ADD PRIMARY KEY (SSN),
                        ADD INDEX credit_card_index (CREDIT_CARD_NO);
                        """)
        conn.commit()
        
        cursor.execute("""
                       ALTER TABLE cdw_sapp_branch
                       ADD PRIMARY KEY (BRANCH_CODE)
                       """)
        conn.commit()
        
        cursor.execute("""
                       ALTER TABLE cdw_sapp_credit_card
                       MODIFY COLUMN CREDIT_CARD_NO VARCHAR(255),
                       ADD PRIMARY KEY (TRANSACTION_ID),
                       ADD CONSTRAINT `branch_fk` FOREIGN KEY (`BRANCH_CODE`) REFERENCES `CDW_SAPP_BRANCH` (`BRANCH_CODE`),
                       ADD CONSTRAINT `credit_card_fk` FOREIGN KEY (CREDIT_CARD_NO) REFERENCES `CDW_SAPP_CUSTOMER`(CREDIT_CARD_NO)
        """)
        conn.commit()    
        

        
    except Exception as e:
        print(f"Error: {e}")
    finally:
    # Close the session
        cursor.close()
        conn.close()

In [11]:
# 2.1 Transaction Details Module
# Prompt the user for a zip code, provide contextual cues for valid input, and verify it is in the correct format.
# 1 a. Prompt the user for a zip code
def prompt_user_zipcode():
    try:
        while True:
            zipcode = input("Enter your zipcode (Enter 5-digit number): ")
            if len(zipcode) == 5 and zipcode.isdigit():
                return int(zipcode)
            else:
                print("Invalid zipcode. Please enter a 5-digit number.")
                
    except Exception as e:
        print(e)

# b. Prompt the user for a month , year
def prompt_user_month():
    try:
        while True:
            month_input = input("Enter your month (Enter number 1 - 12): ")
            if month_input.isdigit():
                if 1 <= int(month_input) <= 12:
                    return int(month_input)
                else:
                    print("Invalid month. Please enter a number between 1 and 12.")
            else:
                print("Invalid Input. Please enter a number between 1 and 12.")
    except ValueError:
        print("Invalid input. Please enter a valid number.")

def prompt_user_year():
    try:
        while True:
            year_input = input("Enter your year (Enter 4-digit number): ")
            if len(year_input) == 4 and year_input.isdigit():
                return int(year_input)
            else:
                print("Invalid year. Please enter a 4-digit number.")
    except Exception as e:
        print(e)


In [4]:
# c. Query the data using Spark SQL base on user input zipcode, month, year
def query_transaction_data_spark(zipcode, month, year):
    from pyspark.sql import SparkSession
    import json
    # Create a Spark session
    try:
        spark = SparkSession.builder.appName("QueryTransactionData").getOrCreate()

        # Load JSON data into DataFrames
        branch_df = spark.read.json(r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_branch.json")
        transaction_df = spark.read.json(r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_credit.json")

        # Load data from MySQL database using JDBC
        jdbc_url = "jdbc:mysql://localhost:3306/creditcard_capstone"
        properties = {
            "user": "root",
            "password": "password",
            "driver": "com.mysql.cj.jdbc.Driver"
        }

        # Query the data using Spark SQL
        branch_df.createOrReplaceTempView("branch")
        transaction_df.createOrReplaceTempView("transaction")
        
        query = f"""
            SELECT branch.BRANCH_CODE, branch.BRANCH_ZIP, transaction.DAY, transaction.MONTH, transaction.YEAR, branch.BRANCH_NAME, branch.BRANCH_STREET, branch.BRANCH_CITY, branch.BRANCH_STATE, branch.BRANCH_PHONE, transaction.CREDIT_CARD_NO, transaction.CUST_SSN, transaction.TRANSACTION_ID, transaction.TRANSACTION_TYPE, transaction.TRANSACTION_VALUE, branch.LAST_UPDATED
            FROM branch 
            JOIN transaction 
            Using (BRANCH_CODE) 
            WHERE branch.BRANCH_ZIP = {zipcode} 
            AND transaction.MONTH = {month} 
            AND transaction.YEAR = {year}
        """
        result_df = spark.sql(query)

        # Show the result DataFrame
        result_df.show()
        
    except Exception as e:
        print(f"An error occurred: {str(e)}")
    finally:  
        # Stop the Spark session
        spark.stop()

In [11]:
# d. Sort the transaction by day
def sort_transaction_by_day():
    # pyspark to read credit json file
    from pyspark.sql import SparkSession
    # Create a SparkSession
    spark = SparkSession.builder.appName("Read_credit_JSON").getOrCreate()

    # Specify the path to the JSON file
    json_file_path = r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_credit.json"

    # Read the JSON file into a DataFrame
    df = spark.read.json(json_file_path)

    try: 
    # Show the DataFrame
        df.select("TRANSACTION_ID","DAY","MONTH","YEAR","CREDIT_CARD_NO","CUST_SSN","BRANCH_CODE","TRANSACTION_TYPE","TRANSACTION_VALUE").orderBy("DAY", ascending=False).show(df.count(),truncate=False)
    except Exception as e: 
        print(e)
    finally:
    # Stop the SparkSession
        spark.stop()

In [20]:
# 2 Use to display number and value of transaction type
def transaction_number_value(type):
    from pyspark.sql import SparkSession
    # Create a SparkSession
    spark = SparkSession.builder.appName("trasaction_JSON").getOrCreate()
    # Specify the path to the JSON file
    json_file_path = r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_credit.json"
    # Read the JSON file into a DataFrame
    df = spark.read.json(json_file_path)

    try: 
    # Show the DataFrame
        df.createOrReplaceTempView("transaction")
        spark.sql(f"""SELECT TRANSACTION_TYPE, count(TRANSACTION_TYPE) as Number, round(Sum(TRANSACTION_VALUE),2) as Total 
                    FROM transaction 
                    Group By TRANSACTION_TYPE 
                    HAVING TRANSACTION_TYPE = '{type}'""").show(df.count(),truncate=False)
    except Exception as e: 
        print(e)
    finally:
    # Stop the SparkSession
        spark.stop()


In [22]:
def prompt_user_type():
    try:
        while True:
            type = input("Enter Transaction Type (Healthcare, Automotive, etc.): ")
            if type.isalpha():
                return type
            else:
                print("Invalid input. Please enter string only.")
                
    except Exception as e:
        print(e)

In [5]:
# 3. Use to display number and total value of transaction branch by given state
def transaction_branch_number_value_state(state):
    from pyspark.sql import SparkSession
    try:
    # Create a SparkSession
        spark = SparkSession.builder.appName("trasaction_JSON").getOrCreate()
        # Read the JSON file into a DataFrame
        df = spark.read.json(r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_credit.json")
        branch_df = spark.read.json(r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_branch.json")

        
        # Show the DataFrame
        df.createOrReplaceTempView("transaction")
        branch_df.createOrReplaceTempView("branch")
        
        query = f"""
            SELECT branch.BRANCH_STATE, count(TRANSACTION_TYPE) as Number, round(Sum(TRANSACTION_VALUE),2) as Total
            FROM branch 
            JOIN transaction 
            Using (BRANCH_CODE)
            WHERE branch.BRANCH_STATE = '{state}'
            Group By branch.BRANCH_STATE
        """
        
        result_df = spark.sql(query)
        # Show the result DataFrame
        result_df.show()
        
    except Exception as e: 
        print(e)
    finally:
    # Stop the SparkSession
        spark.stop()

In [16]:
def prompt_user_state():
    try:
        while True:
            state = input("Enter State (LA, CA, etc.): ")
            if state.isalpha():
                if len(state) == 2:
                    return state.upper()
                else:
                    print("Invalid input. Please enter 2-letters for the state only.")
            else:
                print("Invalid input. Please enter string only.")
                
    except Exception as e:
        print(e)

In [4]:
# 2.2 Customer Details Module
# 1 Check existing account details of customer
def check_cust_detail(SSN):
    from pyspark.sql import SparkSession
    try:
    # Create a SparkSession
        spark = SparkSession.builder.appName("check customer detail").getOrCreate()
        # Read the JSON file into a DataFrame
        credit_df = spark.read.json(r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_credit.json")
        customer_df = spark.read.json(r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_custmer.json")
        branch_df = spark.read.json(r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_branch.json")

        
        # Show the DataFrame
        credit_df.createOrReplaceTempView("credit")
        customer_df.createOrReplaceTempView("customer")
        branch_df.createOrReplaceTempView("branch")
        
        query = f"""
            SELECT customer.SSN, 
            customer.FIRST_NAME, 
            customer.MIDDLE_NAME, 
            customer.LAST_NAME, 
            customer.CREDIT_CARD_NO, 
            customer.APT_NO, 
            customer.STREET_NAME, 
            customer.CUST_CITY, 
            customer.CUST_STATE, 
            customer.CUST_ZIP, 
            customer.CUST_COUNTRY, 
            customer.CUST_EMAIL, 
            customer.CUST_PHONE,
            branch.BRANCH_NAME,
            credit.DAY,
            credit.MONTH,
            credit.YEAR,
            credit.TRANSACTION_TYPE,
            credit.TRANSACTION_VALUE
            FROM customer
            JOIN credit
            Using (CREDIT_CARD_NO)
            JOIN branch
            Using (BRANCH_CODE)
            WHERE RIGHT(customer.SSN, 4) = '{SSN}'
            ORDER BY credit.YEAR, credit.MONTH, credit.DAY
            """
            
        result_df = spark.sql(query)
        # Show the result DataFrame
        result_df.show()
    except Exception as e: 
        print(e)
    finally:
    # Stop the SparkSession
        spark.stop()

In [5]:
def prompt_user_SSN():
    try:
        while True:
            SSN = input("Enter Customer last 4 SSN: ")
            if SSN.isnumeric():
                if len(SSN) == 4:
                    return SSN
                else:
                    print("Invalid input. Please enter 4-digit number only.")
            else:
                print("Invalid input. Please enter number only.")
                
    except Exception as e:
        print(e)

In [50]:
# 2) Modify existing account details of a customer
def modify_cust_detail(column, value, SSN):
    import mysql.connector
    import json
    try:
        conn = mysql.connector.connect(
            host="localhost",
            user="root",
            password="password",
            database="creditcard_capstone"
        )
    
        # Execute a SQL query
        cursor = conn.cursor()
        
        if check_cust_SSN(SSN):
            cursor.execute(f"""
                            UPDATE cdw_sapp_customer
                            SET {column} = '{value}'
                            WHERE RIGHT(SSN, 4) = '{SSN}';
                        """)
            conn.commit()
        else:
            print("Invalid Customer SSN.")
        
        query = f"""
                SELECT 
                FIRST_NAME,
                MIDDLE_NAME, 
                LAST_NAME, 
                SSN, 
                CREDIT_CARD_NO, 
                APT_NO,
                STREET_NAME, 
                CUST_CITY, 
                CUST_STATE, 
                CUST_COUNTRY, 
                CUST_ZIP, 
                CUST_PHONE, 
                CUST_EMAIL, 
                LAST_UPDATED 
                FROM cdw_sapp_customer"""
                
        cursor.execute(query)
        result = cursor.fetchall()
        
        data = []
        for row in result:
            row_dict = {
                "FIRST_NAME": row[0],
                "MIDDLE_NAME": row[1],
                "LAST_NAME": row[2],
                "SSN":row[3],
                "CREDIT_CARD_NO":row[4],
                "APT_NO":row[5],
                "STREET_NAME":row[6],
                "CUST_CITY":row[7],
                "CUST_STATE":row[8],
                "CUST_COUNTRY":row[9],
                "CUST_ZIP":row[10],
                "CUST_PHONE":row[11],
                "CUST_EMAIL":row[12],
                "LAST_UPDATED":row[13]
            }
            data.append(row_dict)
    
        with open(r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_custmer.json", "w") as file:
            json.dump(data, file, indent=4)
            
    except Exception as e:
        print(e)
    finally:
        conn.close()
        file.close()

In [28]:
def check_cust_SSN(SSN):
    import json
    try:
        with open(r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_custmer.json") as f:
            for line in f:
                customer_data = json.loads(line)
                if str(customer_data["SSN"])[-4:] == str(SSN):
                    return True
            return False
    except Exception as e:
        print(e)
    finally:
        f.close()

In [None]:
# 3 Generate monthly bill for a credit card number for a given month and year
def generate_bill(credit_card_no, month, year):
    # pyspark to read credit json file
    from pyspark.sql import SparkSession
    
    # Create a SparkSession
    spark = SparkSession.builder.appName("Read_credit_JSON").getOrCreate()
    
    # Specify the path to the JSON file
    json_file_path = r"D:\CAP 405 Data Analytics - Capstone Project\cdw_sapp_credit.json"
    
    # Read the JSON file into a DataFrame
    df = spark.read.json(json_file_path)
    
    try: 
    # Show the DataFrame
        df.select("TRANSACTION_ID","DAY","MONTH","YEAR","CREDIT_CARD_NO","CUST_SSN","BRANCH_CODE","TRANSACTION_TYPE","TRANSACTION_VALUE").show(df.count(),truncate=False)
    except Exception as e: 
        print(e)
    # Stop the SparkSession
    finally:
        spark.stop()

In [75]:
display_customer_table()


+----------+-----------+---------+---------+----------------+------------------------+-----------------+----------+-------------+--------+-------------+--------------------------+-----------------------------+
|FIRST_NAME|MIDDLE_NAME|LAST_NAME|SSN      |CREDIT_CARD_NO  |ADDRESS                 |CUST_CITY        |CUST_STATE|CUST_COUNTRY |CUST_ZIP|CUST_PHONE   |CUST_EMAIL                |LAST_UPDATED                 |
+----------+-----------+---------+---------+----------------+------------------------+-----------------+----------+-------------+--------+-------------+--------------------------+-----------------------------+
|Alec      |wm         |Hooper   |123456100|4210653310061055|656, Main Street North  |Natchez          |MS        |United States|39120   |(337)123-7818|AHooper@example.com       |2018-04-21T12:49:02.000-04:00|
|Alec      |brendan    |Hooper   |123453023|4210653310102868|656, Redwood Drive      |Wethersfield     |CT        |United States|06109   |(337)123-7818|EHolman@

In [6]:
check_cust_detail(prompt_user_SSN())

+---+----------+-----------+---------+--------------+------+-----------+---------+----------+--------+------------+----------+----------+-----------+---+-----+----+----------------+-----------------+
|SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|CREDIT_CARD_NO|APT_NO|STREET_NAME|CUST_CITY|CUST_STATE|CUST_ZIP|CUST_COUNTRY|CUST_EMAIL|CUST_PHONE|BRANCH_NAME|DAY|MONTH|YEAR|TRANSACTION_TYPE|TRANSACTION_VALUE|
+---+----------+-----------+---------+--------------+------+-----------+---------+----------+--------+------------+----------+----------+-----------+---+-----+----+----------------+-----------------+
+---+----------+-----------+---------+--------------+------+-----------+---------+----------+--------+------------+----------+----------+-----------+---+-----+----+----------------+-----------------+

