In [33]:
import pyspark
import os
import json
import argparse

from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql.types import StructType
from pyspark.sql.functions import to_timestamp,col,when

In [2]:
## Initialization

In [34]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [35]:
postgres_host = os.getenv('POSTGRES_CONTAINER_NAME')
postgres_db = os.getenv('POSTGRES_DB')
postgres_user = os.getenv('POSTGRES_USER')
postgres_password = os.getenv('POSTGRES_PASSWORD')

In [36]:
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Dibimbing')
        .setMaster('local')
        .set("spark.jars", "/opt/postgresql-42.2.18.jar")
    ))
sparkcontext.setLogLevel("WARN")

spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

In [37]:
jdbc_url = f'jdbc:postgresql://{postgres_host}/{postgres_db}'
jdbc_properties = {
    'user': postgres_user,
    'password': postgres_password,
    'driver': 'org.postgresql.Driver',
    'stringtype': 'unspecified'
}

In [38]:
retail_df = spark.read.jdbc(
    jdbc_url,
    'public.retail',
    properties=jdbc_properties
)

In [39]:
retail_df.show(10)

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|invoiceno|stockcode|         description|quantity|invoicedate|unitprice|customerid|       country|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6| 2010-12-01|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8| 2010-12-01|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2| 2010-12-01|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6| 2010-12-01|     4.25|     17850|United Kingdom|


In [9]:
# Simple aggregation (example: count by stock code)
aggregated_df = retail_df.groupBy("stockcode").count()

# Output (example: write to console and CSV)
aggregated_df.show()

+---------+-----+
|stockcode|count|
+---------+-----+
|    22728|  810|
|    21889|  607|
|   90210B|    7|
|    21259|  296|
|    21894|  135|
|    21452|  200|
|    22121|  141|
|    90022|   21|
|    21249|  119|
|    90143|   22|
|    84881|    8|
|    21248|   68|
|    22254|   61|
|    20868|   46|
|    21331|    8|
|   90197B|   27|
|    22596|  274|
|   90026D|    3|
|   90177A|   10|
|   84899F|    1|
+---------+-----+
only showing top 20 rows



In [13]:
# Churn and return (or re-engagement) analysis is essential for understanding customer behavior over time. 
# Using the given data, we can perform such an analysis by observing purchase patterns of each customer.

In [26]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Assuming data is loaded into a DataFrame named retail_df
retail_df = retail_df.withColumn("invoicedate", F.to_date("invoicedate", "yyyy-MM-dd"))

# Calculate the maximum purchase date for each customer
max_purchase_date = retail_df.groupBy("customerid").agg(F.max("invoicedate").alias("last_purchase_date"))

# Let's assume a customer who hasn't made a purchase in the last 30 days is considered churned
churned_customers = max_purchase_date.filter(F.datediff(F.current_date(), "last_purchase_date") > 30)


In [28]:
print(retail_df.dtypes)

[('invoiceno', 'string'), ('stockcode', 'string'), ('description', 'string'), ('quantity', 'int'), ('invoicedate', 'date'), ('unitprice', 'double'), ('customerid', 'string'), ('country', 'string')]


In [27]:
# For return analysis, we would observe these churned customers in the subsequent period. 
# This involves more complex logic such as observing their next purchase date after the churn date, etc.

# For this example, let's just count the number of churned customers
churned_count = churned_customers.count()

print(f"Number of churned customers: {churned_count}")

Number of churned customers: 4373


In [30]:
# Aggregation
# To get the total quantity of each product sold:

from pyspark.sql.functions import sum

aggregated_df = retail_df.groupBy("stockcode").agg(sum("quantity").alias("total_quantity"))
aggregated_df.show()


+---------+--------------+
|stockcode|total_quantity|
+---------+--------------+
|    20868|           458|
|    22728|          5323|
|    21889|          6377|
|   90197B|            35|
|    21249|           724|
|    22121|           305|
|    21259|          1117|
|    21248|           186|
|    22254|           657|
|    21452|           789|
|    21894|           557|
|    84881|             0|
|    22596|          3477|
|   90026D|             3|
|    90143|            31|
|   90177A|             8|
|    90022|            20|
|   84899F|            -5|
|   85132b|             9|
|    21331|            12|
+---------+--------------+
only showing top 20 rows



In [32]:
# rank
# We want to rank each product based on the quantity sold:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

windowSpec = Window.orderBy(retail_df["quantity"].desc())

ranked_df = retail_df.withColumn("rank", rank().over(windowSpec))
ranked_df.show()

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+----+
|invoiceno|stockcode|         description|quantity|invoicedate|unitprice|customerid|       country|rank|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+----+
|   581483|    23843|PAPER CRAFT , LIT...|   80995| 2011-12-09|     2.08|     16446|United Kingdom|   1|
|   541431|    23166|MEDIUM CERAMIC TO...|   74215| 2011-01-18|     1.04|     12346|United Kingdom|   2|
|   578841|    84826|ASSTD DESIGN 3D P...|   12540| 2011-11-25|      0.0|     13256|United Kingdom|   3|
|   542504|    37413|                null|    5568| 2011-01-28|      0.0|      null|United Kingdom|   4|
|   573008|    84077|WORLD WAR 2 GLIDE...|    4800| 2011-10-27|     0.21|     12901|United Kingdom|   5|
|   554868|    22197|SMALL POPCORN HOLDER|    4300| 2011-05-27|     0.72|     13135|United Kingdom|   6|
|   556231|   85123A|                   ?|    4000| 201