In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize Spark session
spark = SparkSession.builder.appName("DeltaExample").getOrCreate()

# Define data
data = [
    (1, "Alice", 29),
    (2, "Bob", 35),
    (3, "Charlie", 40)
]

columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)

# Save as Delta Table
df.write.format("delta").mode("overwrite").save("/mnt/delta/raw_data")


In [0]:
from delta.tables import DeltaTable

# Load existing Delta Table
delta_table = DeltaTable.forPath(spark, "/mnt/delta/raw_data")

# Perform an incremental update
new_data = spark.createDataFrame([(4, "David", 25)], ["id", "name", "age"])

# Merge logic: Insert new records
delta_table.alias("old").merge(
    new_data.alias("new"),
    "old.id = new.id"
).whenNotMatchedInsertAll().execute()


In [0]:
# Simulate streaming data
streaming_df = spark.readStream.format("delta").load("/mnt/delta/raw_data")

# Write streaming data to a new Delta Table
streaming_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/mnt/delta/checkpoint") \
    .start("/mnt/delta/processed_data")


Out[3]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f2735bdbf40>