In [1]:
import pyspark
import os
import json
import argparse

from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql.types import StructType
from pyspark.sql.functions import to_timestamp,col,when

dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

postgres_host = os.getenv('POSTGRES_CONTAINER_NAME')
postgres_dw_db = os.getenv('POSTGRES_DW_DB')
postgres_user = os.getenv('POSTGRES_USER')
postgres_password = os.getenv('POSTGRES_PASSWORD')

In [2]:
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Dibimbing')
        .setMaster('local')
        .set("spark.jars", "/opt/postgresql-42.2.18.jar")
    ))
sparkcontext.setLogLevel("WARN")

spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

# define schema for purchases dataset
purchases_schema = (
    "order_id int, customer_id int, product_id int, quantity int, price float"
)


In [3]:

# create purchases dataframe
purchases_data = [
    (101, 1, 1, 2, 19.99),
    (102, 2, 2, 1, 9.99),
    (103, 3, 3, 1, 15.99),
    (104, 1, 4, 1, 5.99),
    (105, 2, 5, 3, 12.99),
    (106, 3, 6, 2, 9.99),
    (107, 4, 7, 1, 11.99),
    (108, 1, 8, 2, 14.99),
    (109, 2, 9, 1, 9.99),
    (110, 3, 10, 1, 19.99),
]
purchases_df = spark.createDataFrame(purchases_data, schema=purchases_schema)

# define schema for customers dataset
customers_schema = "customer_id int, name string, email string"

# create customers dataframe
customers_data = [
    (1, "John Doe", "johndoe@example.com"),
    (2, "Jane Smith", "janesmith@example.com"),
    (3, "Bob Johnson", "bobjohnson@example.com"),
    (4, "Sue Lee", "suelee@example.com"),
]
customers_df = spark.createDataFrame(customers_data, schema=customers_schema)

# define schema for products dataset
products_schema = "product_id int, name string, price float"

# create products dataframe
products_data = [
    (1, "Product A", 19.99),
    (2, "Product B", 9.99),
    (3, "Product C", 15.99),
    (4, "Product D", 5.99),
    (5, "Product E", 12.99),
    (6, "Product F", 9.99),
    (7, "Product G", 11.99),
    (8, "Product H", 14.99),
    (9, "Product I", 9.99),
    (10, "Product J", 19.99),
]
products_df = spark.createDataFrame(products_data, schema=products_schema)

In [4]:
# set join preferences
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# perform sort merge join
merged_df = purchases_df.join(customers_df, "customer_id").join(
    products_df, "product_id"
)
merged_df.show()

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         2|          2|     102|       1| 9.99| Jane Smith|janesmith@example...|Product B| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
|         6|          3|     106|       2| 9.99|Bob Johnson|bobjohnson@exampl...|Product F| 9.99|
|         7|          4|     107|       1|11.99|    Sue Lee|  suelee@example.com|Product G|11.99|
|         8|        

In [5]:
merged_df = merged_df.select(
    "order_id", 
    "customer_id", 
    "product_id", 
    "quantity", 
    purchases_df.price.alias("purchase_price"), 
    customers_df.name.alias("customer_name"), 
    "email", 
    products_df.name.alias("product_name"), 
    products_df.price.alias("product_price")
)

merged_df.show()


+--------+-----------+----------+--------+--------------+-------------+--------------------+------------+-------------+
|order_id|customer_id|product_id|quantity|purchase_price|customer_name|               email|product_name|product_price|
+--------+-----------+----------+--------+--------------+-------------+--------------------+------------+-------------+
|     101|          1|         1|       2|         19.99|     John Doe| johndoe@example.com|   Product A|        19.99|
|     102|          2|         2|       1|          9.99|   Jane Smith|janesmith@example...|   Product B|         9.99|
|     103|          3|         3|       1|         15.99|  Bob Johnson|bobjohnson@exampl...|   Product C|        15.99|
|     104|          1|         4|       1|          5.99|     John Doe| johndoe@example.com|   Product D|         5.99|
|     105|          2|         5|       3|         12.99|   Jane Smith|janesmith@example...|   Product E|        12.99|
|     106|          3|         6|       

In [6]:
jdbc_url = f'jdbc:postgresql://{postgres_host}/{postgres_dw_db}'
jdbc_properties = {
    'user': postgres_user,
    'password': postgres_password,
    'driver': 'org.postgresql.Driver',
    'stringtype': 'unspecified'
}

In [7]:
merged_df.write \
    .mode("overwrite") \
    .jdbc(url=jdbc_url, table='public.real3', properties=jdbc_properties)



In [9]:
spark.read.jdbc(url=jdbc_url, table='public.real3', properties=jdbc_properties).show()


+--------+-----------+----------+--------+--------------+-------------+--------------------+------------+-------------+
|order_id|customer_id|product_id|quantity|purchase_price|customer_name|               email|product_name|product_price|
+--------+-----------+----------+--------+--------------+-------------+--------------------+------------+-------------+
|     101|          1|         1|       2|         19.99|     John Doe| johndoe@example.com|   Product A|        19.99|
|     102|          2|         2|       1|          9.99|   Jane Smith|janesmith@example...|   Product B|         9.99|
|     103|          3|         3|       1|         15.99|  Bob Johnson|bobjohnson@exampl...|   Product C|        15.99|
|     104|          1|         4|       1|          5.99|     John Doe| johndoe@example.com|   Product D|         5.99|
|     105|          2|         5|       3|         12.99|   Jane Smith|janesmith@example...|   Product E|        12.99|
|     106|          3|         6|       