# Approach 1 - using saveAsTable

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType

CRIME_SCHEMA = StructType(
    [
        StructField("code", StringType()),
        StructField("region", StringType()),
        StructField("category", StringType())
    ]
)

input_df = spark.readStream(format="csv", path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/crime_data_new/", schema=CRIME_SCHEMA)

result_df = input_df.filter(col("region")=="Downtown")

result_df.saveAsTable("quickstart_catalog.quickstart_schema.crime_delta").output("append")

# Approach 2 - using Spark Structured Streaming

## Writing into delta lake

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
 
CRIME_SCHEMA = StructType(
    [
        StructField("code", StringType()),
        StructField("region", StringType()),
        StructField("category", StringType()),
    ]
)
 
input_df = (
    spark.readStream.format("csv")
    .schema(CRIME_SCHEMA)
    .load(
        "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/crime_data/input/"
    )
)
 
result_df = input_df.filter(col("region") == "Downtown")
 
result_df.writeStream.format("delta").outputMode("append").trigger(
    availableNow=True
).option("checkpointLocation", "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/crime_data/checkpoint").option(
    "path",
    "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/crime_data/output/",
).start().awaitTermination()

## Writing into delta table - using availableNow trigger

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType


def ingest_data():
    CRIME_SCHEMA = StructType(
        [
            StructField("code", StringType()),
            StructField("region", StringType()),
            StructField("category", StringType()),
        ]
    )

    input_df = (
        spark.readStream.format("csv")
        .schema(CRIME_SCHEMA)
        .load(
            "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/crime_data_new/input/"
        )
    )

    result_df = input_df.filter(col("region") == "Downtown")

    result_df.writeStream.format("delta").outputMode("append").trigger(
        availableNow=True
    ).option(
        "checkpointLocation",
        "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/crime_data/checkpoint",
    ).toTable(
        "quickstart_catalog.quickstart_schema.crime_delta"
    ).awaitTermination()

In [0]:
ingest_data()

In [0]:
%sql
SELECT * FROM quickstart_catalog.quickstart_schema.crime_delta;

## Writing into delta table - using processingTime trigger (not supported with Serverless)