
# Source:

https://www.youtube.com/watch?v=J6BSx-gNw7E&t=130s

In [12]:
# Dependencies
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from passwords import db_params

import warnings
warnings.filterwarnings('ignore')

In [2]:
# Check Java Version
!java -version

java version "21.0.2" 2024-01-16 LTS
Java(TM) SE Runtime Environment (build 21.0.2+13-LTS-58)
Java HotSpot(TM) 64-Bit Server VM (build 21.0.2+13-LTS-58, mixed mode, sharing)


In [3]:
# Spark config
spark = SparkSession.builder\
    .config('spark.app.name', 'ecommerce_etlpipeline')\
    .config('spark.jars', '../Resources/postgresql-42.7.1.jar')\
    .getOrCreate()

24/02/06 13:32:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
# Read in data locally
# df = spark.read\
#     .option('header', True)\
#     .option('inferSchema', True)\
#     .csv("../Resources/raw/ecommerce_customer_data_custom_ratios.csv")

# Define DB params
database_url = "jdbc:postgresql://localhost:5432/ecommerceDB"
properties = {
    "user": db_params['user'],
    "password": db_params['password'],
    "driver": "org.postgresql.Driver"
}

# Read in data via Java database connectivity 
df = spark.read.jdbc(url=database_url, table="customer_data", properties=properties)

In [5]:
# Create tempview for querying
df.createOrReplaceTempView('customer_data')

In [6]:
# Display table
df.show()

+-----------+-------------+----------------+-------------+--------+----------------------+--------------+------------+--------------------+-------------------+---+------+-----+
|customer_id|purchase_date|product_category|product_price|quantity|total_purchased_amount|payment_method|customer_age|            returned|      customer_name|age|gender|churn|
+-----------+-------------+----------------+-------------+--------+----------------------+--------------+------------+--------------------+-------------------+---+------+-----+
|      46251|   2020-09-08|     Electronics|           12|       3|                   740|   Credit Card|          37|0.000000000000000000|Christine Hernandez| 37|  Male|    0|
|      46251|   2022-03-05|            Home|          468|       4|                  2739|        PayPal|          37|0.000000000000000000|Christine Hernandez| 37|  Male|    0|
|      46251|   2022-05-23|            Home|          288|       2|                  3196|        PayPal|          

In [7]:
# Data Munging
try:
    customer_profile = spark.sql("""
        SELECT 
            customer_id, 
            customer_name,
            customer_age,
            gender,
            count(*) as order_count,
            sum(returned) as orders_returned,
            sum(churn) as total_churn,
            datediff(max(purchase_date), min(purchase_date)) as tenure_days,
            avg(product_price) as average_total_per_cart,
            sum(quantity) as total_qty_purchased,
            sum(total_purchased_amount) as total_spent_historic
        FROM 
            customer_data
        GROUP BY 
            customer_id, customer_name, customer_age, age, gender
        """)
    customer_shopping_profile = spark.sql("""
        SELECT 
            customer_id, 
            sum(CASE WHEN payment_method = 'Credit Card' THEN 1 ELSE 0 END) as payment_creditcard,
            sum(CASE WHEN payment_method = 'Debit' THEN 1 ELSE 0 END) as payment_debit,
            sum(CASE WHEN payment_method = 'Crypto' THEN 1 ELSE 0 END) as payment_crypto,
            sum(CASE WHEN payment_method = 'Paypal' THEN 1 ELSE 0 END) as payment_paypal,
            sum(CASE WHEN payment_method = 'Cash' THEN 1 ELSE 0 END) as payment_cash,
            sum(CASE WHEN product_category = 'Electronics' THEN 1 ELSE 0 END) as cat_electronics,
            sum(CASE WHEN product_category = 'Home' THEN 1 ELSE 0 END) as cat_home,
            sum(CASE WHEN product_category = 'Clothing' THEN 1 ELSE 0 END) as cat_clothing,
            sum(CASE WHEN product_category = 'Books' THEN 1 ELSE 0 END) as cat_books
        FROM 
            customer_data
        GROUP BY 
            customer_id, customer_name, customer_age, age, gender
        """)
    print('Data transformed successfully!')
except Exception as err:
    print('Error Encountered during transformation:' + str(err))

Data transformed successfully!


In [8]:
customer_profile.show()

[Stage 1:>                                                          (0 + 1) / 1]

+-----------+-----------------+------------+------+-----------+--------------------+-----------+-----------+----------------------+-------------------+--------------------+
|customer_id|    customer_name|customer_age|gender|order_count|     orders_returned|total_churn|tenure_days|average_total_per_cart|total_qty_purchased|total_spent_historic|
+-----------+-----------------+------------+------+-----------+--------------------+-----------+-----------+----------------------+-------------------+--------------------+
|      48644|     Marcus Eaton|          69|Female|          4|1.000000000000000000|          0|       1025|                 130.0|                  6|               15548|
|       5847|      Lauren Wood|          70|  Male|          3|1.000000000000000000|          3|       1146|     316.3333333333333|                  9|                3164|
|      31097|        Anne Frye|          25|  Male|          3|2.000000000000000000|          0|        622|    201.66666666666666|    

                                                                                

In [9]:
customer_shopping_profile.show()

+-----------+------------------+-------------+--------------+--------------+------------+---------------+--------+------------+---------+
|customer_id|payment_creditcard|payment_debit|payment_crypto|payment_paypal|payment_cash|cat_electronics|cat_home|cat_clothing|cat_books|
+-----------+------------------+-------------+--------------+--------------+------------+---------------+--------+------------+---------+
|      48644|                 1|            0|             0|             0|           1|              1|       1|           2|        0|
|       5847|                 1|            0|             0|             0|           2|              1|       2|           0|        0|
|      31097|                 1|            0|             0|             0|           1|              0|       1|           1|        1|
|      18817|                 1|            0|             0|             0|           2|              1|       2|           2|        1|
|       6814|                 3|  

In [10]:
# Load data
try:
    customer_profile.write\
        .option('mode', 'overwrite') \
        .option('user', properties['user']) \
        .option('password', properties['password']) \
        .option('driver', properties['driver']) \
        .jdbc(table= 'customer_profile',
              url=database_url)
    print('Customer Profile Data loaded successfully!')
except Exception as err:
    print('Error Encountered during transformation:' + str(err))


try:
    customer_shopping_profile.write \
        .option('mode', 'overwrite') \
        .option('user', properties['user']) \
        .option('password', properties['password']) \
        .option('driver', properties['driver']) \
        .jdbc(table= 'customer_shopping_profile',
              url=database_url)
    print('Customer Shopping Profile Data loaded successfully!')
except Exception as err:
    print('Error Encountered during transformation:' + str(err))



Customer Profile Data loaded successfully!
Customer Shopping Profile Data loaded successfully!


In [11]:
spark.stop()