Skip to content

Commit

Permalink
Use server sent events to keep connection alive
Browse files Browse the repository at this point in the history
  • Loading branch information
iszulcdeepsense committed May 10, 2024
1 parent a63780e commit 3cc9296
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 11 deletions.
1 change: 0 additions & 1 deletion dashboard/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
httpx==0.24.0
markdown==3.4.1
54 changes: 53 additions & 1 deletion image_builder/image_builder/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import json
import queue
from threading import Thread
from typing import Any, Dict, List, Optional

from fastapi import APIRouter, FastAPI
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, ConfigDict, Field

from image_builder.config import Config
Expand All @@ -16,6 +19,9 @@
from racetrack_commons.api.asgi.fastapi import create_fastapi
from racetrack_commons.api.metrics import setup_metrics_endpoint
from racetrack_commons.plugin.engine import PluginEngine
from racetrack_client.log.logs import get_logger

logger = get_logger(__name__)


def run_api_server():
Expand Down Expand Up @@ -146,3 +152,49 @@ def _build(payload: BuildPayloadModel):
'logs': logs,
'error': error,
}

@api.post('/build/sse', response_model=BuildingResultModel)
def _build_server_sent_events(payload: BuildPayloadModel):
"""Stream events of building a Job image from a manifest"""
manifest = load_manifest_from_dict(payload.manifest)
git_credentials = load_credentials_from_dict(payload.git_credentials.model_dump() if payload.git_credentials else None)
tag = payload.tag
secret_build_env = payload.secret_build_env or {}
build_context = payload.build_context
deployment_id = payload.deployment_id
build_flags = payload.build_flags

result_channel = queue.Queue(maxsize=0)

def _runner():
try:
image_names, logs, error = build_job_image(
config, manifest, git_credentials, secret_build_env, tag,
build_context, deployment_id, plugin_engine, build_flags,
)
result_channel.put(json.dumps({
'result': {
'image_names': image_names,
'logs': logs,
'error': error,
},
}))
except BaseException as e:
result_channel.put(json.dumps({
'error': str(e),
}))

Thread(target=_runner, daemon=True).start()

def sse_generator():
while True:
try:
event: str = result_channel.get(block=True, timeout=60)
yield f'event: result\ndata: {event}\n\n'
result_channel.task_done()
logger.debug('server sent events streaming done')
return
except queue.Empty:
yield f'event: keepalive_heartbeat\n\n'

return StreamingResponse(sse_generator(), media_type="text/event-stream")
29 changes: 20 additions & 9 deletions lifecycle/lifecycle/deployer/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Optional, Dict

import backoff
import httpx

from lifecycle.config import Config
from lifecycle.infrastructure.infra_target import determine_infrastructure_name
Expand All @@ -10,7 +11,7 @@
from lifecycle.job.deployment import create_deployment, save_deployment_build_logs, save_deployment_image_name, save_deployment_result, save_deployment_phase
from racetrack_client.client_config.client_config import Credentials
from racetrack_client.client.env import SecretVars
from racetrack_client.utils.request import parse_response_object, Requests, RequestError
from racetrack_client.utils.request import Requests, RequestError
from racetrack_client.utils.datamodel import datamodel_to_dict
from racetrack_client.utils.time import now
from racetrack_client.log.context_error import wrap_context
Expand All @@ -20,6 +21,7 @@
from racetrack_commons.deploy.image import get_job_image
from racetrack_commons.entities.dto import DeploymentDto, DeploymentStatus
from racetrack_commons.plugin.engine import PluginEngine
from racetrack_commons.api.server_sent_events import validate_streaming_response, extract_response_dict

logger = get_logger(__name__)

Expand Down Expand Up @@ -55,18 +57,27 @@ def _send_image_build_request(
"""
logger.info(f'building a job by image-builder, deployment ID: {deployment.id}')
# see `image_builder.api._setup_api_endpoints`
r = Requests.post(
f'{config.image_builder_url}/api/v1/build',
json=_build_image_request_payload(manifest, git_credentials, secret_build_env, tag, build_context, deployment, build_flags),
timeout=3600,
)
payload = _build_image_request_payload(manifest, git_credentials, secret_build_env, tag, build_context, deployment, build_flags)
response_buffer = ''
with httpx.Client(timeout=3600) as client:
with client.stream('POST', f'{config.image_builder_url}/api/v1/build/sse', json=payload) as stream_response:
for line in stream_response.iter_lines():
if line.strip() == 'event: keepalive_heartbeat':
logger.debug(f'building in progress: keepalive heartbeat for deployment {deployment.id}')
else:
response_buffer += line + '\n'

logger.debug(f'image-builder finished building a job, deployment ID: {deployment.id}')
response = parse_response_object(r, 'Image builder API error')
build_logs: str = response['logs']
validate_streaming_response(stream_response)
response = extract_response_dict(response_buffer)
error = response.get('error')
if error:
raise RuntimeError(f'image-builder error: {error}')
build_logs: str = response['result']['logs']
image_name = get_job_image(config.docker_registry, config.docker_registry_namespace, manifest.name, tag)
save_deployment_build_logs(deployment.id, build_logs)
save_deployment_image_name(deployment.id, image_name)
error: str = response['error']
error: str = response['result']['error']
if error:
raise RuntimeError(error)
logger.info(f'job image {image_name} has been built, deployment ID: {deployment.id}')
Expand Down
26 changes: 26 additions & 0 deletions racetrack_commons/racetrack_commons/api/server_sent_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import json
from typing import Dict

from httpx import Response

RESULT_EVENT = 'event: result\n'
DATA_MESSAGE = 'data: '


def validate_streaming_response(response: Response):
if response.is_success:
return
message = f'HTTP error "{response.status_code} {response.reason_phrase}" ' \
f'for url {response.request.method} {response.url}'
raise RuntimeError(message)


def extract_response_dict(response_text: str) -> Dict:
prefix = RESULT_EVENT + DATA_MESSAGE
last_occurrence = response_text.find(prefix)
if last_occurrence == -1:
raise ValueError('could not find result event in the SSE response')

remainder = response_text[last_occurrence + len(prefix):]
json_dict = json.loads(remainder)
return json_dict

0 comments on commit 3cc9296

Please sign in to comment.