From 5345e34ddc0159a8a9f15f01107fe0fe203a8704 Mon Sep 17 00:00:00 2001 From: Walter Rafelsberger Date: Wed, 22 May 2024 16:51:36 +0200 Subject: [PATCH] [ML] Adds redux toolkit example for `response_stream` to developer examples. (#182690) ## Summary Follow up to #132590. Part of #181111. This updates the developer examples for `@kbn/ml-response-stream` to include a variant with a full Redux Toolkit setup. For this case, the `@kbn/ml-response-stream` now includes a generic slice `streamSlice` that can be used. This allows the actions created to be streamed via NDJSON to be shared across server and client. Functional tests for the examples were added too. To run these tests you can use the following commands: ``` # Start the test server (can continue running) node scripts/functional_tests_server.js --config test/examples/config.js # Start a test run node scripts/functional_test_runner.js --config test/examples/config.js ``` ### Checklist - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios - [x] This was checked for breaking API changes and was [labeled appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process) --- examples/response_stream/README.md | 21 ++- examples/response_stream/common/api/index.ts | 1 + .../common/api/reducer_stream/index.ts | 68 -------- .../common/api/reducer_stream/reducer.ts | 25 +-- .../api/reducer_stream/reducer_actions.ts | 75 +++++++++ .../common/api/redux_stream/data_slice.ts | 46 +++++ .../common/api/redux_stream/options_slice.ts | 37 ++++ .../common/api/stream_state.ts | 19 +++ .../public/components/page.tsx | 2 +- .../app/components/bar_chart_race.tsx | 55 ++++++ .../get_status_message.tsx | 0 .../app/pages/page_reducer_stream/index.tsx | 81 ++++----- .../app/pages/page_redux_stream/hooks.ts | 16 ++ .../app/pages/page_redux_stream/index.tsx | 158 ++++++++++++++++++ .../app/pages/page_redux_stream/store.tsx | 36 ++++ .../pages/page_simple_string_stream/index.tsx | 10 +- examples/response_stream/public/routes.tsx | 15 +- examples/response_stream/server/plugin.ts | 7 +- .../response_stream/server/routes/index.ts | 1 + .../server/routes/reducer_stream.ts | 59 +++---- .../server/routes/redux_stream.ts | 126 ++++++++++++++ .../response_stream/server/routes/shared.ts | 29 ++++ test/examples/config.js | 1 + test/examples/response_stream/index.ts | 12 +- .../response_stream/reducer_stream.ts | 101 ++++------- test/examples/response_stream/redux_stream.ts | 52 ++++++ .../examples/response_stream/string_stream.ts | 46 +++++ .../ml/response_stream/client/constants.ts | 8 + .../ml/response_stream/client/index.ts | 1 + .../ml/response_stream/client/stream_slice.ts | 148 ++++++++++++++++ .../client/use_fetch_stream.ts | 6 +- 31 files changed, 1000 insertions(+), 262 deletions(-) create mode 100644 examples/response_stream/common/api/reducer_stream/reducer_actions.ts create mode 100644 examples/response_stream/common/api/redux_stream/data_slice.ts create mode 100644 examples/response_stream/common/api/redux_stream/options_slice.ts create mode 100644 examples/response_stream/common/api/stream_state.ts create mode 100644 examples/response_stream/public/containers/app/components/bar_chart_race.tsx rename examples/response_stream/public/containers/app/{pages/page_reducer_stream => components}/get_status_message.tsx (100%) create mode 100644 examples/response_stream/public/containers/app/pages/page_redux_stream/hooks.ts create mode 100644 examples/response_stream/public/containers/app/pages/page_redux_stream/index.tsx create mode 100644 examples/response_stream/public/containers/app/pages/page_redux_stream/store.tsx create mode 100644 examples/response_stream/server/routes/redux_stream.ts create mode 100644 examples/response_stream/server/routes/shared.ts create mode 100644 test/examples/response_stream/redux_stream.ts create mode 100644 test/examples/response_stream/string_stream.ts create mode 100644 x-pack/packages/ml/response_stream/client/constants.ts create mode 100644 x-pack/packages/ml/response_stream/client/stream_slice.ts diff --git a/examples/response_stream/README.md b/examples/response_stream/README.md index 73c8a9161b2ce9..ad3124bee805ac 100644 --- a/examples/response_stream/README.md +++ b/examples/response_stream/README.md @@ -4,11 +4,15 @@ This plugin demonstrates how to stream chunks of data to the client with just a To run Kibana with the described examples, use `yarn start --run-examples`. -The `response_stream` plugin demonstrates API endpoints that can stream data chunks with a single request with gzip/compression support. gzip-streams get decompressed natively by browsers. The plugin demonstrates two use cases to get started: Streaming a raw string as well as a more complex example that streams Redux-like actions to the client which update React state via `useReducer()`. +The `response_stream` plugin demonstrates API endpoints that can stream data chunks with a single request with gzip/compression support. gzip-streams get decompressed natively by browsers. The plugin demonstrates some use cases to get you started: -Code in `@kbn/ml-response-stream` contains helpers to set up a stream on the server side (`streamFactory()`) and consume it on the client side via a custom hook (`useFetchStream()`). The utilities make use of TS generics in a way that allows to have type safety for both request related options as well as the returned data. +- Streaming just a raw string. +- Streaming NDJSON with "old-school" redux like actions and client side state managed with `useFetchStream()`. This uses React's own `useReducer()` under the hood. +- Streaming NDJSON with actions created via Redux Toolkit's `createSlice()` to a client with a full Redux Toolkit setup. -No additional third party libraries are used in the helpers to make it work. On the server, they integrate with `Hapi` and use node's own `gzip`. On the client, the custom hook abstracts away the necessary logic to consume the stream, internally it makes use of a generator function and `useReducer()` to update React state. +Code in `@kbn/ml-response-stream` contains helpers to set up a stream on the server side (`streamFactory()`) and consume it on the client side via a custom hook (`useFetchStream()`) or slice (`streamSlice()`). The utilities make use of TS generics in a way that allows to have type safety for both request related options as well as the returned data. + +Besides Redux Toolkit for its particular use case, no additional third party libraries are used in the helpers to make it work. On the server, they integrate with `Hapi` and use node's own `gzip`. On the client, the custom hook abstracts away the necessary logic to consume the stream, internally it makes use of a generator function and `useReducer()` to update React state. On the server, the simpler stream to send a string is set up like this: @@ -21,12 +25,7 @@ The request's headers get passed on to automatically identify if compression is On the client, the custom hook is used like this: ```ts -const { - errors, - start, - cancel, - data, - isRunning -} = useFetchStream('/internal/response_stream/simple_string_stream'); +const { errors, start, cancel, data, isRunning } = useFetchStream( + '/internal/response_stream/simple_string_stream' +); ``` - diff --git a/examples/response_stream/common/api/index.ts b/examples/response_stream/common/api/index.ts index 925ff844e3cfa1..c5ff141e270eb6 100644 --- a/examples/response_stream/common/api/index.ts +++ b/examples/response_stream/common/api/index.ts @@ -8,5 +8,6 @@ export const RESPONSE_STREAM_API_ENDPOINT = { REDUCER_STREAM: '/internal/response_stream/reducer_stream', + REDUX_STREAM: '/internal/response_stream/redux_stream', SIMPLE_STRING_STREAM: '/internal/response_stream/simple_string_stream', } as const; diff --git a/examples/response_stream/common/api/reducer_stream/index.ts b/examples/response_stream/common/api/reducer_stream/index.ts index 606834be2d0baa..cc5255761fbd61 100644 --- a/examples/response_stream/common/api/reducer_stream/index.ts +++ b/examples/response_stream/common/api/reducer_stream/index.ts @@ -9,71 +9,3 @@ export { reducerStreamReducer } from './reducer'; export { reducerStreamRequestBodySchema } from './request_body_schema'; export type { ReducerStreamRequestBodySchema } from './request_body_schema'; - -export const API_ACTION_NAME = { - UPDATE_PROGRESS: 'update_progress', - ADD_TO_ENTITY: 'add_to_entity', - DELETE_ENTITY: 'delete_entity', - ERROR: 'error', -} as const; -export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME]; - -interface ApiActionUpdateProgress { - type: typeof API_ACTION_NAME.UPDATE_PROGRESS; - payload: number; -} - -export function updateProgressAction(payload: number): ApiActionUpdateProgress { - return { - type: API_ACTION_NAME.UPDATE_PROGRESS, - payload, - }; -} - -interface ApiActionAddToEntity { - type: typeof API_ACTION_NAME.ADD_TO_ENTITY; - payload: { - entity: string; - value: number; - }; -} - -export function addToEntityAction(entity: string, value: number): ApiActionAddToEntity { - return { - type: API_ACTION_NAME.ADD_TO_ENTITY, - payload: { - entity, - value, - }, - }; -} - -interface ApiActionDeleteEntity { - type: typeof API_ACTION_NAME.DELETE_ENTITY; - payload: string; -} - -export function deleteEntityAction(payload: string): ApiActionDeleteEntity { - return { - type: API_ACTION_NAME.DELETE_ENTITY, - payload, - }; -} - -interface ApiActionError { - type: typeof API_ACTION_NAME.ERROR; - payload: string; -} - -export function errorAction(payload: string): ApiActionError { - return { - type: API_ACTION_NAME.ERROR, - payload, - }; -} - -export type ReducerStreamApiAction = - | ApiActionUpdateProgress - | ApiActionAddToEntity - | ApiActionDeleteEntity - | ApiActionError; diff --git a/examples/response_stream/common/api/reducer_stream/reducer.ts b/examples/response_stream/common/api/reducer_stream/reducer.ts index 9896e760cd75b5..a793ccf55489ad 100644 --- a/examples/response_stream/common/api/reducer_stream/reducer.ts +++ b/examples/response_stream/common/api/reducer_stream/reducer.ts @@ -6,24 +6,14 @@ * Side Public License, v 1. */ -import { ReducerStreamApiAction, API_ACTION_NAME } from '.'; +import { getInitialState, type StreamState } from '../stream_state'; +import { type ReducerStreamApiAction, API_ACTION_NAME } from './reducer_actions'; export const UI_ACTION_NAME = { RESET: 'reset', } as const; export type UiActionName = typeof UI_ACTION_NAME[keyof typeof UI_ACTION_NAME]; -export interface StreamState { - errors: string[]; - progress: number; - entities: Record; -} -export const initialState: StreamState = { - errors: [], - progress: 0, - entities: {}, -}; - interface UiActionResetStream { type: typeof UI_ACTION_NAME.RESET; } @@ -34,14 +24,7 @@ export function resetStream(): UiActionResetStream { type UiAction = UiActionResetStream; export type ReducerAction = ReducerStreamApiAction | UiAction; -export function reducerStreamReducer( - state: StreamState, - action: ReducerAction | ReducerAction[] -): StreamState { - if (Array.isArray(action)) { - return action.reduce(reducerStreamReducer, state); - } - +export function reducerStreamReducer(state: StreamState, action: ReducerAction): StreamState { switch (action.type) { case API_ACTION_NAME.UPDATE_PROGRESS: return { @@ -72,7 +55,7 @@ export function reducerStreamReducer( errors: [...state.errors, action.payload], }; case UI_ACTION_NAME.RESET: - return initialState; + return getInitialState(); default: return state; } diff --git a/examples/response_stream/common/api/reducer_stream/reducer_actions.ts b/examples/response_stream/common/api/reducer_stream/reducer_actions.ts new file mode 100644 index 00000000000000..7a6f077dfdfe6c --- /dev/null +++ b/examples/response_stream/common/api/reducer_stream/reducer_actions.ts @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export const API_ACTION_NAME = { + UPDATE_PROGRESS: 'update_progress', + ADD_TO_ENTITY: 'add_to_entity', + DELETE_ENTITY: 'delete_entity', + ERROR: 'error', +} as const; +export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME]; + +interface ApiActionUpdateProgress { + type: typeof API_ACTION_NAME.UPDATE_PROGRESS; + payload: number; +} + +export function updateProgressAction(payload: number): ApiActionUpdateProgress { + return { + type: API_ACTION_NAME.UPDATE_PROGRESS, + payload, + }; +} + +interface ApiActionAddToEntity { + type: typeof API_ACTION_NAME.ADD_TO_ENTITY; + payload: { + entity: string; + value: number; + }; +} + +export function addToEntityAction(entity: string, value: number): ApiActionAddToEntity { + return { + type: API_ACTION_NAME.ADD_TO_ENTITY, + payload: { + entity, + value, + }, + }; +} + +interface ApiActionDeleteEntity { + type: typeof API_ACTION_NAME.DELETE_ENTITY; + payload: string; +} + +export function deleteEntityAction(payload: string): ApiActionDeleteEntity { + return { + type: API_ACTION_NAME.DELETE_ENTITY, + payload, + }; +} + +interface ApiActionError { + type: typeof API_ACTION_NAME.ERROR; + payload: string; +} + +export function errorAction(payload: string): ApiActionError { + return { + type: API_ACTION_NAME.ERROR, + payload, + }; +} + +export type ReducerStreamApiAction = + | ApiActionUpdateProgress + | ApiActionAddToEntity + | ApiActionDeleteEntity + | ApiActionError; diff --git a/examples/response_stream/common/api/redux_stream/data_slice.ts b/examples/response_stream/common/api/redux_stream/data_slice.ts new file mode 100644 index 00000000000000..7008c5a17ddcbd --- /dev/null +++ b/examples/response_stream/common/api/redux_stream/data_slice.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { createSlice } from '@reduxjs/toolkit'; +import type { PayloadAction } from '@reduxjs/toolkit'; + +import { getInitialState } from '../stream_state'; + +export const dataSlice = createSlice({ + name: 'development', + initialState: getInitialState(), + reducers: { + updateProgress: (state, action: PayloadAction) => { + state.progress = action.payload; + }, + addToEntity: ( + state, + action: PayloadAction<{ + entity: string; + value: number; + }> + ) => { + const { entity, value } = action.payload; + state.entities[entity] = (state.entities[entity] || 0) + value; + }, + deleteEntity: (state, action: PayloadAction) => { + delete state.entities[action.payload]; + }, + error: (state, action: PayloadAction) => { + state.errors.push(action.payload); + }, + reset: () => { + return getInitialState(); + }, + }, +}); + +export const { updateProgress, addToEntity, deleteEntity, error, reset } = dataSlice.actions; +export type ReduxStreamApiAction = ReturnType< + typeof dataSlice.actions[keyof typeof dataSlice.actions] +>; diff --git a/examples/response_stream/common/api/redux_stream/options_slice.ts b/examples/response_stream/common/api/redux_stream/options_slice.ts new file mode 100644 index 00000000000000..6791c74d81b33d --- /dev/null +++ b/examples/response_stream/common/api/redux_stream/options_slice.ts @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { createSlice } from '@reduxjs/toolkit'; +import type { PayloadAction } from '@reduxjs/toolkit'; + +const getInitialState = () => ({ + simulateErrors: false, + compressResponse: true, + flushFix: false, +}); + +export const optionsSlice = createSlice({ + name: 'options', + initialState: getInitialState(), + reducers: { + setSimulateErrors: (state, action: PayloadAction) => { + state.simulateErrors = action.payload; + }, + setCompressResponse: (state, action: PayloadAction) => { + state.compressResponse = action.payload; + }, + setFlushFix: (state, action: PayloadAction) => { + state.flushFix = action.payload; + }, + }, +}); + +export const { setSimulateErrors, setCompressResponse, setFlushFix } = optionsSlice.actions; +export type ReduxOptionsApiAction = ReturnType< + typeof optionsSlice.actions[keyof typeof optionsSlice.actions] +>; diff --git a/examples/response_stream/common/api/stream_state.ts b/examples/response_stream/common/api/stream_state.ts new file mode 100644 index 00000000000000..78a541515ce8e9 --- /dev/null +++ b/examples/response_stream/common/api/stream_state.ts @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export interface StreamState { + errors: string[]; + progress: number; + entities: Record; +} + +export const getInitialState = (): StreamState => ({ + errors: [], + progress: 0, + entities: {}, +}); diff --git a/examples/response_stream/public/components/page.tsx b/examples/response_stream/public/components/page.tsx index e3138fafa71ab0..2626eb317cf17f 100644 --- a/examples/response_stream/public/components/page.tsx +++ b/examples/response_stream/public/components/page.tsx @@ -18,7 +18,7 @@ export const Page: FC> = ({ title = 'Untitled', chi <> -

{title}

+

{title}

{children} diff --git a/examples/response_stream/public/containers/app/components/bar_chart_race.tsx b/examples/response_stream/public/containers/app/components/bar_chart_race.tsx new file mode 100644 index 00000000000000..ef399f4cc050da --- /dev/null +++ b/examples/response_stream/public/containers/app/components/bar_chart_race.tsx @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import React, { type FC } from 'react'; + +import { + Chart, + Settings, + Axis, + BarSeries, + Position, + ScaleType, + LEGACY_LIGHT_THEME, +} from '@elastic/charts'; + +interface BarChartRaceProps { + entities: Record; +} + +export const BarChartRace: FC = ({ entities }) => { + return ( +
+ + + + + + { + return { + x, + y, + }; + }) + .sort((a, b) => b.y - a.y)} + /> + +
+ ); +}; diff --git a/examples/response_stream/public/containers/app/pages/page_reducer_stream/get_status_message.tsx b/examples/response_stream/public/containers/app/components/get_status_message.tsx similarity index 100% rename from examples/response_stream/public/containers/app/pages/page_reducer_stream/get_status_message.tsx rename to examples/response_stream/public/containers/app/components/get_status_message.tsx diff --git a/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx b/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx index 7da5f68e792f81..a471d6f5890347 100644 --- a/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx +++ b/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx @@ -8,16 +8,6 @@ import React, { useEffect, useState, FC } from 'react'; -import { - Chart, - Settings, - Axis, - BarSeries, - Position, - ScaleType, - LEGACY_LIGHT_THEME, -} from '@elastic/charts'; - import { EuiBadge, EuiButton, @@ -31,8 +21,8 @@ import { import { useFetchStream } from '@kbn/ml-response-stream/client'; +import { getInitialState } from '../../../../../common/api/stream_state'; import { - initialState, resetStream, reducerStreamReducer, } from '../../../../../common/api/reducer_stream/reducer'; @@ -42,7 +32,10 @@ import { Page } from '../../../../components/page'; import { useDeps } from '../../../../hooks/use_deps'; -import { getStatusMessage } from './get_status_message'; +import { BarChartRace } from '../../components/bar_chart_race'; +import { getStatusMessage } from '../../components/get_status_message'; + +const initialState = getInitialState(); export const PageReducerStream: FC = () => { const { @@ -72,8 +65,12 @@ export const PageReducerStream: FC = () => { } }; - // TODO This approach needs to be adapted as it might miss when error messages arrive bulk. - // This is for low level errors on the stream/HTTP level. + // TODO This needs to be adapted as it might miss when error messages arrive + // in bulk, but it should be good enough for this demo. This is for low level + // errors on the HTTP level.Note this will only surface errors that happen for + // the original request. Once the stream returns data, it will not be able to + // return errors. This is why we need separate error handling for application + // level errors. useEffect(() => { if (errors.length > 0) { notifications.toasts.addDanger(errors[errors.length - 1]); @@ -91,27 +88,33 @@ export const PageReducerStream: FC = () => { const buttonLabel = isRunning ? 'Stop development' : 'Start development'; return ( - +

- This demonstrates a single endpoint with streaming support that sends Redux inspired - actions from server to client. The server and client share types of the data to be - received. The client uses a custom hook that receives stream chunks and passes them on to - `useReducer()` that acts on the Redux type actions it receives. The custom hook includes - code to buffer actions and is able to apply them in bulk so the DOM does not get hammered - with updates. Hit "Start development" to trigger the bar chart race! + This demonstrates a single endpoint with streaming support that sends old school Redux + inspired actions from server to client. The server and client share types of the data to + be received. The client uses a custom hook that receives stream chunks and passes them on + to `useReducer()` that acts on the actions it receives. The custom hook includes code to + buffer actions and is able to apply them in bulk so the DOM does not get hammered with + updates. Hit "Start development" to trigger the bar chart race!


- + {buttonLabel} - {progress}% + {progress}% @@ -119,35 +122,11 @@ export const PageReducerStream: FC = () => { -
- - - - - - { - return { - x, - y, - }; - }) - .sort((a, b) => b.y - a.y)} - /> - -
+ -

{getStatusMessage(isRunning, isCancelled, data.progress)}

+

+ {getStatusMessage(isRunning, isCancelled, progress)} +

AppDispatch = useDispatch; +export const useAppSelector: TypedUseSelectorHook = useSelector; +export const useAppStore: () => AppStore = useStore; diff --git a/examples/response_stream/public/containers/app/pages/page_redux_stream/index.tsx b/examples/response_stream/public/containers/app/pages/page_redux_stream/index.tsx new file mode 100644 index 00000000000000..aaf0a7b5dbbb7a --- /dev/null +++ b/examples/response_stream/public/containers/app/pages/page_redux_stream/index.tsx @@ -0,0 +1,158 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import React, { useEffect, useRef, FC } from 'react'; + +import { + EuiBadge, + EuiButton, + EuiCheckbox, + EuiFlexGroup, + EuiFlexItem, + EuiProgress, + EuiSpacer, + EuiText, +} from '@elastic/eui'; + +import { cancelStream, startStream } from '@kbn/ml-response-stream/client'; + +import { RESPONSE_STREAM_API_ENDPOINT } from '../../../../../common/api'; +import { + setSimulateErrors, + setCompressResponse, + setFlushFix, +} from '../../../../../common/api/redux_stream/options_slice'; +import { reset } from '../../../../../common/api/redux_stream/data_slice'; + +import { Page } from '../../../../components/page'; +import { useDeps } from '../../../../hooks/use_deps'; + +import { BarChartRace } from '../../components/bar_chart_race'; +import { getStatusMessage } from '../../components/get_status_message'; + +import { useAppDispatch, useAppSelector } from './hooks'; + +export const PageReduxStream: FC = () => { + const { + core: { http, notifications }, + } = useDeps(); + + const dispatch = useAppDispatch(); + const { isRunning, isCancelled, errors: streamErrors } = useAppSelector((s) => s.stream); + const { progress, entities, errors } = useAppSelector((s) => s.data); + const { simulateErrors, compressResponse, flushFix } = useAppSelector((s) => s.options); + + const abortCtrl = useRef(new AbortController()); + + const onClickHandler = async () => { + if (isRunning) { + abortCtrl.current.abort(); + dispatch(cancelStream()); + } else { + abortCtrl.current = new AbortController(); + dispatch(reset()); + dispatch( + startStream({ + http, + endpoint: RESPONSE_STREAM_API_ENDPOINT.REDUX_STREAM, + apiVersion: '1', + abortCtrl, + body: { compressResponse, flushFix, simulateErrors }, + }) + ); + } + }; + + // TODO This needs to be adapted as it might miss when error messages arrive + // in bulk, but it should be good enough for this demo. This is for low level + // errors on the HTTP level.Note this will only surface errors that happen for + // the original request. Once the stream returns data, it will not be able to + // return errors. This is why we need separate error handling for application + // level errors. + useEffect(() => { + if (streamErrors.length > 0) { + notifications.toasts.addDanger(streamErrors[streamErrors.length - 1]); + } + }, [streamErrors, notifications.toasts]); + + // TODO This approach needs to be adapted as it might miss when error messages arrive bulk. + // This is for errors on the application level + useEffect(() => { + if (errors.length > 0) { + notifications.toasts.addDanger(errors[errors.length - 1]); + } + }, [errors, notifications.toasts]); + + const buttonLabel = isRunning ? 'Stop development' : 'Start development'; + + return ( + + +

+ This demonstrates integration of a single endpoint with streaming support with Redux + Toolkit. The server and client share actions created via `createSlice`. The server sends a + stream of NDJSON data to the client where each line is a redux action. The client then + applies these actions to its state. The package `@kbn/ml-response-stream` exposes a slice + of the state that can be used to start and cancel the stream. The `startStream` action is + implemented as an async thunk that starts the stream and then dispatches received actions + to the store. Hit "Start development" to trigger the bar chart race! +

+
+
+ + + + {buttonLabel} + + + + + {progress}% + + + + + + + + + +

+ {getStatusMessage(isRunning, isCancelled, progress)} +

+ dispatch(setSimulateErrors(!simulateErrors))} + compressed + /> + dispatch(setCompressResponse(!compressResponse))} + compressed + /> + dispatch(setFlushFix(!flushFix))} + compressed + /> +
+
+ ); +}; diff --git a/examples/response_stream/public/containers/app/pages/page_redux_stream/store.tsx b/examples/response_stream/public/containers/app/pages/page_redux_stream/store.tsx new file mode 100644 index 00000000000000..ddeb2654932984 --- /dev/null +++ b/examples/response_stream/public/containers/app/pages/page_redux_stream/store.tsx @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import React, { type FC, type PropsWithChildren } from 'react'; +import { configureStore } from '@reduxjs/toolkit'; +import { Provider } from 'react-redux'; + +import { streamSlice } from '@kbn/ml-response-stream/client'; + +import { optionsSlice } from '../../../../../common/api/redux_stream/options_slice'; +import { dataSlice } from '../../../../../common/api/redux_stream/data_slice'; + +const reduxStore = configureStore({ + reducer: { + // State of the stream: is it running, has it errored, etc. + stream: streamSlice.reducer, + // The actual data returned by the stream. + data: dataSlice.reducer, + // Options for the stream: simulate errors, compress response, etc. + options: optionsSlice.reducer, + }, +}); + +export const ReduxStreamProvider: FC> = ({ children }) => ( + {children} +); + +// Infer the `RootState` and `AppDispatch` types from the store itself +export type AppStore = typeof reduxStore; +export type RootState = ReturnType; +export type AppDispatch = AppStore['dispatch']; diff --git a/examples/response_stream/public/containers/app/pages/page_simple_string_stream/index.tsx b/examples/response_stream/public/containers/app/pages/page_simple_string_stream/index.tsx index 7075656dc01671..3e33ade3814891 100644 --- a/examples/response_stream/public/containers/app/pages/page_simple_string_stream/index.tsx +++ b/examples/response_stream/public/containers/app/pages/page_simple_string_stream/index.tsx @@ -66,7 +66,13 @@ export const PageSimpleStringStream: FC = () => {
- + {buttonLabel} @@ -81,7 +87,7 @@ export const PageSimpleStringStream: FC = () => { /> -

{data}

+

{data}

{errors.length > 0 && ( diff --git a/examples/response_stream/public/routes.tsx b/examples/response_stream/public/routes.tsx index eafe96c320d8cc..f2ec595f1f7538 100644 --- a/examples/response_stream/public/routes.tsx +++ b/examples/response_stream/public/routes.tsx @@ -10,6 +10,8 @@ import React from 'react'; import { PLUGIN_ID, PLUGIN_NAME } from '../common/constants'; import { PageSimpleStringStream } from './containers/app/pages/page_simple_string_stream'; import { PageReducerStream } from './containers/app/pages/page_reducer_stream'; +import { PageReduxStream } from './containers/app/pages/page_redux_stream'; +import { ReduxStreamProvider } from './containers/app/pages/page_redux_stream/store'; interface RouteSectionDef { title: string; @@ -34,10 +36,19 @@ export const routes: RouteSectionDef[] = [ component: , }, { - title: 'Reducer stream', - id: 'reducer-stream', + title: 'NDJSON useReducer stream', + id: 'ndjson-usereducer-stream', component: , }, + { + title: 'NDJSON Redux Toolkit stream', + id: 'ndjson-redux-toolkit-stream', + component: ( + + + + ), + }, ], }, ]; diff --git a/examples/response_stream/server/plugin.ts b/examples/response_stream/server/plugin.ts index 6bb0c550030593..e21c3c3f59287e 100644 --- a/examples/response_stream/server/plugin.ts +++ b/examples/response_stream/server/plugin.ts @@ -9,7 +9,11 @@ import { Plugin, PluginInitializerContext, CoreSetup, CoreStart, Logger } from '@kbn/core/server'; import type { DataRequestHandlerContext } from '@kbn/data-plugin/server'; -import { defineReducerStreamRoute, defineSimpleStringStreamRoute } from './routes'; +import { + defineReducerStreamRoute, + defineReduxStreamRoute, + defineSimpleStringStreamRoute, +} from './routes'; // eslint-disable-next-line @typescript-eslint/no-empty-interface export interface ResponseStreamSetupPlugins {} @@ -29,6 +33,7 @@ export class ResponseStreamPlugin implements Plugin { void core.getStartServices().then(([_, depsStart]) => { defineReducerStreamRoute(router, this.logger); + defineReduxStreamRoute(router, this.logger); defineSimpleStringStreamRoute(router, this.logger); }); } diff --git a/examples/response_stream/server/routes/index.ts b/examples/response_stream/server/routes/index.ts index f18a1283aa0ec0..34d707ed816557 100644 --- a/examples/response_stream/server/routes/index.ts +++ b/examples/response_stream/server/routes/index.ts @@ -7,4 +7,5 @@ */ export { defineReducerStreamRoute } from './reducer_stream'; +export { defineReduxStreamRoute } from './redux_stream'; export { defineSimpleStringStreamRoute } from './single_string_stream'; diff --git a/examples/response_stream/server/routes/reducer_stream.ts b/examples/response_stream/server/routes/reducer_stream.ts index 02606b8c44756d..abdc90f28a23c1 100644 --- a/examples/response_stream/server/routes/reducer_stream.ts +++ b/examples/response_stream/server/routes/reducer_stream.ts @@ -11,14 +11,16 @@ import { streamFactory } from '@kbn/ml-response-stream/server'; import { errorAction, - reducerStreamRequestBodySchema, updateProgressAction, addToEntityAction, deleteEntityAction, ReducerStreamApiAction, -} from '../../common/api/reducer_stream'; +} from '../../common/api/reducer_stream/reducer_actions'; +import { reducerStreamRequestBodySchema } from '../../common/api/reducer_stream'; import { RESPONSE_STREAM_API_ENDPOINT } from '../../common/api'; +import { entities, getActions } from './shared'; + export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => { router.versioned .post({ @@ -64,23 +66,7 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => { request.body.flushFix ); - const entities = [ - 'kimchy', - 's1monw', - 'martijnvg', - 'jasontedor', - 'nik9000', - 'javanna', - 'rjernst', - 'jrodewig', - ]; - - const actions = [...Array(19).fill('add'), 'delete']; - - if (simulateError) { - actions.push('throw-error'); - actions.push('emit-error'); - } + const actions = getActions(simulateError); let progress = 0; @@ -101,19 +87,28 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => { const randomEntity = entities[Math.floor(Math.random() * entities.length)]; const randomAction = actions[Math.floor(Math.random() * actions.length)]; - if (randomAction === 'add') { - const randomCommits = Math.floor(Math.random() * 100); - push(addToEntityAction(randomEntity, randomCommits)); - } else if (randomAction === 'delete') { - push(deleteEntityAction(randomEntity)); - } else if (randomAction === 'throw-error') { - // Throw an error. It should not crash Kibana! - // It should be caught and logged to the Kibana server console. - throw new Error('There was a (simulated) server side error!'); - } else if (randomAction === 'emit-error') { - // Emit an error as a stream action. - push(errorAction('(Simulated) error pushed to the stream')); - return; + switch (randomAction) { + case 'add': + const randomCommits = Math.floor(Math.random() * 100); + push(addToEntityAction(randomEntity, randomCommits)); + break; + + case 'delete': + push(deleteEntityAction(randomEntity)); + break; + + case 'throw-error': + // Throw an error. It should not crash Kibana! + // It should be caught and logged to the Kibana server console. + // The stream will just stop but the client will note receive an error! + // In practice this pattern should be avoided as it will just end + // the stream without an explanation. + throw new Error('There was a (simulated) server side error!'); + + case 'emit-error': + // Emit an error as a stream action. + push(errorAction('(Simulated) error pushed to the stream')); + return; } void pushStreamUpdate(); diff --git a/examples/response_stream/server/routes/redux_stream.ts b/examples/response_stream/server/routes/redux_stream.ts new file mode 100644 index 00000000000000..bd694c531907b0 --- /dev/null +++ b/examples/response_stream/server/routes/redux_stream.ts @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import type { IRouter, Logger } from '@kbn/core/server'; +import { streamFactory } from '@kbn/ml-response-stream/server'; + +import { + updateProgress, + addToEntity, + deleteEntity, + error, + type ReduxStreamApiAction, +} from '../../common/api/redux_stream/data_slice'; +import { reducerStreamRequestBodySchema } from '../../common/api/reducer_stream'; +import { RESPONSE_STREAM_API_ENDPOINT } from '../../common/api'; + +import { entities, getActions } from './shared'; + +export const defineReduxStreamRoute = (router: IRouter, logger: Logger) => { + router.versioned + .post({ + path: RESPONSE_STREAM_API_ENDPOINT.REDUX_STREAM, + access: 'internal', + }) + .addVersion( + { + version: '1', + validate: { + request: { + body: reducerStreamRequestBodySchema, + }, + }, + }, + async (context, request, response) => { + const maxTimeoutMs = request.body.timeout ?? 250; + const simulateError = request.body.simulateErrors ?? false; + + let logMessageCounter = 1; + + function logDebugMessage(msg: string) { + logger.debug(`Response Stream Example #${logMessageCounter}: ${msg}`); + logMessageCounter++; + } + + logDebugMessage('Starting stream.'); + + let shouldStop = false; + request.events.aborted$.subscribe(() => { + logDebugMessage('aborted$ subscription trigger.'); + shouldStop = true; + }); + request.events.completed$.subscribe(() => { + logDebugMessage('completed$ subscription trigger.'); + shouldStop = true; + }); + + const { end, push, responseWithHeaders } = streamFactory( + request.headers, + logger, + request.body.compressResponse, + request.body.flushFix + ); + + const actions = getActions(simulateError); + + let progress = 0; + + async function pushStreamUpdate() { + await new Promise((resolve) => + setTimeout(resolve, Math.floor(Math.random() * maxTimeoutMs)) + ); + try { + progress++; + + if (progress > 100 || shouldStop) { + end(); + return; + } + + push(updateProgress(progress)); + + const randomEntity = entities[Math.floor(Math.random() * entities.length)]; + const randomAction = actions[Math.floor(Math.random() * actions.length)]; + + switch (randomAction) { + case 'add': + const randomCommits = Math.floor(Math.random() * 100); + push(addToEntity({ entity: randomEntity, value: randomCommits })); + break; + + case 'delete': + push(deleteEntity(randomEntity)); + break; + + case 'throw-error': + // Throw an error. It should not crash Kibana! + // It should be caught and logged to the Kibana server console. + // The stream will just stop but the client will note receive an error! + // In practice this pattern should be avoided as it will just end + // the stream without an explanation. + throw new Error('There was a (simulated) server side error!'); + + case 'emit-error': + // Emit an error as a stream action. + push(error('(Simulated) error pushed to the stream')); + return; + } + + void pushStreamUpdate(); + } catch (e) { + logger.error(e); + } + } + + // do not call this using `await` so it will run asynchronously while we return the stream already. + void pushStreamUpdate(); + + return response.ok(responseWithHeaders); + } + ); +}; diff --git a/examples/response_stream/server/routes/shared.ts b/examples/response_stream/server/routes/shared.ts new file mode 100644 index 00000000000000..72223d21e09ffc --- /dev/null +++ b/examples/response_stream/server/routes/shared.ts @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export const entities = [ + 'kimchy', + 's1monw', + 'martijnvg', + 'jasontedor', + 'nik9000', + 'javanna', + 'rjernst', + 'jrodewig', +]; + +export const getActions = (simulateError: boolean) => { + const actions = [...Array(19).fill('add'), 'delete']; + + if (simulateError) { + actions.push('throw-error'); + actions.push('emit-error'); + } + + return actions; +}; diff --git a/test/examples/config.js b/test/examples/config.js index 77d73642afd1cb..dd8b49753dba5c 100644 --- a/test/examples/config.js +++ b/test/examples/config.js @@ -31,6 +31,7 @@ export default async function ({ readConfigFile }) { require.resolve('./unified_field_list_examples'), require.resolve('./discover_customization_examples'), require.resolve('./error_boundary'), + require.resolve('./response_stream'), ], services: { ...functionalConfig.get('services'), diff --git a/test/examples/response_stream/index.ts b/test/examples/response_stream/index.ts index b918de669819ba..2f13bf2f9f70cb 100644 --- a/test/examples/response_stream/index.ts +++ b/test/examples/response_stream/index.ts @@ -10,7 +10,17 @@ import { FtrProviderContext } from '../../functional/ftr_provider_context'; // eslint-disable-next-line import/no-default-export export default function ({ getService, getPageObjects, loadTestFile }: FtrProviderContext) { - describe('response stream', function () { + const browser = getService('browser'); + const PageObjects = getPageObjects(['common', 'header']); + + describe('response-stream', function () { + before(async () => { + await browser.setWindowSize(1300, 900); + await PageObjects.common.navigateToApp('response-stream', { insertTimestamp: false }); + }); + + loadTestFile(require.resolve('./string_stream')); loadTestFile(require.resolve('./reducer_stream')); + loadTestFile(require.resolve('./redux_stream')); }); } diff --git a/test/examples/response_stream/reducer_stream.ts b/test/examples/response_stream/reducer_stream.ts index 001fea1a144c81..4e9c0f9a7af090 100644 --- a/test/examples/response_stream/reducer_stream.ts +++ b/test/examples/response_stream/reducer_stream.ts @@ -6,84 +6,47 @@ * Side Public License, v 1. */ -import fetch from 'node-fetch'; -import { format as formatUrl } from 'url'; - import expect from '@kbn/expect'; - import { FtrProviderContext } from '../../functional/ftr_provider_context'; -import { parseStream } from './parse_stream'; - // eslint-disable-next-line import/no-default-export -export default ({ getService }: FtrProviderContext) => { - const supertest = getService('supertest'); - const config = getService('config'); - const kibanaServerUrl = formatUrl(config.get('servers.kibana')); - - describe('POST /internal/response_stream/reducer_stream', () => { - it('should return full data without streaming', async () => { - const resp = await supertest - .post('/internal/response_stream/reducer_stream') - .set('kbn-xsrf', 'kibana') - .send({ - timeout: 1, - }) - .expect(200); - - expect(Buffer.isBuffer(resp.body)).to.be(true); - - const chunks: string[] = resp.body.toString().split('\n'); - - expect(chunks.length).to.be(201); - - const lastChunk = chunks.pop(); - expect(lastChunk).to.be(''); - - let data: any[] = []; +export default function ({ getService, getPageObjects }: FtrProviderContext) { + const testSubjects = getService('testSubjects'); + const retry = getService('retry'); + + describe('useReducer stream example', () => { + it('navigates to the example', async () => { + await testSubjects.click('ndjson-usereducer-stream'); + + await retry.try(async () => { + expect(await testSubjects.getVisibleText('responseStreamPageTitle')).to.be( + 'NDJSON useReducer stream' + ); + expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).to.be('0%'); + expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be( + 'Development did not start yet.' + ); + }); + }); - expect(() => { - data = chunks.map((c) => JSON.parse(c)); - }).not.to.throwError(); + it('starts the stream', async () => { + await testSubjects.click('responseStreamStartButton'); - data.forEach((d) => { - expect(typeof d.type).to.be('string'); + await retry.try(async () => { + expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).not.to.be('0%'); + expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be( + 'Development is ongoing, the hype is real!' + ); }); - - const progressData = data.filter((d) => d.type === 'update_progress'); - expect(progressData.length).to.be(100); - expect(progressData[0].payload).to.be(1); - expect(progressData[progressData.length - 1].payload).to.be(100); }); - it('should return data in chunks with streaming', async () => { - const response = await fetch(`${kibanaServerUrl}/internal/response_stream/reducer_stream`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'kbn-xsrf': 'stream', - }, - body: JSON.stringify({ timeout: 1 }), + it('finishes the stream', async () => { + await retry.tryForTime(60000, async () => { + expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).to.be('100%'); + expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be( + 'Development completed, the release got out the door!' + ); }); - - const stream = response.body; - - expect(stream).not.to.be(null); - - if (stream !== null) { - const progressData: any[] = []; - - for await (const action of parseStream(stream)) { - expect(action.type).not.to.be('error'); - if (action.type === 'update_progress') { - progressData.push(action); - } - } - - expect(progressData.length).to.be(100); - expect(progressData[0].payload).to.be(1); - expect(progressData[progressData.length - 1].payload).to.be(100); - } }); }); -}; +} diff --git a/test/examples/response_stream/redux_stream.ts b/test/examples/response_stream/redux_stream.ts new file mode 100644 index 00000000000000..6a9e73d1d35a5d --- /dev/null +++ b/test/examples/response_stream/redux_stream.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import expect from '@kbn/expect'; +import { FtrProviderContext } from '../../functional/ftr_provider_context'; + +// eslint-disable-next-line import/no-default-export +export default function ({ getService, getPageObjects }: FtrProviderContext) { + const testSubjects = getService('testSubjects'); + const retry = getService('retry'); + + describe('redux stream example', () => { + it('navigates to the example', async () => { + await testSubjects.click('ndjson-redux-toolkit-stream'); + + await retry.try(async () => { + expect(await testSubjects.getVisibleText('responseStreamPageTitle')).to.be( + 'NDJSON Redux Toolkit stream' + ); + expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).to.be('0%'); + expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be( + 'Development did not start yet.' + ); + }); + }); + + it('starts the stream', async () => { + await testSubjects.click('responseStreamStartButton'); + + await retry.try(async () => { + expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).not.to.be('0%'); + expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be( + 'Development is ongoing, the hype is real!' + ); + }); + }); + + it('finishes the stream', async () => { + await retry.tryForTime(60000, async () => { + expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).to.be('100%'); + expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be( + 'Development completed, the release got out the door!' + ); + }); + }); + }); +} diff --git a/test/examples/response_stream/string_stream.ts b/test/examples/response_stream/string_stream.ts new file mode 100644 index 00000000000000..d522b8f5f24848 --- /dev/null +++ b/test/examples/response_stream/string_stream.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import expect from '@kbn/expect'; +import { FtrProviderContext } from '../../functional/ftr_provider_context'; + +// eslint-disable-next-line import/no-default-export +export default function ({ getService, getPageObjects }: FtrProviderContext) { + const testSubjects = getService('testSubjects'); + const retry = getService('retry'); + + describe('string stream example', () => { + it('navigates to the example', async () => { + await testSubjects.click('simple-string-stream'); + + await retry.try(async () => { + expect(await testSubjects.getVisibleText('responseStreamPageTitle')).to.be( + 'Simple string stream' + ); + expect(await testSubjects.exists('responseStreamStartButton')).to.be(true); + expect(await testSubjects.getVisibleText('responseStreamString')).to.be(''); + }); + }); + + it('starts the stream', async () => { + await testSubjects.click('responseStreamStartButton'); + + await retry.try(async () => { + expect(await testSubjects.getVisibleText('responseStreamString')).not.to.be(''); + }); + }); + + it('finishes the stream', async () => { + await retry.tryForTime(60000, async () => { + expect(await testSubjects.getVisibleText('responseStreamString')).to.be( + 'Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License. Official clients are available in Java, .NET (C#), PHP, Python, Apache Groovy, Ruby and many other languages. According to the DB-Engines ranking, Elasticsearch is the most popular enterprise search engine.' + ); + }); + }); + }); +} diff --git a/x-pack/packages/ml/response_stream/client/constants.ts b/x-pack/packages/ml/response_stream/client/constants.ts new file mode 100644 index 00000000000000..52d2aedaf83521 --- /dev/null +++ b/x-pack/packages/ml/response_stream/client/constants.ts @@ -0,0 +1,8 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const DATA_THROTTLE_MS = 100; diff --git a/x-pack/packages/ml/response_stream/client/index.ts b/x-pack/packages/ml/response_stream/client/index.ts index a8b02cecd9cf6b..a0bc59c34deec3 100644 --- a/x-pack/packages/ml/response_stream/client/index.ts +++ b/x-pack/packages/ml/response_stream/client/index.ts @@ -6,4 +6,5 @@ */ export { fetchStream } from './fetch_stream'; +export { cancelStream, startStream, streamSlice } from './stream_slice'; export { useFetchStream } from './use_fetch_stream'; diff --git a/x-pack/packages/ml/response_stream/client/stream_slice.ts b/x-pack/packages/ml/response_stream/client/stream_slice.ts new file mode 100644 index 00000000000000..d8b7888dd4c6f0 --- /dev/null +++ b/x-pack/packages/ml/response_stream/client/stream_slice.ts @@ -0,0 +1,148 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { AnyAction, PayloadAction } from '@reduxjs/toolkit'; +import { createAsyncThunk, createSlice } from '@reduxjs/toolkit'; +import { batch } from 'react-redux'; + +import type { HttpSetup, HttpFetchOptions } from '@kbn/core/public'; +import { fetchStream } from './fetch_stream'; +import { DATA_THROTTLE_MS } from './constants'; + +/** + * Async thunk to start the stream. + */ +export const startStream = createAsyncThunk( + 'stream/start', + async ( + options: { + http: HttpSetup; + endpoint: string; + apiVersion?: string; + abortCtrl: React.MutableRefObject; + body?: any; + headers?: HttpFetchOptions['headers']; + }, + thunkApi + ) => { + const { http, endpoint, apiVersion, abortCtrl, body, headers } = options; + + const fetchState = { isActive: true }; + + // Custom buffering to avoid hammering the DOM with updates. + // We can revisit this once Kibana is on React 18. + const actionBuffer: AnyAction[] = []; + function flushBuffer(withTimeout = true) { + batch(() => { + for (const action of actionBuffer) { + thunkApi.dispatch(action); + } + }); + actionBuffer.length = 0; + + if (withTimeout) { + setTimeout(() => { + if (fetchState.isActive) { + flushBuffer(); + } + }, DATA_THROTTLE_MS); + } + } + + flushBuffer(); + + for await (const [fetchStreamError, action] of fetchStream( + http, + endpoint, + apiVersion, + abortCtrl, + body, + true, + headers + )) { + if (fetchStreamError !== null) { + actionBuffer.push(addError(fetchStreamError)); + } else if (action) { + actionBuffer.push(action); + } + } + + fetchState.isActive = false; + flushBuffer(false); + }, + { + condition: (_, { getState }) => { + // This is a bit of a hack to prevent instant restarts while the stream is running. + // The problem is that in RTK v1, async thunks cannot be made part of the slice, + // so they will not know the namespace used where they run in. We just assume + // `stream` here as the namespace, if it's a custom one, this will not work. + // RTK v2 will allow async thunks to be part of the slice, a draft PR to upgrade + // is up there: https://github.com/elastic/kibana/pull/178986 + try { + const s = getState() as { stream?: StreamState }; + + if (s.stream === undefined) { + return true; + } + + // If the stream was running, the extra reducers will also have set + // and error, so we need to prevent the stream from starting again. + if (s.stream.isRunning && s.stream.errors.length > 0) { + return false; + } + } catch (e) { + return true; + } + }, + } +); + +export interface StreamState { + errors: string[]; + isCancelled: boolean; + isRunning: boolean; +} + +function getDefaultState(): StreamState { + return { + errors: [], + isCancelled: false, + isRunning: false, + }; +} + +export const streamSlice = createSlice({ + name: 'stream', + initialState: getDefaultState(), + reducers: { + addError: (state: StreamState, action: PayloadAction) => { + state.errors.push(action.payload); + }, + cancelStream: (state: StreamState) => { + state.isCancelled = true; + state.isRunning = false; + }, + }, + extraReducers: (builder) => { + builder.addCase(startStream.pending, (state) => { + if (state.isRunning) { + state.errors.push('Instant restart while running not supported yet.'); + return; + } + + state.errors = []; + state.isCancelled = false; + state.isRunning = true; + }); + builder.addCase(startStream.fulfilled, (state) => { + state.isRunning = false; + }); + }, +}); + +// Action creators are generated for each case reducer function +export const { addError, cancelStream } = streamSlice.actions; diff --git a/x-pack/packages/ml/response_stream/client/use_fetch_stream.ts b/x-pack/packages/ml/response_stream/client/use_fetch_stream.ts index 55950c6ee8f774..1e0deeb7f8e26e 100644 --- a/x-pack/packages/ml/response_stream/client/use_fetch_stream.ts +++ b/x-pack/packages/ml/response_stream/client/use_fetch_stream.ts @@ -19,8 +19,7 @@ import { isPopulatedObject } from '@kbn/ml-is-populated-object'; import { fetchStream } from './fetch_stream'; import { stringReducer, type StringReducer } from './string_reducer'; - -const DATA_THROTTLE_MS = 100; +import { DATA_THROTTLE_MS } from './constants'; // This pattern with a dual ternary allows us to default to StringReducer // and if a custom reducer is supplied fall back to that one instead. @@ -80,7 +79,8 @@ export function useFetchStream>( // a lot of unnecessary re-renders even in combination with `useThrottle`. // We're now using `dataRef` to allow updates outside of the render cycle. // When the stream is running, we'll update `data` with the `dataRef` value - // periodically. + // periodically. This will get simpler with React 18 where we + // can make use of `useDeferredValue`. const [data, setData] = useState(reducerWithFallback.initialState); const dataRef = useRef(reducerWithFallback.initialState);