In [1]:


import os
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, StringType,
    TimestampType, DoubleType
)
from pyspark.sql.functions import col




In [2]:

BASE_DIR = "/home/pazzoti/de-upskilling/real-time-pipeline-pyspark-postgresql"

DATA_DIR = os.path.join(BASE_DIR, "data", "raw")
CONFIG_PATH = os.path.join(BASE_DIR, "config", "postgres_connection.txt")

print("BASE_DIR:", BASE_DIR)
print("DATA_DIR:", DATA_DIR)
print("CONFIG_PATH:", CONFIG_PATH)

assert os.path.exists(DATA_DIR), " data/raw folder not found"
assert os.path.exists(CONFIG_PATH), " postgres_connection.txt not found"





BASE_DIR: /home/pazzoti/de-upskilling/real-time-pipeline-pyspark-postgresql
DATA_DIR: /home/pazzoti/de-upskilling/real-time-pipeline-pyspark-postgresql/data/raw
CONFIG_PATH: /home/pazzoti/de-upskilling/real-time-pipeline-pyspark-postgresql/config/postgres_connection.txt


In [3]:
config = {}

with open(CONFIG_PATH) as f:
    for line in f:
        if "=" in line:
            k, v = line.strip().split("=")
            config[k] = v

host = config["host"]
port = config["port"]
database = config["database"]
user = config["user"]
password = config["password"]

print("âœ… DB Config Loaded:")
print(host, port, database, user)





âœ… DB Config Loaded:
localhost 5432 ecommerce_db postgres


In [4]:
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("action", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("price", DoubleType(), True),
    StructField("category", StringType(), True),
])





In [5]:
spark = SparkSession.builder \
    .appName("RealTimeEcommercePipeline") \
    .config("spark.sql.adaptive.enabled", "false") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.3") \
    .getOrCreate()



print("âœ… Spark started")





Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/28 18:25:00 WARN Utils: Your hostname, Patricks-bot, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/28 18:25:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/pazzoti/de-upskilling/real-time-pipeline-pyspark-postgresql/venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/pazzoti/.ivy2.5.2/cache
The jars for the packages stored in: /home/pazzoti/.ivy2.5.2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ebf5bb9a-e37c-430c-867d-20220cb4c293;1.0
	confs: [default]
	found org.postgresql#postgresql;42.7.3 in central
	found org.checkerframework#checker-qual;3.42.0 in central
:: resolution report :: resolve 152ms :: artifacts dl 4ms
	:: modu

âœ… Spark started


In [6]:

df_stream = spark.readStream \
    .option("header", True) \
    .schema(schema) \
    .csv(DATA_DIR)

df_stream.printSchema()





root
 |-- user_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- action: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- category: string (nullable = true)



In [7]:
test_df = spark.createDataFrame(
    [(1, 101, "test", datetime.now(), 9.99, "debug")],
    ["user_id", "product_id", "action", "timestamp", "price", "category"]
)

jdbc_url = f"jdbc:postgresql://{host}:{port}/{database}"

test_df.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "user_events") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

print("âœ… JDBC WRITE TEST SUCCESS")


                                                                                

âœ… JDBC WRITE TEST SUCCESS


In [8]:
def write_to_postgres(batch_df, batch_id):
    print(f"ðŸš€ Writing batch {batch_id} to PostgreSQL")

    batch_df.write \
        .format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "user_events") \
        .option("user", user) \
        .option("password", password) \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()





In [9]:
CHECKPOINT_DIR = os.path.join(BASE_DIR, "checkpoints")

query = df_stream.writeStream \
    .foreachBatch(write_to_postgres) \
    .outputMode("append") \
    .option("checkpointLocation", CHECKPOINT_DIR) \
    .start()

print("âœ… Streaming started")



âœ… Streaming started


In [10]:
query.status


{'message': 'Initializing sources',
 'isDataAvailable': False,
 'isTriggerActive': False}