In [0]:
%sql
SHOW EXTERNAL LOCATIONS;

In [0]:
%sql
LIST 'abfss://warehouse@dbxdl.dfs.core.windows.net/lineitems/' WITH (CREDENTIAL `dbxdl-storage-account-creds`) LIMIT 3;


In [0]:
%sql
CREATE CATALOG IF NOT EXISTS MASTERCLASS;

In [0]:
%sql
USE CATALOG MASTERCLASS;

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS BRONZE
 MANAGED LOCATION "abfss://delta@dbxdl.dfs.core.windows.net/bronze/";

In [0]:
%sql
CREATE OR REPLACE TABLE BRONZE.RAW_LINEITEMS (
  V VARIANT
) USING DELTA;

In [0]:
%sql
SELECT * FROM PARQUET.`abfss://warehouse@dbxdl.dfs.core.windows.net/lineitems/*` LIMIT 5;

In [0]:
%sql
COPY INTO BRONZE.RAW_LINEITEMS
FROM (
  SELECT parse_json(to_json(struct(*))) AS V 
  FROM 'abfss://warehouse@dbxdl.dfs.core.windows.net/lineitems/*'
)
FILEFORMAT = PARQUET
FORMAT_OPTIONS ('singleVariantColumn' = 'true');

In [0]:
%sql

SELECT COUNT(*) FROM BRONZE.RAW_LINEITEMS;

In [0]:
%sql
SELECT
  variant_get(V, '$.L_ORDERKEY') AS order_key,
  variant_get(V, '$.L_PARTKEY') AS part_key,
  variant_get(V, '$.L_SUPPKEY') AS supp_key,
  variant_get(V, '$.L_LINENUMBER') AS line_number,
  variant_get(V, '$.L_QUANTITY') AS quantity,
  variant_get(V, '$.L_EXTENDEDPRICE') AS extended_price,
  variant_get(V, '$.L_DISCOUNT') AS discount,
  variant_get(V, '$.L_TAX') AS tax,
  variant_get(V, '$.L_RETURNFLAG') AS return_flag,
  variant_get(V, '$.L_LINESTATUS') AS line_status,
  variant_get(V, '$.L_SHIPDATE') AS ship_date,
  variant_get(V, '$.L_COMMITDATE') AS commit_date,
  variant_get(V, '$.L_RECEIPTDATE') AS receipt_date,
  variant_get(V, '$.L_SHIPINSTRUCT') AS ship_instruct,
  variant_get(V, '$.L_SHIPMODE') AS ship_mode,
  variant_get(V, '$.L_COMMENT') AS comment
FROM BRONZE.RAW_LINEITEMS
LIMIT 3;

# No DataFrame‑level substitute (and why)

* There is no DataFrameWriter.copyInto() or similar; the only Spark‑side APIs are write.mode(...).save(...), which always append and therefore duplicate rows on re‑runs.  ￼
* You could hand‑roll idempotency by writing input_file_name() into a “manifest” column and merging on it, but that just recreates what COPY INTO already does for you inside the Delta log. 

# SQL Serverless Photon Engine only SQL

In [0]:
%sql
USE CATALOG MASTERCLASS;
USE SCHEMA BRONZE;

CREATE TABLE IF NOT EXISTS LINEITEMS (
  PK_ID                BIGINT,
  L_ORDERKEY           STRING,
  L_PARTKEY            STRING,
  L_SUPPKEY            STRING,
  L_LINENUMBER         STRING,
  L_QUANTITY           STRING,
  L_EXTENDEDPRICE      STRING,
  L_DISCOUNT           STRING,
  L_TAX                STRING,
  L_RETURNFLAG         STRING,
  L_LINESTATUS         STRING,
  L_SHIPDATE           STRING,
  L_COMMITDATE         STRING,
  L_RECEIPTDATE        STRING,
  L_SHIPINSTRUCT       STRING,
  L_SHIPMODE           STRING,
  L_COMMENT            STRING,
  INSERTED_DATE        TIMESTAMP
)
USING DELTA;

In [0]:
from pyspark.sql.functions import input_file_name, current_timestamp, monotonically_increasing_id, col

# 1️⃣ Read all your Parquet files from the mounted stage
df = (
    spark.read.parquet("abfss://warehouse@dbxdl.dfs.core.windows.net/lineitems/*")
)

stringified = df.select(
    *[col(c).cast("string").alias(c) for c in df.columns],
    monotonically_increasing_id().alias("PK_ID"),
    current_timestamp().alias("INSERTED_DATE")
)

In [0]:
display(stringified.limit(5))

In [0]:
(stringified
   .write
   .format("delta")
   .mode("append")
   .saveAsTable("LINEITEMS"))

Delta’s Python API doesn’t let you call .mode("merge") on the DataFrameWriter—but it's possible to do a Delta MERGE (upsert).  DeltaTable .merge() is about upserting rows, not about “only ingesting new files” the way Snowflake’s COPY INTO … FORCE=FALSE works.  The closest equivalent in Databricks is Autoloader, because it keeps its own file‐tracking checkpoint and by default only processes each file once.

# Upsert

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col, input_file_name, input_file_modification_time, current_timestamp, monotonically_increasing_id

# 1️⃣ Read & prepare your incoming data
df = (
    spark.read.parquet("abfss://warehouse@dbxdl.dfs.core.windows.net/lineitems/*")
)

stringified = df.select(
    *[col(c).cast("string").alias(c) for c in df.columns],
    monotonically_increasing_id().alias("PK_ID"),
    current_timestamp().alias("INSERTED_DATE")
)

# 2️⃣ Reference the target Delta table
delta_table = DeltaTable.forName(spark, "LINEITEMS")

# 3️⃣ Merge: match on whatever business keys make sense,
#    here we’re using FILE_NAME + PK_ID as an example
# (delta_table.alias("t")
#   .merge(
#      source = stringified.alias("s"),
#      condition = "t.FILE_NAME = s.FILE_NAME AND t.PK_ID = s.PK_ID"
#   )
#   .whenMatchedUpdateAll()      # update every column in the target from source
#   .whenNotMatchedInsertAll()   # insert any new rows
#   .execute()
# )

# Autoloader

With Autoloader and then kick off a streaming write (either via .start() or as part of a Databricks Job), you’re launching a long-running Structured Streaming query inside your Spark cluster. It isn’t “sent off” to some external queue—Spark itself tracks file progress and runs the work in the background of that cluster until you stop it.

you’ll get back a StreamingQuery object. The driver will continue scheduling micro-batches in the background, and you can even go do other cells or check query.status. But if you detach or idle-timeout that notebook session, the cluster will eventually shut down and your stream stops.

	•	As a Job: the recommended production pattern is to wrap your Autoloader code in a notebook (or .py file) and then create a Databricks Job that runs it. A job cluster stays alive for the duration of the streaming query (up to whatever timeout you configure), so your Autoloader will keep ingesting new files without you having to keep a notebook tab open.

	2.	Cluster uptime
	•	Streaming queries only run while the cluster is up. If you’re using a job cluster, it spins up at job start and only terminates if the job fails or completes.
	•	If you tried to run it on your interactive workspace cluster and leave the notebook running, auto-termination or idle timeouts will eventually kill it.
	3.	Monitoring & failure recovery
	•	Because Autoloader checkpoints file progress to the checkpointLocation, if the stream dies (cluster restart, transient error), when you restart the same query pointing at the same checkpoint it will resume exactly where it left off—never reprocessing old files.
	•	You can monitor the job from the Databricks Jobs UI (micro-batch durations, backpressure alerts, etc.).

⸻

So, do you need to keep a notebook open?
	•	No—you should deploy it as a Databricks Job (or Delta Live Table pipeline) for reliable background execution.
	•	If you’re just experimenting, you can run it in an interactive notebook, but be aware the stream stops as soon as that cluster shuts down.

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import input_file_name, input_file_modification_time, current_timestamp, monotonically_increasing_id

# 1️⃣ Define the streaming source with Autoloader
streaming_df = (
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "parquet")
         .option("cloudFiles.schemaLocation", "/mnt/checkpoints/lineitems/schema")      # for schema inference
         .load("dbfs:/mnt/data_stage")                                                # your mounted path
)

# 2️⃣ Enrich with your synthetic PK + metadata
enriched = (
    streaming_df
      .withColumn("PK_ID",               monotonically_increasing_id())
      .withColumn("FILE_NAME",           input_file_name())
      .withColumn("FILE_LAST_MODIFIED",  input_file_modification_time())
      .withColumn("INSERTED_DATE",       current_timestamp())
)

# 3️⃣ In each micro-batch, MERGE the new rows into your target Delta table
def upsert_batch(batch_df, batch_id):
    delta_tbl = DeltaTable.forName(spark, "LINEITEMS")
    (delta_tbl.alias("t")
         .merge(
            source    = batch_df.alias("s"),
            condition = "t.FILE_NAME = s.FILE_NAME AND t.PK_ID = s.PK_ID"
         )
         .whenMatchedUpdateAll()
         .whenNotMatchedInsertAll()
         .execute()
    )

# 4️⃣ Wire it all together in a streaming query
(
  enriched.writeStream
          .format("delta")
          .foreachBatch(upsert_batch)
          .option("checkpointLocation", "/mnt/checkpoints/lineitems/merge")  
          .outputMode("update")           # or "append"—checkpoint drives exactly-once
          .start()
)