In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
# Initialize  Spark Session
spark = SparkSession.builder.appName("Customer").getOrCreate()

In [20]:
# Read data from Database into Spark DataFrame's for Branch, Credit and Customer

df_credit = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
    .option("dbtable", "CDW_SAPP_CREDIT_CARD") \
    .option("user", "root") \
    .option("password", "password") \
    .load()

df_customer = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
    .option("dbtable", "CDW_SAPP_CUSTOMER") \
    .option("user", "root") \
    .option("password", "password") \
    .load()  

df_branch = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
    .option("dbtable", "CDW_SAPP_BRANCH") \
    .option("user", "root") \
    .option("password", "password") \
    .load() 

In [None]:
# Display both CreditCard and Customer Dataframes

df_credit.show(1)
df_customer.show(1)
df_branch.show(1)

In [22]:
# Join two dataframes and select required fields

df_join = df_credit.join(df_customer, df_credit.CUST_SSN == df_customer.SSN, 'inner').select(col('SSN'), col('CUST_ZIP'), col('TIMEID'), \
          col('TRANSACTION_ID'), col('TRANSACTION_VALUE'))


In [7]:
# Show the joined Dataframe to verify the required fields

df_join.show(1)

+---------+--------+--------+--------------+-----------------+
|      SSN|CUST_ZIP|  TIMEID|TRANSACTION_ID|TRANSACTION_VALUE|
+---------+--------+--------+--------------+-----------------+
|123455659|   10954|20180723|         22772|            97.57|
+---------+--------+--------+--------------+-----------------+
only showing top 1 row



In [8]:
# Create a view for the join DataFrame

df_join.createOrReplaceTempView('CC_CUST')

In [14]:
# Query ---values in input---  53066   2018   06

# data = spark.sql('select * from CC_CUST where CUST_ZIP == "53066" and substr(TIMEID,1,4) == "2018" and substr(TIMEID,5,2)=="06"')
# data.show(5)

+---------+--------+--------+--------------+-----------------+
|      SSN|CUST_ZIP|  TIMEID|TRANSACTION_ID|TRANSACTION_VALUE|
+---------+--------+--------+--------------+-----------------+
|123451357|   53066|20180606|         44947|            18.13|
|123451357|   53066|20180611|         44915|            86.64|
|123451357|   53066|20180610|         44900|            88.73|
|123451357|   53066|20180626|         44896|            13.47|
|123451357|   53066|20180617|         44868|            26.73|
+---------+--------+--------+--------------+-----------------+
only showing top 5 rows



2. Functional Requirements - Application Front-End

    2.1 Transaction Details Module

In [23]:
# 1. Used to display the transactions made by customers living in a given zip code for a given month and year. 
# Order by day in descending order.

# Function 
def trans_value(Zip,Year,Month):
    print("inside the function", Zip, Year, Month)
    data = spark.sql('select distinct * from CC_CUST where CUST_ZIP == "{}" and substr(TIMEID,1,4) == "{}" \
        and substr(TIMEID,5,2) == "{}" ORDER BY substr(TIMEID,7,2) DESC'.format(Zip,Year,Month))
    data.show(5)


Zip = str(input("Please Enter zipcode in the cell: "))
Year = str(input("Please Enter year in the cell: "))
Month = str(input("Please Enter month in the cell: "))
trans_value(Zip,Year,Month)


inside the function 53066 2018 06
+---------+--------+--------+--------------+-----------------+
|      SSN|CUST_ZIP|  TIMEID|TRANSACTION_ID|TRANSACTION_VALUE|
+---------+--------+--------+--------------+-----------------+
|123457562|   53066|20180628|         12436|            65.84|
|123458614|   53066|20180628|          6691|             19.2|
|123457286|   53066|20180627|         13783|            42.01|
|123451357|   53066|20180626|         44896|            13.47|
|123458614|   53066|20180625|          6697|             6.66|
+---------+--------+--------+--------------+-----------------+
only showing top 5 rows



In [24]:
# 2. Used to display the number and total values of transactions for a given type.

# Query for displaying transaction count for each transaction type and Total of Transaction value for each Transaction type

df_credit.select('transaction_type','transaction_value').groupby('transaction_type').agg(count('transaction_type'), \
      sum('transaction_value')).show()

+----------------+-----------------------+----------------------+
|transaction_type|count(transaction_type)|sum(transaction_value)|
+----------------+-----------------------+----------------------+
|       Education|                   6638|     337980.0700000016|
|   Entertainment|                   6635|    338950.09999999945|
|      Healthcare|                   6723|    340476.19999999896|
|         Grocery|                   6549|     337051.6299999997|
|            Test|                   6683|     341310.3700000002|
|             Gas|                   6605|    336059.26000000036|
|           Bills|                   6861|     351405.2800000001|
+----------------+-----------------------+----------------------+



In [25]:
# Create a view for Credit Card

df_credit.createOrReplaceTempView('cdw_cc')

In [18]:
# Execute the query in the Spark DataFrame 

result = spark.sql('SELECT transaction_type, COUNT(*) as trans_count, \
                round(SUM(transaction_value),2) as trans_val \
                FROM cdw_cc \
                GROUP BY transaction_type')
result.show()

+----------------+-----------+---------+
|transaction_type|trans_count|trans_val|
+----------------+-----------+---------+
|       Education|       6638|337980.07|
|   Entertainment|       6635| 338950.1|
|      Healthcare|       6723| 340476.2|
|         Grocery|       6549|337051.63|
|            Test|       6683|341310.37|
|             Gas|       6605|336059.26|
|           Bills|       6861|351405.28|
+----------------+-----------+---------+



In [33]:
# 3. Used to display the total number and total values of transactions for branches in a given state.

df_join_brcc = df_credit.join(df_branch, df_branch.BRANCH_CODE == df_credit.BRANCH_CODE, 'inner')\
          .select(df_branch['branch_code'].alias('Branch_code'), \
          col('branch_name'), col('branch_state'), col('transaction_value'))
df_join_brcc.show(5)

+-----------+------------+------------+-----------------+
|Branch_code| branch_name|branch_state|transaction_value|
+-----------+------------+------------+-----------------+
|         26|Example Bank|          TX|            56.12|
|         29|Example Bank|          CA|            17.03|
|         29|Example Bank|          CA|            86.18|
|         26|Example Bank|          TX|            25.01|
|         29|Example Bank|          CA|            66.37|
+-----------+------------+------------+-----------------+
only showing top 5 rows



In [34]:
# Create a view fro the joined Dataframe

df_join_brcc.createOrReplaceTempView('data_br_view')

In [37]:
# Create a Function

def trans_by_branch(state):
    result_state = spark.sql("SELECT branch_code, COUNT(branch_code) as Count, \
                round(SUM(transaction_value),2) as Total_transaction\
                FROM data_br_view \
                WHERE BRANCH_STATE == '{}' \
                GROUP BY branch_code \
                ORDER BY branch_code".format(state))
    result_state.show()

state = str(input("Enter State : "))
trans_by_branch(state)

+-----------+-----+-----------------+
|branch_code|Count|Total_transaction|
+-----------+-----+-----------------+
|         26|  408|         20156.75|
|         43|  414|         20605.86|
|         52|  378|         18923.41|
|         56|  398|         20544.27|
|        173|  431|         21234.44|
+-----------+-----+-----------------+

