In [None]:
# Spark Streaming

In [None]:
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
import os
import json
import pyspark.sql.functions as F
import time

source_path = "Files/streaming/source"
checkpoint_path = "Files/streaming/checkpoint"

# Define schema
file_schema = StructType() \
    .add("id", StringType()) \
    .add("temperature", DoubleType()) \
    .add("timestamp", TimestampType())

# Define logical table path using Lakehouse naming convention
table_name = "temperature_stream"
table_path = f"Tables/{table_name}"

# Create empty DataFrame with schema
empty_df = spark.createDataFrame([], schema=file_schema)

# Write as a Delta table to Lakehouse
empty_df.write.format("delta").mode("overwrite").saveAsTable(table_name)

print(f"Table created: {table_path}")

In [None]:
# Read streaming data to unbounded table/dataframe
raw_stream_df = spark.readStream \
    .schema(file_schema) \
    .option("maxFilesPerTrigger", 1) \
    .json(source_path)

# Example transformation that adds a processed_timestamp column to the data
transformed_stream_df = raw_stream_df \
    .withColumn("processed_timestamp", \
    F.current_timestamp())

# Stream data to a delta table
deltastream = transformed_stream_df.writeStream \
            .format("delta") \
            .outputMode("append") \
            .option("checkpointLocation", checkpoint_path) \
            .start(f"Tables/{table_name}")

In [None]:
# Examining streamed data
df = spark.sql("SELECT * FROM fab_lakehouse2.temperature_stream LIMIT 1000")
display(df)

In [None]:
# Check if the stream is still active
deltastream.isActive

In [None]:
# Check the status of the stream
deltastream.status