Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions app/core/usage/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ class UsageTrendBucket:
window: str
avg_used_percent: float
samples: int
reset_at: int | None = None
window_minutes: int | None = None
recorded_at: datetime | None = None


@dataclass(frozen=True)
Expand Down
90 changes: 88 additions & 2 deletions app/modules/accounts/mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,21 @@ def build_account_usage_trends(
"""
# Group buckets by (account_id, window)
grouped: dict[tuple[str, str], dict[int, float]] = {}
for b in buckets:
key = (b.account_id, b.window)
secondary_schedule: dict[str, dict[int, tuple[int, int]]] = {}
for b in _effective_usage_trend_buckets(buckets):
is_weekly_primary = b.window == "primary" and usage_core.is_weekly_window_minutes(b.window_minutes)
window = "secondary" if is_weekly_primary else b.window
key = (b.account_id, window)
grouped.setdefault(key, {})[b.bucket_epoch] = b.avg_used_percent
if (
(window == "secondary" or usage_core.is_weekly_window_minutes(b.window_minutes))
and b.reset_at is not None
and b.window_minutes
):
secondary_schedule.setdefault(b.account_id, {})[b.bucket_epoch] = (
b.reset_at,
b.window_minutes,
)

# Generate the full time grid, aligned to bucket boundaries (same as SQL)
aligned_start = (since_epoch // bucket_seconds) * bucket_seconds
Expand All @@ -223,15 +235,60 @@ def build_account_usage_trends(

primary_points = _fill_trend_points(time_grid, primary_data) if primary_data else []
secondary_points = _fill_trend_points(time_grid, secondary_data) if secondary_data else []
secondary_scheduled_points = _fill_scheduled_secondary_points(
time_grid,
secondary_schedule.get(account_id, {}),
)

result[account_id] = AccountUsageTrend(
primary=primary_points,
secondary=secondary_points,
secondary_scheduled=secondary_scheduled_points,
)

return result


def _effective_usage_trend_buckets(buckets: list[UsageTrendBucket]) -> list[UsageTrendBucket]:
secondary_by_key = {
(bucket.account_id, bucket.bucket_epoch): bucket for bucket in buckets if bucket.window == "secondary"
}
weekly_primary_by_key = {
(bucket.account_id, bucket.bucket_epoch): bucket
for bucket in buckets
if bucket.window == "primary" and usage_core.is_weekly_window_minutes(bucket.window_minutes)
}
result: list[UsageTrendBucket] = []
for bucket in buckets:
key = (bucket.account_id, bucket.bucket_epoch)
weekly_primary = weekly_primary_by_key.get(key)
if bucket.window == "secondary" and weekly_primary is not None:
if usage_core.should_use_weekly_primary(
_trend_bucket_to_window_row(weekly_primary),
_trend_bucket_to_window_row(bucket),
):
continue
if bucket is weekly_primary and key in secondary_by_key:
secondary = secondary_by_key[key]
if not usage_core.should_use_weekly_primary(
_trend_bucket_to_window_row(bucket),
_trend_bucket_to_window_row(secondary),
):
continue
result.append(bucket)
return result


def _trend_bucket_to_window_row(bucket: UsageTrendBucket) -> UsageWindowRow:
return UsageWindowRow(
account_id=bucket.account_id,
used_percent=bucket.avg_used_percent,
reset_at=bucket.reset_at,
window_minutes=bucket.window_minutes,
recorded_at=bucket.recorded_at,
)


def _fill_trend_points(
time_grid: list[int],
bucket_data: dict[int, float],
Expand All @@ -252,3 +309,32 @@ def _fill_trend_points(
)
)
return points


def _fill_scheduled_secondary_points(
time_grid: list[int],
schedule_data: dict[int, tuple[int, int]],
) -> list[UsageTrendPoint]:
"""Build the ideal weekly remaining line from each sample's own reset deadline."""
points: list[UsageTrendPoint] = []
current_reset_at: int | None = None
current_window_minutes: int | None = None

for epoch in time_grid:
if epoch in schedule_data:
current_reset_at, current_window_minutes = schedule_data[epoch]

if current_reset_at is None or not current_window_minutes:
continue

window_seconds = current_window_minutes * 60
remaining_seconds = max(0, min(window_seconds, current_reset_at - epoch))
scheduled_remaining = 100.0 * remaining_seconds / window_seconds
points.append(
UsageTrendPoint(
t=datetime.fromtimestamp(epoch, tz=timezone.utc),
v=round(scheduled_remaining, 2),
)
)

return points
2 changes: 2 additions & 0 deletions app/modules/accounts/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class UsageTrendPoint(DashboardModel):
class AccountUsageTrend(DashboardModel):
primary: list[UsageTrendPoint] = Field(default_factory=list)
secondary: list[UsageTrendPoint] = Field(default_factory=list)
secondary_scheduled: list[UsageTrendPoint] = Field(default_factory=list)


class AccountUsage(DashboardModel):
Expand Down Expand Up @@ -105,3 +106,4 @@ class AccountTrendsResponse(DashboardModel):
account_id: str
primary: list[UsageTrendPoint] = Field(default_factory=list)
secondary: list[UsageTrendPoint] = Field(default_factory=list)
secondary_scheduled: list[UsageTrendPoint] = Field(default_factory=list)
1 change: 1 addition & 0 deletions app/modules/accounts/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ async def get_account_trends(self, account_id: str) -> AccountTrendsResponse | N
account_id=account_id,
primary=trend.primary if trend else [],
secondary=trend.secondary if trend else [],
secondary_scheduled=trend.secondary_scheduled if trend else [],
)

async def import_account(self, raw: bytes) -> AccountImportResponse:
Expand Down
73 changes: 64 additions & 9 deletions app/modules/usage/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections.abc import Collection
from datetime import datetime

from sqlalchemy import Integer, cast, delete, func, literal_column, or_, select, true
from sqlalchemy import Integer, and_, cast, delete, func, literal_column, or_, select, true
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.usage.types import UsageAggregateRow, UsageTrendBucket
Expand Down Expand Up @@ -263,21 +263,73 @@ async def trends_by_bucket(
conditions.append(UsageHistory.account_id == account_id)

window_expr = _normalized_window_expr()
stmt = (
base_rows = (
select(
bucket_col,
UsageHistory.account_id,
UsageHistory.id.label("usage_id"),
UsageHistory.account_id.label("account_id"),
window_expr.label("window"),
func.avg(UsageHistory.used_percent).label("avg_used_percent"),
func.count(UsageHistory.id).label("samples"),
UsageHistory.used_percent.label("used_percent"),
UsageHistory.reset_at.label("reset_at"),
UsageHistory.window_minutes.label("window_minutes"),
UsageHistory.recorded_at.label("recorded_at"),
)
.where(*conditions)
.subquery()
)

aggregate_rows = (
select(
base_rows.c.bucket_epoch,
base_rows.c.account_id,
base_rows.c.window,
func.avg(base_rows.c.used_percent).label("avg_used_percent"),
func.count(base_rows.c.usage_id).label("samples"),
)
.group_by(
bucket_col,
UsageHistory.account_id,
window_expr,
base_rows.c.bucket_epoch,
base_rows.c.account_id,
base_rows.c.window,
)
.subquery()
)

latest_rows = select(
base_rows.c.bucket_epoch,
base_rows.c.account_id,
base_rows.c.window,
base_rows.c.reset_at,
base_rows.c.window_minutes,
base_rows.c.recorded_at,
func.row_number()
.over(
partition_by=(base_rows.c.bucket_epoch, base_rows.c.account_id, base_rows.c.window),
order_by=(base_rows.c.recorded_at.desc(), base_rows.c.usage_id.desc()),
)
.label("row_number"),
).subquery()

stmt = (
select(
aggregate_rows.c.bucket_epoch,
aggregate_rows.c.account_id,
aggregate_rows.c.window,
aggregate_rows.c.avg_used_percent,
aggregate_rows.c.samples,
latest_rows.c.reset_at,
latest_rows.c.window_minutes,
latest_rows.c.recorded_at,
)
.join(
latest_rows,
and_(
latest_rows.c.bucket_epoch == aggregate_rows.c.bucket_epoch,
latest_rows.c.account_id == aggregate_rows.c.account_id,
latest_rows.c.window == aggregate_rows.c.window,
latest_rows.c.row_number == 1,
),
)
.order_by(bucket_col)
.order_by(aggregate_rows.c.bucket_epoch)
)
result = await self._session.execute(stmt)
return [
Expand All @@ -287,6 +339,9 @@ async def trends_by_bucket(
window=row.window,
avg_used_percent=float(row.avg_used_percent) if row.avg_used_percent is not None else 0.0,
samples=int(row.samples),
reset_at=int(row.reset_at) if row.reset_at is not None else None,
window_minutes=int(row.window_minutes) if row.window_minutes is not None else None,
recorded_at=row.recorded_at,
)
for row in result.all()
]
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/test_dashboard_overview.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ async def test_dashboard_overview_combines_data(async_client, db_setup):
payload = response.json()

assert payload["accounts"][0]["accountId"] == "acc_dash"
assert payload["accounts"][0]["capacityCreditsSecondary"] == pytest.approx(7560.0)
assert payload["accounts"][0]["remainingCreditsSecondary"] == pytest.approx(4536.0)
assert payload["timeframe"] == {
"key": "7d",
"windowMinutes": 10080,
Expand Down
41 changes: 40 additions & 1 deletion tests/integration/test_usage_repository.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import json
from datetime import timedelta
from datetime import datetime, timedelta

import pytest
from sqlalchemy import text
Expand Down Expand Up @@ -191,3 +191,42 @@ async def test_latest_by_account_primary_query_plan_uses_normalized_window_index
plan_json = json.dumps(plan)
assert "idx_usage_window_account_latest" in plan_json or "idx_usage_window_account_time" in plan_json
assert "Seq Scan" not in plan_json


@pytest.mark.asyncio
async def test_trends_by_bucket_uses_latest_sample_window_metadata(db_setup):
recorded_at = datetime(2026, 1, 1, 12, 0, 0)
async with SessionLocal() as session:
accounts_repo = AccountsRepository(session)
repo = UsageRepository(session)
await accounts_repo.upsert(_make_account("acc1"))

await repo.add_entry(
"acc1",
10.0,
window="secondary",
reset_at=9999,
window_minutes=10080,
recorded_at=recorded_at,
)
await repo.add_entry(
"acc1",
30.0,
window="secondary",
reset_at=1111,
window_minutes=300,
recorded_at=recorded_at + timedelta(minutes=5),
)

trends = await repo.trends_by_bucket(
since=recorded_at - timedelta(minutes=1),
bucket_seconds=86400,
window="secondary",
)

assert len(trends) == 1
assert trends[0].samples == 2
assert trends[0].avg_used_percent == pytest.approx(20.0)
assert trends[0].reset_at == 1111
assert trends[0].window_minutes == 300
assert trends[0].recorded_at == recorded_at + timedelta(minutes=5)
Loading
Loading