In [1]:
import json
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, max as col_max

In [2]:
def get_bookmark(stage: str) -> str:
    bookmark_path = f"{hdfs_url}/data/{stage}/bookmark/"
    try:
        df = spark.read.json(bookmark_path)
        json_str = df.toJSON().first()
        return json.loads(json_str)["lastUpdate"]
    except Exception as e:
        print(e)
        return ""


def set_bookmark(stage: str, last_update: str) -> str:
    bookmark_path = f"{hdfs_url}/data/{stage}/bookmark/"
    json_str = json.dumps({"lastUpdate": last_update})
    json_df = spark.read.json(spark.sparkContext.parallelize([json_str]))
    json_df.write.mode("overwrite").json(bookmark_path)

# Ingestion

In [186]:
spark = (
    SparkSession.builder.appName("Ingestion")
    .config("spark.jars", "/spark/jars/postgresql-jdbc.jar")
    .getOrCreate()
)

In [187]:
jdbc_url = "jdbc:postgresql://postgres:5432/retail"
hdfs_url = "hdfs://hadoop-namenode:8020"

db_username = os.environ.get("DB_USERNAME", "sparkuser")
db_password = os.environ.get("DB_PASSWORD", "sparkpassword")

connection_params = {
    "user": db_username,
    "password": db_password,
    "driver": "org.postgresql.Driver",
}

In [188]:
bookmark = get_bookmark("bronze")
bookmark

                                                                                

'2025-05-25 09:52:44'

In [189]:
# orders table
if bookmark:
    orders_query = f"(SELECT * FROM orders WHERE ordertime > '{bookmark}') AS orders"
else:
    orders_query = "orders"

# ordershistory table
if bookmark:
    orders_hist_query = f"(SELECT * FROM ordershistory WHERE updatedat > '{bookmark}') AS ordershistory"
else:
    orders_hist_query = "ordershistory"

In [190]:
# orders table
(
    spark.read.jdbc(url=jdbc_url, table=orders_query, properties=connection_params)
    .withColumn("update", to_date(col("ordertime")))
    .write.mode("append")
    .partitionBy("update")
    .parquet("hdfs://hadoop-namenode:8020/data/bronze/orders")
)

# ordershistory table
ordershistory_df = spark.read.jdbc(url=jdbc_url, table=orders_hist_query, properties=connection_params)

(
    ordershistory_df
    .withColumn("update", to_date(col("updatedat")))
    .write.mode("append")
    .partitionBy("update")
    .parquet("hdfs://hadoop-namenode:8020/data/bronze/ordershistory")
)

                                                                                

In [191]:
ordershistory_df.count()

                                                                                

12511

In [192]:
last_update = ordershistory_df.select(col_max("updatedat")).first()[0]
if last_update:
    last_update = last_update.strftime("%Y-%m-%d %H:%M:%S")
    print(last_update)
    set_bookmark("bronze", last_update)

2026-02-09 13:37:44


                                                                                

In [193]:
spark.stop()

# Transformation (Silver)

In [194]:
spark = SparkSession.builder.appName("Transformation").getOrCreate()

In [195]:
hdfs_url = "hdfs://hadoop-namenode:8020/data"

In [196]:
spark.read.parquet(f"{hdfs_url}/bronze/orders").createOrReplaceTempView("orders")
spark.read.parquet(f"{hdfs_url}/bronze/ordershistory").createOrReplaceTempView("ordershistory")

                                                                                

In [197]:
joined_df = spark.sql(
    """
        SELECT
            o.orderid,
            ordertime,
            branch,
            historyid,
            status,
            updatedat,
            oh.update
        FROM orders AS o
        INNER JOIN ordershistory AS oh
            ON o.orderid = oh.orderid
        WHERE
            status != "INTRANSIT"
    """
)

In [198]:
(
    joined_df.write.mode("overwrite")
    .partitionBy("update")
    .parquet(f"{hdfs_url}/silver/allorders")
)

                                                                                

In [199]:

spark.stop()

# Serving

In [211]:
from pyspark.sql.functions import unix_timestamp


In [212]:
spark = SparkSession.builder.appName("Serving").getOrCreate()

24/06/16 17:27:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [213]:
hdfs_url = "hdfs://hadoop-namenode:8020/data"

In [214]:
spark.read.parquet(f"{hdfs_url}/silver/allorders").createOrReplaceTempView("allorders")

24/06/16 17:27:46 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/06/16 17:28:01 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
                                                                                

In [216]:
spark.sql(
    """
    SELECT
        t1.orderid,
        t1.historyid,
        t1.branch,
        t1.ordertime AS start,
        t2.updatedat AS end,
        (unix_timestamp(end) - unix_timestamp(start)) / 3600 AS deliverytime
    FROM (SELECT * FROM allorders WHERE status = 'NEW') AS t1
    INNER JOIN (SELECT * FROM allorders WHERE status = 'DELIVERED') AS t2
    ON t1.orderid = t2.orderid
    """
).createOrReplaceTempView("allorders_pivot")

In [217]:
davg_df = spark.sql(
    """
    SELECT branch, AVG(deliverytime) AS avg_deliverytime
    FROM allorders_pivot
    GROUP BY branch
    ORDER BY 2 ASC
    """
)
davg_df.show()



+--------------------+------------------+
|              branch|  avg_deliverytime|
+--------------------+------------------+
|    Harbor View Mall| 2343.730908532842|
|    Sunset Park Mall| 2354.078234086242|
|    Riverfront Plaza| 2440.241782637509|
|Springfield Town ...| 2446.142755035738|
|      Oakwood Square|2461.1467191601046|
| Grand Avenue Center| 2512.957992688601|
|         Metro Plaza| 2552.348954183267|
|Pinecrest Shoppin...|2565.4971057884236|
|       Lakeside Mall|  2579.23463648834|
|   City Central Mall|2637.5592878592875|
+--------------------+------------------+



                                                                                

In [218]:
davg_df.write.mode("overwrite").csv(f"{hdfs_url}/gold/deliveriesavg", header=True)

                                                                                

In [219]:
dcnt_df = spark.sql(
    """
    SELECT branch, COUNT(*) AS deliveries_count
    FROM allorders_pivot
    GROUP BY branch
    ORDER BY 2 DESC
    """
)
dcnt_df.show()



+--------------------+----------------+
|              branch|deliveries_count|
+--------------------+----------------+
|   City Central Mall|            1554|
|         Metro Plaza|            1004|
| Grand Avenue Center|            1003|
|    Harbor View Mall|             543|
|Springfield Town ...|             513|
|      Oakwood Square|             508|
|    Riverfront Plaza|             503|
|Pinecrest Shoppin...|             501|
|    Sunset Park Mall|             487|
|       Lakeside Mall|             486|
+--------------------+----------------+



                                                                                

In [220]:
dcnt_df.write.mode("overwrite").csv(f"{hdfs_url}/gold/deliveriescount", header=True)

                                                                                

In [221]:
spark.stop()