Skip to content

Commit

Permalink
[ML] AIOps: Reduce rerenders when streaming analysis results. (#182793)
Browse files Browse the repository at this point in the history
  • Loading branch information
walterra committed May 14, 2024
1 parent 382ee2d commit 7026199
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ export const ProgressControls: FC<PropsWithChildren<ProgressControlProps>> = (pr
runAnalysisDisabled = false,
} = props;

const progressOutput = Math.round(progress * 100);

const { euiTheme } = useEuiTheme();
const runningProgressBarStyles = useAnimatedProgressBarBackground(euiTheme.colors.success);
const analysisCompleteStyle = { display: 'none' };
Expand Down Expand Up @@ -147,7 +149,7 @@ export const ProgressControls: FC<PropsWithChildren<ProgressControlProps>> = (pr
data-test-subj="aiopsProgressTitleMessage"
id="xpack.aiops.progressTitle"
defaultMessage="Progress: {progress}% — {progressMessage}"
values={{ progress: Math.round(progress * 100), progressMessage }}
values={{ progress: progressOutput, progressMessage }}
/>
</EuiText>
</EuiFlexItem>
Expand All @@ -156,7 +158,7 @@ export const ProgressControls: FC<PropsWithChildren<ProgressControlProps>> = (pr
aria-label={i18n.translate('xpack.aiops.progressAriaLabel', {
defaultMessage: 'Progress',
})}
value={Math.round(progress * 100)}
value={progressOutput}
max={100}
size="m"
/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ export const initialState: StreamState = {

export function streamReducer(
state: StreamState,
action: AiopsLogRateAnalysisApiAction | AiopsLogRateAnalysisApiAction[]
action: AiopsLogRateAnalysisApiAction
): StreamState {
if (Array.isArray(action)) {
return action.reduce(streamReducer, state);
}

switch (action.type) {
case API_ACTION_NAME.ADD_SIGNIFICANT_ITEMS:
return { ...state, significantItems: [...state.significantItems, ...action.payload] };
Expand Down
6 changes: 4 additions & 2 deletions x-pack/packages/ml/response_stream/client/fetch_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export async function* fetchStream<B extends object, R extends Reducer<any, any>
body?: B,
ndjson = true,
headers?: HttpFetchOptions['headers']
): AsyncGenerator<[GeneratorError, ReducerAction<R> | Array<ReducerAction<R>> | undefined]> {
): AsyncGenerator<[GeneratorError, ReducerAction<R> | undefined]> {
let stream: Readonly<Response> | undefined;

try {
Expand Down Expand Up @@ -112,7 +112,9 @@ export async function* fetchStream<B extends object, R extends Reducer<any, any>
: parts
) as Array<ReducerAction<R>>;

yield [null, actions];
for (const action of actions) {
yield [null, action];
}
} catch (error) {
if (error.name !== 'AbortError') {
yield [error.toString(), undefined];
Expand Down
1 change: 1 addition & 0 deletions x-pack/packages/ml/response_stream/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
* 2.0.
*/

export { fetchStream } from './fetch_stream';
export { useFetchStream } from './use_fetch_stream';
12 changes: 3 additions & 9 deletions x-pack/packages/ml/response_stream/client/string_reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,14 @@

import type { Reducer, ReducerAction, ReducerState } from 'react';

type StringReducerPayload = string | string[] | undefined;
type StringReducerPayload = string | undefined;
export type StringReducer = Reducer<string, StringReducerPayload>;

/**
* The `stringReducer` is provided to handle plain string based streams with `streamFactory()`.
*
* @param state - The current state, being the string fetched so far.
* @param payload — The state update can be a plain string, an array of strings or `undefined`.
* * An array of strings will be joined without a delimiter and added to the current string.
* In combination with `useFetchStream`'s buffering this allows to do bulk updates
* within the reducer without triggering a React/DOM update on every stream chunk.
* * `undefined` can be used to reset the state to an empty string, for example, when a
* UI has the option to trigger a refetch of a stream.
*
* @param payload — The state update can be a plain string to be added or `undefined` to reset the state.
* @returns The updated state, a string that combines the previous string and the payload.
*/
export function stringReducer(
Expand All @@ -31,5 +25,5 @@ export function stringReducer(
return '';
}

return `${state}${Array.isArray(payload) ? payload.join('') : payload}`;
return `${state}${payload}`;
}
71 changes: 57 additions & 14 deletions x-pack/packages/ml/response_stream/client/use_fetch_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@

import {
useEffect,
useReducer,
useRef,
useState,
type Reducer,
type ReducerAction,
type ReducerState,
type ReducerAction,
} from 'react';
import useThrottle from 'react-use/lib/useThrottle';

import type { HttpSetup, HttpFetchOptions } from '@kbn/core/public';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';

import { fetchStream } from './fetch_stream';
import { stringReducer, type StringReducer } from './string_reducer';

const DATA_THROTTLE_MS = 100;

// This pattern with a dual ternary allows us to default to StringReducer
// and if a custom reducer is supplied fall back to that one instead.
// The complexity in here allows us to create a simpler API surface where
Expand Down Expand Up @@ -57,6 +57,7 @@ function isReducerOptions<T>(arg: unknown): arg is CustomReducer<T> {
* @param apiVersion Optional API version.
* @param body Optional API request body.
* @param customReducer Optional custom reducer and initial state.
* @param headers Optional headers.
* @returns An object with streaming data and methods to act on the stream.
*/
export function useFetchStream<B extends object, R extends Reducer<any, any>>(
Expand All @@ -75,11 +76,41 @@ export function useFetchStream<B extends object, R extends Reducer<any, any>>(
? customReducer
: ({ reducer: stringReducer, initialState: '' } as FetchStreamCustomReducer<R>);

const [data, dispatch] = useReducer(
reducerWithFallback.reducer,
reducerWithFallback.initialState
);
const dataThrottled = useThrottle(data, 100);
// We used `useReducer` in previous iterations of this hook, but it caused
// a lot of unnecessary re-renders even in combination with `useThrottle`.
// We're now using `dataRef` to allow updates outside of the render cycle.
// When the stream is running, we'll update `data` with the `dataRef` value
// periodically.
const [data, setData] = useState(reducerWithFallback.initialState);
const dataRef = useRef(reducerWithFallback.initialState);

// This effect is used to throttle the data updates while the stream is running.
// It will update the `data` state with the current `dataRef` value every 100ms.
useEffect(() => {
// We cannot check against `isRunning` in the `setTimeout` callback, because
// we would check against a stale value. Instead, we use a mutable
// object to keep track of the current state of the effect.
const effectState = { isActive: true };

if (isRunning) {
setData(dataRef.current);

function updateData() {
setTimeout(() => {
setData(dataRef.current);
if (effectState.isActive) {
updateData();
}
}, DATA_THROTTLE_MS);
}

updateData();
}

return () => {
effectState.isActive = false;
};
}, [isRunning]);

const abortCtrl = useRef(new AbortController());

Expand All @@ -99,7 +130,7 @@ export function useFetchStream<B extends object, R extends Reducer<any, any>>(

abortCtrl.current = new AbortController();

for await (const [fetchStreamError, actions] of fetchStream<B, CustomReducer<R>>(
for await (const [fetchStreamError, action] of fetchStream<B, CustomReducer<R>>(
http,
endpoint,
apiVersion,
Expand All @@ -110,14 +141,26 @@ export function useFetchStream<B extends object, R extends Reducer<any, any>>(
)) {
if (fetchStreamError !== null) {
addError(fetchStreamError);
} else if (Array.isArray(actions) && actions.length > 0) {
dispatch(actions as ReducerAction<CustomReducer<R>>);
} else if (action) {
dataRef.current = reducerWithFallback.reducer(dataRef.current, action) as ReducerState<
CustomReducer<R>
>;
}
}

setIsRunning(false);
};

// This custom dispatch function allows us to update the `dataRef` value and will
// then trigger an update of `data` right away as we don't want to have the
// throttling in place for these types of updates.
const dispatch = (action: ReducerAction<FetchStreamCustomReducer<R>['reducer']>) => {
dataRef.current = reducerWithFallback.reducer(dataRef.current, action) as ReducerState<
CustomReducer<R>
>;
setData(dataRef.current);
};

const cancel = () => {
abortCtrl.current.abort();
setIsCancelled(true);
Expand All @@ -131,10 +174,10 @@ export function useFetchStream<B extends object, R extends Reducer<any, any>>(

return {
cancel,
// To avoid a race condition where the stream already ended but `useThrottle` would
// yet have to trigger another update within the throttling interval, we'll return
// To avoid a race condition where the stream already ended but the throttling would
// yet have to trigger another update within the interval, we'll return
// the unthrottled data once the stream is complete.
data: isRunning ? dataThrottled : data,
data: isRunning ? data : dataRef.current,
dispatch,
errors,
isCancelled,
Expand Down
1 change: 1 addition & 0 deletions x-pack/packages/ml/response_stream/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
export {
streamFactory,
type StreamFactoryReturnType,
type StreamResponseWithHeaders,
type UncompressedResponseStream,
} from './stream_factory';
50 changes: 20 additions & 30 deletions x-pack/packages/ml/response_stream/server/stream_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,25 @@ export class UncompressedResponseStream extends Stream.PassThrough {}

const DELIMITER = '\n';

type StreamType = 'string' | 'ndjson';
type StreamTypeUnion = string | object;
type StreamType<T extends StreamTypeUnion> = T extends string
? string
: T extends object
? T
: never;

export interface StreamResponseWithHeaders {
body: zlib.Gzip | UncompressedResponseStream;
headers?: ResponseHeaders;
}

export interface StreamFactoryReturnType<T = unknown> {
export interface StreamFactoryReturnType<T extends StreamTypeUnion> {
DELIMITER: string;
end: () => void;
push: (d: T, drain?: boolean) => void;
responseWithHeaders: {
body: zlib.Gzip | UncompressedResponseStream;
headers?: ResponseHeaders;
};
push: (d: StreamType<T>, drain?: boolean) => void;
responseWithHeaders: StreamResponseWithHeaders;
}

/**
* Overload to set up a string based response stream with support
* for gzip compression depending on provided request headers.
*
* @param headers - Request headers.
* @param logger - Kibana logger.
* @param compressOverride - Optional flag to override header based compression setting.
* @param flushFix - Adds an attribute with a random string payload to overcome buffer flushing with certain proxy configurations.
*
* @returns An object with stream attributes and methods.
*/
export function streamFactory<T = string>(
headers: Headers,
logger: Logger,
compressOverride?: boolean,
flushFix?: boolean
): StreamFactoryReturnType<T>;
/**
* Sets up a response stream with support for gzip compression depending on provided
* request headers. Any non-string data pushed to the stream will be streamed as NDJSON.
Expand All @@ -66,13 +56,13 @@ export function streamFactory<T = string>(
*
* @returns An object with stream attributes and methods.
*/
export function streamFactory<T = unknown>(
export function streamFactory<T extends StreamTypeUnion>(
headers: Headers,
logger: Logger,
compressOverride: boolean = true,
flushFix: boolean = false
): StreamFactoryReturnType<T> {
let streamType: StreamType;
let streamType: 'string' | 'ndjson';
const isCompressed = compressOverride && acceptCompression(headers);
const flushPayload = flushFix
? crypto.randomBytes(FLUSH_PAYLOAD_SIZE).toString('hex')
Expand All @@ -82,7 +72,7 @@ export function streamFactory<T = unknown>(
const stream = isCompressed ? zlib.createGzip() : new UncompressedResponseStream();

// If waiting for draining of the stream, items will be added to this buffer.
const backPressureBuffer: T[] = [];
const backPressureBuffer: Array<StreamType<T>> = [];

// Flag will be set when the "drain" listener is active so we can avoid setting multiple listeners.
let waitForDrain = false;
Expand Down Expand Up @@ -120,7 +110,7 @@ export function streamFactory<T = unknown>(
}
}

function push(d: T, drain = false) {
function push(d: StreamType<T>, drain = false) {
logDebugMessage(
`Push to stream. Current backPressure buffer size: ${backPressureBuffer.length}, drain flag: ${drain}`
);
Expand All @@ -144,7 +134,7 @@ export function streamFactory<T = unknown>(
function repeat() {
if (!tryToEnd) {
if (responseSizeSinceLastKeepAlive < FLUSH_PAYLOAD_SIZE) {
push({ flushPayload } as unknown as T);
push({ flushPayload, type: 'flushPayload' } as StreamType<T>);
}
responseSizeSinceLastKeepAlive = 0;
setTimeout(repeat, FLUSH_KEEP_ALIVE_INTERVAL_MS);
Expand Down Expand Up @@ -222,7 +212,7 @@ export function streamFactory<T = unknown>(
}
}

const responseWithHeaders: StreamFactoryReturnType['responseWithHeaders'] = {
const responseWithHeaders: StreamResponseWithHeaders = {
body: stream,
headers: {
...(isCompressed ? { 'content-encoding': 'gzip' } : {}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { KibanaRequest, ResponseHeaders } from '@kbn/core-http-server';
import type { LangChainTracer } from '@langchain/core/tracers/tracer_langchain';
import type { AnalyticsServiceSetup } from '@kbn/core-analytics-server';
import { ExecuteConnectorRequestBody, Message, Replacements } from '@kbn/elastic-assistant-common';
import { StreamFactoryReturnType } from '@kbn/ml-response-stream/server';
import { StreamResponseWithHeaders } from '@kbn/ml-response-stream/server';
import { AnonymizationFieldResponse } from '@kbn/elastic-assistant-common/impl/schemas/anonymization_fields/bulk_crud_anonymization_fields_route.gen';
import { ResponseBody } from '../types';
import type { AssistantTool } from '../../../types';
Expand Down Expand Up @@ -51,7 +51,7 @@ export interface StaticReturnType {
headers: ResponseHeaders;
}
export type AgentExecutorResponse<T extends boolean> = T extends true
? StreamFactoryReturnType['responseWithHeaders']
? StreamResponseWithHeaders
: StaticReturnType;

export type AgentExecutor<T extends boolean> = (
Expand Down
Loading

0 comments on commit 7026199

Please sign in to comment.