In [None]:
import pyspark
import os
import json
import argparse

from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql.types import StructType
from pyspark.sql.functions import to_timestamp,col,when

## Initialization

In [None]:
dotenv_path = Path('../.env')
load_dotenv(dotenv_path=dotenv_path)

In [None]:
postgres_host = os.getenv('POSTGRES_CONTAINER_NAME')
postgres_db = os.getenv('POSTGRES_DB')
postgres_user = os.getenv('POSTGRES_USER')
postgres_password = os.getenv('POSTGRES_PASSWORD')

In [None]:
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Dibimbing')
        .setMaster('local')
        .set("spark.jars", "/spark-scripts/postgresql-42.6.0.jar")
    ))
sparkcontext.setLogLevel("WARN")

spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

In [None]:
spark

## UDF

In [None]:
from pyspark.sql.functions import col, udf, pandas_udf
from pyspark.sql.types import IntegerType

# Create a sample dataframe
df = spark.createDataFrame([(1, "apple"), (2, "banana"), (3, "orange"), 
                            (4, "apple"), (5, "banana"), (6, "orange"),
                            (7, "apple"), (8, "banana"), (9, "orange"),
                            (10, "apple")], ["id", "fruit"])

In [None]:
# Define the UDF logic
def string_length(s):
    return len(s)

# Define the Python UDF
string_length_udf = udf(string_length, IntegerType())

# Apply the Python UDF and display the result
df1 = df.withColumn("length", string_length_udf(col("fruit")))
df1.show()

In [None]:
import pandas as pd

# Define the Pandas UDF
@pandas_udf(IntegerType())
def string_length_pandas_udf(s: pd.Series) -> pd.Series:
    return s.str.len()
    
# Apply the Pandas UDF and display the result
df2 = df.withColumn("length", string_length_pandas_udf(col("fruit")))
df2.show()

## Join

In [None]:
spark.conf.set("spark.sql.adaptive.enabled", "false")

In [None]:
# define schema for purchases dataset
purchases_schema = "order_id int, customer_id int, product_id int, quantity int, price float"

# create purchases dataframe
purchases_data = [
    (101, 1, 1, 2, 19.99),
    (102, 2, 2, 1, 9.99),
    (103, 3, 3, 1, 15.99),
    (104, 1, 4, 1, 5.99),
    (105, 2, 5, 3, 12.99),
    (106, 3, 6, 2, 9.99),
    (107, 4, 7, 1, 11.99),
    (108, 1, 8, 2, 14.99),
    (109, 2, 9, 1, 9.99),
    (110, 3, 10, 1, 19.99)
]
purchases_df = spark.createDataFrame(purchases_data, schema=purchases_schema)

# define schema for customers dataset
customers_schema = "customer_id int, name string, email string"

# create customers dataframe
customers_data = [
    (1, "John Doe", "johndoe@example.com"),
    (2, "Jane Smith", "janesmith@example.com"),
    (3, "Bob Johnson", "bobjohnson@example.com"),
    (4, "Sue Lee", "suelee@example.com")
]
customers_df = spark.createDataFrame(customers_data, schema=customers_schema)

# define schema for products dataset
products_schema = "product_id int, name string, price float"

# create products dataframe
products_data = [
    (1, "Product A", 19.99),
    (2, "Product B", 9.99),
    (3, "Product C", 15.99),
    (4, "Product D", 5.99),
    (5, "Product E", 12.99),
    (6, "Product F", 9.99),
    (7, "Product G", 11.99),
    (8, "Product H", 14.99),
    (9, "Product I", 9.99),
    (10, "Product J", 19.99)
]
products_df = spark.createDataFrame(products_data, schema=products_schema)

In [None]:

# set join preferences
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# perform sort merge join
merged_df = (
    purchases_df
    .join(customers_df, "customer_id")
    .join(products_df, "product_id")
)
merged_df.show()

In [None]:
from pyspark.sql.functions import broadcast

# set join preferences
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")

# perform broadcast hash join
broadcast_df = purchases_df.join(broadcast(customers_df), "customer_id").join(broadcast(products_df), "product_id")
broadcast_df.show()

## Cache & Persist

In [None]:
purchases_df.show()

In [None]:
# cache the purchases DataFrame
purchases_df.cache()
# purchases_df.unpersist()

In [None]:
from pyspark.sql.functions import col

# calculate the total purchase amount for each store using the cached DataFrame
store_purchase_totals = (
    purchases_df
    .withColumn("total_price",col("quantity")*col("price"))
    .groupBy("customer_id")
    .agg({"total_price":"sum"}).alias("total_purchase_amount")
)

In [None]:
# persist the store_purchase_totals DataFrame to disk
store_purchase_totals.persist(pyspark.StorageLevel.DISK_ONLY)

In [None]:
# print the results
store_purchase_totals.show()

In [None]:
# unpersist the store_purchase_totals DataFrame to free up memory
store_purchase_totals.unpersist()

Balik lagi ke collab

# JDBC

In [None]:
jdbc_url = f'jdbc:postgresql://{postgres_host}/{postgres_db}'
jdbc_properties = {
    'user': postgres_user,
    'password': postgres_password,
    'driver': 'org.postgresql.Driver',
    'stringtype': 'unspecified'
}

In [None]:
retail_df = spark.read.jdbc(
    jdbc_url,
    'public.retail',
    properties=jdbc_properties
)

In [None]:
retail_df.show(5)

In [None]:
(
    retail_df
    .limit(10)
    .write
    .mode("append")
    .option("truncate", "true")
    .jdbc(
        jdbc_url,
        'public.sample_retail',
        properties=jdbc_properties
    )
)