From 99e63337261089c455dc23a8417db20470c3ebe6 Mon Sep 17 00:00:00 2001 From: Marco Burro Date: Mon, 27 Oct 2025 14:05:42 +0100 Subject: [PATCH 1/9] =?UTF-8?q?=E2=9C=A8=20Resume=20build=20log=20stream?= =?UTF-8?q?=20if=20interrupted?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements-tests.txt | 1 + src/fastapi_cloud_cli/commands/deploy.py | 78 +++-- src/fastapi_cloud_cli/utils/api.py | 145 +++++++++ tests/test_api_client.py | 369 +++++++++++++++++++++++ tests/test_cli_deploy.py | 251 ++++++++++++++- tests/utils.py | 5 + 6 files changed, 792 insertions(+), 57 deletions(-) create mode 100644 tests/test_api_client.py diff --git a/requirements-tests.txt b/requirements-tests.txt index ec32801..759f6c2 100644 --- a/requirements-tests.txt +++ b/requirements-tests.txt @@ -5,3 +5,4 @@ coverage[toml] >=6.2,<8.0 mypy ==1.14.1 ruff ==0.13.0 respx ==0.22.0 +time-machine ==2.19.0 diff --git a/src/fastapi_cloud_cli/commands/deploy.py b/src/fastapi_cloud_cli/commands/deploy.py index 996d374..78cc0a7 100644 --- a/src/fastapi_cloud_cli/commands/deploy.py +++ b/src/fastapi_cloud_cli/commands/deploy.py @@ -1,5 +1,4 @@ import contextlib -import json import logging import subprocess import tempfile @@ -7,7 +6,7 @@ from enum import Enum from itertools import cycle from pathlib import Path -from typing import Any, Dict, Generator, List, Optional, Union +from typing import Any, Dict, List, Optional, Union import fastar import rignore @@ -20,7 +19,7 @@ from typing_extensions import Annotated from fastapi_cloud_cli.commands.login import login -from fastapi_cloud_cli.utils.api import APIClient +from fastapi_cloud_cli.utils.api import APIClient, BuildLogError, BuildLogType from fastapi_cloud_cli.utils.apps import AppConfig, get_app_config, write_app_config from fastapi_cloud_cli.utils.auth import is_logged_in from fastapi_cloud_cli.utils.cli import get_rich_toolkit, handle_http_errors @@ -239,21 +238,11 @@ def _get_apps(team_id: str) -> List[AppResponse]: return [model_validate(AppResponse, app) for app in data] -def _stream_build_logs(deployment_id: str) -> Generator[str, None, None]: - with APIClient() as client: - with client.stream( - "GET", f"/deployments/{deployment_id}/build-logs", timeout=60 - ) as response: - response.raise_for_status() - - yield from response.iter_lines() - - WAITING_MESSAGES = [ "🚀 Preparing for liftoff! Almost there...", "👹 Sneaking past the dependency gremlins... Don't wake them up!", "🤏 Squishing code into a tiny digital sandwich. Nom nom nom.", - "📉 Server space running low. Time to delete those cat videos?", + "🐱 Removing cat videos from our servers to free up space.", "🐢 Uploading at blazing speeds of 1 byte per hour. Patience, young padawan.", "🔌 Connecting to server... Please stand by while we argue with the firewall.", "💥 Oops! We've angered the Python God. Sacrificing a rubber duck to appease it.", @@ -364,43 +353,50 @@ def _wait_for_deployment( with toolkit.progress( next(messages), inline_logs=True, lines_to_show=20 ) as progress: - with handle_http_errors(progress=progress): - for line in _stream_build_logs(deployment.id): - time_elapsed = time.monotonic() - started_at + with APIClient() as client: + try: + for log in client.stream_build_logs(deployment.id): + time_elapsed = time.monotonic() - started_at - data = json.loads(line) + if log.type == BuildLogType.message and log.message: + progress.log(Text.from_ansi(log.message.rstrip())) - if "message" in data: - progress.log(Text.from_ansi(data["message"].rstrip())) + if log.type == BuildLogType.complete: + progress.log("") + progress.log( + f"🐔 Ready the chicken! Your app is ready at [link={deployment.url}]{deployment.url}[/link]" + ) - if data.get("type") == "complete": - progress.log("") - progress.log( - f"🐔 Ready the chicken! Your app is ready at [link={deployment.url}]{deployment.url}[/link]" - ) + progress.log("") - progress.log("") + progress.log( + f"You can also check the app logs at [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]" + ) - progress.log( - f"You can also check the app logs at [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]" - ) + break - break + if log.type == BuildLogType.failed: + progress.log("") + progress.log( + f"😔 Oh no! Something went wrong. Check out the logs at [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]" + ) + raise typer.Exit(1) - if data.get("type") == "failed": - progress.log("") - progress.log( - f"😔 Oh no! Something went wrong. Check out the logs at [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]" - ) - raise typer.Exit(1) + if time_elapsed > 30: + messages = cycle(LONG_WAIT_MESSAGES) - if time_elapsed > 30: - messages = cycle(LONG_WAIT_MESSAGES) # pragma: no cover + if (time.monotonic() - last_message_changed_at) > 2: + progress.title = next(messages) - if (time.monotonic() - last_message_changed_at) > 2: - progress.title = next(messages) # pragma: no cover + last_message_changed_at = time.monotonic() - last_message_changed_at = time.monotonic() # pragma: no cover + except BuildLogError as e: + logger.error("Build log streaming failed: %s", e) + toolkit.print_line() + toolkit.print( + f"⚠️ Unable to stream build logs. Check the dashboard for status: [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]" + ) + raise typer.Exit(1) from e class SignupToWaitingList(BaseModel): diff --git a/src/fastapi_cloud_cli/utils/api.py b/src/fastapi_cloud_cli/utils/api.py index fe11561..4210387 100644 --- a/src/fastapi_cloud_cli/utils/api.py +++ b/src/fastapi_cloud_cli/utils/api.py @@ -1,9 +1,95 @@ +import logging +import time +from contextlib import AbstractContextManager, contextmanager +from datetime import timedelta +from enum import Enum +from typing import Generator, Optional + import httpx +from pydantic import BaseModel, ValidationError from fastapi_cloud_cli import __version__ from fastapi_cloud_cli.config import Settings from fastapi_cloud_cli.utils.auth import get_auth_token +logger = logging.getLogger(__name__) + +BUILD_LOG_MAX_RETRIES = 3 +BUILD_LOG_TIMEOUT = timedelta(minutes=5) + + +class BuildLogError(Exception): ... + + +class BuildLogType(str, Enum): + message = "message" + complete = "complete" + failed = "failed" + timeout = "timeout" # Request closed, reconnect to continue + heartbeat = "heartbeat" # Keepalive signal when no new logs + + +class BuildLogLine(BaseModel): + type: BuildLogType + message: str | None = None + id: str | None = None + + +@contextmanager +def attempt(attempt_number: int) -> Generator[None, None, None]: + def _backoff() -> None: + backoff_seconds = min(2**attempt_number, 30) + logger.debug( + "Retrying in %ds (attempt %d)", + backoff_seconds, + attempt_number, + ) + time.sleep(backoff_seconds) + + try: + yield + + except ( + httpx.TimeoutException, + httpx.NetworkError, + httpx.RemoteProtocolError, + ) as error: + logger.debug("Network error (will retry): %s", error) + + _backoff() + + except httpx.HTTPStatusError as error: + if error.response.status_code >= 500: + logger.debug( + "Server error %d (will retry): %s", + error.response.status_code, + error, + ) + _backoff() + else: + # Try to get response text, but handle streaming responses gracefully + try: + error_detail = error.response.text + except Exception: + error_detail = "(response body unavailable)" + raise BuildLogError( + f"HTTP {error.response.status_code}: {error_detail}" + ) from error + + +def attempts( + total_attempts: int = 3, timeout: timedelta = timedelta(minutes=5) +) -> Generator[AbstractContextManager[None], None, None]: + start = time.monotonic() + + for attempt_number in range(total_attempts): + if time.monotonic() - start > timeout.total_seconds(): + raise TimeoutError( + "Build log streaming timed out after %ds", timeout.total_seconds() + ) + + yield attempt(attempt_number) + class APIClient(httpx.Client): def __init__(self) -> None: @@ -19,3 +105,62 @@ def __init__(self) -> None: "User-Agent": f"fastapi-cloud-cli/{__version__}", }, ) + + def stream_build_logs( + self, deployment_id: str + ) -> Generator[BuildLogLine, None, None]: + last_id = None + + for attempt in attempts(BUILD_LOG_MAX_RETRIES, BUILD_LOG_TIMEOUT): + with attempt: + while True: + params = {"last_id": last_id} if last_id else None + + with self.stream( + "GET", + f"/deployments/{deployment_id}/build-logs", + timeout=60, + params=params, + ) as response: + response.raise_for_status() + + for line in response.iter_lines(): + if not line or not line.strip(): + continue + + if log_line := self._parse_log_line(line): + if log_line.id: + last_id = log_line.id + + if log_line.type == BuildLogType.message: + yield log_line + + if log_line.type in ( + BuildLogType.complete, + BuildLogType.failed, + ): + yield log_line + + return + + if log_line.type == BuildLogType.timeout: + logger.debug("Received timeout; reconnecting") + break # Breaks for loop to reconnect + + else: # Only triggered if the for loop is not broken + logger.debug( + "Connection closed by server unexpectedly; attempting to reconnect" + ) + break + + time.sleep(0.5) + + # Exhausted retries without getting any response + raise BuildLogError(f"Failed after {BUILD_LOG_MAX_RETRIES} attempts") + + def _parse_log_line(self, line: str) -> Optional[BuildLogLine]: + try: + return BuildLogLine.model_validate_json(line) + except ValidationError as e: + logger.debug("Skipping malformed log: %s (error: %s)", line[:100], e) + return None diff --git a/tests/test_api_client.py b/tests/test_api_client.py new file mode 100644 index 0000000..4e7c3dd --- /dev/null +++ b/tests/test_api_client.py @@ -0,0 +1,369 @@ +from datetime import timedelta +from unittest.mock import patch + +import httpx +import pytest +import respx +from httpx import Response +from time_machine import TimeMachineFixture + +from fastapi_cloud_cli.config import Settings +from fastapi_cloud_cli.utils.api import ( + BUILD_LOG_MAX_RETRIES, + APIClient, + BuildLogError, + BuildLogType, +) +from tests.utils import build_logs_response + +settings = Settings.get() + + +@pytest.fixture +def client() -> httpx.Client: + """Create an HTTP client for testing.""" + return APIClient() + + +@pytest.fixture +def deployment_id() -> str: + return "test-deployment-123" + + +api_mock = respx.mock(base_url=settings.base_api_url) + + +@pytest.fixture +def logs_route(deployment_id: str) -> respx.Route: + return api_mock.get(f"/deployments/{deployment_id}/build-logs") + + +@api_mock +def test_stream_build_logs_successful( + logs_route: respx.Route, + client: APIClient, + deployment_id: str, +) -> None: + logs_route.mock( + return_value=Response( + 200, + content=build_logs_response( + {"type": "message", "message": "Building...", "id": "1"}, + {"type": "message", "message": "Done!", "id": "2"}, + {"type": "complete", "id": "3"}, + ), + ) + ) + + logs = list(client.stream_build_logs(deployment_id)) + + assert len(logs) == 3 + + assert logs[0].type == BuildLogType.message + assert logs[0].message == "Building..." + + assert logs[1].type == BuildLogType.message + assert logs[1].message == "Done!" + + assert logs[2].type == BuildLogType.complete + + +@api_mock +def test_stream_build_logs_failed( + logs_route: respx.Route, client: APIClient, deployment_id: str +) -> None: + logs_route.mock( + return_value=Response( + 200, + content=build_logs_response( + {"type": "message", "message": "Error occurred", "id": "1"}, + {"type": "failed", "id": "2"}, + ), + ) + ) + + logs = list(client.stream_build_logs(deployment_id)) + + assert len(logs) == 2 + assert logs[0].type == BuildLogType.message + assert logs[1].type == BuildLogType.failed + + +@pytest.mark.parametrize("terminal_type", [BuildLogType.complete, BuildLogType.failed]) +@api_mock +def test_stream_build_logs_stop_after_terminal_state( + logs_route: respx.Route, + client: APIClient, + terminal_type: BuildLogType, + deployment_id: str, +) -> None: + logs_route.mock( + return_value=Response( + 200, + content=build_logs_response( + {"type": "message", "message": "Step 1", "id": "1"}, + {"type": terminal_type, "id": "2"}, + {"type": "message", "message": "This should not appear", "id": "3"}, + ), + ) + ) + + logs = list(client.stream_build_logs(deployment_id)) + + assert len(logs) == 2 + assert logs[0].type == BuildLogType.message + assert logs[1].type == terminal_type + + +@api_mock +def test_stream_build_logs_internal_messages_are_skipped( + logs_route: respx.Route, + client: APIClient, + deployment_id: str, +) -> None: + logs_route.mock( + return_value=Response( + 200, + content=build_logs_response( + {"type": BuildLogType.heartbeat, "id": "1"}, + {"type": "message", "message": "Continuing...", "id": "2"}, + {"type": "complete", "id": "3"}, + ), + ) + ) + + logs = list(client.stream_build_logs(deployment_id)) + + assert len(logs) == 2 + assert logs[0].type == BuildLogType.message + assert logs[1].type == BuildLogType.complete + + +@api_mock +def test_stream_build_logs_malformed_json_is_skipped( + logs_route: respx.Route, client: APIClient, deployment_id: str +) -> None: + content = "\n".join( + [ + '{"type": "message", "message": "Valid", "id": "1"}', + "not valid json", + '{"type": "complete", "id": "2"}', + ] + ) + + logs_route.mock(return_value=Response(200, content=content)) + + logs = list(client.stream_build_logs(deployment_id)) + + assert len(logs) == 2 + assert logs[0].type == BuildLogType.message + assert logs[1].type == BuildLogType.complete + + +@api_mock +def test_stream_build_logs_unknown_log_type_is_skipped( + logs_route: respx.Route, client: APIClient, deployment_id: str +) -> None: + logs_route.mock( + return_value=Response( + 200, + content=build_logs_response( + {"type": "unknown_future_type", "id": "1"}, + {"type": "message", "message": "Valid", "id": "2"}, + {"type": "complete", "id": "3"}, + ), + ) + ) + + logs = list(client.stream_build_logs(deployment_id)) + + # Unknown type should be filtered out + assert len(logs) == 2 + assert logs[0].type == BuildLogType.message + assert logs[1].type == BuildLogType.complete + + +@pytest.mark.parametrize( + "network_error", + [httpx.NetworkError, httpx.TimeoutException, httpx.RemoteProtocolError], +) +@api_mock +def test_stream_build_logs_network_error_retry( + logs_route: respx.Route, + client: APIClient, + network_error: Exception, + deployment_id: str, +) -> None: + # First call fails, second succeeds + logs_route.side_effect = [ + network_error, + network_error, + Response( + 200, + content=build_logs_response( + {"type": "message", "message": "Success after retry", "id": "1"}, + {"type": "complete", "id": "2"}, + ), + ), + ] + + with patch("time.sleep"): + logs = list(client.stream_build_logs(deployment_id)) + + assert len(logs) == 2 + assert logs[0].message == "Success after retry" + + +@api_mock +def test_stream_build_logs_server_error_retry( + logs_route: respx.Route, client: APIClient, deployment_id: str +) -> None: + logs_route.side_effect = [ + Response(500, text="Internal Server Error"), + Response( + 200, + content=build_logs_response( + {"type": "complete", "id": "1"}, + ), + ), + ] + + with patch("time.sleep"): + logs = list(client.stream_build_logs(deployment_id)) + + assert len(logs) == 1 + assert logs[0].type == BuildLogType.complete + + +@api_mock +def test_stream_build_logs_client_error_raises_immediately( + logs_route: respx.Route, client: APIClient, deployment_id: str +) -> None: + logs_route.mock(return_value=Response(404, text="Not Found")) + + with pytest.raises(BuildLogError, match="HTTP 404"): + list(client.stream_build_logs(deployment_id)) + + +@api_mock +def test_stream_build_logs_max_retries_exceeded( + logs_route: respx.Route, client: APIClient, deployment_id: str +) -> None: + logs_route.side_effect = httpx.NetworkError("Connection failed") + + with patch("time.sleep"): + with pytest.raises( + BuildLogError, match=f"Failed after {BUILD_LOG_MAX_RETRIES} attempts" + ): + list(client.stream_build_logs(deployment_id)) + + +@api_mock +def test_stream_build_logs_empty_lines_are_skipped( + logs_route: respx.Route, client: APIClient, deployment_id: str +) -> None: + content = "\n".join( + [ + "", + '{"type": "message", "message": "Valid", "id": "1"}', + " ", + '{"type": "complete", "id": "2"}', + "", + ] + ) + + logs_route.mock(return_value=Response(200, content=content)) + + logs = list(client.stream_build_logs(deployment_id)) + + assert len(logs) == 2 + assert logs[0].type == BuildLogType.message + assert logs[1].type == BuildLogType.complete + + +@respx.mock(base_url=settings.base_api_url) +def test_stream_build_logs_continue_after_timeout( + respx_mock: respx.MockRouter, + client: APIClient, + deployment_id: str, +) -> None: + for id, last_id in enumerate([None, "1", "2"], start=1): + params = {"last_id": last_id} if last_id else {} + message = f"message {id}" + + respx_mock.get( + f"/deployments/{deployment_id}/build-logs", params__eq=params + ).mock( + return_value=Response( + 200, + content=build_logs_response( + {"type": "message", "message": message, "id": str(id)}, + {"type": "timeout"}, + ), + ) + ) + + respx_mock.get( + f"/deployments/{deployment_id}/build-logs", params__eq={"last_id": "3"} + ).mock( + return_value=Response( + 200, + content=build_logs_response( + {"type": "message", "message": "message 4", "id": "4"}, + {"type": "complete", "id": "5"}, + ), + ) + ) + + logs = client.stream_build_logs(deployment_id) + + with patch("time.sleep"): + assert next(logs).message == "message 1" + assert next(logs).message == "message 2" + assert next(logs).message == "message 3" + assert next(logs).message == "message 4" + assert next(logs).type == BuildLogType.complete + + +@api_mock +def test_stream_build_logs_connection_closed_without_complete_failed_or_timeout( + logs_route: respx.Route, client: APIClient, deployment_id: str +) -> None: + logs_route.mock( + return_value=Response( + 200, + content=build_logs_response( + {"type": "message", "message": "hello", "id": "1"}, + ), + ) + ) + + logs = client.stream_build_logs(deployment_id) + + with pytest.raises(BuildLogError, match="Failed after"): + for _ in range(BUILD_LOG_MAX_RETRIES + 1): + next(logs) + + +@api_mock +@pytest.mark.time_machine("2025-11-01 13:00:00", tick=False) +def test_stream_build_logs_retry_timeout( + logs_route: respx.Route, + client: APIClient, + time_machine: TimeMachineFixture, + deployment_id: str, +) -> None: + def responses(request: httpx.Request, route: respx.Route) -> Response: + time_machine.shift(timedelta(hours=1)) + + return Response( + 200, + content=build_logs_response( + {"type": "message", "message": "First", "id": "1"}, + ), + ) + + logs_route.mock(side_effect=responses) + + with pytest.raises(TimeoutError, match="timed out"): + list(client.stream_build_logs(deployment_id)) diff --git a/tests/test_cli_deploy.py b/tests/test_cli_deploy.py index d1d6ad8..038b743 100644 --- a/tests/test_cli_deploy.py +++ b/tests/test_cli_deploy.py @@ -1,19 +1,22 @@ import random import string +from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Optional from unittest.mock import patch +import httpx import pytest import respx from click.testing import Result from httpx import Response +from time_machine import TimeMachineFixture from typer.testing import CliRunner from fastapi_cloud_cli.cli import app from fastapi_cloud_cli.config import Settings from tests.conftest import ConfiguredApp -from tests.utils import Keys, changing_dir +from tests.utils import Keys, build_logs_response, changing_dir runner = CliRunner() settings = Settings.get() @@ -430,9 +433,10 @@ def test_exits_successfully_when_deployment_is_done( respx_mock.get(f"/deployments/{deployment_data['id']}/build-logs").mock( return_value=Response( 200, - json={ - "message": "Hello, world!", - }, + content=build_logs_response( + {"type": "message", "message": "Building...", "id": "1"}, + {"type": "complete"}, + ), ) ) @@ -483,10 +487,11 @@ def test_exits_successfully_when_deployment_is_done_when_app_is_configured( respx_mock.get(f"/deployments/{deployment_data['id']}/build-logs").mock( return_value=Response( 200, - json={ - "message": "All good!", - "type": "complete", - }, + content=build_logs_response( + {"type": "message", "message": "Building...", "id": "1"}, + {"type": "message", "message": "All good!", "id": "2"}, + {"type": "complete"}, + ), ) ) @@ -545,10 +550,7 @@ def test_exits_with_error_when_deployment_fails_to_build( respx_mock.get(f"/deployments/{deployment_data['id']}/build-logs").mock( return_value=Response( 200, - json={ - "message": "Build failed", - "type": "failed", - }, + json={"type": "failed"}, ) ) @@ -600,10 +602,7 @@ def test_shows_error_when_deployment_build_fails( respx_mock.get(f"/deployments/{deployment_data['id']}/build-logs").mock( return_value=Response( 200, - json={ - "type": "failed", - "message": "Build failed", - }, + json={"type": "failed"}, ) ) @@ -787,3 +786,223 @@ def test_shows_no_apps_found_message_when_team_has_no_apps( "No apps found in this team. You can create a new app instead." in result.output ) + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_handles_build_log_streaming_error( + logged_in_cli: None, tmp_path: Path, respx_mock: respx.MockRouter +) -> None: + """Test that BuildLogError is caught and shows dashboard link (lines 384, 387-392).""" + app_data = _get_random_app() + team_data = _get_random_team() + app_id = app_data["id"] + team_id = team_data["id"] + deployment_data = _get_random_deployment(app_id=app_id) + + config_path = tmp_path / ".fastapicloud" / "cloud.json" + config_path.parent.mkdir(parents=True, exist_ok=True) + config_path.write_text(f'{{"app_id": "{app_id}", "team_id": "{team_id}"}}') + + respx_mock.get(f"/apps/{app_id}").mock(return_value=Response(200, json=app_data)) + respx_mock.post(f"/apps/{app_id}/deployments/").mock( + return_value=Response(201, json=deployment_data) + ) + respx_mock.post(f"/deployments/{deployment_data['id']}/upload").mock( + return_value=Response( + 200, json={"url": "http://test.com", "fields": {"key": "value"}} + ) + ) + respx_mock.post("http://test.com", data={"key": "value"}).mock( + return_value=Response(200) + ) + respx_mock.post(f"/deployments/{deployment_data['id']}/upload-complete").mock( + return_value=Response(200) + ) + + respx_mock.get(f"/deployments/{deployment_data['id']}/build-logs").mock( + return_value=Response(422, text="Error") + ) + + with changing_dir(tmp_path): + result = runner.invoke(app, ["deploy"]) + + assert result.exit_code == 1 + assert "Unable to stream build logs" in result.output + assert deployment_data["dashboard_url"] in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_shows_error_message_when_build_log_streaming_fails( + logged_in_cli: None, + tmp_path: Path, + respx_mock: respx.MockRouter, +) -> None: + app_data = _get_random_app() + team_data = _get_random_team() + app_id = app_data["id"] + team_id = team_data["id"] + deployment_data = _get_random_deployment(app_id=app_id) + + config_path = tmp_path / ".fastapicloud" / "cloud.json" + config_path.parent.mkdir(parents=True, exist_ok=True) + config_path.write_text(f'{{"app_id": "{app_id}", "team_id": "{team_id}"}}') + + respx_mock.get(f"/apps/{app_id}").mock(return_value=Response(200, json=app_data)) + respx_mock.post(f"/apps/{app_id}/deployments/").mock( + return_value=Response(201, json=deployment_data) + ) + respx_mock.post(f"/deployments/{deployment_data['id']}/upload").mock( + return_value=Response( + 200, json={"url": "http://test.com", "fields": {"key": "value"}} + ) + ) + respx_mock.post("http://test.com", data={"key": "value"}).mock( + return_value=Response(200) + ) + respx_mock.post(f"/deployments/{deployment_data['id']}/upload-complete").mock( + return_value=Response(200) + ) + + respx_mock.get(f"/deployments/{deployment_data['id']}/build-logs").mock( + return_value=Response(500, text="Internal Server Error") + ) + + with changing_dir(tmp_path), patch("time.sleep"): + result = runner.invoke(app, ["deploy"]) + + assert "Unable to stream build logs" in result.output + assert deployment_data["dashboard_url"] in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +@pytest.mark.time_machine(datetime(2025, 11, 1), tick=False) +@patch("fastapi_cloud_cli.commands.deploy.WAITING_MESSAGES", ["short wait message"]) +def test_short_wait_messages( + logged_in_cli: None, + tmp_path: Path, + respx_mock: respx.MockRouter, + time_machine: TimeMachineFixture, +) -> None: + app_data = _get_random_app() + team_data = _get_random_team() + app_id = app_data["id"] + team_id = team_data["id"] + deployment_data = _get_random_deployment(app_id=app_id) + + config_path = tmp_path / ".fastapicloud" / "cloud.json" + config_path.parent.mkdir(parents=True, exist_ok=True) + config_path.write_text(f'{{"app_id": "{app_id}", "team_id": "{team_id}"}}') + + respx_mock.get(f"/apps/{app_id}").mock(return_value=Response(200, json=app_data)) + respx_mock.post(f"/apps/{app_id}/deployments/").mock( + return_value=Response(201, json=deployment_data) + ) + respx_mock.post(f"/deployments/{deployment_data['id']}/upload").mock( + return_value=Response( + 200, json={"url": "http://test.com", "fields": {"key": "value"}} + ) + ) + respx_mock.post("http://test.com", data={"key": "value"}).mock( + return_value=Response(200) + ) + respx_mock.post(f"/deployments/{deployment_data['id']}/upload-complete").mock( + return_value=Response(200) + ) + + def build_logs_handler(request: httpx.Request, route: respx.Route) -> Response: + if route.call_count <= 2: + time_machine.shift(timedelta(seconds=3)) + return Response( + 200, + content=build_logs_response( + { + "type": "message", + "message": f"Step {route.call_count}", + "id": str(route.call_count), + }, + {"type": "timeout"}, + ), + ) + else: + return Response( + 200, + content=build_logs_response( + {"type": "complete"}, + ), + ) + + respx_mock.get(f"/deployments/{deployment_data['id']}/build-logs").mock( + side_effect=build_logs_handler + ) + + with changing_dir(tmp_path), patch("time.sleep"): + result = runner.invoke(app, ["deploy"]) + + assert "short wait message" in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +@pytest.mark.time_machine(datetime(2025, 11, 1), tick=False) +@patch("fastapi_cloud_cli.commands.deploy.LONG_WAIT_MESSAGES", ["long wait message"]) +def test_long_wait_messages( + logged_in_cli: None, + tmp_path: Path, + respx_mock: respx.MockRouter, + time_machine: TimeMachineFixture, +) -> None: + app_data = _get_random_app() + team_data = _get_random_team() + app_id = app_data["id"] + team_id = team_data["id"] + deployment_data = _get_random_deployment(app_id=app_id) + + config_path = tmp_path / ".fastapicloud" / "cloud.json" + config_path.parent.mkdir(parents=True, exist_ok=True) + config_path.write_text(f'{{"app_id": "{app_id}", "team_id": "{team_id}"}}') + + respx_mock.get(f"/apps/{app_id}").mock(return_value=Response(200, json=app_data)) + respx_mock.post(f"/apps/{app_id}/deployments/").mock( + return_value=Response(201, json=deployment_data) + ) + respx_mock.post(f"/deployments/{deployment_data['id']}/upload").mock( + return_value=Response( + 200, json={"url": "http://test.com", "fields": {"key": "value"}} + ) + ) + respx_mock.post("http://test.com", data={"key": "value"}).mock( + return_value=Response(200) + ) + respx_mock.post(f"/deployments/{deployment_data['id']}/upload-complete").mock( + return_value=Response(200) + ) + + def build_logs_handler(request: httpx.Request, route: respx.Route) -> Response: + if route.call_count <= 2: + time_machine.shift(timedelta(seconds=35)) + return Response( + 200, + content=build_logs_response( + { + "type": "message", + "message": f"Step {route.call_count}", + "id": str(route.call_count), + }, + {"type": "timeout"}, + ), + ) + else: + return Response( + 200, + content=build_logs_response( + {"type": "complete"}, + ), + ) + + respx_mock.get(f"/deployments/{deployment_data['id']}/build-logs").mock( + side_effect=build_logs_handler + ) + + with changing_dir(tmp_path), patch("time.sleep"): + result = runner.invoke(app, ["deploy"]) + + assert "long wait message" in result.output diff --git a/tests/utils.py b/tests/utils.py index c0d2e32..2901a83 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -16,6 +16,11 @@ def changing_dir(directory: Union[str, Path]) -> Generator[None, None, None]: os.chdir(initial_dir) +def build_logs_response(*logs: dict[str, Any]) -> str: + """Helper to create NDJSON build logs response.""" + return "\n".join(json.dumps(log) for log in logs) + + class Keys: RIGHT_ARROW = "\x1b[C" DOWN_ARROW = "\x1b[B" From 4aa3d169a969ed07895ee765cc7eb228c65e5610 Mon Sep 17 00:00:00 2001 From: Marco Burro Date: Mon, 10 Nov 2025 10:00:28 +0100 Subject: [PATCH 2/9] Downgrade `time-machine` module --- requirements-tests.txt | 2 +- tests/test_api_client.py | 3 ++- tests/test_cli_deploy.py | 7 ++++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/requirements-tests.txt b/requirements-tests.txt index 759f6c2..88d7f43 100644 --- a/requirements-tests.txt +++ b/requirements-tests.txt @@ -5,4 +5,4 @@ coverage[toml] >=6.2,<8.0 mypy ==1.14.1 ruff ==0.13.0 respx ==0.22.0 -time-machine ==2.19.0 +time-machine ==2.15.0 diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 4e7c3dd..204e2c0 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -346,13 +346,14 @@ def test_stream_build_logs_connection_closed_without_complete_failed_or_timeout( @api_mock -@pytest.mark.time_machine("2025-11-01 13:00:00", tick=False) def test_stream_build_logs_retry_timeout( logs_route: respx.Route, client: APIClient, time_machine: TimeMachineFixture, deployment_id: str, ) -> None: + time_machine.move_to("2025-11-01 13:00:00", tick=False) + def responses(request: httpx.Request, route: respx.Route) -> Response: time_machine.shift(timedelta(hours=1)) diff --git a/tests/test_cli_deploy.py b/tests/test_cli_deploy.py index 038b743..d67544d 100644 --- a/tests/test_cli_deploy.py +++ b/tests/test_cli_deploy.py @@ -1,6 +1,6 @@ import random import string -from datetime import datetime, timedelta +from datetime import timedelta from pathlib import Path from typing import Dict, Optional from unittest.mock import patch @@ -875,7 +875,6 @@ def test_shows_error_message_when_build_log_streaming_fails( @pytest.mark.respx(base_url=settings.base_api_url) -@pytest.mark.time_machine(datetime(2025, 11, 1), tick=False) @patch("fastapi_cloud_cli.commands.deploy.WAITING_MESSAGES", ["short wait message"]) def test_short_wait_messages( logged_in_cli: None, @@ -883,6 +882,7 @@ def test_short_wait_messages( respx_mock: respx.MockRouter, time_machine: TimeMachineFixture, ) -> None: + time_machine.move_to("2025-11-01 13:00:00", tick=False) app_data = _get_random_app() team_data = _get_random_team() app_id = app_data["id"] @@ -942,7 +942,6 @@ def build_logs_handler(request: httpx.Request, route: respx.Route) -> Response: @pytest.mark.respx(base_url=settings.base_api_url) -@pytest.mark.time_machine(datetime(2025, 11, 1), tick=False) @patch("fastapi_cloud_cli.commands.deploy.LONG_WAIT_MESSAGES", ["long wait message"]) def test_long_wait_messages( logged_in_cli: None, @@ -950,6 +949,8 @@ def test_long_wait_messages( respx_mock: respx.MockRouter, time_machine: TimeMachineFixture, ) -> None: + time_machine.move_to("2025-11-01 13:00:00", tick=False) + app_data = _get_random_app() team_data = _get_random_team() app_id = app_data["id"] From e1b88f4885ee29b80209105f0fcaac7ad793bc48 Mon Sep 17 00:00:00 2001 From: Marco Burro Date: Mon, 10 Nov 2025 10:03:25 +0100 Subject: [PATCH 3/9] Fix type annotations --- src/fastapi_cloud_cli/utils/api.py | 10 +++++----- tests/utils.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/fastapi_cloud_cli/utils/api.py b/src/fastapi_cloud_cli/utils/api.py index 4210387..fdb6774 100644 --- a/src/fastapi_cloud_cli/utils/api.py +++ b/src/fastapi_cloud_cli/utils/api.py @@ -1,9 +1,9 @@ import logging import time -from contextlib import AbstractContextManager, contextmanager +from contextlib import contextmanager from datetime import timedelta from enum import Enum -from typing import Generator, Optional +from typing import ContextManager, Generator, Optional import httpx from pydantic import BaseModel, ValidationError @@ -31,8 +31,8 @@ class BuildLogType(str, Enum): class BuildLogLine(BaseModel): type: BuildLogType - message: str | None = None - id: str | None = None + message: Optional[str] = None + id: Optional[str] = None @contextmanager @@ -79,7 +79,7 @@ def _backoff() -> None: def attempts( total_attempts: int = 3, timeout: timedelta = timedelta(minutes=5) -) -> Generator[AbstractContextManager[None], None, None]: +) -> Generator[ContextManager[None], None, None]: start = time.monotonic() for attempt_number in range(total_attempts): diff --git a/tests/utils.py b/tests/utils.py index 2901a83..562e994 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -16,7 +16,7 @@ def changing_dir(directory: Union[str, Path]) -> Generator[None, None, None]: os.chdir(initial_dir) -def build_logs_response(*logs: dict[str, Any]) -> str: +def build_logs_response(*logs: Dict[str, Any]) -> str: """Helper to create NDJSON build logs response.""" return "\n".join(json.dumps(log) for log in logs) From f8a796fb9679b9a7c775d57f20cd225b5c783bce Mon Sep 17 00:00:00 2001 From: Marco Burro Date: Wed, 19 Nov 2025 12:00:47 +0000 Subject: [PATCH 4/9] Refactor --- src/fastapi_cloud_cli/commands/deploy.py | 69 ++++++++++++------------ 1 file changed, 34 insertions(+), 35 deletions(-) diff --git a/src/fastapi_cloud_cli/commands/deploy.py b/src/fastapi_cloud_cli/commands/deploy.py index 78cc0a7..5069c1f 100644 --- a/src/fastapi_cloud_cli/commands/deploy.py +++ b/src/fastapi_cloud_cli/commands/deploy.py @@ -352,51 +352,50 @@ def _wait_for_deployment( with toolkit.progress( next(messages), inline_logs=True, lines_to_show=20 - ) as progress: - with APIClient() as client: - try: - for log in client.stream_build_logs(deployment.id): - time_elapsed = time.monotonic() - started_at + ) as progress, APIClient() as client: + try: + for log in client.stream_build_logs(deployment.id): + time_elapsed = time.monotonic() - started_at - if log.type == BuildLogType.message and log.message: - progress.log(Text.from_ansi(log.message.rstrip())) + if log.type == BuildLogType.message and log.message: + progress.log(Text.from_ansi(log.message.rstrip())) - if log.type == BuildLogType.complete: - progress.log("") - progress.log( - f"🐔 Ready the chicken! Your app is ready at [link={deployment.url}]{deployment.url}[/link]" - ) + if log.type == BuildLogType.complete: + progress.log("") + progress.log( + f"🐔 Ready the chicken! Your app is ready at [link={deployment.url}]{deployment.url}[/link]" + ) - progress.log("") + progress.log("") - progress.log( - f"You can also check the app logs at [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]" - ) + progress.log( + f"You can also check the app logs at [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]" + ) - break + break - if log.type == BuildLogType.failed: - progress.log("") - progress.log( - f"😔 Oh no! Something went wrong. Check out the logs at [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]" - ) - raise typer.Exit(1) + if log.type == BuildLogType.failed: + progress.log("") + progress.log( + f"😔 Oh no! Something went wrong. Check out the logs at [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]" + ) + raise typer.Exit(1) - if time_elapsed > 30: - messages = cycle(LONG_WAIT_MESSAGES) + if time_elapsed > 30: + messages = cycle(LONG_WAIT_MESSAGES) - if (time.monotonic() - last_message_changed_at) > 2: - progress.title = next(messages) + if (time.monotonic() - last_message_changed_at) > 2: + progress.title = next(messages) - last_message_changed_at = time.monotonic() + last_message_changed_at = time.monotonic() - except BuildLogError as e: - logger.error("Build log streaming failed: %s", e) - toolkit.print_line() - toolkit.print( - f"⚠️ Unable to stream build logs. Check the dashboard for status: [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]" - ) - raise typer.Exit(1) from e + except BuildLogError as e: + logger.error("Build log streaming failed: %s", e) + toolkit.print_line() + toolkit.print( + f"⚠️ Unable to stream build logs. Check the dashboard for status: [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]" + ) + raise typer.Exit(1) from e class SignupToWaitingList(BaseModel): From 7b545bafddf71df134adfd2ee4a480eac77e312a Mon Sep 17 00:00:00 2001 From: Marco Burro Date: Wed, 19 Nov 2025 12:02:01 +0000 Subject: [PATCH 5/9] Add type for build log message --- src/fastapi_cloud_cli/commands/deploy.py | 8 ++-- src/fastapi_cloud_cli/utils/api.py | 41 ++++++++++---------- tests/test_api_client.py | 49 ++++++++++++------------ 3 files changed, 50 insertions(+), 48 deletions(-) diff --git a/src/fastapi_cloud_cli/commands/deploy.py b/src/fastapi_cloud_cli/commands/deploy.py index 5069c1f..d0e4b0f 100644 --- a/src/fastapi_cloud_cli/commands/deploy.py +++ b/src/fastapi_cloud_cli/commands/deploy.py @@ -19,7 +19,7 @@ from typing_extensions import Annotated from fastapi_cloud_cli.commands.login import login -from fastapi_cloud_cli.utils.api import APIClient, BuildLogError, BuildLogType +from fastapi_cloud_cli.utils.api import APIClient, BuildLogError from fastapi_cloud_cli.utils.apps import AppConfig, get_app_config, write_app_config from fastapi_cloud_cli.utils.auth import is_logged_in from fastapi_cloud_cli.utils.cli import get_rich_toolkit, handle_http_errors @@ -357,10 +357,10 @@ def _wait_for_deployment( for log in client.stream_build_logs(deployment.id): time_elapsed = time.monotonic() - started_at - if log.type == BuildLogType.message and log.message: + if log.type == "message": progress.log(Text.from_ansi(log.message.rstrip())) - if log.type == BuildLogType.complete: + if log.type == "complete": progress.log("") progress.log( f"🐔 Ready the chicken! Your app is ready at [link={deployment.url}]{deployment.url}[/link]" @@ -374,7 +374,7 @@ def _wait_for_deployment( break - if log.type == BuildLogType.failed: + if log.type == "failed": progress.log("") progress.log( f"😔 Oh no! Something went wrong. Check out the logs at [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]" diff --git a/src/fastapi_cloud_cli/utils/api.py b/src/fastapi_cloud_cli/utils/api.py index fdb6774..f14a1ad 100644 --- a/src/fastapi_cloud_cli/utils/api.py +++ b/src/fastapi_cloud_cli/utils/api.py @@ -2,11 +2,11 @@ import time from contextlib import contextmanager from datetime import timedelta -from enum import Enum -from typing import ContextManager, Generator, Optional +from typing import ContextManager, Generator, Literal, Optional, Union import httpx -from pydantic import BaseModel, ValidationError +from pydantic import BaseModel, Field, TypeAdapter, ValidationError +from typing_extensions import Annotated from fastapi_cloud_cli import __version__ from fastapi_cloud_cli.config import Settings @@ -18,23 +18,27 @@ BUILD_LOG_TIMEOUT = timedelta(minutes=5) -class BuildLogError(Exception): ... +class BuildLogError(Exception): + pass -class BuildLogType(str, Enum): - message = "message" - complete = "complete" - failed = "failed" - timeout = "timeout" # Request closed, reconnect to continue - heartbeat = "heartbeat" # Keepalive signal when no new logs +class BuildLogLineGeneric(BaseModel): + type: Literal["complete", "failed", "timeout", "heartbeat"] + id: Optional[str] = None -class BuildLogLine(BaseModel): - type: BuildLogType - message: Optional[str] = None +class BuildLogLineMessage(BaseModel): + type: Literal["message"] = "message" + message: str id: Optional[str] = None +BuildLogLine = Union[BuildLogLineMessage, BuildLogLineGeneric] +BuildLogAdapter = TypeAdapter[BuildLogLine]( + Annotated[BuildLogLine, Field(discriminator="type")] +) + + @contextmanager def attempt(attempt_number: int) -> Generator[None, None, None]: def _backoff() -> None: @@ -132,18 +136,15 @@ def stream_build_logs( if log_line.id: last_id = log_line.id - if log_line.type == BuildLogType.message: + if log_line.type == "message": yield log_line - if log_line.type in ( - BuildLogType.complete, - BuildLogType.failed, - ): + if log_line.type in ("complete", "failed"): yield log_line return - if log_line.type == BuildLogType.timeout: + if log_line.type == "timeout": logger.debug("Received timeout; reconnecting") break # Breaks for loop to reconnect @@ -160,7 +161,7 @@ def stream_build_logs( def _parse_log_line(self, line: str) -> Optional[BuildLogLine]: try: - return BuildLogLine.model_validate_json(line) + return BuildLogAdapter.validate_json(line) except ValidationError as e: logger.debug("Skipping malformed log: %s (error: %s)", line[:100], e) return None diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 204e2c0..5e94940 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -12,7 +12,7 @@ BUILD_LOG_MAX_RETRIES, APIClient, BuildLogError, - BuildLogType, + BuildLogLineMessage, ) from tests.utils import build_logs_response @@ -59,13 +59,13 @@ def test_stream_build_logs_successful( assert len(logs) == 3 - assert logs[0].type == BuildLogType.message + assert logs[0].type == "message" assert logs[0].message == "Building..." - assert logs[1].type == BuildLogType.message + assert logs[1].type == "message" assert logs[1].message == "Done!" - assert logs[2].type == BuildLogType.complete + assert logs[2].type == "complete" @api_mock @@ -85,16 +85,16 @@ def test_stream_build_logs_failed( logs = list(client.stream_build_logs(deployment_id)) assert len(logs) == 2 - assert logs[0].type == BuildLogType.message - assert logs[1].type == BuildLogType.failed + assert logs[0].type == "message" + assert logs[1].type == "failed" -@pytest.mark.parametrize("terminal_type", [BuildLogType.complete, BuildLogType.failed]) +@pytest.mark.parametrize("terminal_type", ["complete", "failed"]) @api_mock def test_stream_build_logs_stop_after_terminal_state( logs_route: respx.Route, client: APIClient, - terminal_type: BuildLogType, + terminal_type: str, deployment_id: str, ) -> None: logs_route.mock( @@ -111,7 +111,7 @@ def test_stream_build_logs_stop_after_terminal_state( logs = list(client.stream_build_logs(deployment_id)) assert len(logs) == 2 - assert logs[0].type == BuildLogType.message + assert logs[0].type == "message" assert logs[1].type == terminal_type @@ -125,7 +125,7 @@ def test_stream_build_logs_internal_messages_are_skipped( return_value=Response( 200, content=build_logs_response( - {"type": BuildLogType.heartbeat, "id": "1"}, + {"type": "heartbeat", "id": "1"}, {"type": "message", "message": "Continuing...", "id": "2"}, {"type": "complete", "id": "3"}, ), @@ -135,8 +135,8 @@ def test_stream_build_logs_internal_messages_are_skipped( logs = list(client.stream_build_logs(deployment_id)) assert len(logs) == 2 - assert logs[0].type == BuildLogType.message - assert logs[1].type == BuildLogType.complete + assert logs[0].type == "message" + assert logs[1].type == "complete" @api_mock @@ -156,8 +156,8 @@ def test_stream_build_logs_malformed_json_is_skipped( logs = list(client.stream_build_logs(deployment_id)) assert len(logs) == 2 - assert logs[0].type == BuildLogType.message - assert logs[1].type == BuildLogType.complete + assert logs[0].type == "message" + assert logs[1].type == "complete" @api_mock @@ -179,8 +179,8 @@ def test_stream_build_logs_unknown_log_type_is_skipped( # Unknown type should be filtered out assert len(logs) == 2 - assert logs[0].type == BuildLogType.message - assert logs[1].type == BuildLogType.complete + assert logs[0].type == "message" + assert logs[1].type == "complete" @pytest.mark.parametrize( @@ -211,6 +211,7 @@ def test_stream_build_logs_network_error_retry( logs = list(client.stream_build_logs(deployment_id)) assert len(logs) == 2 + assert logs[0].type == "message" assert logs[0].message == "Success after retry" @@ -232,7 +233,7 @@ def test_stream_build_logs_server_error_retry( logs = list(client.stream_build_logs(deployment_id)) assert len(logs) == 1 - assert logs[0].type == BuildLogType.complete + assert logs[0].type == "complete" @api_mock @@ -277,8 +278,8 @@ def test_stream_build_logs_empty_lines_are_skipped( logs = list(client.stream_build_logs(deployment_id)) assert len(logs) == 2 - assert logs[0].type == BuildLogType.message - assert logs[1].type == BuildLogType.complete + assert logs[0].type == "message" + assert logs[1].type == "complete" @respx.mock(base_url=settings.base_api_url) @@ -318,11 +319,11 @@ def test_stream_build_logs_continue_after_timeout( logs = client.stream_build_logs(deployment_id) with patch("time.sleep"): - assert next(logs).message == "message 1" - assert next(logs).message == "message 2" - assert next(logs).message == "message 3" - assert next(logs).message == "message 4" - assert next(logs).type == BuildLogType.complete + assert next(logs) == BuildLogLineMessage(message="message 1", id="1") + assert next(logs) == BuildLogLineMessage(message="message 2", id="2") + assert next(logs) == BuildLogLineMessage(message="message 3", id="3") + assert next(logs) == BuildLogLineMessage(message="message 4", id="4") + assert next(logs).type == "complete" @api_mock From 14a8501051f966622d41a916f1d8078dabefd867 Mon Sep 17 00:00:00 2001 From: Marco Burro Date: Wed, 19 Nov 2025 16:26:14 +0000 Subject: [PATCH 6/9] Move attempts logic to decorator --- src/fastapi_cloud_cli/commands/deploy.py | 4 +- src/fastapi_cloud_cli/utils/api.py | 116 ++++++++++++++--------- tests/test_api_client.py | 7 +- 3 files changed, 77 insertions(+), 50 deletions(-) diff --git a/src/fastapi_cloud_cli/commands/deploy.py b/src/fastapi_cloud_cli/commands/deploy.py index d0e4b0f..dddd713 100644 --- a/src/fastapi_cloud_cli/commands/deploy.py +++ b/src/fastapi_cloud_cli/commands/deploy.py @@ -19,7 +19,7 @@ from typing_extensions import Annotated from fastapi_cloud_cli.commands.login import login -from fastapi_cloud_cli.utils.api import APIClient, BuildLogError +from fastapi_cloud_cli.utils.api import APIClient, BuildLogError, TooManyRetriesError from fastapi_cloud_cli.utils.apps import AppConfig, get_app_config, write_app_config from fastapi_cloud_cli.utils.auth import is_logged_in from fastapi_cloud_cli.utils.cli import get_rich_toolkit, handle_http_errors @@ -389,7 +389,7 @@ def _wait_for_deployment( last_message_changed_at = time.monotonic() - except BuildLogError as e: + except (BuildLogError, TooManyRetriesError) as e: logger.error("Build log streaming failed: %s", e) toolkit.print_line() toolkit.print( diff --git a/src/fastapi_cloud_cli/utils/api.py b/src/fastapi_cloud_cli/utils/api.py index f14a1ad..7692f83 100644 --- a/src/fastapi_cloud_cli/utils/api.py +++ b/src/fastapi_cloud_cli/utils/api.py @@ -2,11 +2,19 @@ import time from contextlib import contextmanager from datetime import timedelta -from typing import ContextManager, Generator, Literal, Optional, Union +from functools import wraps +from typing import ( + Callable, + Generator, + Literal, + Optional, + TypeVar, + Union, +) import httpx from pydantic import BaseModel, Field, TypeAdapter, ValidationError -from typing_extensions import Annotated +from typing_extensions import Annotated, ParamSpec from fastapi_cloud_cli import __version__ from fastapi_cloud_cli.config import Settings @@ -22,6 +30,10 @@ class BuildLogError(Exception): pass +class TooManyRetriesError(Exception): + pass + + class BuildLogLineGeneric(BaseModel): type: Literal["complete", "failed", "timeout", "heartbeat"] id: Optional[str] = None @@ -81,18 +93,39 @@ def _backoff() -> None: ) from error +P = ParamSpec("P") +T = TypeVar("T") + + def attempts( total_attempts: int = 3, timeout: timedelta = timedelta(minutes=5) -) -> Generator[ContextManager[None], None, None]: - start = time.monotonic() +) -> Callable[ + [Callable[P, Generator[T, None, None]]], Callable[P, Generator[T, None, None]] +]: + def decorator( + func: Callable[P, Generator[T, None, None]], + ) -> Callable[P, Generator[T, None, None]]: + @wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> Generator[T, None, None]: + start = time.monotonic() - for attempt_number in range(total_attempts): - if time.monotonic() - start > timeout.total_seconds(): - raise TimeoutError( - "Build log streaming timed out after %ds", timeout.total_seconds() - ) + for attempt_number in range(total_attempts): + if time.monotonic() - start > timeout.total_seconds(): + raise TimeoutError( + "Build log streaming timed out after %ds", + timeout.total_seconds(), + ) + + with attempt(attempt_number): + yield from func(*args, **kwargs) + # If we get here without exception, the generator completed successfully + return + + raise TooManyRetriesError(f"Failed after {total_attempts} attempts") - yield attempt(attempt_number) + return wrapper + + return decorator class APIClient(httpx.Client): @@ -110,54 +143,47 @@ def __init__(self) -> None: }, ) + @attempts(BUILD_LOG_MAX_RETRIES, BUILD_LOG_TIMEOUT) def stream_build_logs( self, deployment_id: str ) -> Generator[BuildLogLine, None, None]: last_id = None - for attempt in attempts(BUILD_LOG_MAX_RETRIES, BUILD_LOG_TIMEOUT): - with attempt: - while True: - params = {"last_id": last_id} if last_id else None - - with self.stream( - "GET", - f"/deployments/{deployment_id}/build-logs", - timeout=60, - params=params, - ) as response: - response.raise_for_status() - - for line in response.iter_lines(): - if not line or not line.strip(): - continue + while True: + params = {"last_id": last_id} if last_id else None - if log_line := self._parse_log_line(line): - if log_line.id: - last_id = log_line.id + with self.stream( + "GET", + f"/deployments/{deployment_id}/build-logs", + timeout=60, + params=params, + ) as response: + response.raise_for_status() - if log_line.type == "message": - yield log_line + for line in response.iter_lines(): + if not line or not line.strip(): + continue - if log_line.type in ("complete", "failed"): - yield log_line + if log_line := self._parse_log_line(line): + if log_line.id: + last_id = log_line.id - return + if log_line.type == "message": + yield log_line - if log_line.type == "timeout": - logger.debug("Received timeout; reconnecting") - break # Breaks for loop to reconnect + if log_line.type in ("complete", "failed"): + yield log_line + return - else: # Only triggered if the for loop is not broken - logger.debug( - "Connection closed by server unexpectedly; attempting to reconnect" - ) - break + if log_line.type == "timeout": + logger.debug("Received timeout; reconnecting") + break # Breaks for loop to reconnect + else: + logger.debug("Connection closed by server unexpectedly; will retry") - time.sleep(0.5) + raise httpx.NetworkError("Connection closed without terminal state") - # Exhausted retries without getting any response - raise BuildLogError(f"Failed after {BUILD_LOG_MAX_RETRIES} attempts") + time.sleep(0.5) def _parse_log_line(self, line: str) -> Optional[BuildLogLine]: try: diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 5e94940..6a25d3f 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -13,6 +13,7 @@ APIClient, BuildLogError, BuildLogLineMessage, + TooManyRetriesError, ) from tests.utils import build_logs_response @@ -254,7 +255,7 @@ def test_stream_build_logs_max_retries_exceeded( with patch("time.sleep"): with pytest.raises( - BuildLogError, match=f"Failed after {BUILD_LOG_MAX_RETRIES} attempts" + TooManyRetriesError, match=f"Failed after {BUILD_LOG_MAX_RETRIES} attempts" ): list(client.stream_build_logs(deployment_id)) @@ -341,7 +342,7 @@ def test_stream_build_logs_connection_closed_without_complete_failed_or_timeout( logs = client.stream_build_logs(deployment_id) - with pytest.raises(BuildLogError, match="Failed after"): + with patch("time.sleep"), pytest.raises(TooManyRetriesError, match="Failed after"): for _ in range(BUILD_LOG_MAX_RETRIES + 1): next(logs) @@ -367,5 +368,5 @@ def responses(request: httpx.Request, route: respx.Route) -> Response: logs_route.mock(side_effect=responses) - with pytest.raises(TimeoutError, match="timed out"): + with patch("time.sleep"), pytest.raises(TimeoutError, match="timed out"): list(client.stream_build_logs(deployment_id)) From c2c351973b278cea195d447d3cce86f936aec393 Mon Sep 17 00:00:00 2001 From: Patrick Arminio Date: Thu, 20 Nov 2025 10:26:58 +0000 Subject: [PATCH 7/9] Use compat layer for pydantic 1 --- src/fastapi_cloud_cli/utils/api.py | 3 ++- src/fastapi_cloud_cli/utils/pydantic_compat.py | 9 +++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/fastapi_cloud_cli/utils/api.py b/src/fastapi_cloud_cli/utils/api.py index 7692f83..af94931 100644 --- a/src/fastapi_cloud_cli/utils/api.py +++ b/src/fastapi_cloud_cli/utils/api.py @@ -13,12 +13,13 @@ ) import httpx -from pydantic import BaseModel, Field, TypeAdapter, ValidationError +from pydantic import BaseModel, Field, ValidationError from typing_extensions import Annotated, ParamSpec from fastapi_cloud_cli import __version__ from fastapi_cloud_cli.config import Settings from fastapi_cloud_cli.utils.auth import get_auth_token +from fastapi_cloud_cli.utils.pydantic_compat import TypeAdapter logger = logging.getLogger(__name__) diff --git a/src/fastapi_cloud_cli/utils/pydantic_compat.py b/src/fastapi_cloud_cli/utils/pydantic_compat.py index 0e2a41e..0e9d9a5 100644 --- a/src/fastapi_cloud_cli/utils/pydantic_compat.py +++ b/src/fastapi_cloud_cli/utils/pydantic_compat.py @@ -61,3 +61,12 @@ def validate_python(self, value: Any) -> T: from pydantic import parse_obj_as return parse_obj_as(self.type_, value) # type: ignore[no-any-return, unused-ignore] + + def validate_json(self, value: str) -> T: + """Validate a JSON string against the type.""" + if PYDANTIC_V2: + return self._adapter.validate_json(value) # type: ignore[no-any-return, union-attr, unused-ignore] + else: + from pydantic import parse_raw_as + + return parse_raw_as(self.type_, value) # type: ignore[no-any-return, unused-ignore, operator] From aabd439f16e985f9eae74adebdbe4cfe0fb3e2d3 Mon Sep 17 00:00:00 2001 From: Patrick Arminio Date: Thu, 20 Nov 2025 10:38:37 +0000 Subject: [PATCH 8/9] Ignore things that can't be typed :') --- src/fastapi_cloud_cli/utils/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fastapi_cloud_cli/utils/api.py b/src/fastapi_cloud_cli/utils/api.py index af94931..ff62208 100644 --- a/src/fastapi_cloud_cli/utils/api.py +++ b/src/fastapi_cloud_cli/utils/api.py @@ -48,7 +48,7 @@ class BuildLogLineMessage(BaseModel): BuildLogLine = Union[BuildLogLineMessage, BuildLogLineGeneric] BuildLogAdapter = TypeAdapter[BuildLogLine]( - Annotated[BuildLogLine, Field(discriminator="type")] + Annotated[BuildLogLine, Field(discriminator="type")] # type: ignore ) From 649ae30dae1ba1b53f8ed37710257e714d4cf9b6 Mon Sep 17 00:00:00 2001 From: Patrick Arminio Date: Thu, 20 Nov 2025 10:43:09 +0000 Subject: [PATCH 9/9] Fix error --- src/fastapi_cloud_cli/utils/api.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/fastapi_cloud_cli/utils/api.py b/src/fastapi_cloud_cli/utils/api.py index ff62208..fe494c2 100644 --- a/src/fastapi_cloud_cli/utils/api.py +++ b/src/fastapi_cloud_cli/utils/api.py @@ -1,3 +1,4 @@ +import json import logging import time from contextlib import contextmanager @@ -189,6 +190,6 @@ def stream_build_logs( def _parse_log_line(self, line: str) -> Optional[BuildLogLine]: try: return BuildLogAdapter.validate_json(line) - except ValidationError as e: + except (ValidationError, json.JSONDecodeError) as e: logger.debug("Skipping malformed log: %s (error: %s)", line[:100], e) return None