In [1]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()
spark = SparkSession. \
    builder. \
    appName("project - itv005880"). \
    config('spark.shuffle.useOldFetchProtocol', 'true'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [2]:
!ls -lt /home/itv005880/project/

total 232
-rw-r--r-- 1 itv005880 students  32780 Sep 14 01:00 Credit_Card_Transaction_Analysis.ipynb
-rw-r--r-- 1 itv005880 students    769 Sep 14 00:19 card_holder_modified.csv
drwxr-xr-x 2 itv005880 students   4096 Sep 12 21:53 card_holder_modified
-rw-r--r-- 1 itv005880 students 175758 Sep 10 18:09 transaction.csv
-rw-r--r-- 1 itv005880 students    427 Sep 10 18:09 card_holder.csv
-rw-r--r-- 1 itv005880 students     60 Sep 10 18:09 merchant_category.csv
-rw-r--r-- 1 itv005880 students   3544 Sep 10 18:09 merchant.csv
-rw-r--r-- 1 itv005880 students    787 Sep 10 18:09 credit_card.csv


In [3]:
!hdfs dfs -mkdir /user/itv005880/project/

mkdir: `/user/itv005880/project': File exists


In [4]:
! hdfs dfs -put ./*.csv /user/itv005880/project/

put: `/user/itv005880/project/card_holder.csv': File exists
put: `/user/itv005880/project/card_holder_modified.csv': File exists
put: `/user/itv005880/project/credit_card.csv': File exists
put: `/user/itv005880/project/merchant_category.csv': File exists
put: `/user/itv005880/project/merchant.csv': File exists
put: `/user/itv005880/project/transaction.csv': File exists


In [5]:
! hdfs dfs -ls -r /user/itv005880/project/

Found 6 items
-rw-r--r--   3 itv005880 supergroup     175758 2023-09-14 00:20 /user/itv005880/project/transaction.csv
-rw-r--r--   3 itv005880 supergroup         60 2023-09-14 00:20 /user/itv005880/project/merchant_category.csv
-rw-r--r--   3 itv005880 supergroup       3544 2023-09-14 00:20 /user/itv005880/project/merchant.csv
-rw-r--r--   3 itv005880 supergroup        787 2023-09-14 00:20 /user/itv005880/project/credit_card.csv
-rw-r--r--   3 itv005880 supergroup        769 2023-09-14 00:20 /user/itv005880/project/card_holder_modified.csv
-rw-r--r--   3 itv005880 supergroup        427 2023-09-14 00:20 /user/itv005880/project/card_holder.csv


## Raw Layer

In [30]:
cc_holders_df = spark.read \
.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.load("/user/itv005880/project/card_holder_modified.csv")

In [31]:
cc_holders_df.show()

+---+----------+---------+----------+------+
| id|first_name|last_name| hire_date|gender|
+---+----------+---------+----------+------+
|  1|    Robert|  Johnson|2000-11-03|     M|
|  2|     Shane|  Shaffer|1989-08-16|     F|
|  3| Elizabeth|   Sawyer|1980-06-19|     F|
|  4|  Danielle|    Green|1975-01-02|     M|
|  5|      Sara|   Cooper|1986-03-02|     F|
|  6|      Beth|Hernandez|1975-03-19|  null|
|  7|      Sean|   Taylor|1978-02-06|     F|
|  8|   Michael|    Floyd|1999-06-19|     M|
|  9|    Laurie|     null|1995-03-06|     F|
| 10|   Matthew|Gutierrez|2011-03-19|     M|
| 11|   Brandon|   Pineda|2001-06-19|     M|
| 12|     Megan|    Price|1960-03-28|     F|
| 13|      John|   Martin|      null|     M|
| 14|      Gary|   Jacobs|2001-11-03|     M|
| 15|      Kyle|   Tucker|1988-11-03|     M|
| 16|   Crystal|    Clark|1986-11-03|     F|
| 17|   Michael|  Carroll|1975-11-03|     F|
| 18|     Malik|  Carlson|1999-11-03|     M|
| 19|     Peter|    Mckay|2001-11-03|     M|
| 20|     

In [32]:
cc_df = spark.read \
.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.load("/user/itv005880/project/credit_card.csv")

In [33]:
cc_df.show()

+----------+--------------+
|      card|id_card_holder|
+----------+--------------+
|3.51711E15|             1|
|4.76105E18|             1|
|4.86676E18|             2|
|6.75911E11|             2|
|3.00783E13|             3|
|4.26369E15|             4|
|5.84227E11|             4|
|4.27647E12|             5|
|4.26849E15|             5|
|3.58135E15|             6|
|4.15984E18|             6|
|3.51695E15|             7|
|4.53999E15|             7|
|4.83448E15|             8|
|3.00633E13|             8|
| 3.0182E13|             9|
|4.96292E18|            10|
|4.16531E18|            10|
|2.13194E14|            10|
|1.80099E14|            11|
+----------+--------------+
only showing top 20 rows



In [34]:
merchant_category_df = spark.read \
.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.load("/user/itv005880/project/merchant_category.csv")

In [35]:
merchant_category_df.show()

+---+-----------+
| id|       name|
+---+-----------+
|  1| restaurant|
|  2|coffee shop|
|  3|        bar|
|  4|        pub|
|  5| food truck|
+---+-----------+



In [36]:
merchant_df = spark.read \
.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.load("/user/itv005880/project/merchant.csv")

In [37]:
merchant_df.show()

+---+--------------------+--------------------+
| id|                name|id_merchant_category|
+---+--------------------+--------------------+
|  1|Murphy, Heath and...|                   1|
|  2|         Riggs-Adams|                   1|
|  3|Sanders, Parks an...|                   2|
|  4|      Mccarty-Thomas|                   3|
|  5|      Miller-Blevins|                   4|
|  6|     Wilson and Sons|                   1|
|  7|         Gomez-Kelly|                   4|
|  8|      Russell-Thomas|                   1|
|  9|Curry, Scott and ...|                   3|
| 10|       Herrera Group|                   1|
| 11|       Stanton Group|                   4|
| 12|Bell, Gonzalez an...|                   4|
| 13|      Giles and Sons|                   4|
| 14|        Osborne-Page|                   2|
| 15|Long, Harrell and...|                   5|
| 16|Bryant, Thomas an...|                   4|
| 17|          Bauer-Cole|                   3|
| 18|       Romero-Jordan|              

In [38]:
cc_transaction_df = spark.read \
.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.load("/user/itv005880/project/transaction.csv")

In [39]:
cc_transaction_df.show()

+----+-------------------+------+----------------+-----------+
|  id|               date|amount|            card|id_merchant|
+----+-------------------+------+----------------+-----------+
| 222|2018-01-01 21:35:10|  6.22|3561954487988605|         69|
|2045|2018-01-01 21:43:12|  3.83|5135837688671496|         85|
| 395|2018-01-01 22:41:21|  9.61| 213193946980303|         82|
|3309|2018-01-01 23:13:30| 19.03|4263694062533017|          5|
| 567|2018-01-01 23:15:10|  2.95|   4498002758300|         64|
|1683|2018-01-02 01:13:21| 11.24|4263694062533017|        127|
|2083|2018-01-02 02:06:21|  1.46|   4319653513507|         93|
|3488|2018-01-02 04:36:45|  3.36|4506405265172173|        136|
|2635|2018-01-02 05:45:43| 16.69|5297187379298983|        120|
| 432|2018-01-02 10:13:09|  8.55|5175947111814778|         70|
|2918|2018-01-02 11:28:35| 15.85| 180098539019105|         16|
|2020|2018-01-02 13:17:15|  2.64|    501879657465|         84|
|1084|2018-01-02 15:10:33| 10.52|6500236164848279|     

In [40]:
# Null Handeling
# Filling up NULLs with NA for Gender, first & last name
cc_holders_df = cc_holders_df.fillna({"first_name": "NA", "last_name": "NA", "gender": "NA"})

In [41]:
cc_holders_df.show()

+---+----------+---------+----------+------+
| id|first_name|last_name| hire_date|gender|
+---+----------+---------+----------+------+
|  1|    Robert|  Johnson|2000-11-03|     M|
|  2|     Shane|  Shaffer|1989-08-16|     F|
|  3| Elizabeth|   Sawyer|1980-06-19|     F|
|  4|  Danielle|    Green|1975-01-02|     M|
|  5|      Sara|   Cooper|1986-03-02|     F|
|  6|      Beth|Hernandez|1975-03-19|    NA|
|  7|      Sean|   Taylor|1978-02-06|     F|
|  8|   Michael|    Floyd|1999-06-19|     M|
|  9|    Laurie|       NA|1995-03-06|     F|
| 10|   Matthew|Gutierrez|2011-03-19|     M|
| 11|   Brandon|   Pineda|2001-06-19|     M|
| 12|     Megan|    Price|1960-03-28|     F|
| 13|      John|   Martin|      null|     M|
| 14|      Gary|   Jacobs|2001-11-03|     M|
| 15|      Kyle|   Tucker|1988-11-03|     M|
| 16|   Crystal|    Clark|1986-11-03|     F|
| 17|   Michael|  Carroll|1975-11-03|     F|
| 18|     Malik|  Carlson|1999-11-03|     M|
| 19|     Peter|    Mckay|2001-11-03|     M|
| 20|     

## Prepared Layer DataSet

In [42]:
cc_holders_df.createOrReplaceTempView("cc_holders")

In [43]:
from pyspark.sql.functions import *

# Join First and Last name to create full name
# Calculate age

cc_holders_df = spark.sql("""
            select  id, 
                    first_name, 
                    last_name, 
                    first_name || ' ' || last_name as full_name, 
                    hire_date, 
                    gender, 
                    ceil(datediff(current_date(),hire_date)/365) as age 
                    from cc_holders
         """)

In [44]:
cc_holders_df.show()

+---+----------+---------+-----------------+----------+------+----+
| id|first_name|last_name|        full_name| hire_date|gender| age|
+---+----------+---------+-----------------+----------+------+----+
|  1|    Robert|  Johnson|   Robert Johnson|2000-11-03|     M|  23|
|  2|     Shane|  Shaffer|    Shane Shaffer|1989-08-16|     F|  35|
|  3| Elizabeth|   Sawyer| Elizabeth Sawyer|1980-06-19|     F|  44|
|  4|  Danielle|    Green|   Danielle Green|1975-01-02|     M|  49|
|  5|      Sara|   Cooper|      Sara Cooper|1986-03-02|     F|  38|
|  6|      Beth|Hernandez|   Beth Hernandez|1975-03-19|    NA|  49|
|  7|      Sean|   Taylor|      Sean Taylor|1978-02-06|     F|  46|
|  8|   Michael|    Floyd|    Michael Floyd|1999-06-19|     M|  25|
|  9|    Laurie|       NA|        Laurie NA|1995-03-06|     F|  29|
| 10|   Matthew|Gutierrez|Matthew Gutierrez|2011-03-19|     M|  13|
| 11|   Brandon|   Pineda|   Brandon Pineda|2001-06-19|     M|  23|
| 12|     Megan|    Price|      Megan Price|1960

In [45]:
mean_age = cc_holders_df.select(mean('age')).collect()[0][0] 
cc_holders_df = cc_holders_df.fillna({"age": mean_age})

In [46]:
cc_holders_df.show()

+---+----------+---------+-----------------+----------+------+---+
| id|first_name|last_name|        full_name| hire_date|gender|age|
+---+----------+---------+-----------------+----------+------+---+
|  1|    Robert|  Johnson|   Robert Johnson|2000-11-03|     M| 23|
|  2|     Shane|  Shaffer|    Shane Shaffer|1989-08-16|     F| 35|
|  3| Elizabeth|   Sawyer| Elizabeth Sawyer|1980-06-19|     F| 44|
|  4|  Danielle|    Green|   Danielle Green|1975-01-02|     M| 49|
|  5|      Sara|   Cooper|      Sara Cooper|1986-03-02|     F| 38|
|  6|      Beth|Hernandez|   Beth Hernandez|1975-03-19|    NA| 49|
|  7|      Sean|   Taylor|      Sean Taylor|1978-02-06|     F| 46|
|  8|   Michael|    Floyd|    Michael Floyd|1999-06-19|     M| 25|
|  9|    Laurie|       NA|        Laurie NA|1995-03-06|     F| 29|
| 10|   Matthew|Gutierrez|Matthew Gutierrez|2011-03-19|     M| 13|
| 11|   Brandon|   Pineda|   Brandon Pineda|2001-06-19|     M| 23|
| 12|     Megan|    Price|      Megan Price|1960-03-28|     F|

In [47]:
cc_holders_df.createOrReplaceTempView("cc_holders")
cc_df.createOrReplaceTempView("credit_cards")
merchant_category_df.createOrReplaceTempView("merchant_category")
merchant_df.createOrReplaceTempView("merchant")
cc_transaction_df.createOrReplaceTempView("cc_transactions")

In [49]:
customer_trans_df = spark.sql("""
                select  distinct c.id as customer_id,
                        --c.first_name,
                        --c.last_name,
                        c.full_name,
                        --c.hire_date,
                        c.gender,
                        c.age,
                        t.date as transaction_dt,
                        t.amount,
                        t.card,
                        --t.id_merchant,
                        --cc.card,
                        --cc.id_card_holder,
                        --m.id,
                        m.name as merchant_name,
                        --m.id_merchant_category,
                        --mc.id,
                        mc.name as merchant_category
                from cc_transactions t 
                join cc_holders c on c.id = t.id
                join credit_cards cc on cc.id_card_holder = c.id
                join merchant m on m.id = t.id_merchant
                join merchant_category mc on mc.id = m.id_merchant_category
         """)

In [50]:
customer_trans_df.show()

+-----------+-----------------+------+---+-------------------+------+-------------------+--------------------+-----------------+
|customer_id|        full_name|gender|age|     transaction_dt|amount|               card|       merchant_name|merchant_category|
+-----------+-----------------+------+---+-------------------+------+-------------------+--------------------+-----------------+
|         20|    Kevin Spencer|     M| 21|2018-07-05 01:36:32|  5.15|      4319653513507|            Horn Ltd|      coffee shop|
|         11|   Brandon Pineda|     M| 23|2018-02-04 04:00:18|   6.9|   6500236164848279|    Russell and Sons|              pub|
|         15|      Kyle Tucker|     M| 35|2018-06-01 13:35:17|  4.48|4723783028106084756|      Perry and Sons|              bar|
|         13|      John Martin|     M| 39|2018-02-02 21:41:12|  4.29|   6500236164848279|Huerta, Keith and...|       food truck|
|         10|Matthew Gutierrez|     M| 13|2018-12-26 19:55:23|  1.45|   3561072557118696|       R

## Curated Layer

In [58]:
# Add Age Group
spark.sql(""" select c.id, c.age,
                case when age <20 then "below_20"  
                     when age >=20 and age < 30 then "20_30"  
                     when age >=30 and age < 40 then "30_40"  
                     when age >=40 and age < 50 then "40_50"  
                     when age >=50 and age < 60 then "50_60"  
                     when age >=60 then "above_60"  
                end as age_group
                from cc_holders c
         """).show()

+---+---+---------+
| id|age|age_group|
+---+---+---------+
|  1| 23|    20_30|
|  2| 35|    30_40|
|  3| 44|    40_50|
|  4| 49|    40_50|
|  5| 38|    30_40|
|  6| 49|    40_50|
|  7| 46|    40_50|
|  8| 25|    20_30|
|  9| 29|    20_30|
| 10| 13| below_20|
| 11| 23|    20_30|
| 12| 64| above_60|
| 13| 39|    30_40|
| 14| 22|    20_30|
| 15| 35|    30_40|
| 16| 37|    30_40|
| 17| 48|    40_50|
| 18| 24|    20_30|
| 19| 22|    20_30|
| 20| 21|    20_30|
+---+---+---------+
only showing top 20 rows



In [None]:
# Create Customer Segment

In [None]:
# Map Trasnactions based on Segment

In [44]:
spark.close()

AttributeError: 'SparkSession' object has no attribute 'close'