In [306]:
import pyspark
import os
import json
import argparse

from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql.types import StructType
from pyspark.sql.functions import to_timestamp,col,when,count,isnan,month
import pyspark.sql.functions as F

## Initialization

In [39]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [40]:
postgres_host = os.getenv('POSTGRES_CONTAINER_NAME')
postgres_port = os.getenv('POSTGRES_PORT')
postgres_dw_db = os.getenv('POSTGRES_DW_DB')
postgres_user = os.getenv('POSTGRES_USER')
postgres_password = os.getenv('POSTGRES_PASSWORD')

In [41]:
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Dibimbing')
        .setMaster('local[4]')
        .set("spark.jars", "/opt/postgresql-42.2.18.jar")
    ))
sparkcontext.setLogLevel("WARN")

spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

In [35]:
spark

## UDF

In [34]:
from pyspark.sql.functions import col, udf, pandas_udf
from pyspark.sql.types import IntegerType

# Create a sample dataframe
df = spark.createDataFrame([(1, "apple"), (2, "banana"), (3, "orange"), 
                            (4, "apple"), (5, "banana"), (6, "orange"),
                            (7, "apple"), (8, "banana"), (9, "orange"),
                            (10, "apple")], ["id", "fruit"])

In [7]:
# Define the UDF logic
def string_length(s):
    return len(s)

# Define the Python UDF
string_length_udf = udf(string_length, IntegerType())

# Apply the Python UDF and display the result
df1 = df.withColumn("length", string_length_udf(col("fruit")))
df1.show()

+---+------+------+
| id| fruit|length|
+---+------+------+
|  1| apple|     5|
|  2|banana|     6|
|  3|orange|     6|
|  4| apple|     5|
|  5|banana|     6|
|  6|orange|     6|
|  7| apple|     5|
|  8|banana|     6|
|  9|orange|     6|
| 10| apple|     5|
+---+------+------+



In [8]:
import pandas as pd

# Define the Pandas UDF
@pandas_udf(IntegerType())
def string_length_pandas_udf(s: pd.Series) -> pd.Series:
    return s.str.len()
    
# Apply the Pandas UDF and display the result
df2 = df.withColumn("length", string_length_pandas_udf(col("fruit")))
df2.show()

+---+------+------+
| id| fruit|length|
+---+------+------+
|  1| apple|     5|
|  2|banana|     6|
|  3|orange|     6|
|  4| apple|     5|
|  5|banana|     6|
|  6|orange|     6|
|  7| apple|     5|
|  8|banana|     6|
|  9|orange|     6|
| 10| apple|     5|
+---+------+------+



## Join

In [6]:
spark.conf.set("spark.sql.adaptive.enabled", "false")

In [7]:
# define schema for purchases dataset
purchases_schema = "order_id int, customer_id int, product_id int, quantity int, price float"

# create purchases dataframe
purchases_data = [
    (101, 1, 1, 2, 19.99),
    (102, 2, 2, 1, 9.99),
    (103, 3, 3, 1, 15.99),
    (104, 1, 4, 1, 5.99),
    (105, 2, 5, 3, 12.99),
    (106, 3, 6, 2, 9.99),
    (107, 4, 7, 1, 11.99),
    (108, 1, 8, 2, 14.99),
    (109, 2, 9, 1, 9.99),
    (110, 3, 10, 1, 19.99)
]
purchases_df = spark.createDataFrame(purchases_data, schema=purchases_schema)

# define schema for customers dataset
customers_schema = "customer_id int, name string, email string"

# create customers dataframe
customers_data = [
    (1, "John Doe", "johndoe@example.com"),
    (2, "Jane Smith", "janesmith@example.com"),
    (3, "Bob Johnson", "bobjohnson@example.com"),
    (4, "Sue Lee", "suelee@example.com")
]
customers_df = spark.createDataFrame(customers_data, schema=customers_schema)

# define schema for products dataset
products_schema = "product_id int, name string, price float"

# create products dataframe
products_data = [
    (1, "Product A", 19.99),
    (2, "Product B", 9.99),
    (3, "Product C", 15.99),
    (4, "Product D", 5.99),
    (5, "Product E", 12.99),
    (6, "Product F", 9.99),
    (7, "Product G", 11.99),
    (8, "Product H", 14.99),
    (9, "Product I", 9.99),
    (10, "Product J", 19.99)
]
products_df = spark.createDataFrame(products_data, schema=products_schema)

In [8]:

# set join preferences
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# perform sort merge join
merged_df = (
    purchases_df
    .join(customers_df, "customer_id")
    .join(products_df, "product_id")
)

In [9]:
merged_df.show()

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         6|          3|     106|       2| 9.99|Bob Johnson|bobjohnson@exampl...|Product F| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
|         9|          2|     109|       1| 9.99| Jane Smith|janesmith@example...|Product I| 9.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         8|          1|     108|       2|14.99|   John Doe| johndoe@example.com|Product H|14.99|
|         7|        

In [15]:
merged_df.show()

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         6|          3|     106|       2| 9.99|Bob Johnson|bobjohnson@exampl...|Product F| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
|         9|          2|     109|       1| 9.99| Jane Smith|janesmith@example...|Product I| 9.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         8|          1|     108|       2|14.99|   John Doe| johndoe@example.com|Product H|14.99|
|         7|        

In [11]:
from pyspark.sql.functions import broadcast

# set join preferences
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")

# perform broadcast hash join
broadcast_df = purchases_df.join(broadcast(customers_df), "customer_id").join(broadcast(products_df), "product_id")
broadcast_df.show(5)

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         2|          2|     102|       1| 9.99| Jane Smith|janesmith@example...|Product B| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
only showing top 5 rows



In [12]:
broadcast_df.show(1)

+----------+-----------+--------+--------+-----+--------+-------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|    name|              email|     name|price|
+----------+-----------+--------+--------+-----+--------+-------------------+---------+-----+
|         1|          1|     101|       2|19.99|John Doe|johndoe@example.com|Product A|19.99|
+----------+-----------+--------+--------+-----+--------+-------------------+---------+-----+
only showing top 1 row



## Cache & Persist

In [16]:
broadcast_df.show()

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         2|          2|     102|       1| 9.99| Jane Smith|janesmith@example...|Product B| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
|         6|          3|     106|       2| 9.99|Bob Johnson|bobjohnson@exampl...|Product F| 9.99|
|         7|          4|     107|       1|11.99|    Sue Lee|  suelee@example.com|Product G|11.99|
|         8|        

In [17]:
# cache the purchases DataFrame
# broadcast_df.cache()
broadcast_df.unpersist()

DataFrame[product_id: int, customer_id: int, order_id: int, quantity: int, price: float, name: string, email: string, name: string, price: float]

In [18]:
from pyspark.sql.functions import col

# calculate the total purchase amount for each store using the cached DataFrame
store_purchase_totals = (
    purchases_df
    .withColumn("total_price",col("quantity")*col("price"))
    .groupBy("customer_id")
    .agg({"total_price":"sum"}).alias("total_purchase_amount")
)

In [19]:
# persist the store_purchase_totals DataFrame to disk
store_purchase_totals.persist(pyspark.StorageLevel.DISK_ONLY)
store_purchase_totals.persist(pyspark.StorageLevel.MEMORY_ONLY)


DataFrame[customer_id: int, sum(total_price): double]

In [20]:
# print the results
store_purchase_totals.show()

+-----------+------------------+
|customer_id|  sum(total_price)|
+-----------+------------------+
|          1| 75.94999885559082|
|          3|55.959999084472656|
|          4|11.989999771118164|
|          2| 58.95000076293945|
+-----------+------------------+



In [21]:
# print the results
store_purchase_totals.show()

+-----------+------------------+
|customer_id|  sum(total_price)|
+-----------+------------------+
|          1| 75.94999885559082|
|          3|55.959999084472656|
|          4|11.989999771118164|
|          2| 58.95000076293945|
+-----------+------------------+



In [17]:
# unpersist the store_purchase_totals DataFrame to free up memory
store_purchase_totals.unpersist()

DataFrame[customer_id: int, sum(total_price): double]

Balik lagi ke collab

# JDBC

In [415]:
jdbc_url = f'jdbc:postgresql://{postgres_host}:{postgres_port}/{postgres_dw_db}'
jdbc_properties = {
    'user': postgres_user,
    'password': postgres_password,
    'driver': 'org.postgresql.Driver',
    'stringtype': 'unspecified'
}

In [416]:
retail_df = spark.read.jdbc(
    jdbc_url,
    'public.retail',
    properties=jdbc_properties
)

In [417]:
retail_df.show(5)

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|invoiceno|stockcode|         description|quantity|invoicedate|unitprice|customerid|       country|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|   571408|    22972|CHILDREN'S SPACEB...|       1| 2011-10-17|     1.65|     15012|United Kingdom|
|   571408|    22970|LONDON BUS COFFEE...|       1| 2011-10-17|     2.55|     15012|United Kingdom|
|   571408|    22748|POPPY'S PLAYHOUSE...|       1| 2011-10-17|      2.1|     15012|United Kingdom|
|   571408|    23199|    JUMBO BAG APPLES|       1| 2011-10-17|     2.08|     15012|United Kingdom|
|   571408|   51014C|FEATHER PEN,COAL ...|      12| 2011-10-17|     0.39|     15012|United Kingdom|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
only showing top 5 rows



In [418]:
retail_df.printSchema()

root
 |-- invoiceno: string (nullable = true)
 |-- stockcode: string (nullable = true)
 |-- description: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- invoicedate: date (nullable = true)
 |-- unitprice: double (nullable = true)
 |-- customerid: string (nullable = true)
 |-- country: string (nullable = true)



In [419]:
retail_df.summary().toPandas()

Unnamed: 0,summary,invoiceno,stockcode,description,quantity,unitprice,customerid,country
0,count,541909,541909,540455,541909.0,541909.0,406829.0,541909
1,mean,559965.752026781,27623.240210938104,20713.0,9.55224954743324,4.611113626085855,15287.690570239583,
2,stddev,13428.417280789448,16799.737628427607,,218.0811578502368,96.75985306117953,1713.6003033216114,
3,min,536365,10002,4 PURPLE FLOCK DINNER CANDLES,-80995.0,-11062.06,12346.0,Australia
4,25%,547906.0,21929.0,20713.0,1.0,1.25,13953.0,
5,50%,560688.0,22569.0,20713.0,3.0,2.08,15152.0,
6,75%,571841.0,23165.0,20713.0,10.0,4.13,16791.0,
7,max,C581569,m,wrongly sold sets,80995.0,38970.0,18287.0,Unspecified


In [420]:
retail_df = retail_df.withColumn('trxvalue', F.round(F.col('unitprice')*F.col('quantity'),2))

In [421]:
retail_df.createOrReplaceTempView('retail')

In [422]:
# I don't want to speculate about the data and making transformation based on assumptions only.
# Thus, for safety measures, I will exclude data based on outliers of quantity and unitprice.
# Also, excluding NULL rows on customerid, invoiceno, stockcode.

In [423]:
# unitprice
retail_df = spark.sql('''
    WITH 
    q1 AS (
        SELECT 
        MAX(unitprice) AS quartile1_break
        FROM ( 
            SELECT *,
            NTILE(4) OVER (ORDER BY unitprice) AS unitprice_quartile
            FROM retail ) AS quartiles
        WHERE unitprice_quartile IN (1)
        GROUP BY unitprice_quartile
    ),
    q3 AS (
        SELECT 
        MAX(unitprice) AS quartile3_break
        FROM ( 
            SELECT *,
            NTILE(4) OVER (ORDER BY unitprice) AS unitprice_quartile
            FROM retail ) AS quartiles
        WHERE unitprice_quartile IN (3)
        GROUP BY unitprice_quartile
    ),
    iqr AS (
        SELECT *, q3.quartile3_break - q1.quartile1_break AS IQR
        FROM q1, q3
    )
    SELECT 
    retail.* 
    FROM retail, iqr
    WHERE unitprice BETWEEN iqr.quartile1_break-1.5*iqr.IQR AND iqr.quartile3_break+1.5*iqr.IQR
    AND unitprice != 0
''')

In [424]:
retail_df.createOrReplaceTempView('retail')

In [425]:
# quantity
retail_df = spark.sql('''
    WITH 
    q1 AS (
        SELECT 
        MAX(quantity) AS quartile1_break
        FROM ( 
            SELECT *,
            NTILE(4) OVER (ORDER BY quantity) AS quantity_quartile
            FROM retail ) AS quartiles
        WHERE quantity_quartile IN (1)
        GROUP BY quantity_quartile
    ),
    q3 AS (
        SELECT 
        MAX(quantity) AS quartile3_break
        FROM ( 
            SELECT *,
            NTILE(4) OVER (ORDER BY quantity) AS quantity_quartile
            FROM retail ) AS quartiles
        WHERE quantity_quartile IN (3)
        GROUP BY quantity_quartile
    ),
    iqr AS (
        SELECT *, q3.quartile3_break - q1.quartile1_break AS IQR
        FROM q1, q3
    )
    SELECT 
    retail.* 
    FROM retail, iqr
    WHERE quantity BETWEEN iqr.quartile1_break-1.5*iqr.IQR AND iqr.quartile3_break+1.5*iqr.IQR
    AND quantity > 0
''')

In [426]:
retail_df = retail_df.na.drop(subset=['customerid','invoiceno','stockcode'])
retail_df.summary().toPandas()

Unnamed: 0,summary,invoiceno,stockcode,description,quantity,unitprice,customerid,country,trxvalue
0,count,349841.0,349841,349841,349841.0,349841.0,349841.0,349841,349841.0
1,mean,560771.0449632833,27297.921951718177,,7.322272117904991,2.3854106694163124,15323.480824145829,,13.05017742345948
2,stddev,13128.4467064054,16174.389084558114,,6.74119462603656,1.8450473165319192,1708.7909104390285,,13.647388647460373
3,min,536365.0,10002,4 PURPLE FLOCK DINNER CANDLES,1.0,0.001,12347.0,Australia,0.0
4,25%,549294.0,21933.0,,2.0,1.25,13995.0,,3.95
5,50%,562101.0,22601.0,,5.0,1.69,15237.0,,10.2
6,75%,572283.0,23194.0,,12.0,2.95,16817.0,,17.34
7,max,581587.0,POST,ZINC WIRE SWEETHEART LETTER TRAY,28.0,8.29,18287.0,Unspecified,214.2


In [427]:
retail_df.createOrReplaceTempView('retail')

In [431]:
# Descriptive Analytics
spark.sql('''
    SELECT 
    DISTINCT(YEAR(invoicedate)) AS year,
    COUNT(DISTINCT(MONTH(invoicedate))) AS n_month,
    COUNT(DISTINCT(customerid)) AS total_users,
    COUNT(DISTINCT(invoiceno)) AS total_transaction, 
    ROUND(SUM(trxvalue),2) AS total_transaction_value,
    ROUND(SUM(trxvalue)/COUNT(DISTINCT(invoiceno)),2) AS value_per_transaction,
    ROUND(SUM(trxvalue)/COUNT(DISTINCT(MONTH(invoicedate))),2) AS value_per_month
    FROM retail
    GROUP BY year
''').write.jdbc(url=jdbc_url, table='public.descriptive', mode="overwrite", properties=jdbc_properties)

In [432]:
# Top 10 Countries with The Highest Transaction Value
spark.sql('''
    SELECT 
    country,
    ROUND(SUM(trxvalue),2) AS total_transaction_value
    FROM retail
    GROUP BY country
    ORDER BY total_transaction_value DESC
    LIMIT 10
''').write.jdbc(url=jdbc_url, table='public.top10countries', mode="overwrite", properties=jdbc_properties)

In [433]:
# Top 10 Countries with Highest YoY % Incremental in Dec 2011
spark.sql('''
WITH 
data AS (
    SELECT 
    DISTINCT(YEAR(invoicedate)) AS year,
    MONTH(invoicedate) AS month,
    country AS country,
    ROUND(SUM(unitprice),2) AS total_transaction_value
    FROM retail
    GROUP BY country, year, month
    HAVING month==12    
    ORDER BY country, year
    )

SELECT
    country,
    ROUND(MAX(
        CASE
            WHEN prev_year_revenue IS NOT NULL
            THEN (total_transaction_value - prev_year_revenue) * 100.0 / prev_year_revenue
            ELSE NULL
        END
    ),2) AS YoY_percent_incremental
FROM (
    SELECT
        country,
        year,
        total_transaction_value,
        LAG(total_transaction_value) OVER (PARTITION BY country ORDER BY year) AS prev_year_revenue
    FROM data
) AS RevenueWithPrevious
GROUP BY country
HAVING YoY_percent_incremental > 0
ORDER BY YoY_percent_incremental DESC
LIMIT 10
''').write.jdbc(url=jdbc_url, table='public.top10countries_incremental', mode="overwrite", properties=jdbc_properties)

In [434]:
# Churn rate per quarter
spark.sql('''
WITH 
quarters AS (
    SELECT DISTINCT
        customerid,
        CASE
            WHEN EXTRACT(MONTH FROM invoicedate) BETWEEN 1 AND 3 THEN CONCAT(EXTRACT(YEAR FROM invoicedate), '-Q1')
            WHEN EXTRACT(MONTH FROM invoicedate) BETWEEN 4 AND 6 THEN CONCAT(EXTRACT(YEAR FROM invoicedate), '-Q2')
            WHEN EXTRACT(MONTH FROM invoicedate) BETWEEN 7 AND 9 THEN CONCAT(EXTRACT(YEAR FROM invoicedate), '-Q3')
            ELSE CONCAT(EXTRACT(YEAR FROM invoicedate), '-Q4')
        END AS quarter
    FROM retail
    ORDER BY customerid
),
userid AS (
    SELECT DISTINCT 
        customerid
    FROM retail
),
period AS (
    SELECT DISTINCT
        quarter
    FROM Quarters
),
cross AS (
    SELECT * FROM userid CROSS JOIN period
),
joined AS (
    SELECT 
        c.customerid, 
        c.quarter AS sort_q, 
        q.quarter AS active_q 
    FROM cross c
    LEFT JOIN quarters q ON c.customerid = q.customerid 
    AND c.quarter = q.quarter
),
record AS (
    SELECT 
        *,
        LEAD(active_q) OVER (PARTITION BY customerid ORDER BY sort_q) AS next_q
    FROM joined
),
almost AS (
    SELECT 
        customerid, sort_q, active_q, next_q 
    FROM record
    WHERE active_q IS NOT NULL
    ORDER BY customerid, active_q
),
active AS (
    SELECT 
        sort_q, COUNT(DISTINCT(customerid)) AS active 
    FROM almost
    WHERE active_q IS NOT NULL
    GROUP BY sort_q
    ORDER BY sort_q
),
churned AS (
    SELECT
        active_q, COUNT(DISTINCT(customerid)) AS churned
    FROM almost
    WHERE next_q IS NULL
    GROUP BY active_q
    ORDER BY active_q
)
SELECT
    a.sort_q AS quarter,
    COALESCE(c.churned, 0) AS churned_customers,
    COALESCE(a.active, 0) AS total_active_customers,
    CASE
        WHEN COALESCE(a.active, 0) = 0 THEN 0
        ELSE COALESCE(c.churned, 0) * 100.0 / COALESCE(a.active, 0)
    END AS churn_rate
FROM active a
LEFT JOIN churned c
ON a.sort_q = c.active_q
ORDER BY quarter;
''').write.jdbc(url=jdbc_url, table='public.churn_rate_quarterly', mode="overwrite", properties=jdbc_properties)

In [27]:
(
    retail_df
    .limit(10)
    .write
    .mode("append")
    .option("truncate", "true")
    .jdbc(
        jdbc_url,
        'public.sample_retail',
        properties=jdbc_properties
    )
)

In [28]:
(
    spark
    .read
    .jdbc(
        jdbc_url,
        'public.sample_retail',
        properties=jdbc_properties
    )
    .show()
)

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|invoiceno|stockcode|         description|quantity|invoicedate|unitprice|customerid|       country|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6| 2010-12-01|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8| 2010-12-01|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2| 2010-12-01|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6| 2010-12-01|     4.25|     17850|United Kingdom|
