In [0]:
# read the table
df = spark.table("sales_file_catalog.default.sales_20_days_clean")

In [0]:
df.display()

In [0]:
from pyspark.sql.functions import col
from datetime import datetime

# Load source table
df = spark.table("sales_file_catalog.default.sales_20_days_clean")

# Collect unique dates
unique_dates = df.select("date").distinct().collect()

# Base output path
base_path = "/Volumes/sales_file_catalog/default/output_sales/sales_data"

# Loop through each unique date
for row in unique_dates:
    raw_date = row["date"]  # could be string or DateType

    # Ensure raw_date is a datetime object
    if isinstance(raw_date, str):
        raw_date = datetime.strptime(raw_date, "%d-%m-%Y")

    year = raw_date.strftime("%Y")
    month = raw_date.strftime("%m")
    day = raw_date.strftime("%d")

    # Construct directory path
    output_path = f"{base_path}/{year}/{month}/{day}"

    # Filter data
    filtered_df = df.filter(col("date") == row["date"])

    # # Save as Delta file
    # filtered_df.write.format("delta").mode("overwrite").save(output_path)
    filtered_df.write.format("csv").mode("overwrite").save(output_path + '_csv')

    print(f" Saved Delta files to:{output_path}")

In [0]:
from pyspark.sql.functions import col
from datetime import datetime

# Step 1: Read the table
df = spark.table("sales_file_catalog.default.sales_20_days_clean")

# Step 2: Unique dates
unique_dates = df.select("date").distinct().collect()

# Step 3: Base path
base_path = "/Volumes/sales_file_catalog/default/output_sales/sales_data"

# Step 4: Loop through each date
for row in unique_dates:
    date_value = row["date"]
    
    # Convert to datetime object
    date_obj = datetime.strptime(str(date_value), "%Y-%m-%d")

    # Extract year, month, day
    year = date_obj.strftime("%Y")
    month = date_obj.strftime("%m")
    day = date_obj.strftime("%d")

    # Path to save
    output_path = f"{base_path}/{year}/{month}/{day}"

    # Filter for this date
    df_filtered = df.filter(col("date") == date_value)

    # Save in Delta format
    df_filtered.write.format("delta").mode("overwrite").save(output_path)

    print(f" Saved: {output_path}")


In [0]:
#Import required libraries
from pyspark.sql.functions import col
from datetime import datetime
import time

#Initialize log list
log_data = []

#Load source table
df = spark.table("sales_file_catalog.default.sales_20_days_clean")

#Get distinct dates from the dataset
unique_dates = df.select("date").distinct().collect()

#Set base output path
base_path = "/Volumes/sales_file_catalog/default/output_sales/sales_data"

#  Loop through each date and save data partition-wise
for row in unique_dates:
    date_value = row["date"]
    
    #Initialize log entry with date
    log_entry = {"date": str(date_value)}
    
    try:
        #Start timing
        start_time = time.time()

        # Parse date and build folder path
        date_obj = datetime.strptime(str(date_value), "%Y-%m-%d")
        year = date_obj.strftime("%Y")
        month = date_obj.strftime("%m")
        day = date_obj.strftime("%d")
        output_path = f"{base_path}/{year}/{month}/{day}"

        # Filter data for this date
        df_filtered = df.filter(col("date") == date_value)

        #Count records
        record_count = df_filtered.count()

        #Save to Delta
        df_filtered.write.format("delta").mode("overwrite").save(output_path)

        #Log duration and status
        duration = round(time.time() - start_time, 2)
        log_entry.update({
            "status": "SUCCESS",
            "output_path": output_path,
            "records": record_count,
            "duration_sec": duration
        })

    except Exception as e:
        #Log error info
        log_entry.update({
            "status": "ERROR",
            "output_path": None,
            "records": None,
            "duration_sec": None,
            "error": str(e)
        })

    #Append log entry to list
    log_data.append(log_entry)

#Convert log to Spark DataFrame
log_spark_df = spark.createDataFrame(log_data)

#Display logs
display(log_spark_df)
