In [0]:
import pandas as pd
from datetime import datetime, timedelta

input_path = "/tmp/watermark_input"
dbutils.fs.mkdirs(input_path)

In [0]:
data1 = pd.DataFrame({
    "id": [1, 2],
    "value": ["A", "B"],
    "event_time": [datetime.now() - timedelta(minutes=1), datetime.now()]
})

In [0]:
data2 = pd.DataFrame({
    "id": [3, 4],
    "value": ["C", "D"],
    "event_time": [datetime.now() - timedelta(minutes=2), datetime.now()]
})
data2.to_csv("/dbfs/tmp/watermark_input/data2.csv", index=False)


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("value", StringType(), True),
    StructField("event_time", TimestampType(), True)

In [0]:
streaming_df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("header", "true") \
    .schema(schema) \
    .load(input_path)

In [0]:
agg_df = streaming_df \
    .withWatermark("event_time", "1 minute") \
    .groupBy("value") \
    .count()

In [0]:
query = agg_df.writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "/tmp/watermark_checkpoint") \
    .toTable("demo_streaming_advanced.sales")