**Creating required Tables**

In [1]:
%%sql
CREATE TABLE IF NOT EXISTS test_lh.orders
(
    Order_ID STRING,
    Customer_Name STRING,
    Product_Name STRING,
    Quantity STRING,
    Order_Date STRING,
    purchase_price STRING,
    Status STRING
) USING DELTA;

StatementMeta(, c8a4a014-fb0a-4fd9-bb90-f91ee07c0ea3, 2, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [2]:
%%sql
CREATE TABLE IF NOT EXISTS test_lh.orders_transformed
(
    Customer_Name STRING, 
    Product_Name STRING, 
    Quantities_sold STRING, 
    Total_Price STRING
)
USING DELTA

StatementMeta(, c8a4a014-fb0a-4fd9-bb90-f91ee07c0ea3, 3, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

**Spark stream between table to table**

In [None]:
from pyspark.sql.functions import col, sum
from delta.tables import DeltaTable

orders_df = spark.readStream \
    .format("delta") \
    .table("test_lh.orders")

transformed_df = orders_df \
    .withColumn("Quantity", col("Quantity").cast("int")) \
    .withColumn("purchase_price", col("purchase_price").cast("double")) \
    .groupBy("Customer_Name", "Product_Name") \
    .agg(
        sum("Quantity").alias("Quantities_sold"),
        sum(col("Quantity") * col("purchase_price")).alias("Total_Price")
    ) \
    .withColumn("Quantities_sold", col("Quantities_sold").cast("string")) \
    .withColumn("Total_Price", col("Total_Price").cast("string"))

def upsert_to_delta(batch_df, batch_id):
    delta_table = DeltaTable.forName(spark, "test_lh.orders_transformed")
    delta_table.alias("target").merge(
        batch_df.alias("source"),
        "target.Customer_Name = source.Customer_Name AND target.Product_Name = source.Product_Name"
    ).whenMatchedUpdate(
        set={
            "Quantities_sold": "source.Quantities_sold",
            "Total_Price": "source.Total_Price"
        }
    ).whenNotMatchedInsert(
        values={
            "Customer_Name": "source.Customer_Name",
            "Product_Name": "source.Product_Name",
            "Quantities_sold": "source.Quantities_sold",
            "Total_Price": "source.Total_Price"
        }
    ).execute()

query = transformed_df.writeStream \
    .foreachBatch(upsert_to_delta) \
    .outputMode("update") \
    .option("checkpointLocation", "abfss://HealthCareTest@onelake.dfs.fabric.microsoft.com/test_lh.Lakehouse/Files/checkpoint_path/scenerio_1") \
    .start()

query.awaitTermination()

**spark streaming for files to table**

In [8]:
from pyspark.sql.functions import col, sum
from delta.tables import DeltaTable

input_directory = "abfss://HealthCareTest@onelake.dfs.fabric.microsoft.com/test_lh.Lakehouse/Files/test_poc/active"
checkpoint_directory = "abfss://HealthCareTest@onelake.dfs.fabric.microsoft.com/test_lh.Lakehouse/Files/checkpoint_path/scenerio_2"

orders_df = spark.readStream \
    .format("csv") \
    .option("header", "true") \
    .schema("Order_ID STRING, Customer_Name STRING, Product_Name STRING, Quantity STRING, Order_Date STRING, purchase_price STRING, Status STRING") \
    .load(input_directory)

transformed_df = orders_df \
    .withColumn("Quantity", col("Quantity").cast("int")) \
    .withColumn("purchase_price", col("purchase_price").cast("double")) \
    .groupBy("Customer_Name", "Product_Name") \
    .agg(
        sum("Quantity").alias("Quantities_sold"),
        sum(col("Quantity") * col("purchase_price")).alias("Total_Price")
    ) \
    .withColumn("Quantities_sold", col("Quantities_sold").cast("string")) \
    .withColumn("Total_Price", col("Total_Price").cast("string"))

def upsert_to_delta(batch_df, batch_id):
    delta_table = DeltaTable.forName(spark, "test_lh.orders_transformed")
    delta_table.alias("target").merge(
        batch_df.alias("source"),
        "target.Customer_Name = source.Customer_Name AND target.Product_Name = source.Product_Name"
    ).whenMatchedUpdate(
        set={
            "Quantities_sold": "source.Quantities_sold",
            "Total_Price": "source.Total_Price"
        }
    ).whenNotMatchedInsert(
        values={
            "Customer_Name": "source.Customer_Name",
            "Product_Name": "source.Product_Name",
            "Quantities_sold": "source.Quantities_sold",
            "Total_Price": "source.Total_Price"
        }
    ).execute()

query = transformed_df.writeStream \
    .foreachBatch(upsert_to_delta) \
    .outputMode("update") \
    .option("checkpointLocation", checkpoint_directory) \
    .start()

query.awaitTermination()

StatementMeta(, c8a4a014-fb0a-4fd9-bb90-f91ee07c0ea3, 56, Submitted, Running, Running)