# Automated Video Retrieval from ROS Bag Files via Batch 

This notebook documents the full workflow for automatically retrieving short video clips
embedded in ROS bag files from a Batch API. It explains the motivation, design decisions,
implementation details, and challenges encountered during development.

## 1. Project Overview

The goal of this project was to build a reliable and scalable pipeline to automatically
download short video clips embedded in ROS bag files using the Batch API.

Each clip:
- Covers **30 seconds** of video
- Is associated with a **specific vehicle** and **camera**
- Is requested using precise **UTC timestamps**
- Is archived in a **date-based folder structure**

The final pipeline supports:
- Multiple vehicles (e.g. `mallory`, `mav`, `megalodon`, `metatron`, `morizo`)
- Multiple camera views (e.g. front-center, left-center, right-center)
- Long time windows (up to full-day retrievals)
- Execution on **Savio HPC** to improve performance and scalability


## 2. Task Description

The task consists of the following high-level steps:

1. Authenticate with the Batch API using a manually retrieved token
2. Request video clips in **30-second windows** for a given vehicle and camera
3. Poll the API until each clip is ready
4. Download the resulting ROS bag file
5. Organize clips into vehicle-specific subfolders
6. Enforce Savio storage constraints (≤ 10 GB per subfolder)
7. Archive downloaded clips into date-based folders for long-term storage

The pipeline was first prototyped locally and later migrated to Savio for faster execution.


## 3. Environment Setup and Constraints

### Local Development
- Initial testing and debugging were done on a local machine
- Performance was limited due to network speed and I/O constraints

### Savio HPC
- Migration to Savio Jupyter (HTC partition, single-core jobs)
- Significantly faster API requests and downloads
- **Storage constraint:** ~20 GB total per workspace
- **Download constraint:** subfolders capped at **10 GB**

To work within these constraints:
- Downloaded files were periodically transferred to an external SSD
- Local Savio storage was cleaned after each batch


In [1]:
import os
from datetime import datetime, timezone
import time
import json
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import pathlib
from pathlib import Path
from zoneinfo import ZoneInfo
import random
import matplotlib.pyplot as plt

In [2]:
API_BASE = "https://fleet-batch.api.maymobility.com"
VIDEO_URL = f"{API_BASE}/v1/video"
DL_URL = f"{API_BASE}/v1/download"
VEHICLES = ["metatron"] 
CAMERA = "front-center" 
DURATION_SECONDS = 30  
STEP_SECONDS = 30  
THROTTLE_SEC = 0.2
DAY_TZ = datetime.now(timezone.utc)
SKIP_ON_404_SEC = 300
LOOKBACK_S = 5 * 24 * 3600     
MAX_FILES_PER_VEHICLE = None 
MIN_VALID_BYTES = 1_000_000
MAX_OUTPUT_BYTES = 10 * 1024**3
BUCKET_PREFIX = "download"

OUTPUT_DIR = pathlib.Path("downloads")

## 4. Authentication and API Access

All API requests require a valid access token.

Key characteristics:
- Tokens expire after **24 hours**
- Tokens are manually retrieved and updated
- Token is passed via the HTTP `Authorization` header

This design choice ensures security but requires manual renewal during long-running jobs.


In [3]:
# update every day before running
token = "eyJraWQiOiJvcXJNWDRyVXJ2V21lbVhENDRFRVwvWFF0YWFaY3dWbFZzM3M4RElKekREcz0iLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiI2NzNzZ3V2ZW05ZnJxdnZmMWg3bGI4YjY5NCIsInRva2VuX3VzZSI6ImFjY2VzcyIsInNjb3BlIjoiYmF0Y2gtdGVsZW1ldHJ5XC9jY3RhIiwiYXV0aF90aW1lIjoxNzYzNDI5MzY5LCJpc3MiOiJodHRwczpcL1wvY29nbml0by1pZHAudXMtZWFzdC0xLmFtYXpvbmF3cy5jb21cL3VzLWVhc3QtMV9oRTI5U1hRVXYiLCJleHAiOjE3NjM1MTU3NjksImlhdCI6MTc2MzQyOTM2OSwidmVyc2lvbiI6MiwianRpIjoiYjE5YmJkMTItODM3Zi00ZGU2LTkxMmYtOTVjZTA5ODJkMzRjIiwiY2xpZW50X2lkIjoiNjczc2d1dmVtOWZycXZ2ZjFoN2xiOGI2OTQifQ.e5LdP0j3bPGAZYz0WGRFB0W_aC1rtGYd1wsHEbarB2XvT8sNqkmRTCs9gRxfyPfcmJ21ynJFEQ31W8NlG4v39O4SszlKTgkYZmGvZjkmXescsadQId7y6D4r1j54mu6QcWPof-ApKFwViHBDMcVa_aNZ6BvHEgY8WvvpZJ0a_R_8G7e3AMbMUeyw-P9PqCJTVcrOLKgisVacTJdIv70OGkumjWk6y3Puwv-Ujl-1l4CaydMEY7lkAWfbH4fgakrtVOBnMdpuIYhRfh75Q-5LY06AEiBLlS1u_6t606xAqdc1cN3osFbXhDdjsrmDUFBs7eFSyx_d0YhVP-9wuprNTQ"
headers = {"Authorization": f"Bearer {token}"}


## 5. High-Level Pipeline Flow

For each vehicle and camera:

1. Iterate over timestamps in 30-second increments
2. Request a video clip using `/v1/video`
3. If successful, retrieve the associated filename
4. Poll `/v1/download` until the clip is ready
5. Download the ROS bag file
6. Save it into a size-limited bucket folder
7. Repeat until the desired time window is covered

This approach allows the pipeline to recover from missing data and transient API failures.


## 6. Directory Structure and File Management

Downloaded files are organized as: `downloads/vehicle_name/download1/`

Each `downloadN` folder:
- Is capped at **10 GB**
- Automatically rolls over when the size limit is reached

Final archived data is later regrouped into:


## 7. Requesting Video Clips

Video availability is queried using the `/v1/video` endpoint.

Each request specifies:
- Vehicle name
- Camera name
- Start timestamp (UTC)
- End timestamp (start + 30 seconds)

If the API returns a valid filename, the clip can be downloaded.
Otherwise, the timestamp is treated as missing data.


In [4]:
def request_video_filename(token, vehicle, camera, start_ts, duration_s=DURATION_SECONDS):
    end_ts = start_ts + duration_s
    
    headers = {"Authorization": f"Bearer {token}"}
    params = {"vehicle": vehicle, "camera": camera, "startTime": start_ts, "endTime": end_ts}
    try:
        r = requests.get(VIDEO_URL, headers=headers, params=params, timeout=(10, 120))
    except (requests.ReadTimeout, requests.ConnectionError) as e:
        when = dt.datetime.fromtimestamp(start_ts, dt.UTC).strftime("%Y-%m-%d %H:%M:%S")
        print(f"[MISS][timeout] {when}Z {vehicle} {camera}: {e}")
        return None, None
    
    code = r.status_code
    if code in (400, 404):
        body = (r.text or "").strip().replace("\n", " ")
        when = dt.datetime.fromtimestamp(start_ts, dt.UTC).strftime("%Y-%m-%d %H:%M:%S")
        print(f"[{r.status_code}] {when}Z {vehicle} {camera}: {body[:160]}")
        
        return None, code
    
    try:
        r.raise_for_status()
    except requests.HTTPError as e:
        when = dt.datetime.fromtimestamp(start_ts, dt.UTC).strftime("%Y-%m-%d %H:%M:%S")
        print(f"[{r.status_code}] {when}Z {vehicle} {camera}: {e}")
        return None, code

    try:
        data = r.json()
        if isinstance(data, dict) and "filename" in data:
            return data["filename"], 200
        if isinstance(data, str) and data.endswith(".bag"):
            return data, 200
    except ValueError:
        pass

    txt = (r.text or "").strip()
    if txt.endswith(".bag"):
        return txt, 200
    
    ctype = r.headers.get("Content-Type", "")
    snippet = (r.text or "")[:200].replace("\n", " ")
    print(f"[WARN] Unexpected /v1/video body (type={ctype!r}): {snippet!r}")
    return None, code

## 8. Downloading and Polling for Readiness

Video clips are not immediately available after being requested.

To handle this:
- The pipeline polls the `/v1/download` endpoint
- Exponential backoff is applied between retries
- Both presigned URLs and binary streams are supported

This ensures robustness against:
- Backend processing delays
- Temporary server errors


In [5]:
def wait_for_bag_ready(token, filename, max_wait_sec=100, base_sleep=3):
    headers = {"Authorization": f"Bearer {token}"}
    params  = {"filename": filename}
    deadline = time.time() + max_wait_sec
    sleep = base_sleep

    while time.time() < deadline:
        try:
            r = requests.head(DL_URL, headers=headers, params=params, timeout=15)
            code = r.status_code
            if code == 405:  
                r = requests.get(DL_URL, headers=headers, params=params, timeout=15, stream=False)
                code = r.status_code
        except requests.RequestException:
            code = None

        if code == 200:
            return True
        if code in (404, 500, 503) or code is None:
            time.sleep(sleep)
            sleep = min(sleep * 1.5, 30)
            continue

        time.sleep(sleep)
        sleep = min(sleep * 1.5, 30)

    return False

def _get_presigned_or_binary_response(token, filename, timeout=180):
    headers = {"Authorization": f"Bearer {token}"}
    params  = {"filename": filename}
    r = requests.get(DL_URL, headers=headers, params=params, stream=True, timeout=timeout, allow_redirects=False)

    if r.is_redirect or r.status_code in (302, 303, 307, 308):
        loc = r.headers.get("Location")
        if loc and loc.startswith(("http://", "https://")):
            r.close()
            return ("url", loc)

    ctype = (r.headers.get("Content-Type") or "").lower()
    if r.status_code == 200 and ("octet-stream" in ctype or "application/x-rosbag" in ctype or "binary" in ctype):
        prefix = r.raw.read(4096, decode_content=True)
        if _looks_like_url_bytes(prefix):
            r.close()
            presigned = prefix.decode("utf-8", errors="ignore").strip().split()[0]
            return ("url", presigned)
        r.close()
        r2 = requests.get(DL_URL, headers=headers, params=params, stream=True, timeout=timeout, allow_redirects=True)
        return ("resp", r2)

    if r.status_code == 200:
        body = r.content[:8192]
        txt = body.decode("utf-8", errors="ignore").strip()
        if txt.startswith(("http://", "https://")):
            r.close()
            return ("url", txt.split()[0])
        try:
            data = json.loads(txt)
            if isinstance(data, dict) and "url" in data:
                r.close()
                return ("url", data["url"])
        except Exception:
            pass
    r.close()
    return ("fail", f"status={r.status_code}, ctype={ctype}")

def download_bag(token, filename, out_dir=OUTPUT_DIR, final_filename: str | None = None):
    out_dir.mkdir(parents=True, exist_ok=True)
    if not wait_for_bag_ready(token, filename, max_wait_sec=120, base_sleep=3):
        return False

    kind, payload = _get_presigned_or_binary_response(token, filename, timeout=180)

    chosen_name = final_filename if final_filename else filename
    out_path = out_dir / chosen_name

    if kind == "url":
        return _download_stream(payload, out_path, timeout=300, headers=None)

    if kind == "resp":
        resp = payload
        if final_filename is None:
            suggested = _extract_filename_from_headers(resp, filename)
            out_path = out_dir / suggested

        tmp = out_path.with_suffix(out_path.suffix + ".part")
        with open(tmp, "wb") as f:
            for chunk in resp.itercontent(1 << 20):
                if chunk:
                    if len(chunk) < 1024 and _looks_like_url_bytes(chunk.strip()):
                        presigned = chunk.decode("utf-8", errors="ignore").strip().split()[0]
                        resp.close()
                        f.close()
                        try: tmp.unlink(missing_ok=True)
                        except Exception: pass
                        return _download_stream(presigned, out_path, timeout=300, headers=None)
                    f.write(chunk)
        resp.close()
        tmp.replace(out_path)
        return out_path

    print(f"[WARN] failed to download {filename}: {payload}")
    return False

## 9. Error Handling and Reliability Strategies

Several failure modes were observed during development:

### 404 Errors
- Occur when no video exists for a timestamp
- Solution: skip ahead by **5 minutes** to avoid dense retry loops

### 5xx Errors
- Transient server-side failures
- Solution: exponential backoff with retries

### Rate Limiting
- Rapid requests can trigger throttling
- Solution: fixed delay (`THROTTLE_SEC = 0.2`) and randomized retries


## 10. Timezone Handling

All API timestamps are in **UTC**, while data organization was initially based on local time.

This caused misalignment when grouping clips by day.

Solution:
- Explicit timezone conversion using Python's `ZoneInfo`
- Folder boundaries are defined consistently using UTC or local time as needed

This eliminated off-by-one-day errors in archived data.


## 11. Full-Day and Windowed Downloads

The pipeline supports multiple retrieval modes:
- Full-day downloads
- Fixed-hour windows
- Random sampling
- Availability scanning without downloading

This flexibility allows:
- Efficient debugging
- Partial reprocessing
- Parallel execution across vehicles


In [7]:
def grab_whole_day(token: str, vehicles, camera: str,
                   year: int, month: int, day: int,
                   max_files_per_vehicle: int | None = None):
    if isinstance(vehicles, str):
        vehicles = [vehicles]

    start_ts, end_ts = day_bounds_utc(year, month, day)
    total_seconds = end_ts - start_ts
    today_start = int(dt.datetime.now(dt.UTC).replace(hour=0, minute=0, second=0, microsecond=0).timestamp())
    if start_ts >= today_start:
        raise ValueError("That day is 'today' in UTC. The batch API requires using the realtime API for the current day.")

    cap = max_files_per_vehicle
    print(f"\n=== WHOLE DAY {year:04d}-{month:02d}-{day:02d} UTC ===")
    print(f"Window: {dt.datetime.fromtimestamp(start_ts, dt.UTC):%Y-%m-%d %H:%M:%S}Z"
          f" → {dt.datetime.fromtimestamp(end_ts, dt.UTC):%Y-%m-%d %H:%M:%S}Z")

    stats = {}
    for vehicle in vehicles:
        saved = 0
        ts = start_ts
        stats[vehicle] = {"saved": 0, "miss": 0, "skip": 0}
        while ts < end_ts and (cap is None or saved < cap):
            fname, code = request_video_filename(token, vehicle, camera, ts, DURATION_SECONDS)
            when = dt.datetime.fromtimestamp(ts, dt.UTC).strftime("%Y-%m-%d %H:%M:%S")
            if fname:
                day_str = dt.datetime.fromtimestamp(ts, DAY_TZ).strftime("%Y-%m-%d")
                out_dir = out_dir_for(vehicle, day_str)
                final_name = timeslot_filename(vehicle, camera, ts, DURATION_SECONDS)
                path = download_bag(token, fname, out_dir, final_filename=final_name)

                if not path:
                    stats[vehicle]["skip"] += 1
                    print(f"[SKIP] {when}Z {vehicle}: download failed")
                else:
                    sz = path.stat().st_size
                    if sz < MIN_VALID_BYTES:
                        try: path.unlink()
                        except Exception: pass
                        stats[vehicle]["skip"] += 1
                        print(f"[WARN] {when}Z {vehicle} tiny file ({sz} bytes) – counted as SKIP")
                    else:
                        saved += 1
                        stats[vehicle]["saved"] += 1
                        cap_str = f" ({saved}/{cap})" if cap else ""
                        print(f"[OK]  {when}Z -> {_rel_out(path)} (size={sz/1e6:.1f} MB){cap_str}")
            else:
                stats[vehicle]["miss"] += 1
                print(f"[MISS] {when}Z {vehicle} (code={code})")

            ts += DURATION_SECONDS

    return stats