diff --git a/examples/response_stream/README.md b/examples/response_stream/README.md index 73c8a9161b2ce9..ad3124bee805ac 100644 --- a/examples/response_stream/README.md +++ b/examples/response_stream/README.md @@ -4,11 +4,15 @@ This plugin demonstrates how to stream chunks of data to the client with just a To run Kibana with the described examples, use `yarn start --run-examples`. -The `response_stream` plugin demonstrates API endpoints that can stream data chunks with a single request with gzip/compression support. gzip-streams get decompressed natively by browsers. The plugin demonstrates two use cases to get started: Streaming a raw string as well as a more complex example that streams Redux-like actions to the client which update React state via `useReducer()`. +The `response_stream` plugin demonstrates API endpoints that can stream data chunks with a single request with gzip/compression support. gzip-streams get decompressed natively by browsers. The plugin demonstrates some use cases to get you started: -Code in `@kbn/ml-response-stream` contains helpers to set up a stream on the server side (`streamFactory()`) and consume it on the client side via a custom hook (`useFetchStream()`). The utilities make use of TS generics in a way that allows to have type safety for both request related options as well as the returned data. +- Streaming just a raw string. +- Streaming NDJSON with "old-school" redux like actions and client side state managed with `useFetchStream()`. This uses React's own `useReducer()` under the hood. +- Streaming NDJSON with actions created via Redux Toolkit's `createSlice()` to a client with a full Redux Toolkit setup. -No additional third party libraries are used in the helpers to make it work. On the server, they integrate with `Hapi` and use node's own `gzip`. On the client, the custom hook abstracts away the necessary logic to consume the stream, internally it makes use of a generator function and `useReducer()` to update React state. +Code in `@kbn/ml-response-stream` contains helpers to set up a stream on the server side (`streamFactory()`) and consume it on the client side via a custom hook (`useFetchStream()`) or slice (`streamSlice()`). The utilities make use of TS generics in a way that allows to have type safety for both request related options as well as the returned data. + +Besides Redux Toolkit for its particular use case, no additional third party libraries are used in the helpers to make it work. On the server, they integrate with `Hapi` and use node's own `gzip`. On the client, the custom hook abstracts away the necessary logic to consume the stream, internally it makes use of a generator function and `useReducer()` to update React state. On the server, the simpler stream to send a string is set up like this: @@ -21,12 +25,7 @@ The request's headers get passed on to automatically identify if compression is On the client, the custom hook is used like this: ```ts -const { - errors, - start, - cancel, - data, - isRunning -} = useFetchStream('/internal/response_stream/simple_string_stream'); +const { errors, start, cancel, data, isRunning } = useFetchStream( + '/internal/response_stream/simple_string_stream' +); ``` - diff --git a/examples/response_stream/common/api/index.ts b/examples/response_stream/common/api/index.ts index 925ff844e3cfa1..c5ff141e270eb6 100644 --- a/examples/response_stream/common/api/index.ts +++ b/examples/response_stream/common/api/index.ts @@ -8,5 +8,6 @@ export const RESPONSE_STREAM_API_ENDPOINT = { REDUCER_STREAM: '/internal/response_stream/reducer_stream', + REDUX_STREAM: '/internal/response_stream/redux_stream', SIMPLE_STRING_STREAM: '/internal/response_stream/simple_string_stream', } as const; diff --git a/examples/response_stream/common/api/reducer_stream/index.ts b/examples/response_stream/common/api/reducer_stream/index.ts index 606834be2d0baa..cc5255761fbd61 100644 --- a/examples/response_stream/common/api/reducer_stream/index.ts +++ b/examples/response_stream/common/api/reducer_stream/index.ts @@ -9,71 +9,3 @@ export { reducerStreamReducer } from './reducer'; export { reducerStreamRequestBodySchema } from './request_body_schema'; export type { ReducerStreamRequestBodySchema } from './request_body_schema'; - -export const API_ACTION_NAME = { - UPDATE_PROGRESS: 'update_progress', - ADD_TO_ENTITY: 'add_to_entity', - DELETE_ENTITY: 'delete_entity', - ERROR: 'error', -} as const; -export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME]; - -interface ApiActionUpdateProgress { - type: typeof API_ACTION_NAME.UPDATE_PROGRESS; - payload: number; -} - -export function updateProgressAction(payload: number): ApiActionUpdateProgress { - return { - type: API_ACTION_NAME.UPDATE_PROGRESS, - payload, - }; -} - -interface ApiActionAddToEntity { - type: typeof API_ACTION_NAME.ADD_TO_ENTITY; - payload: { - entity: string; - value: number; - }; -} - -export function addToEntityAction(entity: string, value: number): ApiActionAddToEntity { - return { - type: API_ACTION_NAME.ADD_TO_ENTITY, - payload: { - entity, - value, - }, - }; -} - -interface ApiActionDeleteEntity { - type: typeof API_ACTION_NAME.DELETE_ENTITY; - payload: string; -} - -export function deleteEntityAction(payload: string): ApiActionDeleteEntity { - return { - type: API_ACTION_NAME.DELETE_ENTITY, - payload, - }; -} - -interface ApiActionError { - type: typeof API_ACTION_NAME.ERROR; - payload: string; -} - -export function errorAction(payload: string): ApiActionError { - return { - type: API_ACTION_NAME.ERROR, - payload, - }; -} - -export type ReducerStreamApiAction = - | ApiActionUpdateProgress - | ApiActionAddToEntity - | ApiActionDeleteEntity - | ApiActionError; diff --git a/examples/response_stream/common/api/reducer_stream/reducer.ts b/examples/response_stream/common/api/reducer_stream/reducer.ts index 9896e760cd75b5..a793ccf55489ad 100644 --- a/examples/response_stream/common/api/reducer_stream/reducer.ts +++ b/examples/response_stream/common/api/reducer_stream/reducer.ts @@ -6,24 +6,14 @@ * Side Public License, v 1. */ -import { ReducerStreamApiAction, API_ACTION_NAME } from '.'; +import { getInitialState, type StreamState } from '../stream_state'; +import { type ReducerStreamApiAction, API_ACTION_NAME } from './reducer_actions'; export const UI_ACTION_NAME = { RESET: 'reset', } as const; export type UiActionName = typeof UI_ACTION_NAME[keyof typeof UI_ACTION_NAME]; -export interface StreamState { - errors: string[]; - progress: number; - entities: Record; -} -export const initialState: StreamState = { - errors: [], - progress: 0, - entities: {}, -}; - interface UiActionResetStream { type: typeof UI_ACTION_NAME.RESET; } @@ -34,14 +24,7 @@ export function resetStream(): UiActionResetStream { type UiAction = UiActionResetStream; export type ReducerAction = ReducerStreamApiAction | UiAction; -export function reducerStreamReducer( - state: StreamState, - action: ReducerAction | ReducerAction[] -): StreamState { - if (Array.isArray(action)) { - return action.reduce(reducerStreamReducer, state); - } - +export function reducerStreamReducer(state: StreamState, action: ReducerAction): StreamState { switch (action.type) { case API_ACTION_NAME.UPDATE_PROGRESS: return { @@ -72,7 +55,7 @@ export function reducerStreamReducer( errors: [...state.errors, action.payload], }; case UI_ACTION_NAME.RESET: - return initialState; + return getInitialState(); default: return state; } diff --git a/examples/response_stream/common/api/reducer_stream/reducer_actions.ts b/examples/response_stream/common/api/reducer_stream/reducer_actions.ts new file mode 100644 index 00000000000000..7a6f077dfdfe6c --- /dev/null +++ b/examples/response_stream/common/api/reducer_stream/reducer_actions.ts @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export const API_ACTION_NAME = { + UPDATE_PROGRESS: 'update_progress', + ADD_TO_ENTITY: 'add_to_entity', + DELETE_ENTITY: 'delete_entity', + ERROR: 'error', +} as const; +export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME]; + +interface ApiActionUpdateProgress { + type: typeof API_ACTION_NAME.UPDATE_PROGRESS; + payload: number; +} + +export function updateProgressAction(payload: number): ApiActionUpdateProgress { + return { + type: API_ACTION_NAME.UPDATE_PROGRESS, + payload, + }; +} + +interface ApiActionAddToEntity { + type: typeof API_ACTION_NAME.ADD_TO_ENTITY; + payload: { + entity: string; + value: number; + }; +} + +export function addToEntityAction(entity: string, value: number): ApiActionAddToEntity { + return { + type: API_ACTION_NAME.ADD_TO_ENTITY, + payload: { + entity, + value, + }, + }; +} + +interface ApiActionDeleteEntity { + type: typeof API_ACTION_NAME.DELETE_ENTITY; + payload: string; +} + +export function deleteEntityAction(payload: string): ApiActionDeleteEntity { + return { + type: API_ACTION_NAME.DELETE_ENTITY, + payload, + }; +} + +interface ApiActionError { + type: typeof API_ACTION_NAME.ERROR; + payload: string; +} + +export function errorAction(payload: string): ApiActionError { + return { + type: API_ACTION_NAME.ERROR, + payload, + }; +} + +export type ReducerStreamApiAction = + | ApiActionUpdateProgress + | ApiActionAddToEntity + | ApiActionDeleteEntity + | ApiActionError; diff --git a/examples/response_stream/common/api/redux_stream/data_slice.ts b/examples/response_stream/common/api/redux_stream/data_slice.ts new file mode 100644 index 00000000000000..7008c5a17ddcbd --- /dev/null +++ b/examples/response_stream/common/api/redux_stream/data_slice.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { createSlice } from '@reduxjs/toolkit'; +import type { PayloadAction } from '@reduxjs/toolkit'; + +import { getInitialState } from '../stream_state'; + +export const dataSlice = createSlice({ + name: 'development', + initialState: getInitialState(), + reducers: { + updateProgress: (state, action: PayloadAction) => { + state.progress = action.payload; + }, + addToEntity: ( + state, + action: PayloadAction<{ + entity: string; + value: number; + }> + ) => { + const { entity, value } = action.payload; + state.entities[entity] = (state.entities[entity] || 0) + value; + }, + deleteEntity: (state, action: PayloadAction) => { + delete state.entities[action.payload]; + }, + error: (state, action: PayloadAction) => { + state.errors.push(action.payload); + }, + reset: () => { + return getInitialState(); + }, + }, +}); + +export const { updateProgress, addToEntity, deleteEntity, error, reset } = dataSlice.actions; +export type ReduxStreamApiAction = ReturnType< + typeof dataSlice.actions[keyof typeof dataSlice.actions] +>; diff --git a/examples/response_stream/common/api/redux_stream/options_slice.ts b/examples/response_stream/common/api/redux_stream/options_slice.ts new file mode 100644 index 00000000000000..6791c74d81b33d --- /dev/null +++ b/examples/response_stream/common/api/redux_stream/options_slice.ts @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { createSlice } from '@reduxjs/toolkit'; +import type { PayloadAction } from '@reduxjs/toolkit'; + +const getInitialState = () => ({ + simulateErrors: false, + compressResponse: true, + flushFix: false, +}); + +export const optionsSlice = createSlice({ + name: 'options', + initialState: getInitialState(), + reducers: { + setSimulateErrors: (state, action: PayloadAction) => { + state.simulateErrors = action.payload; + }, + setCompressResponse: (state, action: PayloadAction) => { + state.compressResponse = action.payload; + }, + setFlushFix: (state, action: PayloadAction) => { + state.flushFix = action.payload; + }, + }, +}); + +export const { setSimulateErrors, setCompressResponse, setFlushFix } = optionsSlice.actions; +export type ReduxOptionsApiAction = ReturnType< + typeof optionsSlice.actions[keyof typeof optionsSlice.actions] +>; diff --git a/examples/response_stream/common/api/stream_state.ts b/examples/response_stream/common/api/stream_state.ts new file mode 100644 index 00000000000000..78a541515ce8e9 --- /dev/null +++ b/examples/response_stream/common/api/stream_state.ts @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export interface StreamState { + errors: string[]; + progress: number; + entities: Record; +} + +export const getInitialState = (): StreamState => ({ + errors: [], + progress: 0, + entities: {}, +}); diff --git a/examples/response_stream/public/components/page.tsx b/examples/response_stream/public/components/page.tsx index e3138fafa71ab0..2626eb317cf17f 100644 --- a/examples/response_stream/public/components/page.tsx +++ b/examples/response_stream/public/components/page.tsx @@ -18,7 +18,7 @@ export const Page: FC> = ({ title = 'Untitled', chi <> -

{title}

+

{title}

{children} diff --git a/examples/response_stream/public/containers/app/components/bar_chart_race.tsx b/examples/response_stream/public/containers/app/components/bar_chart_race.tsx new file mode 100644 index 00000000000000..ef399f4cc050da --- /dev/null +++ b/examples/response_stream/public/containers/app/components/bar_chart_race.tsx @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import React, { type FC } from 'react'; + +import { + Chart, + Settings, + Axis, + BarSeries, + Position, + ScaleType, + LEGACY_LIGHT_THEME, +} from '@elastic/charts'; + +interface BarChartRaceProps { + entities: Record; +} + +export const BarChartRace: FC = ({ entities }) => { + return ( +
+ + + + + + { + return { + x, + y, + }; + }) + .sort((a, b) => b.y - a.y)} + /> + +
+ ); +}; diff --git a/examples/response_stream/public/containers/app/pages/page_reducer_stream/get_status_message.tsx b/examples/response_stream/public/containers/app/components/get_status_message.tsx similarity index 100% rename from examples/response_stream/public/containers/app/pages/page_reducer_stream/get_status_message.tsx rename to examples/response_stream/public/containers/app/components/get_status_message.tsx diff --git a/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx b/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx index 7da5f68e792f81..a471d6f5890347 100644 --- a/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx +++ b/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx @@ -8,16 +8,6 @@ import React, { useEffect, useState, FC } from 'react'; -import { - Chart, - Settings, - Axis, - BarSeries, - Position, - ScaleType, - LEGACY_LIGHT_THEME, -} from '@elastic/charts'; - import { EuiBadge, EuiButton, @@ -31,8 +21,8 @@ import { import { useFetchStream } from '@kbn/ml-response-stream/client'; +import { getInitialState } from '../../../../../common/api/stream_state'; import { - initialState, resetStream, reducerStreamReducer, } from '../../../../../common/api/reducer_stream/reducer'; @@ -42,7 +32,10 @@ import { Page } from '../../../../components/page'; import { useDeps } from '../../../../hooks/use_deps'; -import { getStatusMessage } from './get_status_message'; +import { BarChartRace } from '../../components/bar_chart_race'; +import { getStatusMessage } from '../../components/get_status_message'; + +const initialState = getInitialState(); export const PageReducerStream: FC = () => { const { @@ -72,8 +65,12 @@ export const PageReducerStream: FC = () => { } }; - // TODO This approach needs to be adapted as it might miss when error messages arrive bulk. - // This is for low level errors on the stream/HTTP level. + // TODO This needs to be adapted as it might miss when error messages arrive + // in bulk, but it should be good enough for this demo. This is for low level + // errors on the HTTP level.Note this will only surface errors that happen for + // the original request. Once the stream returns data, it will not be able to + // return errors. This is why we need separate error handling for application + // level errors. useEffect(() => { if (errors.length > 0) { notifications.toasts.addDanger(errors[errors.length - 1]); @@ -91,27 +88,33 @@ export const PageReducerStream: FC = () => { const buttonLabel = isRunning ? 'Stop development' : 'Start development'; return ( - +

- This demonstrates a single endpoint with streaming support that sends Redux inspired - actions from server to client. The server and client share types of the data to be - received. The client uses a custom hook that receives stream chunks and passes them on to - `useReducer()` that acts on the Redux type actions it receives. The custom hook includes - code to buffer actions and is able to apply them in bulk so the DOM does not get hammered - with updates. Hit "Start development" to trigger the bar chart race! + This demonstrates a single endpoint with streaming support that sends old school Redux + inspired actions from server to client. The server and client share types of the data to + be received. The client uses a custom hook that receives stream chunks and passes them on + to `useReducer()` that acts on the actions it receives. The custom hook includes code to + buffer actions and is able to apply them in bulk so the DOM does not get hammered with + updates. Hit "Start development" to trigger the bar chart race!


- + {buttonLabel} - {progress}% + {progress}% @@ -119,35 +122,11 @@ export const PageReducerStream: FC = () => { -
- - - - - - { - return { - x, - y, - }; - }) - .sort((a, b) => b.y - a.y)} - /> - -
+ -

{getStatusMessage(isRunning, isCancelled, data.progress)}

+

+ {getStatusMessage(isRunning, isCancelled, progress)} +

AppDispatch = useDispatch; +export const useAppSelector: TypedUseSelectorHook = useSelector; +export const useAppStore: () => AppStore = useStore; diff --git a/examples/response_stream/public/containers/app/pages/page_redux_stream/index.tsx b/examples/response_stream/public/containers/app/pages/page_redux_stream/index.tsx new file mode 100644 index 00000000000000..aaf0a7b5dbbb7a --- /dev/null +++ b/examples/response_stream/public/containers/app/pages/page_redux_stream/index.tsx @@ -0,0 +1,158 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import React, { useEffect, useRef, FC } from 'react'; + +import { + EuiBadge, + EuiButton, + EuiCheckbox, + EuiFlexGroup, + EuiFlexItem, + EuiProgress, + EuiSpacer, + EuiText, +} from '@elastic/eui'; + +import { cancelStream, startStream } from '@kbn/ml-response-stream/client'; + +import { RESPONSE_STREAM_API_ENDPOINT } from '../../../../../common/api'; +import { + setSimulateErrors, + setCompressResponse, + setFlushFix, +} from '../../../../../common/api/redux_stream/options_slice'; +import { reset } from '../../../../../common/api/redux_stream/data_slice'; + +import { Page } from '../../../../components/page'; +import { useDeps } from '../../../../hooks/use_deps'; + +import { BarChartRace } from '../../components/bar_chart_race'; +import { getStatusMessage } from '../../components/get_status_message'; + +import { useAppDispatch, useAppSelector } from './hooks'; + +export const PageReduxStream: FC = () => { + const { + core: { http, notifications }, + } = useDeps(); + + const dispatch = useAppDispatch(); + const { isRunning, isCancelled, errors: streamErrors } = useAppSelector((s) => s.stream); + const { progress, entities, errors } = useAppSelector((s) => s.data); + const { simulateErrors, compressResponse, flushFix } = useAppSelector((s) => s.options); + + const abortCtrl = useRef(new AbortController()); + + const onClickHandler = async () => { + if (isRunning) { + abortCtrl.current.abort(); + dispatch(cancelStream()); + } else { + abortCtrl.current = new AbortController(); + dispatch(reset()); + dispatch( + startStream({ + http, + endpoint: RESPONSE_STREAM_API_ENDPOINT.REDUX_STREAM, + apiVersion: '1', + abortCtrl, + body: { compressResponse, flushFix, simulateErrors }, + }) + ); + } + }; + + // TODO This needs to be adapted as it might miss when error messages arrive + // in bulk, but it should be good enough for this demo. This is for low level + // errors on the HTTP level.Note this will only surface errors that happen for + // the original request. Once the stream returns data, it will not be able to + // return errors. This is why we need separate error handling for application + // level errors. + useEffect(() => { + if (streamErrors.length > 0) { + notifications.toasts.addDanger(streamErrors[streamErrors.length - 1]); + } + }, [streamErrors, notifications.toasts]); + + // TODO This approach needs to be adapted as it might miss when error messages arrive bulk. + // This is for errors on the application level + useEffect(() => { + if (errors.length > 0) { + notifications.toasts.addDanger(errors[errors.length - 1]); + } + }, [errors, notifications.toasts]); + + const buttonLabel = isRunning ? 'Stop development' : 'Start development'; + + return ( + + +

+ This demonstrates integration of a single endpoint with streaming support with Redux + Toolkit. The server and client share actions created via `createSlice`. The server sends a + stream of NDJSON data to the client where each line is a redux action. The client then + applies these actions to its state. The package `@kbn/ml-response-stream` exposes a slice + of the state that can be used to start and cancel the stream. The `startStream` action is + implemented as an async thunk that starts the stream and then dispatches received actions + to the store. Hit "Start development" to trigger the bar chart race! +

+
+
+ + + + {buttonLabel} + + + + + {progress}% + + + + + + + + + +

+ {getStatusMessage(isRunning, isCancelled, progress)} +

+ dispatch(setSimulateErrors(!simulateErrors))} + compressed + /> + dispatch(setCompressResponse(!compressResponse))} + compressed + /> + dispatch(setFlushFix(!flushFix))} + compressed + /> +
+
+ ); +}; diff --git a/examples/response_stream/public/containers/app/pages/page_redux_stream/store.tsx b/examples/response_stream/public/containers/app/pages/page_redux_stream/store.tsx new file mode 100644 index 00000000000000..ddeb2654932984 --- /dev/null +++ b/examples/response_stream/public/containers/app/pages/page_redux_stream/store.tsx @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import React, { type FC, type PropsWithChildren } from 'react'; +import { configureStore } from '@reduxjs/toolkit'; +import { Provider } from 'react-redux'; + +import { streamSlice } from '@kbn/ml-response-stream/client'; + +import { optionsSlice } from '../../../../../common/api/redux_stream/options_slice'; +import { dataSlice } from '../../../../../common/api/redux_stream/data_slice'; + +const reduxStore = configureStore({ + reducer: { + // State of the stream: is it running, has it errored, etc. + stream: streamSlice.reducer, + // The actual data returned by the stream. + data: dataSlice.reducer, + // Options for the stream: simulate errors, compress response, etc. + options: optionsSlice.reducer, + }, +}); + +export const ReduxStreamProvider: FC> = ({ children }) => ( + {children} +); + +// Infer the `RootState` and `AppDispatch` types from the store itself +export type AppStore = typeof reduxStore; +export type RootState = ReturnType; +export type AppDispatch = AppStore['dispatch']; diff --git a/examples/response_stream/public/containers/app/pages/page_simple_string_stream/index.tsx b/examples/response_stream/public/containers/app/pages/page_simple_string_stream/index.tsx index 7075656dc01671..3e33ade3814891 100644 --- a/examples/response_stream/public/containers/app/pages/page_simple_string_stream/index.tsx +++ b/examples/response_stream/public/containers/app/pages/page_simple_string_stream/index.tsx @@ -66,7 +66,13 @@ export const PageSimpleStringStream: FC = () => {
- + {buttonLabel} @@ -81,7 +87,7 @@ export const PageSimpleStringStream: FC = () => { /> -

{data}

+

{data}

{errors.length > 0 && ( diff --git a/examples/response_stream/public/routes.tsx b/examples/response_stream/public/routes.tsx index eafe96c320d8cc..f2ec595f1f7538 100644 --- a/examples/response_stream/public/routes.tsx +++ b/examples/response_stream/public/routes.tsx @@ -10,6 +10,8 @@ import React from 'react'; import { PLUGIN_ID, PLUGIN_NAME } from '../common/constants'; import { PageSimpleStringStream } from './containers/app/pages/page_simple_string_stream'; import { PageReducerStream } from './containers/app/pages/page_reducer_stream'; +import { PageReduxStream } from './containers/app/pages/page_redux_stream'; +import { ReduxStreamProvider } from './containers/app/pages/page_redux_stream/store'; interface RouteSectionDef { title: string; @@ -34,10 +36,19 @@ export const routes: RouteSectionDef[] = [ component: , }, { - title: 'Reducer stream', - id: 'reducer-stream', + title: 'NDJSON useReducer stream', + id: 'ndjson-usereducer-stream', component: , }, + { + title: 'NDJSON Redux Toolkit stream', + id: 'ndjson-redux-toolkit-stream', + component: ( + + + + ), + }, ], }, ]; diff --git a/examples/response_stream/server/plugin.ts b/examples/response_stream/server/plugin.ts index 6bb0c550030593..e21c3c3f59287e 100644 --- a/examples/response_stream/server/plugin.ts +++ b/examples/response_stream/server/plugin.ts @@ -9,7 +9,11 @@ import { Plugin, PluginInitializerContext, CoreSetup, CoreStart, Logger } from '@kbn/core/server'; import type { DataRequestHandlerContext } from '@kbn/data-plugin/server'; -import { defineReducerStreamRoute, defineSimpleStringStreamRoute } from './routes'; +import { + defineReducerStreamRoute, + defineReduxStreamRoute, + defineSimpleStringStreamRoute, +} from './routes'; // eslint-disable-next-line @typescript-eslint/no-empty-interface export interface ResponseStreamSetupPlugins {} @@ -29,6 +33,7 @@ export class ResponseStreamPlugin implements Plugin { void core.getStartServices().then(([_, depsStart]) => { defineReducerStreamRoute(router, this.logger); + defineReduxStreamRoute(router, this.logger); defineSimpleStringStreamRoute(router, this.logger); }); } diff --git a/examples/response_stream/server/routes/index.ts b/examples/response_stream/server/routes/index.ts index f18a1283aa0ec0..34d707ed816557 100644 --- a/examples/response_stream/server/routes/index.ts +++ b/examples/response_stream/server/routes/index.ts @@ -7,4 +7,5 @@ */ export { defineReducerStreamRoute } from './reducer_stream'; +export { defineReduxStreamRoute } from './redux_stream'; export { defineSimpleStringStreamRoute } from './single_string_stream'; diff --git a/examples/response_stream/server/routes/reducer_stream.ts b/examples/response_stream/server/routes/reducer_stream.ts index 02606b8c44756d..abdc90f28a23c1 100644 --- a/examples/response_stream/server/routes/reducer_stream.ts +++ b/examples/response_stream/server/routes/reducer_stream.ts @@ -11,14 +11,16 @@ import { streamFactory } from '@kbn/ml-response-stream/server'; import { errorAction, - reducerStreamRequestBodySchema, updateProgressAction, addToEntityAction, deleteEntityAction, ReducerStreamApiAction, -} from '../../common/api/reducer_stream'; +} from '../../common/api/reducer_stream/reducer_actions'; +import { reducerStreamRequestBodySchema } from '../../common/api/reducer_stream'; import { RESPONSE_STREAM_API_ENDPOINT } from '../../common/api'; +import { entities, getActions } from './shared'; + export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => { router.versioned .post({ @@ -64,23 +66,7 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => { request.body.flushFix ); - const entities = [ - 'kimchy', - 's1monw', - 'martijnvg', - 'jasontedor', - 'nik9000', - 'javanna', - 'rjernst', - 'jrodewig', - ]; - - const actions = [...Array(19).fill('add'), 'delete']; - - if (simulateError) { - actions.push('throw-error'); - actions.push('emit-error'); - } + const actions = getActions(simulateError); let progress = 0; @@ -101,19 +87,28 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => { const randomEntity = entities[Math.floor(Math.random() * entities.length)]; const randomAction = actions[Math.floor(Math.random() * actions.length)]; - if (randomAction === 'add') { - const randomCommits = Math.floor(Math.random() * 100); - push(addToEntityAction(randomEntity, randomCommits)); - } else if (randomAction === 'delete') { - push(deleteEntityAction(randomEntity)); - } else if (randomAction === 'throw-error') { - // Throw an error. It should not crash Kibana! - // It should be caught and logged to the Kibana server console. - throw new Error('There was a (simulated) server side error!'); - } else if (randomAction === 'emit-error') { - // Emit an error as a stream action. - push(errorAction('(Simulated) error pushed to the stream')); - return; + switch (randomAction) { + case 'add': + const randomCommits = Math.floor(Math.random() * 100); + push(addToEntityAction(randomEntity, randomCommits)); + break; + + case 'delete': + push(deleteEntityAction(randomEntity)); + break; + + case 'throw-error': + // Throw an error. It should not crash Kibana! + // It should be caught and logged to the Kibana server console. + // The stream will just stop but the client will note receive an error! + // In practice this pattern should be avoided as it will just end + // the stream without an explanation. + throw new Error('There was a (simulated) server side error!'); + + case 'emit-error': + // Emit an error as a stream action. + push(errorAction('(Simulated) error pushed to the stream')); + return; } void pushStreamUpdate(); diff --git a/examples/response_stream/server/routes/redux_stream.ts b/examples/response_stream/server/routes/redux_stream.ts new file mode 100644 index 00000000000000..bd694c531907b0 --- /dev/null +++ b/examples/response_stream/server/routes/redux_stream.ts @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import type { IRouter, Logger } from '@kbn/core/server'; +import { streamFactory } from '@kbn/ml-response-stream/server'; + +import { + updateProgress, + addToEntity, + deleteEntity, + error, + type ReduxStreamApiAction, +} from '../../common/api/redux_stream/data_slice'; +import { reducerStreamRequestBodySchema } from '../../common/api/reducer_stream'; +import { RESPONSE_STREAM_API_ENDPOINT } from '../../common/api'; + +import { entities, getActions } from './shared'; + +export const defineReduxStreamRoute = (router: IRouter, logger: Logger) => { + router.versioned + .post({ + path: RESPONSE_STREAM_API_ENDPOINT.REDUX_STREAM, + access: 'internal', + }) + .addVersion( + { + version: '1', + validate: { + request: { + body: reducerStreamRequestBodySchema, + }, + }, + }, + async (context, request, response) => { + const maxTimeoutMs = request.body.timeout ?? 250; + const simulateError = request.body.simulateErrors ?? false; + + let logMessageCounter = 1; + + function logDebugMessage(msg: string) { + logger.debug(`Response Stream Example #${logMessageCounter}: ${msg}`); + logMessageCounter++; + } + + logDebugMessage('Starting stream.'); + + let shouldStop = false; + request.events.aborted$.subscribe(() => { + logDebugMessage('aborted$ subscription trigger.'); + shouldStop = true; + }); + request.events.completed$.subscribe(() => { + logDebugMessage('completed$ subscription trigger.'); + shouldStop = true; + }); + + const { end, push, responseWithHeaders } = streamFactory( + request.headers, + logger, + request.body.compressResponse, + request.body.flushFix + ); + + const actions = getActions(simulateError); + + let progress = 0; + + async function pushStreamUpdate() { + await new Promise((resolve) => + setTimeout(resolve, Math.floor(Math.random() * maxTimeoutMs)) + ); + try { + progress++; + + if (progress > 100 || shouldStop) { + end(); + return; + } + + push(updateProgress(progress)); + + const randomEntity = entities[Math.floor(Math.random() * entities.length)]; + const randomAction = actions[Math.floor(Math.random() * actions.length)]; + + switch (randomAction) { + case 'add': + const randomCommits = Math.floor(Math.random() * 100); + push(addToEntity({ entity: randomEntity, value: randomCommits })); + break; + + case 'delete': + push(deleteEntity(randomEntity)); + break; + + case 'throw-error': + // Throw an error. It should not crash Kibana! + // It should be caught and logged to the Kibana server console. + // The stream will just stop but the client will note receive an error! + // In practice this pattern should be avoided as it will just end + // the stream without an explanation. + throw new Error('There was a (simulated) server side error!'); + + case 'emit-error': + // Emit an error as a stream action. + push(error('(Simulated) error pushed to the stream')); + return; + } + + void pushStreamUpdate(); + } catch (e) { + logger.error(e); + } + } + + // do not call this using `await` so it will run asynchronously while we return the stream already. + void pushStreamUpdate(); + + return response.ok(responseWithHeaders); + } + ); +}; diff --git a/examples/response_stream/server/routes/shared.ts b/examples/response_stream/server/routes/shared.ts new file mode 100644 index 00000000000000..72223d21e09ffc --- /dev/null +++ b/examples/response_stream/server/routes/shared.ts @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export const entities = [ + 'kimchy', + 's1monw', + 'martijnvg', + 'jasontedor', + 'nik9000', + 'javanna', + 'rjernst', + 'jrodewig', +]; + +export const getActions = (simulateError: boolean) => { + const actions = [...Array(19).fill('add'), 'delete']; + + if (simulateError) { + actions.push('throw-error'); + actions.push('emit-error'); + } + + return actions; +}; diff --git a/test/examples/config.js b/test/examples/config.js index 77d73642afd1cb..dd8b49753dba5c 100644 --- a/test/examples/config.js +++ b/test/examples/config.js @@ -31,6 +31,7 @@ export default async function ({ readConfigFile }) { require.resolve('./unified_field_list_examples'), require.resolve('./discover_customization_examples'), require.resolve('./error_boundary'), + require.resolve('./response_stream'), ], services: { ...functionalConfig.get('services'), diff --git a/test/examples/response_stream/index.ts b/test/examples/response_stream/index.ts index b918de669819ba..2f13bf2f9f70cb 100644 --- a/test/examples/response_stream/index.ts +++ b/test/examples/response_stream/index.ts @@ -10,7 +10,17 @@ import { FtrProviderContext } from '../../functional/ftr_provider_context'; // eslint-disable-next-line import/no-default-export export default function ({ getService, getPageObjects, loadTestFile }: FtrProviderContext) { - describe('response stream', function () { + const browser = getService('browser'); + const PageObjects = getPageObjects(['common', 'header']); + + describe('response-stream', function () { + before(async () => { + await browser.setWindowSize(1300, 900); + await PageObjects.common.navigateToApp('response-stream', { insertTimestamp: false }); + }); + + loadTestFile(require.resolve('./string_stream')); loadTestFile(require.resolve('./reducer_stream')); + loadTestFile(require.resolve('./redux_stream')); }); } diff --git a/test/examples/response_stream/reducer_stream.ts b/test/examples/response_stream/reducer_stream.ts index 001fea1a144c81..4e9c0f9a7af090 100644 --- a/test/examples/response_stream/reducer_stream.ts +++ b/test/examples/response_stream/reducer_stream.ts @@ -6,84 +6,47 @@ * Side Public License, v 1. */ -import fetch from 'node-fetch'; -import { format as formatUrl } from 'url'; - import expect from '@kbn/expect'; - import { FtrProviderContext } from '../../functional/ftr_provider_context'; -import { parseStream } from './parse_stream'; - // eslint-disable-next-line import/no-default-export -export default ({ getService }: FtrProviderContext) => { - const supertest = getService('supertest'); - const config = getService('config'); - const kibanaServerUrl = formatUrl(config.get('servers.kibana')); - - describe('POST /internal/response_stream/reducer_stream', () => { - it('should return full data without streaming', async () => { - const resp = await supertest - .post('/internal/response_stream/reducer_stream') - .set('kbn-xsrf', 'kibana') - .send({ - timeout: 1, - }) - .expect(200); - - expect(Buffer.isBuffer(resp.body)).to.be(true); - - const chunks: string[] = resp.body.toString().split('\n'); - - expect(chunks.length).to.be(201); - - const lastChunk = chunks.pop(); - expect(lastChunk).to.be(''); - - let data: any[] = []; +export default function ({ getService, getPageObjects }: FtrProviderContext) { + const testSubjects = getService('testSubjects'); + const retry = getService('retry'); + + describe('useReducer stream example', () => { + it('navigates to the example', async () => { + await testSubjects.click('ndjson-usereducer-stream'); + + await retry.try(async () => { + expect(await testSubjects.getVisibleText('responseStreamPageTitle')).to.be( + 'NDJSON useReducer stream' + ); + expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).to.be('0%'); + expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be( + 'Development did not start yet.' + ); + }); + }); - expect(() => { - data = chunks.map((c) => JSON.parse(c)); - }).not.to.throwError(); + it('starts the stream', async () => { + await testSubjects.click('responseStreamStartButton'); - data.forEach((d) => { - expect(typeof d.type).to.be('string'); + await retry.try(async () => { + expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).not.to.be('0%'); + expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be( + 'Development is ongoing, the hype is real!' + ); }); - - const progressData = data.filter((d) => d.type === 'update_progress'); - expect(progressData.length).to.be(100); - expect(progressData[0].payload).to.be(1); - expect(progressData[progressData.length - 1].payload).to.be(100); }); - it('should return data in chunks with streaming', async () => { - const response = await fetch(`${kibanaServerUrl}/internal/response_stream/reducer_stream`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'kbn-xsrf': 'stream', - }, - body: JSON.stringify({ timeout: 1 }), + it('finishes the stream', async () => { + await retry.tryForTime(60000, async () => { + expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).to.be('100%'); + expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be( + 'Development completed, the release got out the door!' + ); }); - - const stream = response.body; - - expect(stream).not.to.be(null); - - if (stream !== null) { - const progressData: any[] = []; - - for await (const action of parseStream(stream)) { - expect(action.type).not.to.be('error'); - if (action.type === 'update_progress') { - progressData.push(action); - } - } - - expect(progressData.length).to.be(100); - expect(progressData[0].payload).to.be(1); - expect(progressData[progressData.length - 1].payload).to.be(100); - } }); }); -}; +} diff --git a/test/examples/response_stream/redux_stream.ts b/test/examples/response_stream/redux_stream.ts new file mode 100644 index 00000000000000..6a9e73d1d35a5d --- /dev/null +++ b/test/examples/response_stream/redux_stream.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import expect from '@kbn/expect'; +import { FtrProviderContext } from '../../functional/ftr_provider_context'; + +// eslint-disable-next-line import/no-default-export +export default function ({ getService, getPageObjects }: FtrProviderContext) { + const testSubjects = getService('testSubjects'); + const retry = getService('retry'); + + describe('redux stream example', () => { + it('navigates to the example', async () => { + await testSubjects.click('ndjson-redux-toolkit-stream'); + + await retry.try(async () => { + expect(await testSubjects.getVisibleText('responseStreamPageTitle')).to.be( + 'NDJSON Redux Toolkit stream' + ); + expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).to.be('0%'); + expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be( + 'Development did not start yet.' + ); + }); + }); + + it('starts the stream', async () => { + await testSubjects.click('responseStreamStartButton'); + + await retry.try(async () => { + expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).not.to.be('0%'); + expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be( + 'Development is ongoing, the hype is real!' + ); + }); + }); + + it('finishes the stream', async () => { + await retry.tryForTime(60000, async () => { + expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).to.be('100%'); + expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be( + 'Development completed, the release got out the door!' + ); + }); + }); + }); +} diff --git a/test/examples/response_stream/string_stream.ts b/test/examples/response_stream/string_stream.ts new file mode 100644 index 00000000000000..d522b8f5f24848 --- /dev/null +++ b/test/examples/response_stream/string_stream.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import expect from '@kbn/expect'; +import { FtrProviderContext } from '../../functional/ftr_provider_context'; + +// eslint-disable-next-line import/no-default-export +export default function ({ getService, getPageObjects }: FtrProviderContext) { + const testSubjects = getService('testSubjects'); + const retry = getService('retry'); + + describe('string stream example', () => { + it('navigates to the example', async () => { + await testSubjects.click('simple-string-stream'); + + await retry.try(async () => { + expect(await testSubjects.getVisibleText('responseStreamPageTitle')).to.be( + 'Simple string stream' + ); + expect(await testSubjects.exists('responseStreamStartButton')).to.be(true); + expect(await testSubjects.getVisibleText('responseStreamString')).to.be(''); + }); + }); + + it('starts the stream', async () => { + await testSubjects.click('responseStreamStartButton'); + + await retry.try(async () => { + expect(await testSubjects.getVisibleText('responseStreamString')).not.to.be(''); + }); + }); + + it('finishes the stream', async () => { + await retry.tryForTime(60000, async () => { + expect(await testSubjects.getVisibleText('responseStreamString')).to.be( + 'Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License. Official clients are available in Java, .NET (C#), PHP, Python, Apache Groovy, Ruby and many other languages. According to the DB-Engines ranking, Elasticsearch is the most popular enterprise search engine.' + ); + }); + }); + }); +} diff --git a/x-pack/packages/ml/response_stream/client/constants.ts b/x-pack/packages/ml/response_stream/client/constants.ts new file mode 100644 index 00000000000000..52d2aedaf83521 --- /dev/null +++ b/x-pack/packages/ml/response_stream/client/constants.ts @@ -0,0 +1,8 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const DATA_THROTTLE_MS = 100; diff --git a/x-pack/packages/ml/response_stream/client/index.ts b/x-pack/packages/ml/response_stream/client/index.ts index a8b02cecd9cf6b..a0bc59c34deec3 100644 --- a/x-pack/packages/ml/response_stream/client/index.ts +++ b/x-pack/packages/ml/response_stream/client/index.ts @@ -6,4 +6,5 @@ */ export { fetchStream } from './fetch_stream'; +export { cancelStream, startStream, streamSlice } from './stream_slice'; export { useFetchStream } from './use_fetch_stream'; diff --git a/x-pack/packages/ml/response_stream/client/stream_slice.ts b/x-pack/packages/ml/response_stream/client/stream_slice.ts new file mode 100644 index 00000000000000..d8b7888dd4c6f0 --- /dev/null +++ b/x-pack/packages/ml/response_stream/client/stream_slice.ts @@ -0,0 +1,148 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { AnyAction, PayloadAction } from '@reduxjs/toolkit'; +import { createAsyncThunk, createSlice } from '@reduxjs/toolkit'; +import { batch } from 'react-redux'; + +import type { HttpSetup, HttpFetchOptions } from '@kbn/core/public'; +import { fetchStream } from './fetch_stream'; +import { DATA_THROTTLE_MS } from './constants'; + +/** + * Async thunk to start the stream. + */ +export const startStream = createAsyncThunk( + 'stream/start', + async ( + options: { + http: HttpSetup; + endpoint: string; + apiVersion?: string; + abortCtrl: React.MutableRefObject; + body?: any; + headers?: HttpFetchOptions['headers']; + }, + thunkApi + ) => { + const { http, endpoint, apiVersion, abortCtrl, body, headers } = options; + + const fetchState = { isActive: true }; + + // Custom buffering to avoid hammering the DOM with updates. + // We can revisit this once Kibana is on React 18. + const actionBuffer: AnyAction[] = []; + function flushBuffer(withTimeout = true) { + batch(() => { + for (const action of actionBuffer) { + thunkApi.dispatch(action); + } + }); + actionBuffer.length = 0; + + if (withTimeout) { + setTimeout(() => { + if (fetchState.isActive) { + flushBuffer(); + } + }, DATA_THROTTLE_MS); + } + } + + flushBuffer(); + + for await (const [fetchStreamError, action] of fetchStream( + http, + endpoint, + apiVersion, + abortCtrl, + body, + true, + headers + )) { + if (fetchStreamError !== null) { + actionBuffer.push(addError(fetchStreamError)); + } else if (action) { + actionBuffer.push(action); + } + } + + fetchState.isActive = false; + flushBuffer(false); + }, + { + condition: (_, { getState }) => { + // This is a bit of a hack to prevent instant restarts while the stream is running. + // The problem is that in RTK v1, async thunks cannot be made part of the slice, + // so they will not know the namespace used where they run in. We just assume + // `stream` here as the namespace, if it's a custom one, this will not work. + // RTK v2 will allow async thunks to be part of the slice, a draft PR to upgrade + // is up there: https://github.com/elastic/kibana/pull/178986 + try { + const s = getState() as { stream?: StreamState }; + + if (s.stream === undefined) { + return true; + } + + // If the stream was running, the extra reducers will also have set + // and error, so we need to prevent the stream from starting again. + if (s.stream.isRunning && s.stream.errors.length > 0) { + return false; + } + } catch (e) { + return true; + } + }, + } +); + +export interface StreamState { + errors: string[]; + isCancelled: boolean; + isRunning: boolean; +} + +function getDefaultState(): StreamState { + return { + errors: [], + isCancelled: false, + isRunning: false, + }; +} + +export const streamSlice = createSlice({ + name: 'stream', + initialState: getDefaultState(), + reducers: { + addError: (state: StreamState, action: PayloadAction) => { + state.errors.push(action.payload); + }, + cancelStream: (state: StreamState) => { + state.isCancelled = true; + state.isRunning = false; + }, + }, + extraReducers: (builder) => { + builder.addCase(startStream.pending, (state) => { + if (state.isRunning) { + state.errors.push('Instant restart while running not supported yet.'); + return; + } + + state.errors = []; + state.isCancelled = false; + state.isRunning = true; + }); + builder.addCase(startStream.fulfilled, (state) => { + state.isRunning = false; + }); + }, +}); + +// Action creators are generated for each case reducer function +export const { addError, cancelStream } = streamSlice.actions; diff --git a/x-pack/packages/ml/response_stream/client/use_fetch_stream.ts b/x-pack/packages/ml/response_stream/client/use_fetch_stream.ts index 55950c6ee8f774..1e0deeb7f8e26e 100644 --- a/x-pack/packages/ml/response_stream/client/use_fetch_stream.ts +++ b/x-pack/packages/ml/response_stream/client/use_fetch_stream.ts @@ -19,8 +19,7 @@ import { isPopulatedObject } from '@kbn/ml-is-populated-object'; import { fetchStream } from './fetch_stream'; import { stringReducer, type StringReducer } from './string_reducer'; - -const DATA_THROTTLE_MS = 100; +import { DATA_THROTTLE_MS } from './constants'; // This pattern with a dual ternary allows us to default to StringReducer // and if a custom reducer is supplied fall back to that one instead. @@ -80,7 +79,8 @@ export function useFetchStream>( // a lot of unnecessary re-renders even in combination with `useThrottle`. // We're now using `dataRef` to allow updates outside of the render cycle. // When the stream is running, we'll update `data` with the `dataRef` value - // periodically. + // periodically. This will get simpler with React 18 where we + // can make use of `useDeferredValue`. const [data, setData] = useState(reducerWithFallback.initialState); const dataRef = useRef(reducerWithFallback.initialState);