Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streamer wrapper function in Kraken to avoid errors being thrown after Stream Response is open #2601

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/frontend/src/components/cloud/CloudTrayMenu.vue
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ export default Vue.extend({

this.pull_tracker = new PullTracker(
() => console.log('Major Tom Install Ready'),
() => console.error('Major Tom Install Error'),
(error: string) => {
this.setOperationError('Failed to install Major Tom.', error)
},
)

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ export default Vue.extend({
tag,
},
onDownloadProgress: (progressEvent) => {
tracker.digestNewData(progressEvent)
tracker.digestNewData(progressEvent, false)
this.pull_output = tracker.pull_output
this.download_percentage = tracker.download_percentage
this.extraction_percentage = tracker.extraction_percentage
Expand Down
29 changes: 26 additions & 3 deletions core/frontend/src/utils/pull_tracker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Dictionary } from '@/types/common'
import { aggregateStreamingResponse, parseStreamingResponse } from '@/utils/streaming'

class PullTracker {
private layer_status: Dictionary<string> = {}
Expand Down Expand Up @@ -80,13 +81,12 @@ class PullTracker {
this.extraction_percentage = extraction_current / extraction_total / 0.01
}

digestNewData(progressEvent: {currentTarget: { response: string}}): void {
digestStreamFragment(fragment: string): void {
// dataChunk contains the data that have been obtained so far (the whole data so far)..
// The received data is descbribed at
// https://docker-py.readthedocs.io/en/stable/api.html#docker.api.image.ImageApiMixin.pull
const dataChunk = progressEvent?.currentTarget?.response
// As the data consists of multiple jsons, the following like is a hack to split them
const dataList = (this.left_over_data + dataChunk.replaceAll('}{', '}\n\n{')).split('\n\n')
const dataList = (this.left_over_data + fragment.replaceAll('}{', '}\n\n{')).split('\n\n')
this.left_over_data = ''

for (const line of dataList) {
Expand Down Expand Up @@ -145,5 +145,28 @@ class PullTracker {

this.updateSimplifiedProgress()
}

digestNewData(progressEvent: {currentTarget: { response: string }}, parseFragments = true): void {
let buffer = progressEvent?.currentTarget?.response

if (parseFragments) {
const result = aggregateStreamingResponse(
parseStreamingResponse(progressEvent?.currentTarget?.response),
(fragment) => {
this.onerror(fragment.error ?? `Unknown error with status ${fragment.status}`)
/** Stops aggregation */
return false
},
)

if (result === undefined) {
return
}

buffer = result
}

this.digestStreamFragment(buffer)
}
}
export default PullTracker
84 changes: 84 additions & 0 deletions core/frontend/src/utils/streaming.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
export interface StreamingResponse {
fragment: number
status: number
data?: string
error?: string
}

export type ConditionalHandlerFn = (fragment: StreamingResponse, buffer: string) => boolean

/**
* Parse a string data into an array of streaming response fragments
* @param {string} data Data to be parsed as string
* @param {boolean} decode If true, decodes the data from base64 to string
* @returns {StreamingResponse[]} Array of streaming response fragments
*/
export function parseStreamingResponse(data: string, decode = true): StreamingResponse[] {
const fragments: StreamingResponse[] = []

// In the backend we use this as split identifier
// TODO: If the error includes this split identifier it will break the parsing
const lines = data.split('|\n\n|')
for (const line of lines) {
try {
const parsed = JSON.parse(line) as StreamingResponse

// Data can be a null value as well as initial fragment will have value 0
if ('fragment' in parsed && 'data' in parsed && 'status' in parsed) {
// As we also want to push the fragment if data is null because errors will have no data, so the decoding
// should be in this separated inner if
if (decode && parsed.data) {
parsed.data = atob(parsed.data)
}
fragments.push(parsed)
}
} catch (error) {
// If the error is a SyntaxError, it means that we reached curretn end of the stream
if (error instanceof SyntaxError) {
break
}
console.error('On decoding streaming response expected to get SyntaxError but got: ', error)
}
}

return fragments
}

function defaultInvalidHandler(fragment: StreamingResponse): boolean {
console.error('Invalid fragment in streaming response: ', fragment)

/** Stops aggregation */
return false
}

function defaultValidator(fragment: StreamingResponse): boolean {
return !(fragment.status >= 400 || fragment.error)
}

/**
* Aggregate streaming response fragments into a single string and validate them
* @param {StreamingResponse[]} fragments Array of streaming response fragments
* @param {ConditionalHandlerFn} onInvalid Handler for invalid fragments, must return a boolean to continue or stop
* aggregation, by default it logs the invalid fragment and stops aggregation, when aggregation is stopped it returns
* undefined
* @param {ConditionalHandlerFn} validator Validator for fragments, decides if the fragment is valid or not by default
* it checks if the status is greater than 400 or if there is an error
* @returns {string | undefined} Aggregated string or undefined if the aggregation was stopped
*/
export function aggregateStreamingResponse(
fragments: StreamingResponse[],
onInvalid: ConditionalHandlerFn = defaultInvalidHandler,
validator: ConditionalHandlerFn = defaultValidator,
): string | undefined {
let buffer = ''
for (const fragment of fragments) {
if (!validator(fragment, buffer)) {
if (!onInvalid(fragment, buffer)) {
return undefined
}
}
buffer += fragment.data ?? ''
}

return buffer
}
25 changes: 22 additions & 3 deletions core/frontend/src/views/ExtensionManagerView.vue
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ import { Dictionary } from '@/types/common'
import { kraken_service } from '@/types/frontend_services'
import back_axios from '@/utils/api'
import PullTracker from '@/utils/pull_tracker'
import { aggregateStreamingResponse, parseStreamingResponse } from '@/utils/streaming'

import {
ExtensionData, InstalledExtensionData, RunningContainer, Version,
Expand Down Expand Up @@ -492,8 +493,26 @@ export default Vue.extend({
container_name: this.getContainerName(extension),
},
onDownloadProgress: (progressEvent) => {
const chunk = progressEvent.currentTarget.response
this.$set(this, 'log_output', ansi.ansi_to_html(chunk))
const result = aggregateStreamingResponse(
parseStreamingResponse(progressEvent.currentTarget.response),
(fragment, buffer) => {
// If no logs are available kraken will wait till timeout and stop the stream
if (fragment.status === 408) {
if (!buffer) {
this.$set(this, 'log_output', ansi.ansi_to_html('No Logs available'))
}
} else {
notifier.pushBackError('EXTENSIONS_LOG_FETCH_FAIL', fragment.error)
}

/** Only stops if buffer is empty */
return Boolean(buffer)
},
)

if (result) {
this.$set(this, 'log_output', ansi.ansi_to_html(result))
}
this.show_log = true
this.setLoading(extension, false)
this.$nextTick(() => {
Expand All @@ -507,7 +526,7 @@ export default Vue.extend({
output.scrollTop = output.scrollHeight
})
},
timeout: 30000,
timeout: 35000,
})
.then(() => {
this.setLoading(extension, false)
Expand Down
103 changes: 103 additions & 0 deletions core/libs/commonwealth/commonwealth/utils/streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import asyncio
import base64
import json
from dataclasses import asdict, dataclass
from typing import AsyncGenerator, Optional, Tuple

from fastapi import status

from commonwealth.utils.apis import StackedHTTPException


@dataclass
class StreamingResponse:
fragment: int
status: int
data: Optional[str] = None
error: Optional[str] = None


def response_line(response: StreamingResponse) -> str:
return json.dumps(asdict(response)) + "|\n\n|"


def streaming_timeout_exception(fragment: int) -> str:
return response_line(
StreamingResponse(fragment=fragment, data=None, status=status.HTTP_408_REQUEST_TIMEOUT, error="Timeout reached")
)


def streaming_error_exception(fragment: int, error: Exception) -> str:
return response_line(
StreamingResponse(fragment=fragment, data=None, status=status.HTTP_500_INTERNAL_SERVER_ERROR, error=str(error))
)


def streaming_stack_exception(fragment: int, error: StackedHTTPException) -> str:
return response_line(StreamingResponse(fragment=fragment, data=None, status=error.status_code, error=error.detail))


def streaming_response(fragment: int, data: str | bytes) -> str:
buffer = data.encode("utf-8") if isinstance(data, str) else data
data_encoded = base64.b64encode(buffer).decode()
return response_line(StreamingResponse(fragment=fragment, data=data_encoded, status=status.HTTP_200_OK))


async def streamer(gen: AsyncGenerator[str | bytes, None]) -> AsyncGenerator[str, None]:
"""
Streamer wrapper for async generators and provide a consistent response format
with error handling. Data is encoded in base64 to avoid any new line jsons conflicts.
"""

fragment = 0
try:
async for data in gen:
yield streaming_response(fragment, data)
fragment += 1
except StackedHTTPException as e:
yield streaming_stack_exception(fragment, e)
except Exception as e:
yield streaming_error_exception(fragment, e)


async def _fetch_stream(
gen: AsyncGenerator[str | bytes, None], queue: asyncio.Queue[Optional[Tuple[str | bytes | None, Exception | None]]]
) -> None:
try:
async for data in gen:
await queue.put((data, None))
except Exception as e:
await queue.put((None, e))
finally:
await queue.put((None, None))


async def timeout_streamer(gen: AsyncGenerator[str | bytes, None], timeout: int = 30) -> AsyncGenerator[str, None]:
"""
Streamer wrapper for async generators and provide a consistent response format
with error handling with additional timeout limit for each item iteration.
Data is encoded in base64 to avoid any new line jsons conflicts.
"""

queue: asyncio.Queue[Optional[Tuple[str | bytes | None, Exception | None]]] = asyncio.Queue()
task = asyncio.create_task(_fetch_stream(gen, queue))

fragment = 0
try:
while True:
item = await asyncio.wait_for(queue.get(), timeout=timeout)
data, error = item if item else (None, None)
if error:
raise error
if data is None:
break
yield streaming_response(fragment, data)
fragment += 1
except asyncio.TimeoutError:
yield streaming_timeout_exception(fragment)
except StackedHTTPException as e:
yield streaming_stack_exception(fragment, e)
except Exception as e:
yield streaming_error_exception(fragment, e)
finally:
task.cancel()
6 changes: 1 addition & 5 deletions core/services/kraken/kraken.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,12 @@ async def list_containers(self) -> List[DockerContainer]:
containers: List[DockerContainer] = await self.client.containers.list(filter='{"status": ["running"]}') # type: ignore
return containers

async def stream_logs(self, container_name: str, timeout: int = 30) -> AsyncGenerator[str, None]:
async def stream_logs(self, container_name: str) -> AsyncGenerator[str, None]:
containers = await self.client.containers.list(filters={"name": {container_name: True}}) # type: ignore
if not containers:
raise RuntimeError(f"Container not found: {container_name}")

start_time = asyncio.get_event_loop().time()
async for log_line in containers[0].log(stdout=True, stderr=True, follow=True, stream=True):
elapsed_time = asyncio.get_event_loop().time() - start_time
if elapsed_time > timeout:
break
yield log_line
logger.info(f"Finished streaming logs for {container_name}")

Expand Down
7 changes: 4 additions & 3 deletions core/services/kraken/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from commonwealth.utils.apis import GenericErrorHandlingRoute
from commonwealth.utils.logs import InterceptHandler, init_logger
from commonwealth.utils.streaming import streamer, timeout_streamer
from fastapi import FastAPI, HTTPException, status
from fastapi.responses import HTMLResponse, PlainTextResponse, StreamingResponse
from fastapi_versioning import VersionedFastAPI, version
Expand Down Expand Up @@ -91,13 +92,13 @@ async def install_extension(extension: Extension) -> Any:
status_code=status.HTTP_400_BAD_REQUEST,
detail="Extension is not compatible with the current machine running BlueOS.",
)
return StreamingResponse(kraken.install_extension(extension, compatible_digest))
return StreamingResponse(streamer(kraken.install_extension(extension, compatible_digest)))


@app.post("/extension/update_to_version", status_code=status.HTTP_201_CREATED)
@version(1, 0)
async def update_extension(extension_identifier: str, new_version: str) -> Any:
return StreamingResponse(kraken.update_extension_to_version(extension_identifier, new_version))
return StreamingResponse(streamer(kraken.update_extension_to_version(extension_identifier, new_version)))


@app.post("/extension/uninstall", status_code=status.HTTP_200_OK)
Expand Down Expand Up @@ -142,7 +143,7 @@ async def list_containers() -> Any:
@app.get("/log", status_code=status.HTTP_200_OK, response_class=PlainTextResponse)
@version(1, 0)
async def log_containers(container_name: str) -> Iterable[bytes]:
return StreamingResponse(kraken.stream_logs(container_name), media_type="text/plain") # type: ignore
return StreamingResponse(timeout_streamer(kraken.stream_logs(container_name)), media_type="text/plain") # type: ignore


@app.get("/stats", status_code=status.HTTP_200_OK)
Expand Down
Loading