In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import to_date, to_timestamp

# Paths and table
source_dir = "/Volumes/incremental_load/default/orders_data/source/"
target_dir = "/Volumes/incremental_load/default/orders_data/archive/"
stage_table = "incremental_load.default.orders_stage"

# Define schema
schema = StructType([
    StructField("order_num", IntegerType(), True),
    StructField("tracking_num", StringType(), True),
    StructField("pck_recieved_date", StringType(), True),   # will be cast to Date
    StructField("package_deliver_date", StringType(), True),# will be cast to Date
    StructField("status", StringType(), True),
    StructField("address", StringType(), True),
    StructField("last_update_timestamp", StringType(), True) # will be cast to Timestamp
])

# Read CSV with schema
df = spark.read.csv(source_dir, header=True, schema=schema)

# Cast date/timestamp fields
df = (df
      .withColumn("pck_recieved_date", to_date("pck_recieved_date", "dd-MM-yyyy"))
      .withColumn("package_deliver_date", to_date("package_deliver_date", "dd-MM-yyyy"))
      .withColumn("last_update_timestamp", to_timestamp("last_update_timestamp", "dd-MM-yyyy HH:mm"))
)

# Write to Delta table
df.write.format("delta").mode("overwrite").saveAsTable(stage_table)

# Archive files after processing
files = dbutils.fs.ls(source_dir)

for file in files:
    src_path = file.path
    target_path = target_dir + src_path.split("/")[-1]
    dbutils.fs.mv(src_path, target_path)
