In [0]:
# ---------------------
# --------CREATE Table
# ---------------------

table_name = "testing.bronze.t1"

if not spark.catalog.tableExists(table_name):
    spark.sql(f"""
        CREATE TABLE {table_name} (
            id INT,
            name STRING
        )
        USING DELTA
    """)


In [0]:
# ---------------------
# --------DROP Table
# ---------------------

from pyspark.sql.utils import AnalysisException

# Table name
table_name = "testing.bronze.t1"

# Step 1: Check if table exists
if spark.catalog.tableExists(table_name):
    print(f"✅ Table {table_name} exists. Proceeding to drop and clean.")

    # Step 2: Get Delta table location from DESCRIBE DETAIL
    try:
        location_row = spark.sql(f"DESCRIBE DETAIL {table_name}").collect()[0]
        table_location = location_row["location"]
        print(f"📁 Table data location found: {table_location}")
    except Exception as e:
        print(f"⚠️ Failed to get table location: {e}")
        table_location = None

    # Step 3: Drop the table
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    print(f"🗑️ Dropped table: {table_name}")

    if table_location:
        # Step 4: Disable Delta retention safety check
        spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
        print("⛔ Retention check disabled temporarily.")

        # Step 5: Force vacuum (delete files immediately)
        spark.sql(f"VACUUM delta.`{table_location}` RETAIN 0 HOURS")
        print("🧹 Vacuum complete. Obsolete files removed.")

        # Step 6: Re-enable Delta retention check
        spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "true")
        print("🔒 Retention check re-enabled for safety.")
else:
    print(f"ℹ️ Table {table_name} does not exist. Nothing to drop.")


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import Row

# Step 1: Get next ID as before
if spark.catalog.tableExists("testing.bronze.t1"):
    max_id_row = spark.sql("SELECT MAX(id) as max_id FROM testing.bronze.t1").collect()[0]
    current_max_id = max_id_row["max_id"] or 0
else:
    current_max_id = 0

next_id = current_max_id + 1
name = f"Product {next_id}"

# Step 2: Create an explicit schema
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), True)
])

# Step 3: Create DataFrame with defined schema
data = [(next_id, name)]
df = spark.createDataFrame(data, schema)

# Step 4: Append to table
df.write.mode("append").saveAsTable("testing.bronze.t1")

print(f"✅ Inserted row: id={next_id}, name='{name}'")


In [0]:
%sql
select * from testing.bronze.t1

In [0]:
# Widgets
dbutils.widgets.text("start_date", "2025-07-01")
dbutils.widgets.text("end_date", "2025-07-10")
dbutils.widgets.text("product_id", "BREAD")



In [0]:
# Get widget values
start_date = dbutils.widgets.get("start_date")
end_date = dbutils.widgets.get("end_date")
product_id = dbutils.widgets.get("product_id")

print(f"product_id: {product_id}")
print(f"start_date: {start_date}")
print(f"end_date: {end_date}")

In [0]:
from pyspark.sql.functions import to_date, col, lit, length

# Widgets
dbutils.widgets.text("start_date", "2025-07-01")
dbutils.widgets.text("end_date", "2025-07-10")
dbutils.widgets.text("product_id", "BREAD")

# Get widget values
start_date = dbutils.widgets.get("start_date")
end_date = dbutils.widgets.get("end_date")
product_id = dbutils.widgets.get("product_id")

# Read from source
raw_df = spark.table("samples.bakehouse.sales_transactions")

# Step 1: Filter rows where `dateTime` is non-empty and matches expected date format
# Adjust the regex to match your actual format. Example below assumes `yyyy-MM-dd` or `yyyy-MM-dd HH:mm:ss`
filtered_raw_df = raw_df.filter(
    (col("dateTime").isNotNull()) &
    (length(col("dateTime")) > 0) &
    (col("dateTime").rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2}.*"))  # Basic format check
)

# Step 2: Safely cast to date after filtering
df = (
    filtered_raw_df
    .withColumn("sale_date", to_date(col("dateTime")))
    .filter(col("sale_date").isNotNull())
    .filter(
        (col("sale_date") >= to_date(lit(start_date))) &
        (col("sale_date") <= to_date(lit(end_date))) &
        (col("product") == lit(product_id))
    )
)

# Step 3: Target table setup
target_table = "testing.bronze.pk_salestxn"
if not spark.catalog.tableExists(target_table):
    df.limit(0).write.mode("overwrite").saveAsTable(target_table)

# Step 4: Write data
df.write.mode("append").saveAsTable(target_table)
