Skip to content

Commit

Permalink
Deployment getting stuck at finalizing the build phase (#461)
Browse files Browse the repository at this point in the history
* Add metric measuring unfinished deployments

* Use raw ASGI middlewares

* Set interactive mode to docker compose containers

* set request timeout to 1h

* Add server sent events test with Fastapi & httpx

* Move httpx to racetrack commons

* Use server sent events to keep connection alive

* Move SSE client and server to commons

* Add changelog notice
  • Loading branch information
iszulcdeepsense committed May 15, 2024
1 parent 342a9f8 commit 5cfdd90
Show file tree
Hide file tree
Showing 14 changed files with 429 additions and 47 deletions.
1 change: 0 additions & 1 deletion dashboard/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
httpx==0.24.0
markdown==3.4.1
4 changes: 4 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ services:
DOCKER_HOST: tcp://docker-engine:2375
CONFIG_FILE: /etc/racetrack/image-builder/config.yaml
LIFECYCLE_AUTH_TOKEN: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzZWVkIjoiZGVhNzYzMDAtN2JhYy00ODRjLTkyOTYtNWQxMGNkOTM3YTU4Iiwic3ViamVjdCI6ImltYWdlLWJ1aWxkZXIiLCJzdWJqZWN0X3R5cGUiOiJpbnRlcm5hbCIsInNjb3BlcyI6WyJmdWxsX2FjY2VzcyJdfQ.ND3wDeK58L5T1jIYcuArQ5O3M0Ez3_pCAEi5NXD_hLY
stdin_open: true
tty: true

dashboard:
container_name: dashboard
Expand All @@ -110,6 +112,8 @@ services:
EXTERNAL_GRAFANA_URL: 'http://127.0.0.1:3100'
AUTH_REQUIRED: 'true'
SITE_NAME: ''
stdin_open: true
tty: true

pub:
container_name: pub
Expand Down
4 changes: 4 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Changed
- "Connection refused" error is now retriable in async job calls (will be retried automatically).
([#459](https://github.com/TheRacetrack/racetrack/issues/459))

### Fixed
- Long deployments (over 20 minutes) no longer get stuck in the final stage.
([#448](https://github.com/TheRacetrack/racetrack/issues/448))

## [2.29.2] - 2024-04-30
Expand Down
28 changes: 28 additions & 0 deletions image_builder/image_builder/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
from racetrack_commons.api.asgi.asgi_server import serve_asgi_app
from racetrack_commons.api.asgi.fastapi import create_fastapi
from racetrack_commons.api.metrics import setup_metrics_endpoint
from racetrack_commons.api.server_sent_events import stream_result_with_heartbeat
from racetrack_commons.plugin.engine import PluginEngine
from racetrack_client.log.logs import get_logger

logger = get_logger(__name__)


def run_api_server():
Expand Down Expand Up @@ -146,3 +150,27 @@ def _build(payload: BuildPayloadModel):
'logs': logs,
'error': error,
}

@api.post('/build/sse', response_model=BuildingResultModel)
def _build_server_sent_events(payload: BuildPayloadModel):
"""Stream events of building a Job image from a manifest"""
manifest = load_manifest_from_dict(payload.manifest)
git_credentials = load_credentials_from_dict(payload.git_credentials.model_dump() if payload.git_credentials else None)
tag = payload.tag
secret_build_env = payload.secret_build_env or {}
build_context = payload.build_context
deployment_id = payload.deployment_id
build_flags = payload.build_flags

def _result_runner() -> Dict:
image_names, logs, error = build_job_image(
config, manifest, git_credentials, secret_build_env, tag,
build_context, deployment_id, plugin_engine, build_flags,
)
return {
'image_names': image_names,
'logs': logs,
'error': error,
}

return stream_result_with_heartbeat(_result_runner)
14 changes: 6 additions & 8 deletions lifecycle/lifecycle/deployer/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from lifecycle.job.deployment import create_deployment, save_deployment_build_logs, save_deployment_image_name, save_deployment_result, save_deployment_phase
from racetrack_client.client_config.client_config import Credentials
from racetrack_client.client.env import SecretVars
from racetrack_client.utils.request import parse_response_object, Requests, RequestError
from racetrack_client.utils.request import Requests, RequestError
from racetrack_client.utils.datamodel import datamodel_to_dict
from racetrack_client.utils.time import now
from racetrack_client.log.context_error import wrap_context
Expand All @@ -20,6 +20,7 @@
from racetrack_commons.deploy.image import get_job_image
from racetrack_commons.entities.dto import DeploymentDto, DeploymentStatus
from racetrack_commons.plugin.engine import PluginEngine
from racetrack_commons.api.server_sent_events import make_sse_request

logger = get_logger(__name__)

Expand Down Expand Up @@ -55,17 +56,14 @@ def _send_image_build_request(
"""
logger.info(f'building a job by image-builder, deployment ID: {deployment.id}')
# see `image_builder.api._setup_api_endpoints`
r = Requests.post(
f'{config.image_builder_url}/api/v1/build',
json=_build_image_request_payload(manifest, git_credentials, secret_build_env, tag, build_context, deployment, build_flags),
)
payload = _build_image_request_payload(manifest, git_credentials, secret_build_env, tag, build_context, deployment, build_flags)
result: Dict = make_sse_request('POST', f'{config.image_builder_url}/api/v1/build/sse', payload)
logger.debug(f'image-builder finished building a job, deployment ID: {deployment.id}')
response = parse_response_object(r, 'Image builder API error')
build_logs: str = response['logs']
build_logs: str = result['logs']
image_name = get_job_image(config.docker_registry, config.docker_registry_namespace, manifest.name, tag)
save_deployment_build_logs(deployment.id, build_logs)
save_deployment_image_name(deployment.id, image_name)
error: str = response['error']
error: str = result['error']
if error:
raise RuntimeError(error)
logger.info(f'job image {image_name} has been built, deployment ID: {deployment.id}')
Expand Down
11 changes: 9 additions & 2 deletions lifecycle/lifecycle/deployer/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from lifecycle.job.audit import AuditLogger
from lifecycle.job.deployment import create_deployment, save_deployment_result
from lifecycle.job.models_registry import job_exists, find_deleted_job
from lifecycle.server.metrics import metric_done_job_deployments

logger = get_logger(__name__)

Expand Down Expand Up @@ -99,8 +100,12 @@ def deploy_job_in_background(
Schedule deployment of a job in background
:return: deployment ID
"""
infra_target = determine_infrastructure_name(config, plugin_engine, manifest)
deployment = create_deployment(manifest, username, infra_target)
try:
infra_target = determine_infrastructure_name(config, plugin_engine, manifest)
deployment = create_deployment(manifest, username, infra_target)
except BaseException as e:
metric_done_job_deployments.inc()
raise e
logger.info(f'starting deployment {deployment.id} in background')
args = (config, manifest, git_credentials, secret_vars, deployment,
build_context, force, plugin_engine, auth_subject, build_flags)
Expand Down Expand Up @@ -138,6 +143,8 @@ def deploy_job_saving_result(
job_name=deployment.job_name,
job_version=deployment.job_version,
)
finally:
metric_done_job_deployments.inc()


def _protect_job_overwriting(manifest: Manifest, force: bool):
Expand Down
9 changes: 8 additions & 1 deletion lifecycle/lifecycle/server/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@

from lifecycle.server.db_status import database_status

metric_requested_job_deployments = Counter('requested_job_deployments', 'Number of requests to deploy job')
metric_requested_job_deployments = Counter(
'requested_job_deployments',
'Number of started job deployments',
)
metric_done_job_deployments = Counter(
'done_job_deployments',
'Number of finished job deployments (processed or failed)',
)
metric_deployed_job = Counter('deployed_job', 'Number of Jobs deployed successfully')
metric_metrics_scrapes = Counter('metrics_scrapes', 'Number of Prometheus metrics scrapes')

Expand Down
73 changes: 73 additions & 0 deletions lifecycle/tests/sse/test_server_sent_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import asyncio

import backoff
import httpx
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

from racetrack_client.log.logs import configure_logs
from racetrack_commons.api.asgi.asgi_server import serve_asgi_in_background
from racetrack_commons.socket import free_tcp_port


def test_server_sent_events():
app = FastAPI()

def sse_generator():
for num in range(3):
yield f'data: {{"progress": {num}}}\n\n'

@app.get("/sse")
def sse_endpoint():
return StreamingResponse(sse_generator(), media_type="text/event-stream")

@app.get("/ready")
def ready_endpoint():
return

async def test_async():
configure_logs()
port = free_tcp_port()
with serve_asgi_in_background(app, port):
_wait_until_server_ready(port)
_test_sse_client_get(port)
_test_sse_client_stream(port)

asyncio.run(test_async())


def _test_sse_client_get(port: int):
response = httpx.get(f'http://127.0.0.1:{port}/sse')
assert response.status_code == 200
assert response.text == '''data: {"progress": 0}
data: {"progress": 1}
data: {"progress": 2}
'''


def _test_sse_client_stream(port: int):
lines = []
with httpx.Client(timeout=10) as client:
with client.stream('GET', f'http://127.0.0.1:{port}/sse') as stream_response:
for line in stream_response.iter_lines():
lines.append(line)

assert lines == [
'data: {"progress": 0}',
'',
'data: {"progress": 1}',
'',
'data: {"progress": 2}',
'',
]
stream_response.raise_for_status()
assert stream_response.status_code == 200


@backoff.on_exception(backoff.fibo, httpx.RequestError, max_time=5, jitter=None)
def _wait_until_server_ready(port: int):
response = httpx.get(f'http://127.0.0.1:{port}/ready')
assert response.status_code == 200
36 changes: 31 additions & 5 deletions racetrack_commons/racetrack_commons/api/asgi/access_log.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time

from fastapi import FastAPI, Request, Response
from starlette.types import ASGIApp, Receive, Scope, Send

from racetrack_client.log.logs import get_logger
from racetrack_commons.api.asgi.asgi_server import HIDDEN_ACCESS_LOGS
Expand All @@ -27,10 +28,28 @@ def enable_request_access_log(fastapi_app: FastAPI):
tracing_header = get_tracing_header_name()
caller_header = get_caller_header_name()

@fastapi_app.middleware('http')
async def access_log_on_receive(request: Request, call_next) -> Response:
tracing_id = request.headers.get(tracing_header)
caller_name = request.headers.get(caller_header)
fastapi_app.add_middleware(RequestAccessLogMiddleware, tracing_header=tracing_header, caller_header=caller_header)


class RequestAccessLogMiddleware:
def __init__(
self,
app: ASGIApp,
tracing_header: str = '',
caller_header: str = '',
) -> None:
self.app: ASGIApp = app
self.tracing_header: str = tracing_header
self.caller_header: str = caller_header

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return

request = Request(scope=scope)
tracing_id = request.headers.get(self.tracing_header)
caller_name = request.headers.get(self.caller_header)
uri = request.url.replace(scheme='', netloc='')
request_logger = RequestTracingLogger(logger, {
'tracing_id': tracing_id,
Expand All @@ -39,7 +58,8 @@ async def access_log_on_receive(request: Request, call_next) -> Response:
message = f'{request.method} {uri}'
if message not in HIDDEN_REQUEST_LOGS:
request_logger.debug(f'Request: {message}')
return await call_next(request)

await self.app(scope, receive, send)


def enable_response_access_log(fastapi_app: FastAPI):
Expand Down Expand Up @@ -68,6 +88,12 @@ async def access_log(request: Request, call_next) -> Response:
metric_request_duration.observe(time.time() - start_time)
metric_requests_done.inc()

if await request.is_disconnected():
method = request.method
uri = request.url.replace(scheme='', netloc='')
logger.error(f"Request cancelled by the client: {method} {uri}")
return Response(status_code=204) # No Content

method = request.method
uri = request.url.replace(scheme='', netloc='')
response_code = response.status_code
Expand Down
42 changes: 35 additions & 7 deletions racetrack_commons/racetrack_commons/api/asgi/error_handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations
import json
import sys
from typing import Any

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from starlette.types import ASGIApp, Receive, Scope, Send

from racetrack_client.log.errors import EntityNotFound, AlreadyExists, ValidationError
from racetrack_commons.api.metrics import metric_internal_server_errors
Expand Down Expand Up @@ -67,18 +70,43 @@ def default_error_handler(request: Request, error: Exception):
content={'error': error_message, 'type': error_type},
)

@api.middleware('http')
async def catch_all_exceptions_middleware(request: Request, call_next):
api.add_middleware(ErrorHandlerMiddleware)


class ErrorHandlerMiddleware:
def __init__(
self,
app: ASGIApp,
) -> None:
self.app: ASGIApp = app

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
try:
return await call_next(request)
await self.app(scope, receive, send)
except BaseException as error:
metric_internal_server_errors.inc()
request = Request(scope=scope)
log_request_exception_with_tracing(request, error)
error_message, error_type = _upack_error_message(error)
return JSONResponse(
status_code=500,
content={'error': error_message, 'type': error_type},
)
json_content = {'error': error_message, 'type': error_type}
await send_json_content(send, 500, json_content)


async def send_json_content(send: Send, status_code: int, json_content: Any):
await send({
"type": "http.response.start",
"status": status_code,
"headers": [
[b"content-type", b"application/json"],
],
})
await send({
"type": "http.response.body",
"body": json.dumps(json_content).encode(),
})


def _upack_error_message(e: BaseException) -> tuple[str, str]:
Expand Down

0 comments on commit 5cfdd90

Please sign in to comment.