In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime, timedelta
import random

# Create sample sales data
data = []
countries = ['USA', 'UK', 'Germany', 'France', 'Japan']
products = ['Laptop', 'Phone', 'Tablet', 'Monitor', 'Keyboard']

# Generate 100,000 records across different dates
for i in range(10000):
    date = (datetime(2024, 1, 1) + timedelta(days=random.randint(0, 365))).strftime('%Y-%m-%d')
    price_val = random.uniform(50, 2000)
    data.append({
        'order_id': i,
        'order_date': date,
        'country': random.choice(countries),
        'product': random.choice(products),
        'quantity': random.randint(1, 10),
        'price': float(f"{price_val:.2f}"),
        'customer_id': random.randint(1000, 9999)
    })

# Create DataFrame
df = spark.createDataFrame(data)
#print(f"Created {df.count()} records")
#df.show(10)

In [0]:
# Azure Storage configuration
storage_account_name = "shared5005"
container_name = "rawdata"
account_key = ""

# Use abfss:// scheme for ADLS Gen2
delta_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/sales_data"

In [0]:

print(f"Testing connection to: {delta_path}")

try:
    df.write \
        .format("delta") \
        .mode("overwrite") \
        .option(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", account_key) \
        .partitionBy("country") \
        .option("maxRecordsPerFile", "1000") \
        .save(delta_path)

    print(f"‚úÖ Delta table successfully written to {delta_path}")

except Exception as e:
    print(f"‚ùå Error writing Delta: {e}")

In [0]:
df.repartition("country").coalesce(1) \
    .write \
    .format("delta") \
    .mode("overwrite") \
    .option(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", account_key) \
    .partitionBy("country") \
    .save(delta_path)

In [0]:
# Read the Delta table back
df2 = spark.read \
    .format("delta") \
    .option(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", account_key) \
    .load(delta_path)
# Sample data from each partition
print("\nüìä Sample data by country:")
df2.groupBy("country").count().orderBy("country").show()

In [0]:
# This will probably fail on serverless, but worth trying
try:
    from delta.tables import DeltaTable
    
    delta_table = DeltaTable.forPath(spark, delta_path)
    
    # Disable retention check (allows immediate deletion)
    spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)
    
    print("üßπ Running VACUUM to clean old files...")
    
    # VACUUM with 0 hours retention (delete everything not in current version)
    delta_table.vacuum(0)
    
    print("‚úÖ VACUUM completed - old files deleted!")
    
except Exception as e:
    print(f"‚ùå VACUUM failed (expected on serverless): {e}")
    print("\nUse Option 1 (delete and recreate) or Option 3 (classic compute)")