Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ readme = "README.md"
requires-python = ">=3.9"
dependencies = [
"cryptography>=42.0",
"numpy>=1.24",
"requests>=2.31",
]

Expand Down
7 changes: 6 additions & 1 deletion src/beeutil/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from . import secrets
from . import embeddings, recordings, secrets
from .embeddings import DimensionMismatchError, EmbeddingsError
from .image_cache import (
disable_image_collection,
disable_stereo_collection,
Expand All @@ -9,6 +10,7 @@
purge_data,
upload_to_s3,
)
from .recordings import RecordingsError
from .secrets import DecryptionError, SecretsError, SecretsNetworkError, SecretsNotFoundError

__all__ = [
Expand All @@ -17,5 +19,8 @@
'list_contents', 'upload_to_s3',
'secrets',
'SecretsError', 'DecryptionError', 'SecretsNetworkError', 'SecretsNotFoundError',
'embeddings', 'recordings',
'EmbeddingsError', 'DimensionMismatchError',
'RecordingsError',
]

2 changes: 2 additions & 0 deletions src/beeutil/_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ODC_HOST = 'http://127.0.0.1:5000'
ODC_API_BASE = f'{ODC_HOST}/api/1'
171 changes: 171 additions & 0 deletions src/beeutil/embeddings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
"""Scene embeddings: query, compare, and match."""

from __future__ import annotations

import numpy as np
import requests
from typing_extensions import NotRequired, TypedDict

from ._constants import ODC_API_BASE


class QueryEmbedding(TypedDict):
label: str
embedding: list[float]
threshold: NotRequired[float]


class FrameEmbedding(TypedDict):
embeddings: list[float]
timestamp_ms: int
lat: float
lon: float
image_name: str


class Match(TypedDict):
label: str
score: float
timestamp_ms: int
lat: float
lon: float
image_name: str

TIMEOUT = 10


class EmbeddingsError(Exception):
"""Base exception for embeddings operations."""


class DimensionMismatchError(EmbeddingsError):
"""Vectors have incompatible dimensions."""


def list_embeddings(
since_ms: int | None = None,
until_ms: int | None = None,
) -> list[FrameEmbedding]:
"""Query scene embeddings from odc-api."""
try:
resp = requests.get(
f'{ODC_API_BASE}/embeddings',
params={'since': since_ms, 'until': until_ms},
timeout=TIMEOUT,
)
except requests.RequestException as e:
raise EmbeddingsError(f'Failed to reach odc-api: {e}') from e

if resp.status_code != 200:
raise EmbeddingsError(
f'odc-api error {resp.status_code}: {resp.text}',
)

try:
items = resp.json()
except ValueError as e:
raise EmbeddingsError('Invalid JSON response') from e

if not isinstance(items, list):
raise EmbeddingsError(
f'Expected list, got {type(items).__name__}',
)

return items


def load_query_embeddings(plugin_name: str) -> list[QueryEmbedding]:
"""Load query embeddings from the plugin data store."""
try:
resp = requests.get(
f'{ODC_API_BASE}/plugin/dataStore/{plugin_name}/queryEmbeddings',
timeout=TIMEOUT,
)
except requests.RequestException as e:
raise EmbeddingsError(f'Failed to reach odc-api: {e}') from e

if resp.status_code != 200:
raise EmbeddingsError(
f'odc-api error {resp.status_code}: {resp.text}',
)

try:
data = resp.json()
except ValueError as e:
raise EmbeddingsError('Invalid JSON response') from e

items = data.get('queryEmbeddings')
if not isinstance(items, list):
raise EmbeddingsError('Response missing queryEmbeddings list')

return items



def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Cosine similarity between two vectors. Normalizes inputs internally."""
if len(a) != len(b):
raise DimensionMismatchError(
f'Vector dimensions do not match: {len(a)} vs {len(b)}',
)
a_arr, b_arr = np.array(a), np.array(b)
return float(np.dot(a_arr, b_arr) / (np.linalg.norm(a_arr) * np.linalg.norm(b_arr)))


def find_matches(
frame_embedding: FrameEmbedding,
query_embeddings: list[QueryEmbedding],
default_threshold: float,
) -> list[Match]:
"""Compare a scene embedding against all query embeddings.

Returns matches above threshold.
"""
embedding_vector = frame_embedding['embeddings']
matches: list[Match] = []

for qe in query_embeddings:
threshold = qe.get('threshold', default_threshold)
score = cosine_similarity(embedding_vector, qe['embedding'])
if score >= threshold:
matches.append(Match(
label=qe['label'],
score=score,
timestamp_ms=frame_embedding['timestamp_ms'],
lat=frame_embedding['lat'],
lon=frame_embedding['lon'],
image_name=frame_embedding['image_name'],
))

return matches


def fetch_and_match(
since_ms: int,
query_embeddings: list[QueryEmbedding],
default_threshold: float,
) -> tuple[list[Match], int]:
"""Fetch new embeddings and return matches with cursor.

Args:
since_ms: Inclusive lower bound (Unix ms). Pass cursor + 1 to skip reprocessed.
query_embeddings: Vectors to match against.
default_threshold: Minimum cosine similarity for a match.

Returns:
(matches, last_timestamp_ms) — cursor advances even with no matches.
"""
frames = list_embeddings(since_ms=since_ms)

if not frames:
return ([], since_ms)

last_timestamp_ms = since_ms
all_matches: list[Match] = []

for frame in frames:
last_timestamp_ms = max(last_timestamp_ms, frame['timestamp_ms'])
matches = find_matches(frame, query_embeddings, default_threshold)
all_matches.extend(matches)

return (all_matches, last_timestamp_ms)
41 changes: 41 additions & 0 deletions src/beeutil/recordings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Recordings: query video files from the device."""

from __future__ import annotations

import requests

from ._constants import ODC_API_BASE

TIMEOUT = 10


class RecordingsError(Exception):
"""Error querying recordings."""


def get_video_paths_by_timerange(start_ms: int, end_ms: int) -> list[str]:
"""Return absolute file paths to videos within a time range."""
url = (
f'{ODC_API_BASE}/recordings/video'
f'/query-by-timestamp-ms/{start_ms}/{end_ms}'
)
try:
resp = requests.get(url, timeout=TIMEOUT)
except requests.RequestException as e:
raise RecordingsError(f'Failed to reach odc-api: {e}') from e

if resp.status_code != 200:
raise RecordingsError(
f'odc-api error {resp.status_code}: {resp.text}',
)

try:
data = resp.json()
except ValueError as e:
raise RecordingsError('Invalid JSON response from odc-api') from e

files = data.get('files') if isinstance(data, dict) else None
if not isinstance(files, list):
raise RecordingsError('Response missing files list')

return files
Loading
Loading