In [1]:
import pyspark
import os
import json
import argparse

from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql.types import StructType
from pyspark.sql.functions import to_timestamp,col,when

In [2]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [3]:
postgres_host = os.getenv('POSTGRES_CONTAINER_NAME')
postgres_dw_db = os.getenv('POSTGRES_DW_DB')
postgres_user = os.getenv('POSTGRES_USER')
postgres_password = os.getenv('POSTGRES_PASSWORD')

In [4]:
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Dibimbing')
        .setMaster('local')
        .set("spark.jars", "/opt/postgresql-42.2.18.jar")
    ))
sparkcontext.setLogLevel("WARN")

spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

In [5]:
jdbc_url = f'jdbc:postgresql://{postgres_host}/{postgres_dw_db}'
jdbc_properties = {
    'user': postgres_user,
    'password': postgres_password,
    'driver': 'org.postgresql.Driver',
    'stringtype': 'unspecified'
}

In [6]:
retail_df = spark.read.jdbc(
    jdbc_url,
    'public.retail',
    properties=jdbc_properties
)

In [7]:
retail_df.printSchema()

root
 |-- invoiceno: string (nullable = true)
 |-- stockcode: string (nullable = true)
 |-- description: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- invoicedate: date (nullable = true)
 |-- unitprice: double (nullable = true)
 |-- customerid: string (nullable = true)
 |-- country: string (nullable = true)



In [8]:
retail_df.show(10)

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|invoiceno|stockcode|         description|quantity|invoicedate|unitprice|customerid|       country|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6| 2010-12-01|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8| 2010-12-01|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2| 2010-12-01|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6| 2010-12-01|     4.25|     17850|United Kingdom|


In [13]:
retail_df = retail_df.withColumn("total_price", retail_df["unitprice"] * retail_df["quantity"])
retail_df.show(10)

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+------------------+
|invoiceno|stockcode|         description|quantity|invoicedate|unitprice|customerid|       country|       total_price|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6| 2010-12-01|     2.55|     17850|United Kingdom|15.299999999999999|
|   536365|    71053| WHITE METAL LANTERN|       6| 2010-12-01|     3.39|     17850|United Kingdom|             20.34|
|   536365|   84406B|CREAM CUPID HEART...|       8| 2010-12-01|     2.75|     17850|United Kingdom|              22.0|
|   536365|   84029G|KNITTED UNION FLA...|       6| 2010-12-01|     3.39|     17850|United Kingdom|             20.34|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6| 2010-12-01|     3.39|     17850|United Kingdom|             20.34|
|   536365|    22752|SET 7 BABUSHKA NE...|      

In [20]:
from pyspark.sql import functions as F

max_date = retail_df.agg(F.max("invoicedate")).collect()[0][0]

rfm_df = retail_df.groupBy("customerid").agg(
    (F.datediff(F.lit(max_date), F.max("invoicedate"))).alias("Recency"), 
    F.count("invoiceno").alias("Frequency"),                               
    F.sum(F.col("quantity") * F.col("unitprice")).alias("Monetary")       
)

rfm_df.show()


+----------+-------+---------+------------------+
|customerid|Recency|Frequency|          Monetary|
+----------+-------+---------+------------------+
|     16250|    261|       24|389.44000000000005|
|     15574|    177|      168| 702.2500000000001|
|     15555|     12|      925| 4758.199999999989|
|     15271|      7|      275| 2485.820000000002|
|     17714|    320|       10|             153.0|
|     17686|      7|      286| 5739.460000000007|
|     13865|     58|       30|            501.56|
|     14157|     19|       49| 400.4300000000002|
|     13610|     12|      228|1115.4300000000005|
|     13772|     33|      177|           1132.13|
|     13282|     18|       40|           1047.84|
|     12394|     63|       27|           1272.48|
|     16320|    172|       56|           1038.46|
|     13192|     95|       63| 911.9400000000002|
|     14887|     79|        6|            1862.0|
|     17506|     70|       16|294.28999999999996|
|     17427|     71|        2|             100.8|


In [21]:
(
    rfm_df
    .write
    .mode("append")
    .option("truncate", "true")
    .jdbc(
        jdbc_url,
        'public.rfm_retail',
        properties=jdbc_properties
    )
)


In [22]:
(
    spark
    .read
    .jdbc(
        jdbc_url,
        'public.rfm_retail',
        properties=jdbc_properties
    )
    .show()
)

+----------+-------+---------+------------------+
|customerid|Recency|Frequency|          Monetary|
+----------+-------+---------+------------------+
|     16250|    261|       24|389.44000000000005|
|     15574|    177|      168| 702.2500000000001|
|     15555|     12|      925| 4758.199999999989|
|     15271|      7|      275| 2485.820000000002|
|     17714|    320|       10|             153.0|
|     17686|      7|      286| 5739.460000000007|
|     13865|     58|       30|            501.56|
|     14157|     19|       49| 400.4300000000002|
|     13610|     12|      228|1115.4300000000005|
|     13772|     33|      177|           1132.13|
|     13282|     18|       40|           1047.84|
|     12394|     63|       27|           1272.48|
|     16320|    172|       56|           1038.46|
|     13192|     95|       63| 911.9400000000002|
|     14887|     79|        6|            1862.0|
|     17506|     70|       16|294.28999999999996|
|     17427|     71|        2|             100.8|
