In [0]:
secret_value = "2fV8Q~wWpWRiQpE4teMezR5fZyViPAHsE1Jn.aZG"
client_id = "ffc81883-5473-4917-978e-4ca22cd12487"
tenant_id = "048040fa-d182-4cdc-8fc0-39864acdef68"


In [0]:
spark.conf.set("fs.azure.account.auth.type.grdsamitsa.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.grdsamitsa.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.grdsamitsa.dfs.core.windows.net", client_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.grdsamitsa.dfs.core.windows.net", secret_value)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.grdsamitsa.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
# Databricks notebook source
# DBTITLE 1,Setup Parameters
# MAGIC %md
# MAGIC #### Set up the input path and checkpoint location for Auto Loader.
# MAGIC Replace `my_storage_account` and `my_container` with your actual Azure storage account and container names.

# In[1]:
input_path = "abfss://landing-zone@grdsamitsa.dfs.core.windows.net/data/order-stream"
checkpoint_path = "abfss://landing-zone@grdsamitsa.dfs.core.windows.net/checkpoints/auto_loader_trigger_available_now/"

# DBTITLE 2,Configure and Run Auto Loader
# MAGIC %md
# MAGIC #### Configure Auto Loader with Trigger.AvailableNow
# MAGIC
# MAGIC We use `cloudFiles` as the source format.
# MAGIC The `trigger(availableNow=True)` setting is the key here. It tells the streaming query to process all files that have arrived in the `input_path` since the last run, and then terminate immediately.
# MAGIC This allows us to run the job on a schedule (e.g., every 5 minutes, every hour) without leaving a cluster running continuously.

# In[2]:
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql.functions import col

# Read data from the input path using Auto Loader
# We use the `cloudFiles` source with the required options.
# `inferSchema` is set to `True` for simplicity, but you should explicitly define a schema for production.
df = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("cloudFiles.inferSchema", "true")
  .option("cloudFiles.schemaLocation", checkpoint_path + "schema/")
  .load(input_path)
)

# DBTITLE 3,Write to Delta Table and Terminate
# MAGIC %md
# MAGIC #### Write the data to a Delta table
# MAGIC
# MAGIC The `.trigger(availableNow=True)` is the crucial part of this code. It's the mechanism that makes the pipeline "batch-like" and cost-effective.
# MAGIC
# MAGIC * The query will find all new files that have arrived since the last successful run.
# MAGIC * It will write the new data to the target Delta table in a single micro-batch.
# MAGIC * Once the data is written, the query will terminate itself, allowing the job's cluster to shut down and save on compute costs.

# In[3]:
try:
    (df.writeStream
        .trigger(availableNow=True) # THIS IS THE KEY!
        .option("checkpointLocation", checkpoint_path + "checkpoint/")
        .option("mergeSchema", "true")
        .toTable("dbacademy.labuser11327437_1757240457.order_df")
        .awaitTermination()
    )
except Exception as e:
    print(f"Error during streaming write: {e}")
    # You can add more robust error handling here, like logging to a separate table
    # or sending a notification.

# DBTITLE 4,Important Considerations
# MAGIC %md
# MAGIC #### Key things to remember for production:
# MAGIC 1.  **Scheduling:** You would set this up as a Databricks Job with a recurring schedule (e.g., every 15 minutes).
# MAGIC 2.  **Cluster Configuration:** Use a **Jobs cluster** for this notebook, as they are significantly cheaper than All-Purpose clusters. Configure the cluster to autoterminate after a short period of inactivity.
# MAGIC 3.  **Permissions:** Ensure the service principal or cluster has the necessary read/write permissions for the storage path and the checkpoint location.
# MAGIC 4.  **Idempotency:** The checkpoint location is what ensures exactly-once processing, even if the job fails and needs to restart.
# MAGIC 5.  **Data Freshness:** This pattern is ideal for use cases where near-real-time (e.g., 5-15 minute latency) is acceptable. For true real-time needs, a continuous streaming query with a cluster that is always on would be necessary, but at a higher cost.


In [0]:
%sql
select count(*) from dbacademy.labuser11327437_1757240457.order_df;