In [0]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import re

In [0]:
catalog = "ademianczuk"
database = "ncr"
table = f"{catalog}.{database}.csv_copy_progress"
src_dir = f"/Volumes/{catalog}/{database}/data/csv"
dst_dir = f"/Volumes/{catalog}/{database}/data/loader"

dbutils.fs.mkdirs(dst_dir)

In [0]:
# Create table once
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {table} (
  filename STRING,
  seq BIGINT,
  status STRING,
  copied_at TIMESTAMP
) USING DELTA
""")

In [0]:
#Validate the files by name
def is_month_csv(name):
    return name.startswith("month_") and name.endswith(".csv")

#Validate the file key (month number)
def month_key(name):
    m = re.search(r"month[_-](\d{2})\.csv$", name)
    return int(m.group(1)) if m else 9999



In [0]:
#Get a list of the files in the source directory
src_files = [f for f in dbutils.fs.ls(src_dir) if is_month_csv(f.name)]
src_files = sorted(src_files, key=lambda f: month_key(f.name))

#If no source files exist, raise an elegant error message
if not src_files:
    raise RuntimeError(f"No month_*.csv files found in {src_dir}. Have you populated your source directory with data?")

#Compute the next sequence number by counting distinct filenames already logged as "done"
done = spark.table(table).filter(col("status") == "done").select("filename").distinct().collect()
done_set = {r.filename for r in done}

#Determine the next file (first in order not yet “done”)
pending = [f for f in src_files if f.name not in done_set]

In [0]:
if not pending:
    print(f"Nothing to do. All {len(src_files)} files already copied.")
else:
    src = pending[0]
    dst = f"{dst_dir}/{src.name}"

    #Idempotent copy
    exists = False
    try:
        _ = dbutils.fs.ls(dst)
        exists = True
    except Exception:
        exists = False

    if not exists:
        dbutils.fs.cp(src.path, dst)
        status = "done"
        print(f"Copied: {src.path}  ->  {dst}")
    else:
        status = "already_present"
        print(f"Already present, logging and skipping: {dst}")

    #Append a row to the tracking table (csv_copy_progress)
    seq_num = 1 + spark.table(table).count()
    df = spark.createDataFrame(
        [Row(filename=src.name, seq=seq_num, status=status)]
    )
    
    df = df.withColumn(
        "copied_at", expr("current_timestamp()")
    )

    df.write.mode("append").saveAsTable(table)

    remaining = len(pending) - 1
    print(f"Progress: {len(done_set) + 1}/{len(src_files)} copied. {remaining} remaining.")

In [0]:
spark.sql(f"""
DROP TABLE IF EXISTS {table}
""")

dbutils.fs.rm(dst_dir, True)