In [None]:
from notebookutils import mssparkutils
from pyspark.sql.functions import lit
from functools import reduce
from datetime import datetime

# Step 1: Set paths
input_path = "abfss://raw-data@datapipelineprojectsa123.dfs.core.windows.net/"
output_base = "abfss://clean-data@datapipelineprojectsa123.dfs.core.windows.net/"

# Step 2: List all CSV files using mssparkutils
files = mssparkutils.fs.ls(input_path)
csv_files = [f.path for f in files if f.path.endswith(".csv")]



# Step 3: Define expected columns
expected_columns = ["OrderID", "OrderDate", "CustomerName", "Region", "Product", "Category", "Quantity", "UnitPrice", "Amount"]
dfs = []

# Step 4: Load, align, and clean each file
for path in csv_files:
    try:
        df = spark.read.option("header", True).option("inferSchema", True).csv(path)

        # Skip empty files
        if len(df.columns) == 0:
            print(f"⚠️ Skipping empty file: {path}")
            continue

        # Add missing columns as nulls
        for col in expected_columns:
            if col not in df.columns:
                df = df.withColumn(col, lit(None))

        # Reorder/select columns
        df = df.select(*expected_columns)
        dfs.append(df)

    except Exception as e:
        print(f"❌ Failed to read {path}: {str(e)}")

# Step 5: Combine and write
if dfs:
    final_df = reduce(lambda a, b: a.unionByName(b), dfs)
    final_df_clean = final_df.dropna().dropDuplicates()

    folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M")
    output_path = f"{output_base}{folder_name}/"
    final_df_clean.write.mode("overwrite").option("header", True).csv(output_path)
    print(f"✅ Write complete: {output_path}")

else:
    print("⚠️ No valid files found to process.")
