### Checking ID for Incremental Processing

In [1]:
import os
from pyspark.sql.utils import AnalysisException

lastFileCreatedId = None  # default if table doesn't exist or is empty

try:
    # In Fabric Lakehouse, tables are stored as delta under /Tables/<tablename>
    path = "Tables/dbo/controlId"

   
    df = spark.read.format("delta").load(path)

    row = df.select("lastFileCreatedId").limit(1).collect()
    if row:
        lastFileCreatedId = row[0]["lastFileCreatedId"]
    # else keep None

except AnalysisException:
    # If something goes wrong (schema mismatch, etc.), keep None
    pass


StatementMeta(, 40c493b3-2f9f-476c-a8c4-5e36e3cb0f55, 3, Finished, Available, Finished)

In [2]:
df = spark.read.format("delta").load("Tables/dbo/amazon")
if not lastFileCreatedId is None:
    df=df.filter(col("_c0") > lastFileCreatedId)

StatementMeta(, 40c493b3-2f9f-476c-a8c4-5e36e3cb0f55, 4, Finished, Available, Finished)

In [3]:
import os

# Ensure output directory exists
output_dir = "/lakehouse/default/Files/AmazonReviews"

# Collect rows into Python (assuming df is not massive)
rows = df.select("reviewText", "key").orderBy("key").collect()

lastId = None

for row in rows:
    review_text = row["reviewText"]
    idx = row["key"]

    # Format id as 4-digit string
    file_id = str(idx).zfill(4)

    # Build file path
    file_path = os.path.join(output_dir, f"Review{file_id}.txt")

    # Save review text to file
    with open(file_path, "w", encoding="utf-8") as f:
        f.write(review_text if review_text else "")

    # Keep last id
    lastId = idx

StatementMeta(, 40c493b3-2f9f-476c-a8c4-5e36e3cb0f55, 5, Finished, Available, Finished)

In [4]:
from pyspark.sql import Row

# Create a single-row DataFrame with lastId as lastFileCreatedId
control_df = spark.createDataFrame([Row(lastFileCreatedId=lastId)])

# Save into Tables/controlId, overwriting if it exists
control_df.write.format("delta").mode("overwrite").save("Tables/dbo/controlId")


StatementMeta(, 40c493b3-2f9f-476c-a8c4-5e36e3cb0f55, 6, Finished, Available, Finished)