In [None]:
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "0"

from pathlib import Path
from PIL import Image
from transformers import (
    Mask2FormerImageProcessor,
    Mask2FormerForUniversalSegmentation,
)
import torch
import numpy as np
import shutil
from tqdm import tqdm
import datetime
import re

# from trailvision.data import COCOWithVideo, AeroFeatureCollection


task = "panoptic"
model_size = "base"
run_id = "polygon"

# Configuration
dataset_name = "gQg5IUvV"  # or "OdnkTZQ8"
use_projected = True  # True for PROJECTED, False for RAW

# Paths
datasets_dir = Path("/data/common/STEREOSTUDYIPSL/Datasets")
dataset_dir = datasets_dir / dataset_name
images_dir = dataset_dir / ("PROJECTED" if use_projected else "RAW")
trailvision_dir = Path("/data/common/TRAILVISION")
segmentation_dir = trailvision_dir / "segmentation"
models_dir = segmentation_dir / "models"
altitude_ft = 10_000 / 0.3048

# Load model from
base_model = f"facebook/mask2former-swin-{model_size}-coco-{task}"  # Base model from Hugging Face
checkpoint_dir = models_dir / task / run_id

# Save predictions to
predictions_dir = segmentation_dir / "predictions" / task / run_id / dataset_name

# Get categories from model (simplified without COCO annotations)
# You may need to adjust these categories based on your model training
categories = [
    {"id": 0, "name": "object", "isthing": 1},  # Generic object class
    {"id": 1, "name": "sky", "isthing": 0},     # Background/sky
]

id2label = {id: label["name"] for id, label in enumerate(categories)}

# Image processor (normalization, resizing, etc.)
processor = Mask2FormerImageProcessor.from_pretrained(
    base_model,
    do_resize=False,  # We handle resizing manually
    do_rescale=False,  # We handle rescaling manually
    do_normalize=True,  # Normalizes pixel values
    do_reduce_labels=True,  # Decreases label indices by 1 (in COCO format labels start at 1)
    ignore_index=255,  # Ignore label for padding/missing annotations
)

# Load model from checkpoint
model = Mask2FormerForUniversalSegmentation.from_pretrained(
    checkpoint_dir,
    id2label=id2label,  # Our custom class mapping
    ignore_mismatched_sizes=True,  # Class numbers differs from COCO
)

model = model.to("cuda")

# Remove all contents inside the directory if it exists
if predictions_dir.exists() and predictions_dir.is_dir():
    shutil.rmtree(predictions_dir)

predictions_dir.mkdir(parents=True)

print(f"Processing images from: {images_dir}")
print(f"Saving predictions to: {predictions_dir}")

In [None]:
kwargs = {}
kwargs["threshold"] = 0.5
kwargs["mask_threshold"] = 0.5
kwargs["overlap_mask_area_threshold"] = 0.8
kwargs["return_binary_maps"] = True
post_process_segmentation = processor.post_process_instance_segmentation

In [None]:
# Helper function to parse timestamp from filename
def parse_timestamp_from_filename(filename):
    """
    Parse timestamp from filename format: YYYYMMDDHHMMSS_XX.jpg
    Example: 20250406044600_01.jpg -> 2025-04-06 04:46:00
    """
    match = re.match(r'(\d{14})_\d{2}\.jpg', filename)
    if match:
        timestamp_str = match.group(1)
        return datetime.datetime.strptime(timestamp_str, '%Y%m%d%H%M%S')
    return None

# Get all image files
image_files = sorted(list(images_dir.glob("*.jpg")))
print(f"Found {len(image_files)} images")

# Group images by date (video sessions)
videos = {}
for img_path in image_files:
    timestamp = parse_timestamp_from_filename(img_path.name)
    if timestamp:
        date_key = timestamp.strftime('%Y%m%d')
        if date_key not in videos:
            videos[date_key] = []
        videos[date_key].append((img_path, timestamp))

print(f"Grouped into {len(videos)} video sessions: {list(videos.keys())}")

In [None]:
# Process each video session
for date_key, images in tqdm(videos.items(), desc="Processing videos"):
    print(f"\nProcessing video session: {date_key}")
    
    # Get start and end times for this video session
    timestamps = [ts for _, ts in images]
    video_start = min(timestamps)
    video_stop = max(timestamps)
    video_ref = f"{video_start.strftime('%Y%m%d%H%M%S')}_{video_stop.strftime('%Y%m%d%H%M%S')}"
    
    all_predictions = []
    
    for img_path, img_time in tqdm(images, desc=f"  Images", leave=False):
        # Load image
        image = np.array(Image.open(img_path).convert("RGB"))
        
        # Prepare inputs
        inputs = processor([image], return_tensors="pt").to(model.device)
        
        # Run inference
        with torch.no_grad():
            outputs = model(**inputs)
        
        target_sizes = image.shape[:2]
        
        # Post-process segmentation
        segmentation = post_process_segmentation(
            outputs,
            target_sizes=[target_sizes],
            **kwargs,
        )[0]
        
        # Store prediction with metadata
        prediction = {
            'filename': img_path.name,
            'time': img_time,
            'segmentation': segmentation,
            'image_shape': image.shape[:2]
        }
        all_predictions.append(prediction)
    
    # Save predictions for this video session
    out_file = (predictions_dir / video_ref).with_suffix(".npz")
    
    # Convert to saveable format
    save_data = {
        'video_start': video_start.isoformat(),
        'video_stop': video_stop.isoformat(),
        'filenames': [p['filename'] for p in all_predictions],
        'timestamps': [p['time'].isoformat() for p in all_predictions],
    }
    
    # Save segmentation masks separately for each image
    for i, pred in enumerate(all_predictions):
        seg = pred['segmentation']
        if 'segmentation' in seg:
            save_data[f'mask_{i}'] = seg['segmentation'].cpu().numpy()
        if 'segments_info' in seg:
            save_data[f'segments_info_{i}'] = str(seg['segments_info'])
    
    np.savez_compressed(out_file, **save_data)
    print(f"  Saved {len(all_predictions)} predictions to {out_file.name}")

print(f"\nProcessing complete! Results saved to: {predictions_dir}")