CAP 350 - Data Engineering - Capstone Project

This project is a Data Extraction, Transformation, load (ETL) proccess implemented using  with Python and PySpark. 
It proccess two datasets: Loan Application and a Credit Card dataset. This project use various technologies and libraries Python (Pandas, advanced modules, e.g., Matplotlib), SQL,
Apache Spark (Spark Core, Spark SQL), and Python Visualization and Analytics libraries.   

Author: Ribka Ayele
Date: 10/14/2023

In [1]:
import os
import sys
import pyspark
import requests
import pandas as pd 
import random
import secret
import matplotlib.pyplot as plt
import mysql.connector

from mysql.connector import Error
from pyspark.sql.functions import when,col, substring, concat,lit,initcap, lpad,substring, lower
from pyspark.sql.types import IntegerType,StringType,TimestampType, DoubleType
from pyspark.sql import SparkSession  #import sparksession from pyspark.sql

In [2]:
 #Create a Spark session
spark = SparkSession.builder.appName("Credit Card System").getOrCreate()

Data extraction


In [3]:
#Extract cdw_sapp_custmer, cdw_sapp_branch, and cdw_sapp_creditJSON files
#Read the Json data form my local computer 
df_cdw_sapp_custmer = spark.read.json("cdw_sapp_custmer.json")
df_cdw_sapp_custmer.printSchema() #show df_cdw_sapp_custmer schema
print(df_cdw_sapp_custmer.show()) 

root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)

+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+---

In [4]:
#extract cdw_sapp_branch Json data
df_cdw_sapp_branch = spark.read.json("cdw_sapp_branch.json")
df_cdw_sapp_branch.printSchema()
print(df_cdw_sapp_branch.show(5))


root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)

+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|    BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|     Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank| 

In [5]:

#extract cdw_sapp_credit Json data
df_cdw_sapp_credit = spark.read.json("cdw_sapp_credit.json")
df_cdw_sapp_credit.printSchema()
print(df_cdw_sapp_credit.show(5))


root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             5

Data transformation (applying mapping)

In [12]:
"""  Functional Requirements - Load Credit Card Database (SQL)
Req-1.1 Data Extraction and Transformation with Python and  PySpark
Transform cdw_sapp_custme file based on the mapping document according to the 
specifications found in the mapping document; 
- """

# Map df_cdw_sapp_custmer data
#Phone number missed area code so that  generate random area code
random_area_code = (random.randint(206, 789))

  #covert area code to string  and amking sure 3 digit long
random_area_code = f"{random_area_code:03d}"

  #Apply Transformation First & last name to title case, middle name lower case 
  #phone number add area code and in (xxx)xxx-xxx format
cdw_sapp_custmer= df_cdw_sapp_custmer.withColumn("FIRST_NAME" , initcap(col("FIRST_NAME"))  #tile case
                                                    ).withColumn("LAST_NAME" , initcap(col("LAST_NAME"))
                                                    ).withColumn("MIDDLE_NAME" , lower(col("MIDDLE_NAME"))
                                                    ).withColumn("FULL_STREET_ADDRESS" , concat(col("STREET_NAME"), lit(","), ("APT_NO"))
                                                    ).withColumn("CUST_PHONE",concat(lit('('),lit(random_area_code),lit(')'),substring(col("CUST_PHONE"),1,3),lit(')'),
                                                                    substring(col("CUST_PHONE"),4,3).cast(StringType()))
                                                    ).withColumn("CUST_ZIP", lpad(col("CUST_ZIP"),5,"0"))


                                                      
cdw_sapp_custmer.show(5)



cdw_sapp_custmer = cdw_sapp_custmer.select("SSN","FIRST_NAME","MIDDLE_NAME","LAST_NAME","CREDIT_CARD_NO",
                          "FULL_STREET_ADDRESS", "CUST_CITY","CUST_STATE","CUST_COUNTRY",
                          "CUST_ZIP", "CUST_PHONE","CUST_EMAIL", "LAST_UPDATED")
  

  #convert  all colomn to VARCAR except SSN,CUST_ZIP, APT_NO (IntegerType),LAST_UPDATED TimestampType()
cdw_sapp_custmer=cdw_sapp_custmer.withColumn("SSN", col("SSN").cast(IntegerType())
                                  ).withColumn("FIRST_NAME", col("FIRST_NAME").cast(StringType())
                                  ).withColumn("MIDDLE_NAME", col("MIDDLE_NAME").cast(StringType())
                                  ).withColumn("LAST_NAME", col("LAST_NAME").cast(StringType())            
                                  ).withColumn("Credit_card_no", col("CREDIT_CARD_NO").cast(StringType())
                                  ).withColumn("FULL_STREET_ADDRESS", col("FULL_STREET_ADDRESS").cast(StringType())
                                  ).withColumn("CUST_CITY", col("CUST_CITY").cast(StringType()) 
                                  ).withColumn("CUST_STATE", col("CUST_STATE").cast(StringType())
                                  ).withColumn("CUST_COUNTRY", col("CUST_COUNTRY").cast(StringType()) 
                                  ).withColumn("CUST_ZIP", col("CUST_ZIP").cast(StringType())    
                                  ).withColumn("CUST_PHONE", col("CUST_PHONE").cast(StringType())
                                  ).withColumn("CUST_EMAIL", col("CUST_EMAIL").cast(StringType()) 
                                  ).withColumn("LAST_UPDATED", col("LAST_UPDATED").cast(TimestampType()))

  # # Drop Apt No and street number. FULL_STREET_ADDRESS includes both
  # cdw_sapp_custmer=cdw_sapp_custmer.drop("STREET_NAME","APT_NO")                       
print("cdw_sapp_custmer after mapping")                         
cdw_sapp_custmer.show(8) 


+------+----------------+------------+-------------+-------------------+------------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+--------------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|  CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME| FULL_STREET_ADDRESS|
+------+----------------+------------+-------------+-------------------+------------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+--------------------+
|   656|4210653310061055|     Natchez|United States|AHooper@example.com|(644)123)781|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         wm|123456100|Main Street North|Main Street North...|
|   829|4210653310102868|Wethersfield|United States|EHolman@example.com|(644)123)893|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    brendan|12345

In [13]:
""" Transform cdw_sapp_branch data frame based on the mapping document"""

df_cdw_sapp_branch = df_cdw_sapp_branch.select("BRANCH_CODE","BRANCH_NAME","BRANCH_STREET","BRANCH_CITY","BRANCH_STATE",
                                                 "BRANCH_ZIP","BRANCH_PHONE","LAST_UPDATED")

  #IF BRANCH_ZIP  value is null load default (99999) value 
  #Change the format of phone number to(xxx)xxx-xxx format
  #Make sure BRANCH_ZIP  code 5 number

df_cdw_sapp_branch = df_cdw_sapp_branch.withColumn("BRANCH_ZIP", lpad(col("BRANCH_ZIP"),5,"0"))

df_cdw_sapp_branch = df_cdw_sapp_branch.withColumn("BRANCH_ZIP", when(col("BRANCH_ZIP").isNull(),'99999').otherwise(col("BRANCH_ZIP"))
                                ).withColumn("BRANCH_PHONE",concat(lit('('),substring(col("BRANCH_PHONE"),1,3),lit(')'),
                                                                  substring(col("BRANCH_PHONE"),4,3),lit('-'),
                                                                  substring(col("BRANCH_PHONE"),7,4)))                              

df_cdw_sapp_branch.show(5)

#Conver into targeted data type except Branch code, zip code and last update convert all to stringtype
 
cdw_sapp_branch = df_cdw_sapp_branch.withColumn("BRANCH_CODE", col("BRANCH_CODE").cast(IntegerType())
                                ).withColumn('BRANCH_NAME', col("BRANCH_NAME").cast(StringType())
                                  ).withColumn('BRANCH_STREET', col("BRANCH_STREET").cast(StringType())
                                  ).withColumn('BRANCH_CITY', col("BRANCH_CITY").cast(StringType()) 
                                  ).withColumn('BRANCH_STATE', col("BRANCH_STATE").cast(StringType())
                                  ).withColumn('BRANCH_ZIP', col("BRANCH_ZIP").cast(StringType())
                                  ).withColumn('BRANCH_PHONE', col("BRANCH_PHONE").cast(StringType())
                                  ).withColumn("LAST_UPDATED", col("LAST_UPDATED").cast(TimestampType()))  
cdw_sapp_branch.show(8)
cdw_sapp_branch.printSchema()
 

+-----------+------------+-----------------+-----------------+------------+----------+-------------+--------------------+
|BRANCH_CODE| BRANCH_NAME|    BRANCH_STREET|      BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP| BRANCH_PHONE|        LAST_UPDATED|
+-----------+------------+-----------------+-----------------+------------+----------+-------------+--------------------+
|          1|Example Bank|     Bridle Court|        Lakeville|          MN|     55044|((12)3)4-56-5|2018-04-18T16:51:...|
|          2|Example Bank|Washington Street|          Huntley|          IL|     60142|((12)3)4-61-8|2018-04-18T16:51:...|
|          3|Example Bank|    Warren Street|SouthRichmondHill|          NY|     11419|((12)3)4-98-5|2018-04-18T16:51:...|
|          4|Example Bank| Cleveland Street|       Middleburg|          FL|     32068|((12)3)4-66-3|2018-04-18T16:51:...|
|          5|Example Bank|      14th Street|    KingOfPrussia|          PA|     19406|((12)3)4-84-9|2018-04-18T16:51:...|
+-----------+-----------

In [14]:
"""Transform cdw_sapp_credit data"""

#use lpad function for  month and day because some of them put as one degit
# concat Year, Month and Day into a TIMEID (YYYYMMDD)
cdw_sapp_credit = df_cdw_sapp_credit.withColumn("TIMEID",concat(col("YEAR"),lpad(col("MONTH"),2,"0"),lpad(col("DAY"),2,"0")))
cdw_sapp_credit.show(5)

cdw_sapp_credit= cdw_sapp_credit.select("CREDIT_CARD_NO", "TIMEID","CUST_SSN","BRANCH_CODE","TRANSACTION_TYPE",
                                          "TRANSACTION_VALUE","TRANSACTION_ID")

  #Change CREDIT_CARD_NO column name to CUST_CC_NO and cast it VARCHAr
cdw_sapp_credit = cdw_sapp_credit.withColumn("CUST_CC_NO", col("CREDIT_CARD_NO").cast(StringType())
                            ).withColumn("TIMEID", col("TIMEID").cast(StringType())
                            ).withColumn("CUST_SSN", col("CUST_SSN").cast(IntegerType())
                            ).withColumn("BRANCH_CODE", col("BRANCH_CODE").cast(IntegerType())
                            ).withColumn("TRANSACTION_TYPE", col("TRANSACTION_TYPE").cast(StringType())
                            ).withColumn("TRANSACTION_VALUE", col("TRANSACTION_VALUE").cast(DoubleType())
                            ).withColumn("TRANSACTION_ID", col("TRANSACTION_ID").cast(IntegerType()))
cdw_sapp_credit.show(8)
  

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+--------+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|  TIMEID|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+--------+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|20180214|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|20180320|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             56.7|2018|20180708|
|        114|4210653349028689|123459988| 19|    4|             4|   Entertainment|            59.73|2018|20180419|
|         93|4210653349028689|123459988| 10|   10|             5|             Gas|             3.59|2018|20181010|
+-----------+----------------+---------+---+-----+--------------+---------------

Data loading into Database

In [15]:
"""Req-1.2 Data loading into Database
Function Requirement 1.2 Once PySpark reads data from JSON files, and then utilizes Python,
PySpark, and Python modules to load data into RDBMS(SQL), perform thefollowing:
a) Create a Database in SQL(MySQL), named “creditcard_capstone.”
b) Create a Python and Pyspark Program to load/write the “Credit
Card System Data” into RDBMS(creditcard_capstone).
Tables should be created by the following names in RDBMS: CDW_SAPP_BRANCH,CDW_SAPP_CREDIT_CARD, CDW_SAPP_CUSTOMER"""

def db_connection():
   
    #spark = SparkSession.builder.master("local[*]").appName("creditcard_capstone").getOrCreate()
    conn = None
   
    try:
        conn = mysql.connector.connect(database='creditcard_capstone',
                                             user = secret.mysql_username,
                                             password = secret.mysql_password)
        #  user=secret.mysql_password,
        #                                      password=secret.mysql_password)
        if conn.is_connected():
            cursor = conn.cursor()
            cursor.execute("CREATE DATABASE IF NOT EXISTS creditcard_capstone")
             #Connect to MySQL database 
            print('Connected to MySQL database')

    except Error as e:
        print(e)

    finally:
        if conn is not None and conn.is_connected():
            conn.close()
db_connection()

Connected to MySQL database


In [16]:

def load_to_mysql(dataframe,table_name):
  #write a pyspark dataframe to MySQL table
 table = dataframe.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", table_name) \
  .option("user", secret.mysql_username) \
  .option("password",secret.mysql_password) \
  .save()
 return table 

cdw_sapp_custmer = load_to_mysql(cdw_sapp_custmer,"cdw_sapp_custmer")
cdw_sapp_branch = load_to_mysql(cdw_sapp_branch,"cdw_sapp_branch")
cdw_sapp_credit = load_to_mysql(cdw_sapp_credit, "cdw_sapp_credit")


In [17]:
def read_to_my_sql(table_name,spark):
  # spark = SparkSession.builder.appName("CreditCardSystem").getOrCreate()
  
    # Conncatnate the table name with the database name
    full_table_name = "creditcard_capstone." + table_name

    table = spark.read.format("jdbc").options(driver = "com.mysql.cj.jdbc.Driver",
            user =  secret.mysql_username,\
            password = secret.mysql_password,\
            url = "jdbc:mysql://localhost:3306/creditcard_capstone",\
            dbtable = full_table_name).load() #Ex:creditcard_capstone.cdw_sapp_custmer
    table.show(3)
    return table
cdw_sapp_custmer = read_to_my_sql("cdw_sapp_custmer", spark)
cdw_sapp_branch = read_to_my_sql("cdw_sapp_branch",spark )
cdw_sapp_credit = read_to_my_sql( "cdw_sapp_credit",spark)



+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+------------+-------------------+-------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|  Credit_card_no| FULL_STREET_ADDRESS|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|  CUST_PHONE|         CUST_EMAIL|       LAST_UPDATED|
+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+------------+-------------------+-------------------+
|123456100|      Alec|         wm|   Hooper|4210653310061055|Main Street North...|     Natchez|        MS|United States|   39120|(760)123)781|AHooper@example.com|2018-04-21 09:49:02|
|123453023|      Etta|    brendan|   Holman|4210653310102868|   Redwood Drive,829|Wethersfield|        CT|United States|    6109|(760)123)893|EHolman@example.com|2018-04-21 09:49:02|
|123454487|    Wilber|   ezequiel|   Dunham|4210653310116272|12th Street East,683|   

In [18]:
cdw_sapp_custmer.createOrReplaceTempView("cdw_sapp_custmer")
cdw_sapp_branch .createOrReplaceTempView("cdw_sapp_branch")
cdw_sapp_credit.createOrReplaceTempView("cdw_sapp_credit")


 Application Front-End

Transaction Details 

In [19]:
"""2. Functional Requirements - Application Front-End
# # Once data is loaded into the database, we need a front-end (console) to see/display data. For
# # that, create a console-based Python program to satisfy System Requirements 2 (2.1 and 2.2)"""

# Get zip_code, Month, year from the user
#Example Zip_code:23112  Month:03  year:2018  Z: 52804 M: 08 y 2018

 #Create a Spark session
spark = SparkSession.builder.appName("Credit Card System").getOrCreate()

zip_code = input("Please enter the Zip code (ex:23112): ")
Month = input("Please enter the month (ex:03): ")
Year = input("Please enter the year (ex:2018): ")
                             
#Conver TIMEID to Month and Year by split or using substring slicing 
credit_Tempview = cdw_sapp_credit.withColumn("YEAR", substring(col("TIMEID"), 1,4).cast("INT")
                                              ).withColumn("MONTH", (substring(col("TIMEID"), 5,2).cast("INT"))
                                              ).withColumn("DAY", (substring(col("TIMEID"), 7,2).cast("INT")))

credit_Tempview.createOrReplaceTempView("credit_Tempview")
# Used to display the transactions made by customers living in agiven zip code for a given month and year.
#  Order by day in descending order.

customer_transaction = f""" SELECT c.FIRST_NAME,c.LAST_NAME,c.CREDIT_CARD_NO,c.FULL_STREET_ADDRESS,c.CUST_ZIP, cc.CUST_CC_NO,
       cc.TRANSACTION_TYPE, cc.TRANSACTION_VALUE FROM cdw_sapp_custmer c  INNER JOIN credit_Tempview  cc  ON c.CREDIT_CARD_NO = cc.CUST_CC_NO
      wHERE c.CUST_ZIP = '{zip_code}' AND cc.MONTH = '{Month}' AND cc.YEAR = '{Year}'ORDER BY cc.DAY DESC"""
 
result = spark.sql(customer_transaction)
result.show(5)

+----------+---------+----------------+-------------------+--------+----------------+----------------+-----------------+
|FIRST_NAME|LAST_NAME|  CREDIT_CARD_NO|FULL_STREET_ADDRESS|CUST_ZIP|      CUST_CC_NO|TRANSACTION_TYPE|TRANSACTION_VALUE|
+----------+---------+----------------+-------------------+--------+----------------+----------------+-----------------+
|     Jonah|  Andrade|4210653384836500|     Fawn Court,633|   23112|4210653384836500|         Grocery|            82.95|
|     Jonah|  Andrade|4210653384836500|     Fawn Court,633|   23112|4210653384836500|         Grocery|            82.95|
|     Jonah|  Andrade|4210653384836500|     Fawn Court,633|   23112|4210653384836500|         Grocery|            82.95|
|     Jonah|  Andrade|4210653384836500|     Fawn Court,633|   23112|4210653384836500|         Grocery|            82.95|
|     Jonah|  Andrade|4210653384836500|     Fawn Court,633|   23112|4210653384836500|         Grocery|            82.95|
+----------+---------+----------

In [22]:
#2) Used to display the number and total values of transactions for a given type.
type = input("Please enter a transactions type: ")

#use count to get total number of transaction

spark_sql= f""" SELECT COUNT(*) AS TRANSATION_TYPE, '{type}' FROM cdw_sapp_credit WHERE cdw_sapp_credit.TRANSACTION_TYPE = '{type}'"""

result = spark.sql(spark_sql)    
result.show() 

+---------------+---+
|TRANSATION_TYPE|Gas|
+---------------+---+
|         495375|Gas|
+---------------+---+



In [26]:
#3) Used to display the total number and total values of transactions for branches in a given state.

State = input("Please enter the state(ex:WA): ")

spark_sql =f""" SELECT '{State}' AS STATE, COUNT(c.TRANSACTION_ID) AS TOTAL_TRANSACTION, 
SUM(c.TRANSACTION_VALUE) AS TOTAL_TRANSACTION_VALUE 
FROM cdw_sapp_branch b JOIN cdw_sapp_credit c  ON b.BRANCH_CODE = c.BRANCH_CODE 
WHERE b.BRANCH_STATE = '{State}' """


result = spark.sql(spark_sql)    
result.show() 

+-----+-----------------+-----------------------+
|STATE|TOTAL_TRANSACTION|TOTAL_TRANSACTION_VALUE|
+-----+-----------------+-----------------------+
|   WA|         11995200|    6.255253439990447E8|
+-----+-----------------+-----------------------+



Customer Details

In [33]:
# 2.2 Customer Details Module
# Functional Requirements 2.2
# 1) Used to check the existing account details of a customer.
# 2) Used to modify the existing account details of a customer.
# 3) Used to generate a monthly bill for a credit card number for a given month and year.
# 4) Used to display the transactions made by a customer between
# two dates. Order by year, month, and day in descending order

#1) Used to check the existing account details of a customer.

SSN = input("Please enter the 9 digit of customer SSN(123456100): ")

#Check customer existing account
spark_Sql = f""" SELECT * FROM cdw_sapp_custmer WHERE '{SSN}' = SSN """
result = spark.sql(spark_Sql)
result.show()

+---------+----------+-----------+---------+----------------+--------------------+---------+----------+-------------+--------+------------+-------------------+-------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|  Credit_card_no| FULL_STREET_ADDRESS|CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|  CUST_PHONE|         CUST_EMAIL|       LAST_UPDATED|
+---------+----------+-----------+---------+----------------+--------------------+---------+----------+-------------+--------+------------+-------------------+-------------------+
|123456100|      Alec|         wm|   Hooper|4210653310061055|Main Street North...|  Natchez|        MS|United States|   39120|(760)123)781|AHooper@example.com|2018-04-21 09:49:02|
|123456100|      Alec|         wm|   Hooper|4210653310061055|Main Street North...|  Natchez|        MS|United States|   39120|(471)123)781|AHooper@example.com|2018-04-21 09:49:02|
|123456100|      Alec|         wm|   Hooper|4210653310061055|Main Street North...|  Natchez|        

#2) Used to modify the existing account details of a customer.

SSN = input("Please enter the 9 digit SSN(e.g 123455692): ")

#Ask user to input they would like to make
Custumer_Info = int(input("""Please enter the number you wish to
                           change:\n1,FIRST_NAME \n2,MIDDLE_NAME \n3,LAST_NAME \n4,CREDIT_CARD_NO 
                           \n5,FULL_STREET_ADDRESS \n6,CUST_CITY \n7,CUST_STATE \n8,CUST_COUNTRY 
                           \n9,CUST_ZIP \n10, CUST_PHONE: """)) 
   
update = f"UPDATE cdw_sapp_custmer SET"
if Custumer_Info == '1':
  FIRST_NAME = input("Please enter the new First Name: ")
  spark_sql += f", FIRST_NAME = '{FIRST_NAME}'"
elif Custumer_Info ==2:
  MIDDLE_NAME = input("Please enetr the Middle Name: ")
  spark_sql += f",MIDDLE_NAME = '{MIDDLE_NAME}'"
elif Custumer_Info ==3:
   LAST_NAME = input("Please enter the new Last Name: ") 
   spark_sql += f",LAST_NAME= '{LAST_NAME}'"
elif Custumer_Info ==4:
   CREDIT_CARD_NO = input("Please enter new CREDIT_CARD_NO:  ")
   spark_sql += f",CREDIT_CARD_NO= '{CREDIT_CARD_NO}'"

update += f" WHERE SSN = '{SSN}'"
spark.sql(update)
#Excute the update
print("update successful")





In [20]:
 #Create a Spark session
#spark = SparkSession.builder.appName("Credit Card System").getOrCreate()
spark_sql = "select * from cdw_sapp_credit where CREDIT_CARD_NO = '4210653349028689'"
result = spark.sql(spark_sql)
result.show()

+----------------+--------+---------+-----------+----------------+-----------------+--------------+----------------+
|  CREDIT_CARD_NO|  TIMEID| CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|TRANSACTION_ID|      CUST_CC_NO|
+----------------+--------+---------+-----------+----------------+-----------------+--------------+----------------+
|4210653349028689|20180214|123459988|        114|       Education|             78.9|             1|4210653349028689|
|4210653349028689|20180320|123459988|         35|   Entertainment|            14.24|             2|4210653349028689|
|4210653349028689|20180708|123459988|        160|         Grocery|             56.7|             3|4210653349028689|
|4210653349028689|20180419|123459988|        114|   Entertainment|            59.73|             4|4210653349028689|
|4210653349028689|20181010|123459988|         93|             Gas|             3.59|             5|4210653349028689|
|4210653349028689|20180528|123459988|        164|       Educatio

In [25]:
#3) Used to generate a monthly bill for a credit card number for a given month and year.

credits_Card_number = input("Please enter the credit card number(ex:4210653349028689): ")
Month = int(input("Please enter the month (ex:04): "))
Year = int(input("Please enter the year (ex: 2018): "))


monthly_bill = credit_Tempview.filter((credit_Tempview['MONTH']== Month) & (credit_Tempview['YEAR'] == Year) & (credit_Tempview['CREDIT_CARD_NO'] == 'credits_Card_number'))
monthly_bill.show()

+--------------+------+--------+-----------+----------------+-----------------+--------------+----------+----+-----+---+
|CREDIT_CARD_NO|TIMEID|CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|TRANSACTION_ID|CUST_CC_NO|YEAR|MONTH|DAY|
+--------------+------+--------+-----------+----------------+-----------------+--------------+----------+----+-----+---+
+--------------+------+--------+-----------+----------------+-----------------+--------------+----------+----+-----+---+



In [None]:
#) Used to generate a monthly bill for a credit card number for a given month and year.


In [None]:
#4) Used to display the transactions made by a customer between
#two dates. Order by year, month, and day in descending order

Data Analysis and Visualization