Create two streams

In [0]:
from pyspark.sql.functions import rand

spark.conf.set("spark.sql.shuffle.partitions", '200')

impressions = (
  spark
  .readStream.format("rate").option("rowsPerSecond", "500").option("numPartitions", "1").load()
  .selectExpr("value AS adId", "timestamp AS impressionTime")
)

clicks = (
  spark
  .readStream.format("rate").option("rowsPerSecond", "500").option("numPartitions", "1").load()
  .where((rand()*100).cast("integer")< 10)  # 10 out of 100 impressions result in a click
  .selectExpr("(value - 50) AS adId", "timestamp AS clickTime")
  .where("adId > 0")
)

In [0]:
display(impressions, streamName="impressions")


adId,impressionTime
0,2025-10-17T14:10:45.978Z
1,2025-10-17T14:10:45.980Z
2,2025-10-17T14:10:45.982Z
3,2025-10-17T14:10:45.984Z
4,2025-10-17T14:10:45.986Z
5,2025-10-17T14:10:45.988Z
6,2025-10-17T14:10:45.990Z
7,2025-10-17T14:10:45.992Z
8,2025-10-17T14:10:45.994Z
9,2025-10-17T14:10:45.996Z


In [0]:
display(clicks, streamName="clicks")

adId,clickTime
17,2025-10-17T14:10:50.723Z
51,2025-10-17T14:10:50.791Z
55,2025-10-17T14:10:50.799Z
56,2025-10-17T14:10:50.801Z
60,2025-10-17T14:10:50.809Z
77,2025-10-17T14:10:50.843Z
81,2025-10-17T14:10:50.851Z
102,2025-10-17T14:10:50.893Z
127,2025-10-17T14:10:50.943Z
132,2025-10-17T14:10:50.953Z


## Stream Inner Join without Watermark

This is the same as joining two batcg DataFrames

In [0]:
display(impressions.join(clicks, "adId"), strealmName="joined")

adId,impressionTime,clickTime
92,2025-10-17T13:57:09.668Z,2025-10-17T13:57:10.028Z
78,2025-10-17T13:57:09.640Z,2025-10-17T13:57:10.000Z
20,2025-10-17T13:57:09.524Z,2025-10-17T13:57:09.884Z
70,2025-10-17T13:57:09.624Z,2025-10-17T13:57:09.984Z
99,2025-10-17T13:57:09.682Z,2025-10-17T13:57:10.042Z
40,2025-10-17T13:57:09.564Z,2025-10-17T13:57:09.924Z
53,2025-10-17T13:57:09.590Z,2025-10-17T13:57:09.950Z


## Stream Inner join with watermarking

To avoid unbounded state, you have to define additional join conditions such that indefinitely old inputs cannot match with future inputs and therefore can be cleared from the state.

In [0]:
from pyspark.sql.functions import expr

# Define watermarks
impressionsWithWatermark = (impressions
                            .selectExpr("adId AS impressionAdId", "impressionTime")
                            .withWatermark("impressionTime", "10 seconds"))

clicksWithWatermark = (clicks
                      .selectExpr("adId AS clickAdId", "clickTime")
                      .withWatermark("clickTime", "20 seconds"))

In [0]:
# Inner join with Watermark
display(impressionsWithWatermark.join(
    clicksWithWatermark,
    expr(""" clickAdId = impressionAdId""")), streamName="innerJoinWithWatermark")

impressionAdId,impressionTime,clickAdId,clickTime
94,2025-10-17T14:10:09.638Z,94,2025-10-17T14:10:09.944Z
39,2025-10-17T14:10:09.528Z,39,2025-10-17T14:10:09.834Z
71,2025-10-17T14:10:09.592Z,71,2025-10-17T14:10:09.898Z
6,2025-10-17T14:10:09.462Z,6,2025-10-17T14:10:09.768Z
58,2025-10-17T14:10:09.566Z,58,2025-10-17T14:10:09.872Z
79,2025-10-17T14:10:09.608Z,79,2025-10-17T14:10:09.914Z
33,2025-10-17T14:10:09.516Z,33,2025-10-17T14:10:09.822Z
12,2025-10-17T14:10:09.474Z,12,2025-10-17T14:10:09.780Z
49,2025-10-17T14:10:09.548Z,49,2025-10-17T14:10:09.854Z
35,2025-10-17T14:10:09.520Z,35,2025-10-17T14:10:09.826Z


## Stream Join with Watermark abd Event

This will enable Structured Streaming to perform full state cleanup. For long-running stream processes.

In [0]:
# Inner join with watermark + time conditions. Required for full state cleanup
display(impressionsWithWatermark.join(
    clicksWithWatermark,
    expr(""" clickAdId = impressionAdId AND
         clickTime >= impressionTime AND
         clickTime <= impressionTime + interval 10 seconds""")), streamName="innerJoinWithWatermarkAndTimeConditions")

impressionAdId,impressionTime,clickAdId,clickTime
19,2025-10-17T14:16:56.723Z,19,2025-10-17T14:16:57.056Z
77,2025-10-17T14:16:56.839Z,77,2025-10-17T14:16:57.172Z
31,2025-10-17T14:16:56.747Z,31,2025-10-17T14:16:57.080Z
71,2025-10-17T14:16:56.827Z,71,2025-10-17T14:16:57.160Z
48,2025-10-17T14:16:56.781Z,48,2025-10-17T14:16:57.114Z
37,2025-10-17T14:16:56.759Z,37,2025-10-17T14:16:57.092Z
12,2025-10-17T14:16:56.709Z,12,2025-10-17T14:16:57.042Z
73,2025-10-17T14:16:56.831Z,73,2025-10-17T14:16:57.164Z
20,2025-10-17T14:16:56.725Z,20,2025-10-17T14:16:57.058Z
70,2025-10-17T14:16:56.825Z,70,2025-10-17T14:16:57.158Z


In [0]:
for s in spark.streams.active:
    print(s.name)
    s.stop()