Skip to content

Commit

Permalink
Move SSE client and server to commons
Browse files Browse the repository at this point in the history
  • Loading branch information
iszulcdeepsense committed May 13, 2024
1 parent 3cc9296 commit 40bfcb3
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 61 deletions.
52 changes: 14 additions & 38 deletions image_builder/image_builder/api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import json
import queue
from threading import Thread
from typing import Any, Dict, List, Optional

from fastapi import APIRouter, FastAPI
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.responses import JSONResponse
from pydantic import BaseModel, ConfigDict, Field

from image_builder.config import Config
Expand All @@ -18,6 +15,7 @@
from racetrack_commons.api.asgi.asgi_server import serve_asgi_app
from racetrack_commons.api.asgi.fastapi import create_fastapi
from racetrack_commons.api.metrics import setup_metrics_endpoint
from racetrack_commons.api.server_sent_events import stream_result_with_heartbeat
from racetrack_commons.plugin.engine import PluginEngine
from racetrack_client.log.logs import get_logger

Expand Down Expand Up @@ -164,37 +162,15 @@ def _build_server_sent_events(payload: BuildPayloadModel):
deployment_id = payload.deployment_id
build_flags = payload.build_flags

result_channel = queue.Queue(maxsize=0)

def _runner():
try:
image_names, logs, error = build_job_image(
config, manifest, git_credentials, secret_build_env, tag,
build_context, deployment_id, plugin_engine, build_flags,
)
result_channel.put(json.dumps({
'result': {
'image_names': image_names,
'logs': logs,
'error': error,
},
}))
except BaseException as e:
result_channel.put(json.dumps({
'error': str(e),
}))

Thread(target=_runner, daemon=True).start()

def sse_generator():
while True:
try:
event: str = result_channel.get(block=True, timeout=60)
yield f'event: result\ndata: {event}\n\n'
result_channel.task_done()
logger.debug('server sent events streaming done')
return
except queue.Empty:
yield f'event: keepalive_heartbeat\n\n'

return StreamingResponse(sse_generator(), media_type="text/event-stream")
def _result_runner() -> Dict:
image_names, logs, error = build_job_image(
config, manifest, git_credentials, secret_build_env, tag,
build_context, deployment_id, plugin_engine, build_flags,
)
return {
'image_names': image_names,
'logs': logs,
'error': error,
}

return stream_result_with_heartbeat(_result_runner)
22 changes: 4 additions & 18 deletions lifecycle/lifecycle/deployer/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import Optional, Dict

import backoff
import httpx

from lifecycle.config import Config
from lifecycle.infrastructure.infra_target import determine_infrastructure_name
Expand All @@ -21,7 +20,7 @@
from racetrack_commons.deploy.image import get_job_image
from racetrack_commons.entities.dto import DeploymentDto, DeploymentStatus
from racetrack_commons.plugin.engine import PluginEngine
from racetrack_commons.api.server_sent_events import validate_streaming_response, extract_response_dict
from racetrack_commons.api.server_sent_events import make_sse_request

logger = get_logger(__name__)

Expand Down Expand Up @@ -58,26 +57,13 @@ def _send_image_build_request(
logger.info(f'building a job by image-builder, deployment ID: {deployment.id}')
# see `image_builder.api._setup_api_endpoints`
payload = _build_image_request_payload(manifest, git_credentials, secret_build_env, tag, build_context, deployment, build_flags)
response_buffer = ''
with httpx.Client(timeout=3600) as client:
with client.stream('POST', f'{config.image_builder_url}/api/v1/build/sse', json=payload) as stream_response:
for line in stream_response.iter_lines():
if line.strip() == 'event: keepalive_heartbeat':
logger.debug(f'building in progress: keepalive heartbeat for deployment {deployment.id}')
else:
response_buffer += line + '\n'

result: Dict = make_sse_request('POST', f'{config.image_builder_url}/api/v1/build/sse', payload)
logger.debug(f'image-builder finished building a job, deployment ID: {deployment.id}')
validate_streaming_response(stream_response)
response = extract_response_dict(response_buffer)
error = response.get('error')
if error:
raise RuntimeError(f'image-builder error: {error}')
build_logs: str = response['result']['logs']
build_logs: str = result['logs']
image_name = get_job_image(config.docker_registry, config.docker_registry_namespace, manifest.name, tag)
save_deployment_build_logs(deployment.id, build_logs)
save_deployment_image_name(deployment.id, image_name)
error: str = response['result']['error']
error: str = result['error']
if error:
raise RuntimeError(error)
logger.info(f'job image {image_name} has been built, deployment ID: {deployment.id}')
Expand Down
72 changes: 67 additions & 5 deletions racetrack_commons/racetrack_commons/api/server_sent_events.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,75 @@
import json
from typing import Dict
from threading import Thread
from typing import Dict, Callable
import queue

from httpx import Response
import httpx
from fastapi.responses import StreamingResponse

RESULT_EVENT = 'event: result\n'
from racetrack_client.log.logs import get_logger

logger = get_logger(__name__)

EVENT_RESULT = 'event: result\n'
EVENT_HEARTBEAT = 'event: keepalive_heartbeat'
DATA_MESSAGE = 'data: '


def validate_streaming_response(response: Response):
def stream_result_with_heartbeat(result_runner: Callable[[], Dict]):
"""
Return result dict in SSE (Server-Sent Events) response, streaming heartbeat events to keep the connection alive
"""
result_channel = queue.Queue(maxsize=0)

def _runner():
try:
result = result_runner()
result_channel.put(json.dumps({
'result': result,
}))
except BaseException as e:
result_channel.put(json.dumps({
'error': str(e),
}))

Thread(target=_runner, daemon=True).start()

def sse_generator():
while True:
try:
event_data: str = result_channel.get(block=True, timeout=60)
yield f'{EVENT_RESULT}data: {event_data}\n\n'
result_channel.task_done()
return
except queue.Empty:
yield f'{EVENT_HEARTBEAT}\n\n'

return StreamingResponse(sse_generator(), media_type="text/event-stream")


def make_sse_request(
method: str,
url: str,
payload: Dict,
):
response_buffer = ''
with httpx.Client(timeout=3600) as client:
with client.stream(method.upper(), url, json=payload) as stream_response:
for line in stream_response.iter_lines():
if line.strip() == 'event: keepalive_heartbeat':
logger.debug(f'keepalive heartbeat for {method} {url}')
else:
response_buffer += line + '\n'

validate_streaming_response(stream_response)
response = extract_response_dict(response_buffer)
error = response.get('error')
if error:
raise RuntimeError(f'Streaming Response error: {error}')
return response['result']


def validate_streaming_response(response: httpx.Response):
if response.is_success:
return
message = f'HTTP error "{response.status_code} {response.reason_phrase}" ' \
Expand All @@ -16,7 +78,7 @@ def validate_streaming_response(response: Response):


def extract_response_dict(response_text: str) -> Dict:
prefix = RESULT_EVENT + DATA_MESSAGE
prefix = EVENT_RESULT + DATA_MESSAGE
last_occurrence = response_text.find(prefix)
if last_occurrence == -1:
raise ValueError('could not find result event in the SSE response')
Expand Down

0 comments on commit 40bfcb3

Please sign in to comment.