In [17]:
from pyspark.sql.functions import col, to_timestamp, lower, year, month, round
from pyspark.sql import DataFrame

# =============================================================================
# 1. DEFINE TRANSFORMATION FUNCTIONS
# =============================================================================

def transform_orders(df: DataFrame) -> DataFrame:
    """Cleans and transforms the orders data."""
    return df.withColumn("order_purchase_timestamp", to_timestamp(col("order_purchase_timestamp"))) \
             .withColumn("order_approved_at", to_timestamp(col("order_approved_at"))) \
             .withColumn("order_delivered_carrier_date", to_timestamp(col("order_delivered_carrier_date"))) \
             .withColumn("order_delivered_customer_date", to_timestamp(col("order_delivered_customer_date"))) \
             .withColumn("order_estimated_delivery_date", to_timestamp(col("order_estimated_delivery_date"))) \
             .withColumn("order_status", lower(col("order_status"))) \
             .withColumn("purchase_year", year(col("order_purchase_timestamp"))) \
             .withColumn("purchase_month", month(col("order_purchase_timestamp"))) \
             .na.drop(subset=["order_id", "customer_id", "order_purchase_timestamp"])

def transform_order_items(df: DataFrame) -> DataFrame:
    """Cleans and transforms order items data."""
    return df.withColumn("shipping_limit_date", to_timestamp(col("shipping_limit_date"))) \
             .withColumn("price", round(col("price").cast("float"), 2)) \
             .withColumn("freight_value", round(col("freight_value").cast("float"), 2)) \
             .withColumn("order_item_id", col("order_item_id").cast("integer"))

def transform_payments(df: DataFrame) -> DataFrame:
    """Cleans and transforms payments data."""
    return df.withColumn("payment_sequential", col("payment_sequential").cast("integer")) \
             .withColumn("payment_installments", col("payment_installments").cast("integer")) \
             .withColumn("payment_value", round(col("payment_value").cast("float"), 2))

def transform_reviews(df: DataFrame) -> DataFrame:
    """Cleans and transforms reviews data."""
    return df.withColumn("review_creation_date", to_timestamp(col("review_creation_date"))) \
             .withColumn("review_answer_timestamp", to_timestamp(col("review_answer_timestamp"))) \
             .withColumn("review_score", col("review_score").cast("integer")) \
             .na.drop(subset=["order_id", "review_score"])

def transform_products(df: DataFrame) -> DataFrame:
    """Cleans and transforms products data."""
    return df.withColumnRenamed("product_name_lenght", "product_name_length") \
             .withColumnRenamed("product_description_lenght", "product_description_length")

def passthrough_transform(df: DataFrame) -> DataFrame:
    """A default function for data that needs no transformation."""
    return df

# =============================================================================
# 2. DETECT AVAILABLE PATHS
# =============================================================================

print("🔍 Detecting available paths...")

# Try different path patterns for Bronze layer (based on working config)
bronze_paths_to_try = [
    "abfss://Alex_Olist@onelake.dfs.fabric.microsoft.com/LH_Bronze_AY.Lakehouse/Files/",
    "/LH_Bronze_AY.Lakehouse/Files/",
    "LH_Bronze_AY.Lakehouse/Files/",
    "Files/",
    ""
]

# Test read path
bronze_base_path = None
for path in bronze_paths_to_try:
    try:
        test_file = f"{path}olist_orders_dataset.csv"
        print(f"Testing read path: {test_file}")
        test_df = spark.read.format("csv").option("header", "true").load(test_file)
        if test_df.count() > 0:
            bronze_base_path = path
            print(f"✅ Found working Bronze path: {bronze_base_path}")
            break
    except Exception as e:
        print(f"❌ Failed: {test_file}")

if not bronze_base_path:
    print("❌ Could not find Bronze layer files.")
    exit()

# =============================================================================
# 3. CREATE CONFIGURATION
# =============================================================================

job_config = {
    "orders":       {"bronze": "olist_orders_dataset.csv", "table": "silver_orders", "func": transform_orders},
    "customers":    {"bronze": "olist_customers_dataset.csv", "table": "silver_customers", "func": passthrough_transform},
    "order_items":  {"bronze": "olist_order_items_dataset.csv", "table": "silver_order_items", "func": transform_order_items},
    "products":     {"bronze": "olist_products_dataset.csv", "table": "silver_products", "func": transform_products},
    "sellers":      {"bronze": "olist_sellers_dataset.csv", "table": "silver_sellers", "func": passthrough_transform},
    "payments":     {"bronze": "olist_order_payments_dataset.csv", "table": "silver_order_payments", "func": transform_payments},
    "reviews":      {"bronze": "olist_order_reviews_dataset.csv", "table": "silver_order_reviews", "func": transform_reviews},
    "geolocation":  {"bronze": "olist_geolocation_dataset.csv", "table": "silver_geolocation", "func": passthrough_transform},
    "translations": {"bronze": "product_category_name_translation.csv", "table": "silver_product_translations", "func": passthrough_transform}
}

# =============================================================================
# 4. RUN THE TRANSFORMATION JOB - CREATE TABLES
# =============================================================================

print(f"\n🚀 Starting table transformation job...")
print(f"📁 Reading from: {bronze_base_path}")
print(f"🏗️  Creating tables in: LH_Silver_AY")

successful_transformations = 0

for key, config in job_config.items():
    bronze_filename = config["bronze"]
    table_name = config["table"]
    transform_function = config["func"]
    
    read_path = f"{bronze_base_path}{bronze_filename}"
    
    try:
        print(f"\n📊 Processing '{key}': {bronze_filename} -> {table_name}")
        
        # Step 1: Read the CSV file
        bronze_df = spark.read \
            .format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("multiline", "true") \
            .option("escape", '"') \
            .load(read_path)
        
        row_count = bronze_df.count()
        if row_count == 0:
            print(f"⚠️  WARNING: '{key}' appears to be empty, skipping...")
            continue
        
        print(f"   📈 Loaded {row_count} rows")
        
        # Step 2: Apply transformation
        print(f"   🔄 Applying transformation: {transform_function.__name__}")
        transformed_df = transform_function(bronze_df)
        
        # Step 3: Clean column names (Fabric table friendly)
        clean_columns = []
        for c in transformed_df.columns:
            # Make column names Fabric table friendly
            clean_name = c.lower().replace(" ", "_").replace("-", "_").replace(".", "_")
            clean_columns.append(clean_name)
        
        final_df = transformed_df.toDF(*clean_columns)
        
        # Step 4: Create temporary view first
        temp_view_name = f"temp_{table_name}"
        final_df.createOrReplaceTempView(temp_view_name)
        
        # Step 5: Drop existing table if it exists
        try:
            spark.sql(f"DROP TABLE IF EXISTS LH_Silver_AY.{table_name}")
        except:
            pass  # Table might not exist yet
        
        # Step 6: Create table in LH_Silver_AY lakehouse
        print(f"   🏗️  Creating table: LH_Silver_AY.{table_name}")
        
        create_table_sql = f"""
        CREATE TABLE LH_Silver_AY.{table_name}
        USING DELTA
        AS SELECT * FROM {temp_view_name}
        """
        
        spark.sql(create_table_sql)
        
        # Step 7: Verify table creation
        result_count = spark.sql(f"SELECT COUNT(*) as count FROM LH_Silver_AY.{table_name}").collect()[0]['count']
        
        print(f"   ✅ Successfully created table '{table_name}' with {result_count} rows")
        successful_transformations += 1
        
        # Clean up temp view
        spark.sql(f"DROP VIEW IF EXISTS {temp_view_name}")
        
    except Exception as e:
        print(f"   ❌ ERROR processing '{key}': {str(e)}")
        print(f"      Read path: {read_path}")
        # Try to clean up temp view if it exists
        try:
            spark.sql(f"DROP VIEW IF EXISTS temp_{table_name}")
        except:
            pass
        continue

print(f"\n🎉 Table transformation job completed!")
print(f"📈 Successfully created {successful_transformations}/{len(job_config)} tables")

# =============================================================================
# 5. DISPLAY TABLE INFORMATION
# =============================================================================

if successful_transformations > 0:
    print(f"\n📋 Created tables in LH_Silver_AY:")
    
    for key, config in job_config.items():
        table_name = config["table"]
        try:
            # Get table info
            table_info = spark.sql(f"DESCRIBE TABLE LH_Silver_AY.{table_name}")
            row_count = spark.sql(f"SELECT COUNT(*) as count FROM LH_Silver_AY.{table_name}").collect()[0]['count']
            
            print(f"\n🗃️  Table: {table_name}")
            print(f"   📊 Rows: {row_count}")
            print(f"   📝 Columns: {table_info.count()}")
            
            # Show first few column names
            columns = [row['col_name'] for row in table_info.select('col_name').collect()[:5]]
            print(f"   🏷️  Sample columns: {', '.join(columns)}")
            
        except Exception as e:
            print(f"   ❌ Could not get info for {table_name}: {str(e)}")

    print(f"\n✨ All tables are now available in LH_Silver_AY lakehouse!")
    print(f"💡 You can now query them using: SELECT * FROM LH_Silver_AY.table_name")

# =============================================================================
# 6. SAMPLE QUERIES
# =============================================================================

print(f"\n📄 Sample queries to test your new tables:")
print(f"   • SELECT * FROM LH_Silver_AY.silver_orders LIMIT 10")
print(f"   • SELECT COUNT(*) FROM LH_Silver_AY.silver_customers") 
print(f"   • SELECT * FROM LH_Silver_AY.silver_products WHERE product_category_name IS NOT NULL LIMIT 5")
print(f"   • SHOW TABLES IN LH_Silver_AY")

StatementMeta(, 683d243f-7cc7-4b06-af7e-979352f7fb30, 19, Finished, Available, Finished)

🔍 Detecting available paths...
Testing read path: abfss://Alex_Olist@onelake.dfs.fabric.microsoft.com/LH_Bronze_AY.Lakehouse/Files/olist_orders_dataset.csv
✅ Found working Bronze path: abfss://Alex_Olist@onelake.dfs.fabric.microsoft.com/LH_Bronze_AY.Lakehouse/Files/

🚀 Starting table transformation job...
📁 Reading from: abfss://Alex_Olist@onelake.dfs.fabric.microsoft.com/LH_Bronze_AY.Lakehouse/Files/
🏗️  Creating tables in: LH_Silver_AY

📊 Processing 'orders': olist_orders_dataset.csv -> silver_orders
   📈 Loaded 99441 rows
   🔄 Applying transformation: transform_orders
   🏗️  Creating table: LH_Silver_AY.silver_orders
   ✅ Successfully created table 'silver_orders' with 99441 rows

📊 Processing 'customers': olist_customers_dataset.csv -> silver_customers
   📈 Loaded 99441 rows
   🔄 Applying transformation: passthrough_transform
   🏗️  Creating table: LH_Silver_AY.silver_customers
   ✅ Successfully created table 'silver_customers' with 99441 rows

📊 Processing 'order_items': olist_ord

In [14]:
from pyspark.sql.functions import col, to_timestamp, lower, year, month, round
from pyspark.sql import DataFrame

# =============================================================================
# 1. DEFINE TRANSFORMATION FUNCTIONS
# =============================================================================

def transform_orders(df: DataFrame) -> DataFrame:
    """Cleans and transforms the orders data."""
    return df.withColumn("order_purchase_timestamp", to_timestamp(col("order_purchase_timestamp"))) \
             .withColumn("order_approved_at", to_timestamp(col("order_approved_at"))) \
             .withColumn("order_delivered_carrier_date", to_timestamp(col("order_delivered_carrier_date"))) \
             .withColumn("order_delivered_customer_date", to_timestamp(col("order_delivered_customer_date"))) \
             .withColumn("order_estimated_delivery_date", to_timestamp(col("order_estimated_delivery_date"))) \
             .withColumn("order_status", lower(col("order_status"))) \
             .withColumn("purchase_year", year(col("order_purchase_timestamp"))) \
             .withColumn("purchase_month", month(col("order_purchase_timestamp"))) \
             .na.drop(subset=["order_id", "customer_id", "order_purchase_timestamp"])

def transform_order_items(df: DataFrame) -> DataFrame:
    """Cleans and transforms order items data."""
    return df.withColumn("shipping_limit_date", to_timestamp(col("shipping_limit_date"))) \
             .withColumn("price", round(col("price").cast("float"), 2)) \
             .withColumn("freight_value", round(col("freight_value").cast("float"), 2)) \
             .withColumn("order_item_id", col("order_item_id").cast("integer"))

def transform_payments(df: DataFrame) -> DataFrame:
    """Cleans and transforms payments data."""
    return df.withColumn("payment_sequential", col("payment_sequential").cast("integer")) \
             .withColumn("payment_installments", col("payment_installments").cast("integer")) \
             .withColumn("payment_value", round(col("payment_value").cast("float"), 2))

def transform_reviews(df: DataFrame) -> DataFrame:
    """Cleans and transforms reviews data."""
    return df.withColumn("review_creation_date", to_timestamp(col("review_creation_date"))) \
             .withColumn("review_answer_timestamp", to_timestamp(col("review_answer_timestamp"))) \
             .withColumn("review_score", col("review_score").cast("integer")) \
             .na.drop(subset=["order_id", "review_score"])

def transform_products(df: DataFrame) -> DataFrame:
    """Cleans and transforms products data."""
    return df.withColumnRenamed("product_name_lenght", "product_name_length") \
             .withColumnRenamed("product_description_lenght", "product_description_length")

def passthrough_transform(df: DataFrame) -> DataFrame:
    """A default function for data that needs no transformation."""
    return df

# =============================================================================
# 2. DETECT AVAILABLE PATHS AND LAKEHOUSES
# =============================================================================

print("🔍 Detecting available paths and lakehouses...")

# Try to list available lakehouses/paths
try:
    # Check what's available in the root
    print("Root directories:")
    dbutils.fs.ls("/")
except:
    print("Cannot access root with dbutils.fs")

# Try different path patterns for Bronze layer
bronze_paths_to_try = [
    "abfss://Alex_Olist@onelake.dfs.fabric.microsoft.com/LH_Bronze_AY.Lakehouse/Files/",
    "/LH_Bronze_AY.Lakehouse/Files/",
    "LH_Bronze_AY.Lakehouse/Files/",
    "Files/",
    ""
]

# Try different path patterns for Silver layer  
silver_paths_to_try = [
    "abfss://Alex_Olist@onelake.dfs.fabric.microsoft.com/LH_Silver_AY.Lakehouse/Files/",
    "/LH_Silver_AY.Lakehouse/Files/",
    "LH_Silver_AY.Lakehouse/Files/",
    "../LH_Silver_AY.Lakehouse/Files/",
    "Files/"
]

# Test read path
bronze_base_path = None
for path in bronze_paths_to_try:
    try:
        test_file = f"{path}olist_orders_dataset.csv"
        print(f"Testing read path: {test_file}")
        test_df = spark.read.format("csv").option("header", "true").load(test_file)
        if test_df.count() > 0:
            bronze_base_path = path
            print(f"✅ Found working Bronze path: {bronze_base_path}")
            break
    except Exception as e:
        print(f"❌ Failed: {test_file} - {str(e)[:100]}")

if not bronze_base_path:
    print("❌ Could not find Bronze layer files. Please check the file paths.")
    print("Available files in current directory:")
    try:
        for file_info in dbutils.fs.ls("."):
            print(f"  {file_info.name}")
    except:
        pass
    exit()

# Test write path
silver_base_path = None
for path in silver_paths_to_try:
    try:
        # Try to create the directory structure
        test_write_path = f"{path}test_folder/"
        print(f"Testing write path: {test_write_path}")
        
        # Create a simple test dataframe
        test_data = spark.createDataFrame([("test",)], ["col1"])
        test_data.coalesce(1).write.mode("overwrite").format("csv").option("header", "true").save(test_write_path)
        
        # Clean up test
        try:
            dbutils.fs.rm(test_write_path, True)
        except:
            pass
            
        silver_base_path = path
        print(f"✅ Found working Silver path: {silver_base_path}")
        break
    except Exception as e:
        print(f"❌ Failed: {test_write_path} - {str(e)[:100]}")

if not silver_base_path:
    print("❌ Could not access Silver layer for writing. Using Bronze path as fallback.")
    silver_base_path = bronze_base_path

# =============================================================================
# 3. CREATE CONFIGURATION
# =============================================================================

job_config = {
    "orders":       {"bronze": "olist_orders_dataset.csv", "silver": "silver_orders", "func": transform_orders},
    "customers":    {"bronze": "olist_customers_dataset.csv", "silver": "silver_customers", "func": passthrough_transform},
    "order_items":  {"bronze": "olist_order_items_dataset.csv", "silver": "silver_order_items", "func": transform_order_items},
    "products":     {"bronze": "olist_products_dataset.csv", "silver": "silver_products", "func": transform_products},
    "sellers":      {"bronze": "olist_sellers_dataset.csv", "silver": "silver_sellers", "func": passthrough_transform},
    "payments":     {"bronze": "olist_order_payments_dataset.csv", "silver": "silver_order_payments", "func": transform_payments},
    "reviews":      {"bronze": "olist_order_reviews_dataset.csv", "silver": "silver_order_reviews", "func": transform_reviews},
    "geolocation":  {"bronze": "olist_geolocation_dataset.csv", "silver": "silver_geolocation", "func": passthrough_transform},
    "translations": {"bronze": "product_category_name_translation.csv", "silver": "silver_product_translations", "func": passthrough_transform}
}

# =============================================================================
# 4. RUN THE TRANSFORMATION JOB
# =============================================================================

print(f"\n🚀 Starting CSV transformation job...")
print(f"📁 Reading from: {bronze_base_path}")
print(f"📁 Writing to: {silver_base_path}")

successful_transformations = 0

for key, config in job_config.items():
    bronze_filename = config["bronze"]
    silver_filename = config["silver"]
    transform_function = config["func"]
    
    read_path = f"{bronze_base_path}{bronze_filename}"
    write_path = f"{silver_base_path}{silver_filename}.csv"
    
    try:
        print(f"\n📊 Processing '{key}': {bronze_filename} -> {silver_filename}.csv")
        
        # Step 1: Read the CSV file
        bronze_df = spark.read \
            .format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("multiline", "true") \
            .option("escape", '"') \
            .load(read_path)
        
        row_count = bronze_df.count()
        if row_count == 0:
            print(f"⚠️  WARNING: '{key}' appears to be empty, skipping...")
            continue
        
        print(f"   📈 Loaded {row_count} rows")
        
        # Step 2: Apply transformation
        print(f"   🔄 Applying transformation: {transform_function.__name__}")
        transformed_df = transform_function(bronze_df)
        
        # Step 3: Clean column names
        final_df = transformed_df.toDF(*(c.lower().replace(" ", "_") for c in transformed_df.columns))
        
        # Step 4: Write as single CSV file
        print(f"   💾 Writing to Silver layer...")
        final_df.coalesce(1) \
            .write \
            .mode("overwrite") \
            .format("csv") \
            .option("header", "true") \
            .save(write_path)
        
        print(f"   ✅ Successfully processed '{key}' - {row_count} rows written to {silver_filename}.csv")
        successful_transformations += 1
        
    except Exception as e:
        print(f"   ❌ ERROR processing '{key}': {str(e)}")
        print(f"      Read path: {read_path}")
        print(f"      Write path: {write_path}")
        continue

print(f"\n🎉 Transformation job completed!")
print(f"📈 Successfully processed {successful_transformations}/{len(job_config)} files")

if successful_transformations > 0:
    print(f"\n📋 Transformed CSV files are now available in LH_Silver_AY:")
    for key, config in job_config.items():
        print(f"   • {config['silver']}.csv")

StatementMeta(, 683d243f-7cc7-4b06-af7e-979352f7fb30, 16, Finished, Available, Finished)

🔍 Detecting available paths and lakehouses...
Root directories:
Cannot access root with dbutils.fs
Testing read path: abfss://Alex_Olist@onelake.dfs.fabric.microsoft.com/LH_Bronze_AY.Lakehouse/Files/olist_orders_dataset.csv
✅ Found working Bronze path: abfss://Alex_Olist@onelake.dfs.fabric.microsoft.com/LH_Bronze_AY.Lakehouse/Files/
Testing write path: abfss://Alex_Olist@onelake.dfs.fabric.microsoft.com/LH_Silver_AY.Lakehouse/Files/test_folder/
✅ Found working Silver path: abfss://Alex_Olist@onelake.dfs.fabric.microsoft.com/LH_Silver_AY.Lakehouse/Files/

🚀 Starting CSV transformation job...
📁 Reading from: abfss://Alex_Olist@onelake.dfs.fabric.microsoft.com/LH_Bronze_AY.Lakehouse/Files/
📁 Writing to: abfss://Alex_Olist@onelake.dfs.fabric.microsoft.com/LH_Silver_AY.Lakehouse/Files/

📊 Processing 'orders': olist_orders_dataset.csv -> silver_orders.csv
   📈 Loaded 99441 rows
   🔄 Applying transformation: transform_orders
   💾 Writing to Silver layer...
   ✅ Successfully processed 'orders'