In [0]:
import os
import time

def reset(dest_path):
    dbutils.fs.rm(dest_path, True)

def copy_next_n_rows(
  n,
  source_table,
  dest_path):
    
    df = spark.table(source_table)

    # Find latest timestamp in destination
    dest_exists = os.path.exists(dest_path)
    dest_files = os.listdir(dest_path) if dest_exists else []
    if dest_files:
        dest_df = spark.read.json(dest_path)
        if dest_df.count() > 0:
            last_ts = dest_df.agg({"timestamp": "max"}).collect()[0][0]
        else:
            last_ts = None
    else:
        last_ts = None

    # Filter for next N rows after last_ts
    if last_ts:
        next_rows = df.filter(df.timestamp > last_ts).orderBy("timestamp").limit(n)
    else:
        next_rows = df.orderBy("timestamp").limit(n)
    
    # If nothing to write, exit
    if next_rows.count() == 0:
        print("No new rows to copy.")
        return
    
    # Write as JSON, append mode, preserve format
    next_rows.write.mode("append").json(dest_path)
    print(f"Copied {next_rows.count()} rows to {dest_path}")

In [0]:
dest_path = "/Volumes/rtlh_lakehouse_labs/bootcamp_oct_2025/resources/data/gym_stream/"
source_table = "rtlh_lakehouse_labs.bootcamp_oct_2025.lab2_bronze"

In [0]:
#reset(dest_path)

In [0]:
while True:
  n = 1000
  sleep = 10
  copy_next_n_rows(n, source_table, dest_path)
  print(f"Copied {n} rows at {time.strftime('%Y-%m-%d %H:%M:%S')}")
  print(f"Sleeping for {sleep} seconds...")
  time.sleep(sleep)