In [1]:
import findspark
findspark.init()
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, StringType, StructType, TimestampType, IntegerType
import os
import psycopg2
from sqlalchemy import create_engine
import pandas as pd

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

In [2]:
# SQL Schema
impression_schema = StructType() \
    .add('ImpressionID', IntegerType()) \
    .add('UserID', IntegerType()) \
    .add('SessionID', IntegerType()) \
    .add('AdID', IntegerType()) \
    .add('AdCategory', StringType()) \
    .add('ImpressionTimestamp', TimestampType()) \
    .add('Device', StringType()) \
    .add('OS', StringType()) \
    .add('Browser', StringType()) \
    .add('Location', StringType()) \
    .add('PageID', StringType()) \
    .add('Referrer', StringType()) \
    .add('ImpressionCost', FloatType()) \

click_schema = StructType() \
    .add('JoinID', IntegerType()) \
    .add('ClickTimestamp', TimestampType()) \
    .add('ClickID', StringType()) \
    .add('ClickPosition', StringType()) \
    .add('ClickCost', FloatType())

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .config("spark.jars", "postgresql-42.7.1.jar") \
    .appName("impression-click-event") \
    .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
24/06/24 10:11:41 WARN Utils: Your hostname, James-Razerblade resolves to a loopback address: 127.0.1.1; using 192.168.204.183 instead (on interface eth0)
24/06/24 10:11:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/limws/spark-3.5.1-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/limws/.ivy2/cache
The jars for the packages stored in: /home/limws/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d8837eb3-8866-4ae2-a354-dd55a1da7581;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-loggi

In [4]:
impression_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "impression-event") \
  .option("startingOffsets", "latest") \
  .load()

In [5]:
impression_json_df = impression_df.selectExpr("cast(value as string) as value")
impression_raw_df = impression_json_df.withColumn("value", from_json(impression_json_df["value"], impression_schema)).select("value.*")
impression_raw_df = impression_raw_df.replace("\"NaN\"", None)

In [6]:
click_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "click-event") \
  .option("startingOffsets", "latest") \
  .load()

In [7]:
click_json_df = click_df.selectExpr("cast(value as string) as value")
click_raw_df = click_json_df.withColumn("value", from_json(click_json_df["value"], click_schema)).select("value.*")
click_raw_df = click_raw_df.replace("\"NaN\"", None)

In [8]:
### apply watermarks on event-time columns
impressionsWithWatermark = impression_raw_df.withWatermark("ImpressionTimestamp", "1 hour").alias("impression")
clicksWithWatermark = click_raw_df.withWatermark("ClickTimestamp", "1 hour").alias("click")

### stream-stream inner join (no time constraints needed for inner join)
joined_stream = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    click.JoinID = impression.ImpressionID AND
    click.ClickTimestamp >= impression.ImpressionTimestamp AND
    click.ClickTimestamp <= impression.ImpressionTimestamp
    """),
    "leftOuter"
)

In [9]:
conn = psycopg2.connect(
    host="localhost",
    database="superset",
    user="superset",
    password="superset")

sql = """DROP TABLE IF EXISTS joined CASCADE"""
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()

In [10]:
sql = """CREATE TABLE joined (
    impressionid INT,
    userid INT,
    sessionid INT,
    adid INT,
    adcategory TEXT,
    impressiontimestamp TIMESTAMP,
    device TEXT,
    os TEXT,
    browser TEXT,
    location TEXT,
    pageid TEXT,
    referrer TEXT,
    joinid INT,
    impressioncost DECIMAL,
    clicktimestamp TIMESTAMP,
    clickid TEXT NULL,
    clickposition TEXT NULL,
    clickcost DECIMAL NULL 
);"""
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()

In [11]:
def write_to_postgresql(df,epoch_id):
    df.write \
    .format('jdbc') \
    .option("url", "jdbc:postgresql://localhost:5432/superset") \
    .option("dbtable", "joined") \
    .option("user", "superset") \
    .option("password", "superset") \
    .option("driver", "org.postgresql.Driver") \
    .mode('append') \
    .save()

In [12]:
postgresql_stream=joined_stream.writeStream \
    .foreachBatch(write_to_postgresql) \
    .start() \
    .awaitTermination()

24/06/24 10:11:51 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-51f8e400-b574-428e-830b-9e3342ec5c70. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/06/24 10:11:51 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/06/24 10:11:53 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/06/24 10:11:53 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/06/24 10:11:53 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/06/24 10:11:53 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known con

KeyboardInterrupt: 

24/06/24 13:19:48 WARN NetworkClient: [AdminClient clientId=adminclient-1] Connection to node 1001 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/06/24 13:19:48 WARN NetworkClient: [AdminClient clientId=adminclient-2] Connection to node 1001 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/06/24 13:19:48 WARN NetworkClient: [AdminClient clientId=adminclient-2] Connection to node 1001 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/06/24 13:19:48 WARN NetworkClient: [AdminClient clientId=adminclient-1] Connection to node 1001 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/06/24 13:19:48 WARN NetworkClient: [AdminClient clientId=adminclient-2] Connection to node 1001 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/06/24 13:19:49 WARN NetworkClient: [AdminClient clientId=adminclient-1] Connection to node 1