### Backfill detection stats.

In [None]:
import boto3
from decimal import Decimal
from collections import defaultdict
from boto3.dynamodb.conditions import Key, Attr



def convert_floats_to_decimals(obj):
    if isinstance(obj, float):
        return Decimal(str(obj))
    elif isinstance(obj, list):
        return [convert_floats_to_decimals(i) for i in obj]
    elif isinstance(obj, dict):
        return {k: convert_floats_to_decimals(v) for k, v in obj.items()}
    else:
        return obj

AWS_REGION = "eu-west-1"
DEVICE_IDS = [
    "B8A44FE820CC__construction_ptz",
    "B8A44FCDF536__parking_lot",
    "B8A44F976508__panoramic_tree",
    "B8A44FB97C3C__panoramic_garages",
    "B8A44F9C9902__train_rails",
    "B8A44FD014E5__distorted_tree"
]

session = boto3.Session(region_name=AWS_REGION)
dynamodb = session.resource("dynamodb")

events_table = dynamodb.Table("events")
detections_table = dynamodb.Table("event_detections")

In [None]:
import time
from decimal import Decimal, ROUND_HALF_UP

def compute_detection_stats(detections, valid_classes):
    stats = defaultdict(lambda: {"total_confidence": Decimal("0.0"), "max_confidence": Decimal("0.0"), "n_frames": 0})
    
    for item in detections:
        for det in item.get("detections", []):
            cls = det["label"]
            score = det["score"]
            if cls in valid_classes:
                stats[cls]["total_confidence"] += score
                stats[cls]["max_confidence"] = max(stats[cls]["max_confidence"], score)
                stats[cls]["n_frames"] += 1

    final_stats = {}
    for cls in stats:
        n = stats[cls]["n_frames"]
        if n == 0:
            continue
        avg = (stats[cls]["total_confidence"] / Decimal(n)).quantize(Decimal("0.0001"), rounding=ROUND_HALF_UP)
        max_conf = stats[cls]["max_confidence"].quantize(Decimal("0.0001"), rounding=ROUND_HALF_UP)
        final_stats[cls] = {
            "avg_confidence": avg,
            "max_confidence": max_conf,
            "n_frames": n
        }

    return final_stats


In [None]:

for device_id in DEVICE_IDS:
    print(f"\n🔄 Processing device: {device_id}")
    last_evaluated_key = None

    while True:
        kwargs = {
            "KeyConditionExpression": Key("device_id").eq(device_id),
            "FilterExpression": Attr("detection_stats").not_exists()
        }
        if last_evaluated_key:
            kwargs["ExclusiveStartKey"] = last_evaluated_key

        response = events_table.query(**kwargs)

        for event in response["Items"]:
            event_timestamp = event["event_timestamp"]
            seen_classes = event.get("seen_classes", [])

            if not seen_classes:
                print(f"  ⚠️ Skipping event {event_timestamp} (empty seen_classes)")
                continue

            print(f"  🔍 Processing event at {event_timestamp}")
            detections = []
            gsi_last_evaluated_key = None
            start_time = time.time()

            while True:
                gsi_kwargs = {
                    "IndexName": "event_timestamp_index",
                    "KeyConditionExpression": Key("event_timestamp").eq(event_timestamp)
                }
                if gsi_last_evaluated_key:
                    gsi_kwargs["ExclusiveStartKey"] = gsi_last_evaluated_key

                detections_response = detections_table.query(**gsi_kwargs)

                # Filter manually
                detections_page = detections_response["Items"]
                detections.extend([d for d in detections_page if d.get("device_id") == device_id])

                gsi_last_evaluated_key = detections_response.get("LastEvaluatedKey")
                if not gsi_last_evaluated_key:
                    break


            elapsed_time = time.time() - start_time
            print(f"    ⏱️ Detections query took {elapsed_time:.4f} seconds. Total items: {len(detections)}")

            if not detections:
                print("    ⚠️ No detections found. Skipping.")
                continue

            detection_stats = compute_detection_stats(detections, valid_classes=set(seen_classes))
            print(f"    🔢 Computed detection stats: {detection_stats}")
            if not detection_stats:
                print("    ⚠️ No valid stats computed. Skipping.")
                continue
            

            try:
                events_table.update_item(
                    Key={
                        "device_id": device_id,
                        "event_timestamp": event_timestamp
                    },
                    UpdateExpression="SET detection_stats = :stats",
                    ExpressionAttributeValues={
                        ":stats": detection_stats
                    }
                )
                print(f"    ✅ detection_stats updated for {event_timestamp}")
            except Exception as e:
                print(f"    ❌ Failed to update {event_timestamp}: {e}")

        last_evaluated_key = response.get("LastEvaluatedKey")
        if not last_evaluated_key:
            break
        