In [0]:
source_dir = "/Volumes/incremental_load/default/orders_data/source/"
target_dir = "/Volumes/incremental_load/default/orders_data/archive/"
stage_table = "incremental_load.default.orders_stage"

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, TimestampType

# Initialize Spark
spark = SparkSession.builder.getOrCreate()

# Define schema explicitly (best practice for performance and accuracy)
schema = StructType([
    StructField("order_num", IntegerType(), True),
    StructField("tracking_num", StringType(), True),
    StructField("pck_recieved_date", DateType(), True),
    StructField("package_deliver_date", DateType(), True),
    StructField("status", StringType(), True),
    StructField("address", StringType(), True),
    StructField("last_update_timestamp", TimestampType(), True),
])

# Read data from CSV (adjust the path accordingly)
df = spark.read.option("header", "true") \
               .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
               .schema(schema) \
               .csv(source_dir)

df.show(truncate=False)

In [0]:
df.write.format("delta").mode("overwrite").saveAsTable(stage_table)

In [0]:
# List all files in the source directory
files = dbutils.fs.ls(source_dir)

# Iterate on the list one by one and print each file path separately
for file in files:
    src_path = file.path

    # Construct the target path
    target_path = target_dir + src_path.split("/")[-1]
    
    # Move the file
    dbutils.fs.mv(src_path, target_path)