Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 50 additions & 20 deletions src/routers/occupancy.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from __future__ import annotations

from datetime import datetime, timezone
from typing import Annotated, Union, cast
import ast
import json
from datetime import datetime, timezone
from typing import Annotated, Any, Union, cast

from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import ValidationError

from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import text, func
from sqlalchemy import func, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session

from ..database import get_db
Expand Down Expand Up @@ -50,6 +48,23 @@ def _confidence_level(confidence: float) -> ConfidenceLevel | None:
return ConfidenceLevel.very_low


def _to_utc_naive(value: datetime) -> datetime:
if value.tzinfo is None or value.tzinfo.utcoffset(value) is None:
return value

return value.astimezone(timezone.utc).replace(tzinfo=None)


def _is_newer_or_same_observation(
incoming_observed_at: datetime,
current_zone_updated_at: datetime | None,
) -> bool:
if current_zone_updated_at is None:
return True

return _to_utc_naive(incoming_observed_at) >= _to_utc_naive(current_zone_updated_at)


def _serialize_obs(obs: OccupancyObservation, db: Session) -> OccupancyObservationResponse:
return OccupancyObservationResponse(
observation_id=obs.observation_id,
Expand Down Expand Up @@ -315,12 +330,13 @@ async def create_observation(
zone_occupancy_updated_at = cast(datetime | None, zone.occupancy_updated_at)
current_user_id = cast(int | None, current_user.user_id)

capacity = body.capacity if body.capacity is not None else zone.capacity
zone_capacity = cast(int, zone.capacity)

if body.occupied > capacity:
body.occupied = capacity
capacity = body.capacity if body.capacity is not None else zone_capacity
occupied = min(body.occupied, capacity)
confidence = body.confidence

cl = _confidence_level(body.confidence)
cl = _confidence_level(confidence)
now = datetime.now(timezone.utc)

# Если источник прислал стабильный source_ref, то повторный запрос
Expand All @@ -342,8 +358,8 @@ async def create_observation(
source_type=body.source_type,
source_ref=body.source_ref,
capacity=capacity,
occupied=body.occupied,
confidence=body.confidence,
occupied=occupied,
confidence=confidence,
confidence_level=cl,
observed_at=body.observed_at,
ingested_at=now,
Expand All @@ -356,26 +372,40 @@ async def create_observation(
obs.camera_id = zone_camera_id
obs.partner_id = zone_partner_id
obs.capacity = capacity
obs.occupied = body.occupied
obs.confidence = body.confidence
obs.occupied = occupied
obs.confidence = confidence
obs.confidence_level = cl
obs.observed_at = body.observed_at
obs.ingested_at = now
obs.metadata_json = body.metadata

if obs.created_by_user_id is None:
obs.created_by_user_id = current_user.user_id
obs.created_by_user_id = current_user_id

# Обновляем денормализованные поля parking_zones только если пришло
# самое свежее наблюдение или наблюдение с тем же временем.
if zone_occupancy_updated_at is None or body.observed_at >= zone_occupancy_updated_at:
zone.occupied = body.occupied
zone.confidence = body.confidence
#
# Важно: body.observed_at может быть timezone-aware из-за "Z" в ISO-строке,
# а parking_zones.occupancy_updated_at в БД — TIMESTAMP без timezone.
# Поэтому напрямую их не сравниваем.
if _is_newer_or_same_observation(body.observed_at, zone_occupancy_updated_at):
zone.occupied = occupied
zone.confidence = confidence
zone.confidence_level = cl
zone.occupancy_updated_at = body.observed_at
zone.occupancy_updated_at = _to_utc_naive(body.observed_at)

db.commit()
db.refresh(obs)
try:
db.commit()
db.refresh(obs)
except SQLAlchemyError as exc:
db.rollback()
raise HTTPException(
status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={
"error_description": "Occupancy payload could not be saved",
"details": exc.__class__.__name__,
},
)

return {"observation_id": obs.observation_id}

Expand Down
Loading