In [0]:
# define the year of photoes for table names
photo_year="2024"

# schema of the output
schema= "meadowbank_prod"

# define output of this model's name and location, ex. md24v6 then final table will be 
process_name="md24_tws"

# Volume root path
volume_root = "/Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/2024/"

# designated folder for animal output images
wildlife_output_folder="/Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Wildlife/"

vehicle_output_folder="/Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Vehicle/"

human_output_folder="/Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Human/"

# includes timestamp
manifest_timestamp_table=f"{schema}.remote_cameras.manifest_timestamp_{photo_year}"

# model output table includes model output from both models
model_output_table=f"{schema}.remote_cameras.{process_name}_megadetector_v5a_v6byolo9c_results_{photo_year}"

# manifest final output table for final_category to classify images
animal_confidence_threshold=0.5
vehicle_confidence_threshold=0.3
human_confidence_threshold=0.3

In [0]:
#import model output table
df=spark.sql(f"select * from {model_output_table}")

In [0]:
df.show(5,truncate=False)

+---------------------------------------------------------------------------------------------------------------+--------+---------+----------------------------+---------+--------------------------+
|image_path                                                                                                     |category|conf_md5a|bbox_md5a                   |conf_md9c|bbox_md9c                 |
+---------------------------------------------------------------------------------------------------------------+--------+---------+----------------------------+---------+--------------------------+
|/Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/2024/FirstPart/Camera 1/2023-12-19/RCNX0348.JPG|2       |0.331    |[0.459, 0.675, 0.918, 0.605]|NULL     |NULL                      |
|/Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/2024/FirstPart/Camera 1/2023-12-19/RCNX0604.JPG|3       |0.41     |[0.5, 0.342, 1.0, 0.644]    |NULL     |NULL                      |
|/Vol

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import regexp_replace

df_ts = spark.sql(f"select * from {manifest_timestamp_table}")

df_ts = df_ts.withColumn(
    "TM_flag",
    F.when(F.col("timestamp").endswith("0:00"), "T").otherwise("M")
)

df_ts = df_ts.withColumn(
    "image_path",
    F.regexp_replace("path", r"^dbfs:", "")).select("image_path","TM_flag","timestamp")

In [0]:
import pyspark.sql.functions as F

#threshold 0.5 was used here 
df_very_confident = df.join(df_ts, on="image_path", how="inner").filter(F.col("TM_flag") != "T")\
    .filter((F.col("category") == "1") & ((F.col("conf_md5a") >= animal_confidence_threshold) & (F.col("conf_md9c") >= animal_confidence_threshold)))

# identify all the images taken by camera 4
df_very_confident = df_very_confident.withColumn(
    "camera_folder",
    F.regexp_extract(F.col("image_path"), r"(Camera \d+)", 1)
)

# exclude all tagged images from camera 4
df_very_confident_no_camera4 = df_very_confident.filter(~F.col("image_path").contains("Camera 4"))

print("number of distinct images is: " + str(df_very_confident_no_camera4.select(F.countDistinct("image_path")).collect()[0][0]))

number of distinct images is: 5221


In [0]:
# some images may contain multiple animals there fore has more than one row, only 5221 distinct images
df_very_confident_no_camera4.count()

22098

In [0]:
from pyspark.sql import functions as F

#use megadetector v6b-yolo9c to draw bounding boxes because it should have higher precision than v5a
df_output=df_very_confident_no_camera4.select("image_path","category","conf_md9c","bbox_md9c")

df_dedup = df_output.dropDuplicates()

df_animal = (
    df_dedup
        .groupBy("image_path","category")   # keep metadata
        .agg(
            F.collect_list("conf_md9c").alias("conf"),
            F.collect_list("bbox_md9c").alias("bbox"),
        )
)

df_animal.show(truncate=False)


+-------------------------------------------------------------------------------------------------------------------------------------+--------+---------------------+-----------------------------------------------------------------------------------------+
|image_path                                                                                                                           |category|conf                 |bbox                                                                                     |
+-------------------------------------------------------------------------------------------------------------------------------------+--------+---------------------+-----------------------------------------------------------------------------------------+
|/Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/2024/FirstPart/Camera 1/2024-05-20/101RECNX/RCNX3526.JPG             |1       |[0.516]              |[[0.99, 0.675, 0.02, 0.142]]                                   

In [0]:
from PIL import Image, ImageDraw, ImageFont

#1. draw bounding boxes on single image
def draw_boxes(image_path, conf_list, bbox_list, output_path):
    img = Image.open(image_path).convert("RGB")
    draw = ImageDraw.Draw(img)
    w, h = img.size

    # Load font safely
    try:
        font = ImageFont.truetype("arial.ttf", 20)
    except:
        font = ImageFont.load_default()

    def text_size(text):
        try:
            # Modern Pillow (>=8)
            bbox = draw.textbbox((0, 0), text, font=font)
            return bbox[2] - bbox[0], bbox[3] - bbox[1]
        except:
            return font.getsize(text)

    # Draw boxes
    for conf, b in zip(conf_list, bbox_list):
        x, y, bw, bh = b
        
        x1 = int(x * w)
        y1 = int(y * h)
        x2 = int((x + bw) * w)
        y2 = int((y + bh) * h)

        draw.rectangle([x1, y1, x2, y2], outline="red", width=3)

        label = f"{conf:.3f}"
        tw, th = text_size(label)

        draw.rectangle([x1, y1 - th, x1 + tw, y1], fill="red")
        draw.text((x1, y1 - th), label, fill="white", font=font)

    img.save(output_path)


In [0]:
import os
# 2. Process entire df_animal and write images to output_root
def process_dataframe(df, output_folder):
    """
    Loop through df, read image_path/conf/bbox, 
    and save annotated images into output_folder.
    """

    # Ensure root folder exists
    dbutils.fs.mkdirs(output_folder)

    # Collect data to driver (use Pandas UDF for very large datasets)
    rows = df.select("image_path", "conf", "bbox").collect()

    print(f"Processing {len(rows)} images...")
    
    for r in rows:
        image_path = r.image_path
        conf_list = r.conf
        bbox_list = r.bbox

        # Extract filename
        filename = os.path.basename(image_path)
        out_file = filename.replace(".JPG", "_annotated.jpg")

        # Full DBFS paths
        input_path = f"{image_path}"
        output_path = f"{output_folder}{out_file}"

        try:
            draw_boxes(input_path, conf_list, bbox_list, output_path)
            print("Saved:", output_path)
        except Exception as e:
            print("Error processing", image_path, " â†’ ", e)

    print("All done!")


In [0]:
#if folder path exists
dbutils.fs.mkdirs(wildlife_output_folder)

True

In [0]:
#process vehicle images
process_dataframe(df_animal,animal_output_folder)

Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Wildlife/RCNX3526_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Wildlife/RCNX7928_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Wildlife/RCNX8118_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Wildlife/RCNX0076_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Wildlife/RCNX3615_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Wildlife/RCNX3632_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Wildlife/RCNX4216_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Wildlife/RCNX0003_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_c

In [0]:
import pyspark.sql.functions as F

#threshold 0.3 was used here 
df_vehicle = df.join(df_ts, on="image_path", how="inner").filter(F.col("TM_flag") != "T")\
    .filter((F.col("category") == "3") & ((F.col("conf_md5a") >= vehicle_confidence_threshold) & (F.col("conf_md9c") >= vehicle_confidence_threshold)))

print("number of distinct images is: " + str(df_vehicle.select(F.countDistinct("image_path")).collect()[0][0]))

number of distinct images is: 391742


In [0]:
from pyspark.sql import functions as F

#use megadetector v6b-yolo9c to draw bounding boxes because it should have higher precision than v5a
df_output=df_vehicle.select("image_path","category","conf_md9c","bbox_md9c")

df_dedup = df_output.dropDuplicates()

df_vehicle = (
    df_dedup
        .groupBy("image_path","category")   # keep metadata
        .agg(
            F.collect_list("conf_md9c").alias("conf"),
            F.collect_list("bbox_md9c").alias("bbox"),
        )
)

df_vehicle.show(truncate=False)


+------------------------------------------------------------------------------------------------------------------------+--------+-------+------------------------------+
|image_path                                                                                                              |category|conf   |bbox                          |
+------------------------------------------------------------------------------------------------------------------------+--------+-------+------------------------------+
|/Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/2024/FirstPart/Camera 1/2023-12-19/RCNX0359.JPG         |3       |[0.367]|[[0.869, 0.808, 0.262, 0.298]]|
|/Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/2024/FirstPart/Camera 1/2024-01-16/RCNX0153.JPG         |3       |[0.838]|[[0.764, 0.865, 0.205, 0.223]]|
|/Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/2024/FirstPart/Camera 1/2024-01-16/RCNX0155.JPG         |3       |[0.901]|[[0.85

In [0]:
#process vehicle images
process_dataframe(df_vehicle,vehicle_output_folder)

Processing 391742 images...
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Vehicle/RCNX0359_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Vehicle/RCNX0153_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Vehicle/RCNX0155_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Vehicle/RCNX2064_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Vehicle/RCNX2112_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Vehicle/RCNX2114_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Vehicle/RCNX2135_annotated.jpg
Saved: /Volumes/meadowbank_prod/remote_cameras/mbk_remote_cameras_blob/results/2024_TWS/Vehicle/RCNX2153_annotated.jpg
Saved: /Volumes/mead

In [0]:
import pyspark.sql.functions as F

#threshold 0.3 was used here 
df_human = df.join(df_ts, on="image_path", how="inner").filter(F.col("TM_flag") != "T")\
    .filter((F.col("category") == "2") & ((F.col("conf_md5a") >= human_confidence_threshold) & (F.col("conf_md9c") >= human_confidence_threshold)))

print("number of distinct images is: " + str(df_human.select(F.countDistinct("image_path")).collect()[0][0]))

In [0]:
from pyspark.sql import functions as F

#use megadetector v6b-yolo9c to draw bounding boxes because it should have higher precision than v5a
df_output=df_human.select("image_path","category","conf_md9c","bbox_md9c")

df_dedup = df_output.dropDuplicates()

df_human = (
    df_dedup
        .groupBy("image_path","category")   # keep metadata
        .agg(
            F.collect_list("conf_md9c").alias("conf"),
            F.collect_list("bbox_md9c").alias("bbox"),
        )
)

df_human.show(truncate=False)


In [0]:
#process human images
process_dataframe(df_human,human_output_folder)