# Configure Spark Session
Import necessary libraries and configure Spark session with Delta Lake support.

In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession

# Configure Spark session with Delta Lake support
spark = SparkSession.builder \
    .appName("Parquet to Delta Stream") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define Source Stream Configuration
Set up the streaming source configuration for reading parquet files from the external location.

In [None]:
# Define Source Stream Configuration

# Set the source path for the parquet files
source_path = "abfss://external-location@databricksdevstgacc.dfs.core.windows.net/parquet_stream/"

# Set the schemaLocation and checkpointLocation in the same folder as the source
schema_location = source_path + "schema/"
checkpoint_location = source_path + "checkpoint/"

# Read the parquet files as a stream
# First, read a sample parquet file to infer the schema
sample_schema = spark.read.parquet(source_path).schema

# Then use the inferred schema for streaming read
parquet_stream = spark.readStream \
    .format("parquet") \
    .schema(sample_schema) \
    .option("mergeSchema", "true") \
    .option("schemaLocation", schema_location) \
    .option("checkpointLocation", checkpoint_location) \
    .load(source_path)

# Display the schema of the streaming DataFrame
parquet_stream.printSchema()

# Create Streaming Query
Define the streaming query with appropriate schema location and checkpoint location settings.

In [None]:
# Create Streaming Query

# Define the streaming query with appropriate schema location and checkpoint location settings
streaming_query = parquet_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("mergeSchema", "true") \
    .option("checkpointLocation", checkpoint_location) \
    .trigger(processingTime="10 seconds") \
    .start("dev_catalog.default.transactions_stream_delta")

# Await termination of the streaming query
# streaming_query.awaitTermination()