# Create a delta table in Databricks
* Find all the files of interest for that table.
* Generate a pandas df.
* Save the table with spark.

In [None]:
%pip install git+https://github.com/AgDMALabs-Public/ag-vision-dataops.git

In [None]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from ag_vision.pipelines import image_processing as ip

spark = SparkSession.builder.getOrCreate()

In [None]:
# Set to True if you want to generate the whole table from scratch. (this will read all metadatafile)
# Set to False if you only want to add new data. (this will read only metadata files for images not in the table.)
REGEN = True
TABLE_NAME = "use1_prod_artemis_catalog_3718194974443840.production.images_table"

In [None]:
# 1. Load the root directory
root_path = "/Volumes/use1_prod_artemis_catalog_3718194974443840/production/data/"

df = spark.read.format("binaryFile") \
    .option("recursiveFileLookup", "true") \
    .load(root_path)

# 2. Filter by Depth AND Regex
# Your pattern: artemis/*/*/*/*/*/*/im*/**/*
# This implies a specific number of folders between 'artemis' and the file.
img_paths_df = df.select("path").filter(
    # Ensure it's an image
    F.col("path").rlike(r"\.(jpg|jpeg|png|webp)$") &
    # Ensure 'im' folder is in the path
    F.col("path").contains("/im") &
    # Ensure the depth matches your 13-slash pattern, this will make sure annotation images done make it in.
    (F.size(F.split(F.col("path"), "/")) >= 13)
)

# 3. Collect
img_paths_list = [row.path for row in img_paths_df.toLocalIterator()]

print(f"Found {len(img_paths_list)} paths at the correct depth.")

In [None]:
old_df = spark.table(TABLE_NAME).toPandas()
print(f"The Old Table len is {len(old_df)}")

In [None]:
img_paths_list = [x.replace('dbfs:', '') for x in img_paths_list]
if not REGEN:
    s = pd.Series(img_paths_list)
    run_list = s[~s.isin(old_df['file_path'])].tolist()
else:
    run_list = img_paths_list

In [None]:
print(f"The len of the run list is. {len(run_list)}")

In [None]:
# Need to generate the schema.
sample_df = ip.generate_images_table(img_list=run_list[:1],
                                      platform='db',
                                      project_index=6)

my_schema = spark.createDataFrame(sample_df).schema.simpleString()

df = spark.createDataFrame([(i,) for i in run_list], ["item"])

In [None]:
def process_batch(iterator):
    """
    Processes an iterator of pandas DataFrames (chunks) and yields processed results.
    """
    for pdf in iterator:
        # pdf is a pandas DataFrame chunk
        items = pdf["item"].tolist()

        if not items:
            # Yield an empty DataFrame with the correct columns if the chunk is empty
            yield pd.DataFrame(columns=sample_df.columns)
        else:
            # Process the items and yield the resulting DataFrame
            yield ip.generate_images_table(img_list=items,
                                           platform='db',
                                           project_index=6)

img_spark_df = df.mapInPandas(process_batch, schema=my_schema)

In [None]:
# 1. Check if the table exists and REGEN is False
table_exists = spark.catalog.tableExists(TABLE_NAME)

if not REGEN and table_exists:
    # Load the existing data as a Spark DataFrame
    old_spark_df = spark.read.table(TABLE_NAME)
    old_spark_df = old_spark_df.drop("image_path", "error")

    # Combine (Union) the old data with the new results
    # unionByName is safer as it matches columns by name, not position
    final_spark_df = old_spark_df.unionByName(img_spark_df)
else:
    # If REGEN is True or table doesn't exist, just use the new data
    final_spark_df = img_spark_df

#2. Write the final result back to the table
final_spark_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(TABLE_NAME)