In [1]:
import pyspark
import os
import json
import argparse

from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql.types import StructType
from pyspark.sql.functions import to_timestamp,col,when

## Initialization

In [2]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [3]:
postgres_host = os.getenv('POSTGRES_CONTAINER_NAME')
postgres_dw_db = os.getenv('POSTGRES_DW_DB')
postgres_user = os.getenv('POSTGRES_USER')
postgres_password = os.getenv('POSTGRES_PASSWORD')

In [4]:
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Dibimbing')
        .setMaster('local')
        .set("spark.jars", "/opt/postgresql-42.2.18.jar")
    ))
sparkcontext.setLogLevel("WARN")

spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

In [5]:
spark

## Basic

In [6]:
schema = "invoice_no long, stock_code string, description string, quantity int, invoice_date string, unit_price float, customer_id int, country string"

df = spark.read.csv("/resources/data/online-retail-dataset.csv", header=True, schema=schema)
df = df.withColumn("invoice_date", to_timestamp(df["invoice_date"], "M/d/yyyy H:mm"))

In [7]:
# == Viewing Data ==
# Display top 5 rows
df.show(5)

# Display the schema of the DataFrame
df.printSchema()

# List all column names
df.columns

+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+
|invoice_no|stock_code|         description|quantity|       invoice_date|unit_price|customer_id|       country|
+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+
|    536365|    85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|      2.55|      17850|United Kingdom|
|    536365|     71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|      3.39|      17850|United Kingdom|
|    536365|    84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|      2.75|      17850|United Kingdom|
|    536365|    84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|      3.39|      17850|United Kingdom|
|    536365|    84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|      3.39|      17850|United Kingdom|
+----------+----------+--------------------+--------+-------------------+----------+-----------+--------

['invoice_no',
 'stock_code',
 'description',
 'quantity',
 'invoice_date',
 'unit_price',
 'customer_id',
 'country']

In [8]:
# == Selecting Columns ==
# Select specific columns
df.select("invoice_no", "stock_code").show()

# Using alias to rename a column
df.select(df.invoice_no.alias("invoice_number")).show()

+----------+----------+
|invoice_no|stock_code|
+----------+----------+
|    536365|    85123A|
|    536365|     71053|
|    536365|    84406B|
|    536365|    84029G|
|    536365|    84029E|
|    536365|     22752|
|    536365|     21730|
|    536366|     22633|
|    536366|     22632|
|    536367|     84879|
|    536367|     22745|
|    536367|     22748|
|    536367|     22749|
|    536367|     22310|
|    536367|     84969|
|    536367|     22623|
|    536367|     22622|
|    536367|     21754|
|    536367|     21755|
|    536367|     21777|
+----------+----------+
only showing top 20 rows

+--------------+
|invoice_number|
+--------------+
|        536365|
|        536365|
|        536365|
|        536365|
|        536365|
|        536365|
|        536365|
|        536366|
|        536366|
|        536367|
|        536367|
|        536367|
|        536367|
|        536367|
|        536367|
|        536367|
|        536367|
|        536367|
|        536367|
|        536367|
+------

In [9]:
# == Filtering Rows (WHERE) ==
# Filter rows using a condition
df.filter(df.unit_price > 5).show()

# Filter using multiple conditions
df.filter((df.unit_price > 5) & (df.customer_id == "16503")).show()


+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+
|invoice_no|stock_code|         description|quantity|       invoice_date|unit_price|customer_id|       country|
+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+
|    536365|     22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|      7.65|      17850|United Kingdom|
|    536367|     22622|BOX OF VINTAGE AL...|       2|2010-12-01 08:34:00|      9.95|      13047|United Kingdom|
|    536367|     21754|HOME BUILDING BLO...|       3|2010-12-01 08:34:00|      5.95|      13047|United Kingdom|
|    536367|     21755|LOVE BUILDING BLO...|       3|2010-12-01 08:34:00|      5.95|      13047|United Kingdom|
|    536367|     21777|RECIPE BOX WITH M...|       4|2010-12-01 08:34:00|      7.95|      13047|United Kingdom|
|    536367|     48187| DOORMAT NEW ENGLAND|       4|2010-12-01 08:34:00|      7.95|      13047|United K

In [10]:
# Cell 6: Sorting Rows (ORDER BY)
# Sort by a single column
df.orderBy("invoice_date").show()

# Sort by multiple columns
df.orderBy(df.invoice_no.asc(), df.invoice_date.desc()).show()


+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+
|invoice_no|stock_code|         description|quantity|       invoice_date|unit_price|customer_id|       country|
+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+
|    536365|     22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|      7.65|      17850|United Kingdom|
|    536365|    84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|      2.75|      17850|United Kingdom|
|    536365|     21730|GLASS STAR FROSTE...|       6|2010-12-01 08:26:00|      4.25|      17850|United Kingdom|
|    536365|    84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|      3.39|      17850|United Kingdom|
|    536365|    85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|      2.55|      17850|United Kingdom|
|    536365|     71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|      3.39|      17850|United K

In [11]:
# Cell 7: Aggregations (GROUP BY)
# Group by and aggregate using count, sum, avg, etc.
df.groupBy("customer_id").count().show()
df.groupBy("customer_id").sum("unit_price").show()
df.groupBy("customer_id").agg({"unit_price": "mean", "unit_price": "max"}).show()


+-----------+-----+
|customer_id|count|
+-----------+-----+
|      17420|   30|
|      16861|    8|
|      16503|   86|
|      15727|  302|
|      17389|  224|
|      15447|    9|
|      14450|   40|
|      13623|   86|
|      13285|  187|
|      16339|   20|
|      14570|   29|
|      16386|   88|
|      18024|   22|
|      12940|  103|
|      17679|   30|
|      16574|   28|
|      15957|   42|
|      13832|    4|
|      15619|    3|
|      15790|   35|
+-----------+-----+
only showing top 20 rows

+-----------+------------------+
|customer_id|   sum(unit_price)|
+-----------+------------------+
|      17420| 116.4399995803833|
|      16861| 18.72999981045723|
|      16503|361.07999685406685|
|      15727|  980.939994186163|
|      17389|1273.5699914097786|
|      15447| 28.09000015258789|
|      14450|111.61999799311161|
|      13623|  333.639996945858|
|      13285|399.21999901533127|
|      16339| 105.1499981880188|
|      14570| 97.78999862074852|
|      16386|145.98999929055572|

In [12]:
# == CASE WHEN (Conditional Statements) ==
# Create a new column with conditional values
df = df.withColumn("price_status", when(df.unit_price > 7, "High").otherwise("Low"))
df.show()


+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+------------+
|invoice_no|stock_code|         description|quantity|       invoice_date|unit_price|customer_id|       country|price_status|
+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+------------+
|    536365|    85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|      2.55|      17850|United Kingdom|         Low|
|    536365|     71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|      3.39|      17850|United Kingdom|         Low|
|    536365|    84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|      2.75|      17850|United Kingdom|         Low|
|    536365|    84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|      3.39|      17850|United Kingdom|         Low|
|    536365|    84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|      3.39|      17850|United Kingdom|         Low|


In [13]:
# == Handling Missing Data == 
# Drop rows with any nulls
df = df.na.drop()

# Fill nulls in a specific column with a value
df = df.na.fill({"description": "no description", "unit_price": 0})


## UDF

In [14]:
from pyspark.sql.functions import col, udf, pandas_udf
from pyspark.sql.types import IntegerType

# Create a sample dataframe
df = spark.createDataFrame([(1, "apple"), (2, "banana"), (3, "orange"), 
                            (4, "apple"), (5, "banana"), (6, "orange"),
                            (7, "apple"), (8, "banana"), (9, "orange"),
                            (10, "apple")], ["id", "fruit"])

In [15]:
# Define the UDF logic
def string_length(s):
    return len(s)

# Define the Python UDF
string_length_udf = udf(string_length, IntegerType())

# Apply the Python UDF and display the result
df1 = df.withColumn("length", string_length_udf(col("fruit")))
df1.show()

+---+------+------+
| id| fruit|length|
+---+------+------+
|  1| apple|     5|
|  2|banana|     6|
|  3|orange|     6|
|  4| apple|     5|
|  5|banana|     6|
|  6|orange|     6|
|  7| apple|     5|
|  8|banana|     6|
|  9|orange|     6|
| 10| apple|     5|
+---+------+------+



In [16]:
import pandas as pd

# Define the Pandas UDF
@pandas_udf(IntegerType())
def string_length_pandas_udf(s: pd.Series) -> pd.Series:
    return s.str.len()
    
# Apply the Pandas UDF and display the result
df2 = df.withColumn("length", string_length_pandas_udf(col("fruit")))
df2.show()

+---+------+------+
| id| fruit|length|
+---+------+------+
|  1| apple|     5|
|  2|banana|     6|
|  3|orange|     6|
|  4| apple|     5|
|  5|banana|     6|
|  6|orange|     6|
|  7| apple|     5|
|  8|banana|     6|
|  9|orange|     6|
| 10| apple|     5|
+---+------+------+



## Join

In [17]:
spark.conf.set("spark.sql.adaptive.enabled", "false")

In [18]:
# define schema for purchases dataset
purchases_schema = "order_id int, customer_id int, product_id int, quantity int, price float"

# create purchases dataframe
purchases_data = [
    (101, 1, 1, 2, 19.99), # Worker 1
    (102, 2, 2, 1, 9.99), # Worker 1
    (103, 3, 3, 1, 15.99), # Worker 1
    (104, 1, 4, 1, 5.99), # Worker 2
    (105, 2, 5, 3, 12.99), # Worker 2
    (106, 3, 6, 2, 9.99), # Worker 2
    (107, 4, 7, 1, 11.99), # Worker 2
    (108, 1, 8, 2, 14.99), # Worker 3
    (109, 2, 9, 1, 9.99), # Worker 3
    (110, 3, 10, 1, 19.99) # Worker 3
]
purchases_df = spark.createDataFrame(purchases_data, schema=purchases_schema)
# 10.000 row

# define schema for customers dataset
customers_schema = "customer_id int, name string, email string"

# create customers dataframe
customers_data = [
    (1, "John Doe", "johndoe@example.com"), # Worker 1
    (2, "Jane Smith", "janesmith@example.com"), # Worker 2
    (3, "Bob Johnson", "bobjohnson@example.com"), # Worker 3
    (4, "Sue Lee", "suelee@example.com"), # Worker 3
]

customers_df = spark.createDataFrame(customers_data, schema=customers_schema)
# 1000 row

# define schema for products dataset
products_schema = "product_id int, name string, price float"

# create products dataframe
products_data = [
    (1, "Product A", 19.99),
    (2, "Product B", 9.99),
    (3, "Product C", 15.99),
    (4, "Product D", 5.99),
    (5, "Product E", 12.99),
    (6, "Product F", 9.99),
    (7, "Product G", 11.99),
    (8, "Product H", 14.99),
    (9, "Product I", 9.99),
    (10, "Product J", 19.99)
]
products_df = spark.createDataFrame(products_data, schema=products_schema)

In [19]:

# set join preferences
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# perform sort merge join
merged_df = (
    purchases_df
    .join(customers_df, "customer_id")
    .join(products_df, "product_id")
)

In [20]:
merged_df.show()

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         6|          3|     106|       2| 9.99|Bob Johnson|bobjohnson@exampl...|Product F| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
|         9|          2|     109|       1| 9.99| Jane Smith|janesmith@example...|Product I| 9.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         8|          1|     108|       2|14.99|   John Doe| johndoe@example.com|Product H|14.99|
|         7|        

In [21]:
merged_df.show()

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         6|          3|     106|       2| 9.99|Bob Johnson|bobjohnson@exampl...|Product F| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
|         9|          2|     109|       1| 9.99| Jane Smith|janesmith@example...|Product I| 9.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         8|          1|     108|       2|14.99|   John Doe| johndoe@example.com|Product H|14.99|
|         7|        

In [22]:
from pyspark.sql.functions import broadcast

# set join preferences
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")

# perform broadcast hash join
broadcast_df = purchases_df.join(broadcast(customers_df), "customer_id").join(broadcast(products_df), "product_id")
broadcast_df.show(5)

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         2|          2|     102|       1| 9.99| Jane Smith|janesmith@example...|Product B| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
only showing top 5 rows



In [23]:
broadcast_df.show(1)

+----------+-----------+--------+--------+-----+--------+-------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|    name|              email|     name|price|
+----------+-----------+--------+--------+-----+--------+-------------------+---------+-----+
|         1|          1|     101|       2|19.99|John Doe|johndoe@example.com|Product A|19.99|
+----------+-----------+--------+--------+-----+--------+-------------------+---------+-----+
only showing top 1 row



## Cache & Persist

In [24]:
broadcast_df.show()

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         2|          2|     102|       1| 9.99| Jane Smith|janesmith@example...|Product B| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
|         6|          3|     106|       2| 9.99|Bob Johnson|bobjohnson@exampl...|Product F| 9.99|
|         7|          4|     107|       1|11.99|    Sue Lee|  suelee@example.com|Product G|11.99|
|         8|        

In [25]:
# cache the purchases DataFrame
# broadcast_df.cache()
broadcast_df.unpersist()

DataFrame[product_id: int, customer_id: int, order_id: int, quantity: int, price: float, name: string, email: string, name: string, price: float]

In [26]:
from pyspark.sql.functions import col

# calculate the total purchase amount for each store using the cached DataFrame
store_purchase_totals = (
    purchases_df
    .withColumn("total_price",col("quantity")*col("price"))
    .groupBy("customer_id")
    .agg({"total_price":"sum"}).alias("total_purchase_amount")
)

In [27]:
# persist the store_purchase_totals DataFrame to disk
# store_purchase_totals.persist(pyspark.StorageLevel.DISK_ONLY)
store_purchase_totals.persist(pyspark.StorageLevel.MEMORY_ONLY)


DataFrame[customer_id: int, sum(total_price): double]

In [28]:
# print the results
store_purchase_totals.show()

+-----------+------------------+
|customer_id|  sum(total_price)|
+-----------+------------------+
|          1| 75.94999885559082|
|          3|55.959999084472656|
|          4|11.989999771118164|
|          2| 58.95000076293945|
+-----------+------------------+



In [29]:
# print the results
store_purchase_totals.show()

+-----------+------------------+
|customer_id|  sum(total_price)|
+-----------+------------------+
|          1| 75.94999885559082|
|          3|55.959999084472656|
|          4|11.989999771118164|
|          2| 58.95000076293945|
+-----------+------------------+



In [30]:
# unpersist the store_purchase_totals DataFrame to free up memory
store_purchase_totals.unpersist()

DataFrame[customer_id: int, sum(total_price): double]

Balik lagi ke collab

# JDBC

In [None]:
jdbc_url = f'jdbc:postgresql://{postgres_host}/{postgres_dw_db}'
jdbc_properties = {
    'user': postgres_user,
    'password': postgres_password,
    'driver': 'org.postgresql.Driver',
    'stringtype': 'unspecified'
}

In [None]:
retail_df = spark.read.jdbc(
    jdbc_url,
    'public.retail',
    properties=jdbc_properties
)

In [None]:
retail_df.show(5)

In [None]:
(
    retail_df
    .limit(10)
    .write
    .mode("append")
    .option("truncate", "true")
    .jdbc(
        jdbc_url,
        'public.sample_retail',
        properties=jdbc_properties
    )
)

In [None]:
(
    spark
    .read
    .jdbc(
        jdbc_url,
        'public.sample_retail',
        properties=jdbc_properties
    )
    .show()
)