In [1]:
import pyspark
import os
import json
import argparse

from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql.types import StructType
from pyspark.sql.functions import to_timestamp,col,when

## Initialization

In [2]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [3]:
postgres_host = os.getenv('POSTGRES_CONTAINER_NAME')
postgres_db = os.getenv('POSTGRES_DB')
postgres_user = os.getenv('POSTGRES_USER')
postgres_password = os.getenv('POSTGRES_PASSWORD')

In [4]:
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Dibimbing')
        .setMaster('local')
        .set("spark.jars", "/opt/postgresql-42.2.18.jar")
    ))
sparkcontext.setLogLevel("WARN")

spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

In [5]:
spark

## UDF

In [6]:
from pyspark.sql.functions import col, udf, pandas_udf
from pyspark.sql.types import IntegerType

# Create a sample dataframe
df = spark.createDataFrame([(1, "apple"), (2, "banana"), (3, "orange"), 
                            (4, "apple"), (5, "banana"), (6, "orange"),
                            (7, "apple"), (8, "banana"), (9, "orange"),
                            (10, "apple")], ["id", "fruit"])

In [7]:
# Define the UDF logic
def string_length(s):
    return len(s)

# Define the Python UDF
string_length_udf = udf(string_length, IntegerType())

# Apply the Python UDF and display the result
df1 = df.withColumn("length", string_length_udf(col("fruit")))
df1.show()

+---+------+------+
| id| fruit|length|
+---+------+------+
|  1| apple|     5|
|  2|banana|     6|
|  3|orange|     6|
|  4| apple|     5|
|  5|banana|     6|
|  6|orange|     6|
|  7| apple|     5|
|  8|banana|     6|
|  9|orange|     6|
| 10| apple|     5|
+---+------+------+



In [8]:
import pandas as pd

# Define the Pandas UDF
@pandas_udf(IntegerType())
def string_length_pandas_udf(s: pd.Series) -> pd.Series:
    return s.str.len()
    
# Apply the Pandas UDF and display the result
df2 = df.withColumn("length", string_length_pandas_udf(col("fruit")))
df2.show()

+---+------+------+
| id| fruit|length|
+---+------+------+
|  1| apple|     5|
|  2|banana|     6|
|  3|orange|     6|
|  4| apple|     5|
|  5|banana|     6|
|  6|orange|     6|
|  7| apple|     5|
|  8|banana|     6|
|  9|orange|     6|
| 10| apple|     5|
+---+------+------+



## Join

In [6]:
spark.conf.set("spark.sql.adaptive.enabled", "false")

In [7]:
# define schema for purchases dataset
purchases_schema = "order_id int, customer_id int, product_id int, quantity int, price float"

# create purchases dataframe
purchases_data = [
    (101, 1, 1, 2, 19.99),
    (102, 2, 2, 1, 9.99),
    (103, 3, 3, 1, 15.99),
    (104, 1, 4, 1, 5.99),
    (105, 2, 5, 3, 12.99),
    (106, 3, 6, 2, 9.99),
    (107, 4, 7, 1, 11.99),
    (108, 1, 8, 2, 14.99),
    (109, 2, 9, 1, 9.99),
    (110, 3, 10, 1, 19.99)
]
purchases_df = spark.createDataFrame(purchases_data, schema=purchases_schema)

# define schema for customers dataset
customers_schema = "customer_id int, name string, email string"

# create customers dataframe
customers_data = [
    (1, "John Doe", "johndoe@example.com"),
    (2, "Jane Smith", "janesmith@example.com"),
    (3, "Bob Johnson", "bobjohnson@example.com"),
    (4, "Sue Lee", "suelee@example.com")
]
customers_df = spark.createDataFrame(customers_data, schema=customers_schema)

# define schema for products dataset
products_schema = "product_id int, name string, price float"

# create products dataframe
products_data = [
    (1, "Product A", 19.99),
    (2, "Product B", 9.99),
    (3, "Product C", 15.99),
    (4, "Product D", 5.99),
    (5, "Product E", 12.99),
    (6, "Product F", 9.99),
    (7, "Product G", 11.99),
    (8, "Product H", 14.99),
    (9, "Product I", 9.99),
    (10, "Product J", 19.99)
]
products_df = spark.createDataFrame(products_data, schema=products_schema)

In [8]:

# set join preferences
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# perform sort merge join
merged_df = (
    purchases_df
    .join(customers_df, "customer_id")
    .join(products_df, "product_id")
)
merged_df.show()

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         6|          3|     106|       2| 9.99|Bob Johnson|bobjohnson@exampl...|Product F| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
|         9|          2|     109|       1| 9.99| Jane Smith|janesmith@example...|Product I| 9.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         8|          1|     108|       2|14.99|   John Doe| johndoe@example.com|Product H|14.99|
|         7|        

In [9]:
from pyspark.sql.functions import broadcast

# set join preferences
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")

# perform broadcast hash join
broadcast_df = purchases_df.join(broadcast(customers_df), "customer_id").join(broadcast(products_df), "product_id")
broadcast_df.show(5)

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         2|          2|     102|       1| 9.99| Jane Smith|janesmith@example...|Product B| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
only showing top 5 rows



In [10]:
broadcast_df.show(1)

+----------+-----------+--------+--------+-----+--------+-------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|    name|              email|     name|price|
+----------+-----------+--------+--------+-----+--------+-------------------+---------+-----+
|         1|          1|     101|       2|19.99|John Doe|johndoe@example.com|Product A|19.99|
+----------+-----------+--------+--------+-----+--------+-------------------+---------+-----+
only showing top 1 row



## Cache & Persist

In [11]:
broadcast_df.show()

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         2|          2|     102|       1| 9.99| Jane Smith|janesmith@example...|Product B| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
|         6|          3|     106|       2| 9.99|Bob Johnson|bobjohnson@exampl...|Product F| 9.99|
|         7|          4|     107|       1|11.99|    Sue Lee|  suelee@example.com|Product G|11.99|
|         8|        

In [10]:
# cache the purchases DataFrame
# broadcast_df.cache()
broadcast_df.unpersist()

DataFrame[product_id: int, customer_id: int, order_id: int, quantity: int, price: float, name: string, email: string, name: string, price: float]

In [12]:
from pyspark.sql.functions import col

# calculate the total purchase amount for each store using the cached DataFrame
store_purchase_totals = (
    purchases_df
    .withColumn("total_price",col("quantity")*col("price"))
    .groupBy("customer_id")
    .agg({"total_price":"sum"}).alias("total_purchase_amount")
)

In [13]:
# persist the store_purchase_totals DataFrame to disk
store_purchase_totals.persist(pyspark.StorageLevel.DISK_ONLY)
store_purchase_totals.persist(pyspark.StorageLevel.MEMORY_ONLY)


DataFrame[customer_id: int, sum(total_price): double]

In [14]:
# print the results
store_purchase_totals.show()

+-----------+------------------+
|customer_id|  sum(total_price)|
+-----------+------------------+
|          1| 75.94999885559082|
|          3|55.959999084472656|
|          4|11.989999771118164|
|          2| 58.95000076293945|
+-----------+------------------+



In [17]:
# unpersist the store_purchase_totals DataFrame to free up memory
store_purchase_totals.unpersist()

DataFrame[customer_id: int, sum(total_price): double]

Balik lagi ke collab

# JDBC

In [5]:
jdbc_url = f'jdbc:postgresql://{postgres_host}/{postgres_db}'
jdbc_properties = {
    'user': postgres_user,
    'password': postgres_password,
    'driver': 'org.postgresql.Driver',
    'stringtype': 'unspecified'
}

In [6]:
retail_df = spark.read.jdbc(
    jdbc_url,
    'public.retail',
    properties=jdbc_properties
)

In [7]:
retail_df.show(5)

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|invoiceno|stockcode|         description|quantity|invoicedate|unitprice|customerid|       country|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6| 2010-12-01|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8| 2010-12-01|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6| 2010-12-01|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
only showing top 5 rows



In [18]:
(
    retail_df
    .limit(10)
    .write
    .mode("append")
    .option("truncate", "true")
    .jdbc(
        jdbc_url,
        'public.sample_retail',
        properties=jdbc_properties
    )
)

In [19]:
retail_df.printSchema()

root
 |-- invoiceno: string (nullable = true)
 |-- stockcode: string (nullable = true)
 |-- description: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- invoicedate: date (nullable = true)
 |-- unitprice: double (nullable = true)
 |-- customerid: string (nullable = true)
 |-- country: string (nullable = true)



In [20]:
retail_df.createOrReplaceTempView('retail')
spark.sql('''
    SELECT
        country,
        ROUND(SUM(quantity * unitprice), 2) AS total_spendings,
        RANK() OVER(ORDER BY SUM(quantity * unitprice) DESC) AS ranking
    FROM
        retail
    GROUP BY
        country
    ORDER BY
        total_spendings DESC
''').show()

+---------------+---------------+-------+
|        country|total_spendings|ranking|
+---------------+---------------+-------+
| United Kingdom|     8187806.36|      1|
|    Netherlands|      284661.54|      2|
|           EIRE|      263276.82|      3|
|        Germany|      221698.21|      4|
|         France|       197403.9|      5|
|      Australia|      137077.27|      6|
|    Switzerland|       56385.35|      7|
|          Spain|       54774.58|      8|
|        Belgium|       40910.96|      9|
|         Sweden|       36595.91|     10|
|          Japan|       35340.62|     11|
|         Norway|       35163.46|     12|
|       Portugal|       29367.02|     13|
|        Finland|       22326.74|     14|
|Channel Islands|       20086.29|     15|
|        Denmark|       18768.14|     16|
|          Italy|       16890.51|     17|
|         Cyprus|       12946.29|     18|
|        Austria|       10154.32|     19|
|      Hong Kong|       10117.04|     20|
+---------------+---------------+-

In [21]:
retail_df.createOrReplaceTempView('retail')
spark.sql('''
    SELECT
        customerid,
        ROUND(SUM(quantity * unitprice), 2) AS total_spendings,
        RANK() OVER(ORDER BY SUM(quantity * unitprice) DESC) AS ranking
    FROM
        retail
    WHERE
        customerid IS NOT NULL
    GROUP BY
        customerid
    ORDER BY
        total_spendings DESC
    LIMIT 10
''').show()

+----------+---------------+-------+
|customerid|total_spendings|ranking|
+----------+---------------+-------+
|     14646|      279489.02|      1|
|     18102|      256438.49|      2|
|     17450|      187482.17|      3|
|     14911|      132572.62|      4|
|     12415|      123725.45|      5|
|     14156|      113384.14|      6|
|     17511|       88125.38|      7|
|     16684|       65892.08|      8|
|     13694|        62653.1|      9|
|     15311|       59419.34|     10|
+----------+---------------+-------+



In [22]:
retail_df.createOrReplaceTempView('retail')
spark.sql('''
    SELECT
        customerid,
        COUNT(DISTINCT invoiceno) AS times_shopped,
        RANK() OVER(ORDER BY COUNT(DISTINCT invoiceno) DESC) AS ranking
    FROM
        retail
    WHERE
        customerid IS NOT NULL
    GROUP BY
        customerid
    ORDER BY
        times_shopped DESC
    LIMIT 10
''').show()

+----------+-------------+-------+
|customerid|times_shopped|ranking|
+----------+-------------+-------+
|     14911|          248|      1|
|     12748|          224|      2|
|     17841|          169|      3|
|     14606|          128|      4|
|     15311|          118|      5|
|     13089|          118|      5|
|     12971|           89|      7|
|     14527|           86|      8|
|     13408|           81|      9|
|     14646|           77|     10|
+----------+-------------+-------+



In [23]:
retail_df.createOrReplaceTempView('retail')
spark.sql('''
    WITH ranked_items AS (
        SELECT
            country,
            description AS item_name,
            SUM(quantity) AS numbers_sold,
            RANK() OVER(PARTITION BY country ORDER BY SUM(quantity) DESC) AS ranking
        FROM
            retail
        WHERE
            customerid IS NOT NULL
        GROUP BY
            country,
            description
        ORDER BY
            country
    )

    SELECT
        country,
        item_name,
        numbers_sold,
        ranking
    FROM
        ranked_items
    WHERE
        ranking IN (1,2)
''').show()

+---------------+--------------------+------------+-------+
|        country|           item_name|numbers_sold|ranking|
+---------------+--------------------+------------+-------+
|      Australia|MINI PAINT SET VI...|        2916|      1|
|      Australia|  RABBIT NIGHT LIGHT|        1884|      2|
|        Austria|SET 12 KIDS COLOU...|         288|      1|
|        Austria|  MINI JIGSAW PURDEY|         240|      2|
|        Bahrain|ICE CREAM SUNDAE ...|          96|      1|
|        Bahrain| DOUGHNUT LIP GLOSS |          60|      2|
|        Belgium|ROUND SNACK BOXES...|         420|      2|
|        Belgium|PACK OF 72 RETROS...|         480|      1|
|         Brazil|GREEN REGENCY TEA...|          24|      1|
|         Brazil|SET/3 RED GINGHAM...|          24|      1|
|         Brazil|SMALL HEART FLOWE...|          24|      1|
|         Brazil|SET OF 6 SPICE TI...|          24|      1|
|         Brazil|ROSES REGENCY TEA...|          24|      1|
|         Brazil|PINK REGENCY TEAC...|  

In [24]:
retail_df.createOrReplaceTempView('retail')
spark.sql('''
    WITH ranked_items AS (
        SELECT
            DATE_PART('month', invoicedate) AS month_number,
            description AS item_name,
            SUM(quantity) AS numbers_sold,
            RANK() OVER(PARTITION BY DATE_PART('month', invoicedate) ORDER BY SUM(quantity) DESC) AS ranking
        FROM
            retail
        WHERE
            customerid IS NOT NULL
        GROUP BY
            DATE_PART('month', invoicedate),
            description
        ORDER BY
            month_number
    )

    SELECT
        month_number,
        item_name,
        numbers_sold,
        ranking
    FROM
        ranked_items
    WHERE
        ranking IN (1,2)
''').show()

+------------+--------------------+------------+-------+
|month_number|           item_name|numbers_sold|ranking|
+------------+--------------------+------------+-------+
|           1|WHITE HANGING HEA...|        5456|      1|
|           1|PACK OF 72 RETROS...|        3327|      2|
|           2|EMPIRE DESIGN ROS...|        3986|      1|
|           2|GROW A FLYTRAP OR...|        3795|      2|
|           3|JUMBO BAG RED RET...|        4923|      1|
|           3|WORLD WAR 2 GLIDE...|        3888|      2|
|           4|WORLD WAR 2 GLIDE...|       10128|      1|
|           4|PACK OF 72 RETROS...|        2768|      2|
|           5|SMALL POPCORN HOLDER|        6737|      1|
|           5|PACK OF 60 PINK P...|        3949|      2|
|           6|ASSORTED COLOURS ...|        3096|      2|
|           6|JUMBO BAG RED RET...|        3523|      1|
|           7|GIRLS ALPHABET IR...|        4896|      1|
|           7|WORLD WAR 2 GLIDE...|        3552|      2|
|           8|      POPCORN HOL

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 50836)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/