diff --git a/pkg/console/internal/events/events.go b/pkg/console/internal/events/events.go index 3422b160dc..e1f66c33db 100644 --- a/pkg/console/internal/events/events.go +++ b/pkg/console/internal/events/events.go @@ -19,6 +19,7 @@ import ( "context" "net/http" "sync" + "time" "github.com/gorilla/mux" "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" @@ -26,8 +27,10 @@ import ( "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/middleware" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/errorcontext" "go.thethings.network/lorawan-stack/v3/pkg/events" "go.thethings.network/lorawan-stack/v3/pkg/log" + "go.thethings.network/lorawan-stack/v3/pkg/random" "go.thethings.network/lorawan-stack/v3/pkg/ratelimit" "go.thethings.network/lorawan-stack/v3/pkg/task" "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" @@ -40,6 +43,9 @@ import ( const ( authorizationProtocolPrefix = "ttn.lorawan.v3.header.authorization.bearer." protocolV1 = "ttn.lorawan.v3.console.internal.events.v1" + + pingPeriod = time.Minute + pingJitter = 0.1 ) // Component is the interface of the component to the events API handler. @@ -95,7 +101,7 @@ func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) { } defer conn.Close(websocket.StatusNormalClosure, "main task closed") - ctx, cancel := context.WithCancelCause(ctx) + ctx, cancel := errorcontext.New(ctx) defer cancel(nil) var wg sync.WaitGroup @@ -108,6 +114,7 @@ func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) { "console_events_mux": makeMuxTask(m, cancel), "console_events_read": makeReadTask(conn, m, rateLimit, cancel), "console_events_write": makeWriteTask(conn, m, cancel), + "console_events_ping": makePingTask(conn, cancel, random.Jitter(pingPeriod, pingJitter)), } { wg.Add(1) h.component.StartTask(&task.Config{ diff --git a/pkg/console/internal/events/eventsmux/mux.go b/pkg/console/internal/events/eventsmux/mux.go index e0874f9c51..8c5c2af3ba 100644 --- a/pkg/console/internal/events/eventsmux/mux.go +++ b/pkg/console/internal/events/eventsmux/mux.go @@ -18,8 +18,10 @@ package eventsmux import ( "context" + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/errorcontext" "go.thethings.network/lorawan-stack/v3/pkg/events" "go.thethings.network/lorawan-stack/v3/pkg/log" ) @@ -54,7 +56,7 @@ func (m *mux) Responses() <-chan protocol.Response { // Run implements Interface. func (m *mux) Run(ctx context.Context) (err error) { - ctx, cancel := context.WithCancelCause(ctx) + ctx, cancel := errorcontext.New(ctx) defer func() { cancel(err) }() subs := m.createSubs(ctx, cancel) defer subs.Close() @@ -63,6 +65,9 @@ func (m *mux) Run(ctx context.Context) (err error) { case <-ctx.Done(): return ctx.Err() case req := <-m.requestCh: + if err := rights.RequireAuthenticated(ctx); err != nil { + return err + } var resp protocol.Response switch req := req.(type) { case *protocol.SubscribeRequest: diff --git a/pkg/console/internal/events/eventsmux/mux_test.go b/pkg/console/internal/events/eventsmux/mux_test.go index de220f52fa..895da2db1d 100644 --- a/pkg/console/internal/events/eventsmux/mux_test.go +++ b/pkg/console/internal/events/eventsmux/mux_test.go @@ -125,6 +125,10 @@ func TestMux(t *testing.T) { // nolint:gocyclo unique.ID(ctx, appIDs): ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL), }), }) + ctx = rights.NewContextWithAuthInfo(ctx, &ttnpb.AuthInfoResponse{ + UniversalRights: ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL), + IsAdmin: true, + }) subs := &mockSubscriptions{ ctx: ctx, diff --git a/pkg/console/internal/events/subscriptions/subscriptions.go b/pkg/console/internal/events/subscriptions/subscriptions.go index 0a099fffdd..faea7a4b73 100644 --- a/pkg/console/internal/events/subscriptions/subscriptions.go +++ b/pkg/console/internal/events/subscriptions/subscriptions.go @@ -22,6 +22,7 @@ import ( "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" "go.thethings.network/lorawan-stack/v3/pkg/auth/rights/rightsutil" + "go.thethings.network/lorawan-stack/v3/pkg/errorcontext" "go.thethings.network/lorawan-stack/v3/pkg/errors" "go.thethings.network/lorawan-stack/v3/pkg/events" "go.thethings.network/lorawan-stack/v3/pkg/log" @@ -141,7 +142,7 @@ func (s *subscriptions) Subscribe( return err } ch := make(chan events.Event, channelSize(tail)) - ctx, cancel := context.WithCancelCause(s.ctx) + ctx, cancel := errorcontext.New(s.ctx) defer func() { if err != nil { cancel(err) diff --git a/pkg/console/internal/events/subscriptions/subscriptions_test.go b/pkg/console/internal/events/subscriptions/subscriptions_test.go index 10aaf738bb..a397789e1c 100644 --- a/pkg/console/internal/events/subscriptions/subscriptions_test.go +++ b/pkg/console/internal/events/subscriptions/subscriptions_test.go @@ -22,6 +22,7 @@ import ( "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/errorcontext" "go.thethings.network/lorawan-stack/v3/pkg/events" "go.thethings.network/lorawan-stack/v3/pkg/task" "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" @@ -143,7 +144,7 @@ func runTestSubscriptions( _, historical := subscriber.(interface{ historical() }) a, ctx := test.New(t) - ctx, cancel := context.WithCancelCause(ctx) + ctx, cancel := errorcontext.New(ctx) defer cancel(nil) timeout := test.Delay << 3 diff --git a/pkg/console/internal/events/tasks.go b/pkg/console/internal/events/tasks.go index af7024a5b6..298cb1f3f6 100644 --- a/pkg/console/internal/events/tasks.go +++ b/pkg/console/internal/events/tasks.go @@ -18,6 +18,7 @@ import ( "context" "errors" "io" + "time" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" @@ -79,3 +80,21 @@ func makeWriteTask(conn *websocket.Conn, m eventsmux.Interface, cancel func(erro } } } + +func makePingTask(conn *websocket.Conn, cancel func(error), period time.Duration) func(context.Context) error { + return func(ctx context.Context) (err error) { + ticker := time.NewTicker(period) + defer ticker.Stop() + defer func() { cancel(err) }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if err := conn.Ping(ctx); err != nil { + return err + } + } + } + } +} diff --git a/pkg/webui/console/lib/events/utils.js b/pkg/webui/console/lib/events/utils.js index fa1d34436b..57ed39e7d7 100644 --- a/pkg/webui/console/lib/events/utils.js +++ b/pkg/webui/console/lib/events/utils.js @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +import { ingestError } from '@ttn-lw/lib/errors/utils' + import { createNetworkErrorEvent, createUnknownErrorEvent } from './definitions' export const defineSyntheticEvent = name => data => ({ @@ -23,13 +25,30 @@ export const defineSyntheticEvent = name => data => ({ data, }) -export const createSyntheticEventFromError = error => { +const convertError = error => { if (error instanceof Error) { - const errorString = error.toString() - if (error.message === 'network error' || error.message === 'Error in body stream') { - return createNetworkErrorEvent({ error: errorString }) + return { + ...error, + message: error.message, + name: error.name, + // The stack is omitted intentionally, as it is not relevant for a user. } + } + return error +} - return createUnknownErrorEvent({ error: errorString }) +export const createSyntheticEventFromError = error => { + if (error instanceof Error) { + if ( + error.name === 'ConnectionError' || + error.name === 'ConnectionClosedError' || + error.name === 'ConnectionTimeoutError' + ) { + return createNetworkErrorEvent({ error: convertError(error) }) + } else if (error.name === 'ProtocolError') { + ingestError(error.error) + return createUnknownErrorEvent({ error: convertError(error) }) + } + return createUnknownErrorEvent({ error: convertError(error) }) } } diff --git a/pkg/webui/console/store/middleware/logics/events.js b/pkg/webui/console/store/middleware/logics/events.js index 513fb8ef0b..635602bf77 100644 --- a/pkg/webui/console/store/middleware/logics/events.js +++ b/pkg/webui/console/store/middleware/logics/events.js @@ -19,7 +19,7 @@ import CONNECTION_STATUS from '@console/constants/connection-status' import EVENT_TAIL from '@console/constants/event-tail' import { getCombinedDeviceId } from '@ttn-lw/lib/selectors/id' -import { isUnauthenticatedError, isNetworkError, isTimeoutError } from '@ttn-lw/lib/errors/utils' +import { TokenError } from '@ttn-lw/lib/errors/custom-errors' import { SET_CONNECTION_STATUS, setStatusChecking } from '@ttn-lw/lib/store/actions/status' import { selectIsOnlineStatus } from '@ttn-lw/lib/store/selectors/status' @@ -29,7 +29,6 @@ import { createStartEventsStreamFailureActionType, createStartEventsStreamSuccessActionType, createEventStreamClosedActionType, - createGetEventMessageFailureActionType, createGetEventMessageSuccessActionType, createSetEventsFilterActionType, getEventMessageSuccess, @@ -65,7 +64,6 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { const START_EVENTS_FAILURE = createStartEventsStreamFailureActionType(reducerName) const STOP_EVENTS = createStopEventsStreamActionType(reducerName) const EVENT_STREAM_CLOSED = createEventStreamClosedActionType(reducerName) - const GET_EVENT_MESSAGE_FAILURE = createGetEventMessageFailureActionType(reducerName) const GET_EVENT_MESSAGE_SUCCESS = createGetEventMessageSuccessActionType(reducerName) const SET_EVENT_FILTER = createSetEventsFilterActionType(reducerName) const startEventsSuccess = startEventsStreamSuccess(reducerName) @@ -93,8 +91,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { }, validate: ({ getState, action = {} }, allow, reject) => { if (!action.id) { - reject() - return + return reject() } const id = typeof action.id === 'object' ? getCombinedDeviceId(action.id) : action.id @@ -106,11 +103,10 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { const connected = status === CONNECTION_STATUS.CONNECTED const connecting = status === CONNECTION_STATUS.CONNECTING if (connected || connecting || !isOnline) { - reject() - return + return reject() } - allow(action) + return allow(action) }, process: async ({ getState, action }, dispatch) => { const { id, silent } = action @@ -131,22 +127,18 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { const filterRegExp = Boolean(filter) ? filter.filterRegExp : undefined try { - channel = await onEventsStart([id], filterRegExp, EVENT_TAIL, after) + const listeners = { + message: message => dispatch(getEventSuccess(id, message)), + error: error => dispatch(getEventFailure(id, error)), + close: wasClientRequest => dispatch(closeEvents(id, { silent: wasClientRequest })), + } + channel = await onEventsStart([id], filterRegExp, EVENT_TAIL, after, listeners) dispatch(startEventsSuccess(id, { silent })) - - channel.on('message', message => dispatch(getEventSuccess(id, message))) - channel.on('error', error => dispatch(getEventFailure(id, error))) - channel.on('close', wasClientRequest => { - dispatch(closeEvents(id, { silent: wasClientRequest })) - channel = null - }) + channel.open() } catch (error) { - if (isUnauthenticatedError(error)) { + if (error instanceof TokenError) { // The user is no longer authenticated; reinitiate the auth flow // by refreshing the page. - // NOTE: As a result of the WebSocket refactor, the error shape is - // now very unspecific and authentication errors like before are - // not thrown anymore. This should be addressed eventually. window.location.reload() } else { dispatch(startEventsFailure(id, error)) @@ -158,8 +150,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { type: [STOP_EVENTS, START_EVENTS_FAILURE], validate: ({ getState, action = {} }, allow, reject) => { if (!action.id) { - reject() - return + return reject() } const id = typeof action.id === 'object' ? getCombinedDeviceId(action.id) : action.id @@ -169,40 +160,36 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { const connected = status === CONNECTION_STATUS.CONNECTED const connecting = status === CONNECTION_STATUS.CONNECTING if (!connected && !connecting) { - reject() - return + return reject() } - allow(action) + return allow(action) }, process: async ({ action }, dispatch, done) => { if (action.type === START_EVENTS_FAILURE) { - if (action?.error?.message === 'timeout') { - // Set the connection status to `checking` to trigger connection checks - // and detect possible offline state. - dispatch(setStatusChecking()) + // Set the connection status to `checking` to trigger connection checks + // and detect possible offline state. + dispatch(setStatusChecking()) - // In case of a network error, the connection could not be closed - // since the network connection is disrupted. We can regard this - // as equivalent to a closed connection. - return done() - } + // In case of a network error, the connection could not be closed + // since the network connection is disrupted. We can regard this + // as equivalent to a closed connection. + return done() } if (action.type === STOP_EVENTS && Boolean(channel)) { // Close the connection if it wasn't closed already. await channel.close() } - done() + return done() }, }), createLogic({ - type: [GET_EVENT_MESSAGE_FAILURE, EVENT_STREAM_CLOSED], + type: EVENT_STREAM_CLOSED, cancelType: [START_EVENTS_SUCCESS, GET_EVENT_MESSAGE_SUCCESS, STOP_EVENTS], warnTimeout: 0, validate: ({ getState, action = {} }, allow, reject) => { if (!action.id) { - reject() - return + return reject() } const id = typeof action.id === 'object' ? getCombinedDeviceId(action.id) : action.id @@ -211,11 +198,11 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { const status = selectEntityEventsStatus(getState(), id) const connected = status === CONNECTION_STATUS.CONNECTED const interrupted = selectEntityEventsInterrupted(getState(), id) - if (!connected && interrupted) { - reject() + if (!connected || interrupted) { + return reject() } - allow(action) + return allow(action) }, process: ({ getState, action }, dispatch, done) => { const isOnline = selectIsOnlineStatus(getState()) @@ -234,11 +221,11 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { dispatch(startEvents(action.id)) } else { clearInterval(reconnector) - done() + return done() } }, 5000) } else { - done() + return done() } }, }), @@ -274,27 +261,18 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { // If the app went offline, close the event stream. } - done() + return done() }, }), createLogic({ type: SET_EVENT_FILTER, debounce: 250, process: async ({ action }, dispatch, done) => { - if (channel) { - try { - await channel.close() - } catch (error) { - if (isNetworkError(error) || isTimeoutError(action.payload)) { - dispatch(setStatusChecking()) - } else { - throw error - } - } finally { - dispatch(startEvents(action.id, { silent: true })) - } + if (Boolean(channel)) { + await channel.close() + dispatch(startEvents(action.id, { silent: true })) } - done() + return done() }, }), ] diff --git a/pkg/webui/console/store/reducers/events.js b/pkg/webui/console/store/reducers/events.js index e16d23bfc0..dc0e6b6022 100644 --- a/pkg/webui/console/store/reducers/events.js +++ b/pkg/webui/console/store/reducers/events.js @@ -150,13 +150,12 @@ const createNamedEventReducer = (reducerName = '') => { : state.events), error: action.error, status: CONNECTION_STATUS.DISCONNECTED, + interrupted: true, } case GET_EVENT_FAILURE: return { ...state, ...addEvent(state, createSyntheticEventFromError(action.error)), - status: CONNECTION_STATUS.DISCONNECTED, - interrupted: true, } case PAUSE_EVENTS: return { diff --git a/sdk/js/src/api/stream/shared.js b/sdk/js/src/api/stream/shared.js index 8d1b8b18ff..e5765f20f3 100644 --- a/sdk/js/src/api/stream/shared.js +++ b/sdk/js/src/api/stream/shared.js @@ -19,7 +19,6 @@ export const notify = (listener, ...args) => { } export const EVENTS = Object.freeze({ - OPEN: 'open', MESSAGE: 'message', ERROR: 'error', CLOSE: 'close', @@ -31,3 +30,35 @@ export const MESSAGE_TYPES = Object.freeze({ PUBLISH: 'publish', ERROR: 'error', }) + +export const INITIAL_LISTENERS = Object.freeze( + Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, [curr]: {} }), {}), +) + +export const newQueuedListeners = listeners => { + const queue = [] + let open = false + const queuedListeners = Object.values(EVENTS).reduce( + (acc, curr) => ({ + ...acc, + [curr]: (...args) => { + if (open) { + notify(listeners[curr], ...args) + } else { + queue.push([curr, args]) + } + }, + }), + {}, + ) + return [ + () => { + open = true + for (const [event, args] of queue) { + notify(listeners[event], ...args) + } + queue.splice(0, queue.length) + }, + queuedListeners, + ] +} diff --git a/sdk/js/src/api/stream/subscribeToWebSocketStream.js b/sdk/js/src/api/stream/subscribeToWebSocketStream.js index ca38abb5a0..bca8a83b18 100644 --- a/sdk/js/src/api/stream/subscribeToWebSocketStream.js +++ b/sdk/js/src/api/stream/subscribeToWebSocketStream.js @@ -15,91 +15,252 @@ import traverse from 'traverse' import Token from '../../util/token' -import { warn } from '../../../../../pkg/webui/lib/log' -import { notify, EVENTS, MESSAGE_TYPES } from './shared' +import { notify, newQueuedListeners, EVENTS, MESSAGE_TYPES, INITIAL_LISTENERS } from './shared' -const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, [curr]: {} }), {}) +export class ConnectionError extends Error { + constructor(message) { + super(message) + this.name = 'ConnectionError' + } +} -const store = () => { - const connections = {} +export class ConnectionClosedError extends ConnectionError { + constructor(message, code) { + super(message) + this.name = 'ConnectionClosedError' + this.code = code + } +} - return { - getInstance: url => traverse(connections).get([url, 'instance']), - setInstance: (url, instance) => { - traverse(connections).set([url, 'instance'], instance) - return instance +export class ConnectionTimeoutError extends ConnectionError { + constructor(message) { + super(message) + this.name = 'ConnectionTimeoutError' + } +} + +export class ProtocolError extends Error { + constructor(error) { + super(error.message) + this.name = 'ProtocolError' + this.error = error + } +} + +const newSubscription = ( + unsubscribe, + originalListeners, + resolveSubscribe, + rejectSubscribe, + resolveClose, +) => { + let closeRequested = false + const [open, listeners] = newQueuedListeners(originalListeners) + const externalSubscription = { + open, + close: () => { + closeRequested = true + return unsubscribe() }, - deleteInstance: url => { - if (url in connections) { - delete connections[url] - } + } + return { + onError: err => { + notify(listeners[EVENTS.ERROR], err) + + rejectSubscribe(err) }, - getSubscriptions: url => Object.values(traverse(connections).get([url, 'subscriptions'] || {})), - getSubscription: (url, sid) => traverse(connections).get([url, 'subscriptions', sid]) || null, - setSubscription: (url, sid, subscription) => { - const subs = traverse(connections).get([url, 'subscriptions']) || {} - subs[sid] = subscription - traverse(connections).set([url, 'subscriptions'], subs) - return subs[sid] + onClose: closeEvent => { + notify(listeners[EVENTS.CLOSE], closeRequested) + + rejectSubscribe(new ConnectionClosedError('WebSocket connection closed', closeEvent.code)) + resolveClose() }, - markSubscriptionClosing: (url, sid) => { - if (traverse(connections).has([url, 'subscriptions', sid])) { - traverse(connections).set([url, 'subscriptions', sid, 'closeRequested'], true) + onMessage: dataParsed => { + if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) { + resolveSubscribe(externalSubscription) + } + + if (dataParsed.type === MESSAGE_TYPES.ERROR) { + const err = new ProtocolError(dataParsed.error) + notify(listeners[EVENTS.ERROR], err) + + rejectSubscribe(err) + } + + if (dataParsed.type === MESSAGE_TYPES.PUBLISH) { + notify(listeners[EVENTS.MESSAGE], dataParsed.event) + } + + if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) { + notify(listeners[EVENTS.CLOSE], closeRequested) + + resolveClose() } }, - getSubscriptionCount: url => { - const subs = traverse(connections).get([url, 'subscriptions']) - return subs ? Object.keys(subs).length : 0 - }, - deleteSubscription: (url, sid) => { - const subscriptions = traverse(connections).get([url, 'subscriptions']) - if (subscriptions && subscriptions[sid]) { + } +} + +const newInstance = (wsInstance, onClose) => { + const subscriptions = {} + + // Broadcast connection errors to all subscriptions. + wsInstance.addEventListener('error', () => { + const err = new ConnectionError('WebSocket connection error') + for (const subscription of Object.values(subscriptions)) { + subscription.onError(err) + } + }) + + // Broadcast connection closure to all subscriptions. + wsInstance.addEventListener('close', closeEvent => { + // TODO: Handle close event codes. + // https://github.com/TheThingsNetwork/lorawan-stack/issues/6752 + for (const subscription of Object.values(subscriptions)) { + subscription.onClose(closeEvent) + } + onClose() + }) + + // Broadcast messages to the correct subscription. + wsInstance.addEventListener('message', ({ data }) => { + const dataParsed = JSON.parse(data) + const sid = dataParsed.id + const subscription = traverse(subscriptions).get([sid]) || null + + if (!subscription) { + return + } + + subscription.onMessage(dataParsed) + + if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) { + delete subscriptions[sid] + } + }) + + return { + subscribe: (sid, subscribePayload, unsubscribePayload, listeners, resolve, reject) => { + if (sid in subscriptions) { + throw new Error(`Subscription with ID ${sid} already exists`) + } + + // The `unsubscribed` promise is used in order to guarantee that calls to the `close` method + // of the subscription finish _after_ the closure events have been emitted. Callers can expect + // that after `close` resolves, no further events will be emitted. + let resolveClose = null + const unsubscribed = new Promise(resolve => { + resolveClose = resolve + }) + let unsubscribeCalled = false + const unsubscribe = () => { + if (!unsubscribeCalled) { + unsubscribeCalled = true + + if (wsInstance.state === WebSocket.open) { + wsInstance.send(unsubscribePayload) + } + } + return unsubscribed + } + + const subscription = newSubscription(unsubscribe, listeners, resolve, reject, resolveClose) + subscriptions[sid] = subscription + + if (wsInstance.readyState === WebSocket.OPEN) { + // If the WebSocket connection is already open, only add the subscription. + wsInstance.send(subscribePayload) + } else if (wsInstance.readyState === WebSocket.CONNECTING) { + // Otherwise wait for the connection to open and then add the subscription. + const onOpen = () => { + wsInstance.send(subscribePayload) + wsInstance.removeEventListener('open', onOpen) + } + wsInstance.addEventListener('open', onOpen) + } else { delete subscriptions[sid] + throw new Error('WebSocket connection is closed') } }, } } -const state = store() +const newStore = () => { + const connections = {} + return { + getInstance: url => traverse(connections).get([url]), + setInstance: (url, wsInstance) => + traverse(connections).set( + [url], + newInstance(wsInstance, () => delete connections[url]), + ), + } +} + +const state = newStore() /** * Opens a new stream. * + * Implementation guarantees: + * - No events will be sent to the listeners before the `open` function is called. + * - No events will be sent to the listeners after the `close` function is called. + * - The `close` function will resolve after all events have been sent to the listeners. + * - The `close` function does not throw any errors. + * - The `close` function can be called multiple times. + * - The `open` function can be called multiple times. + * - The `open` function does not throw any errors, as long as the event listeners do not throw. + * - No `message` event will follow an `error` event. + * - No `message` event will follow a `close` event. + * - No `error` event will follow a `close` event. + * - No `error` event will follow another `error` event. + * - No `close` event will follow another `close` event. + * * @async * @param {object} payload - - The body of the initial request. * @param {string} baseUrl - The stream baseUrl. + * @param {object} listeners - The listeners object. * @param {string} endpoint - The stream endpoint. - * @param {number} timeout - The timeout for the stream. + * @param {number} timeout - The connection timeout for the stream. * * @example * (async () => { * const stream = await stream( * { identifiers: [{ application_ids: { application_id: 'my-app' }}]}, * 'http://localhost:8080', - * '/api/v3', + * { + * message: ({ data }) => console.log('received data', JSON.parse(data)), + * error: error => console.log(error), + * close: wasClientRequest => console.log(wasClientRequest ? 'conn closed by client' : 'conn closed by server'), + * }, * ) * - * // Add listeners to the stream. - * stream - * .on('open', () => console.log('conn opened')) - * .on('message', ({ data }) => console.log('received data', JSON.parse(data))) - * .on('error', error => console.log(error)) - * .on('close', wasClientRequest => console.log(wasClientRequest ? 'conn closed by client' : 'conn closed by server')) + * // Start the stream in order to start dispatching events. + * stream.open() * - * // Close the stream after 20 s. + * // Close the stream after 20 s. * setTimeout(() => stream.close(), 20000) * })() * - * @returns {object} The stream subscription object with the `on` function for - * attaching listeners and the `close` function to close the stream. + * @returns {object} The stream subscription object the `open` function to start sending events to the listeners and + * the `close` function to close the stream. */ export default async ( payload, baseUrl, + listeners, endpoint = '/console/internal/events/', timeout = 10000, ) => { + for (const eventName of Object.keys(listeners)) { + if (!Object.values(EVENTS).includes(eventName)) { + throw new Error( + `${eventName} event is not supported. Should be one of: message, error or close`, + ) + } + } + const filledListeners = { ...INITIAL_LISTENERS, ...listeners } + const subscriptionId = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER) const subscriptionPayload = JSON.stringify({ type: MESSAGE_TYPES.SUBSCRIBE, @@ -111,177 +272,48 @@ export default async ( id: subscriptionId, }) const url = baseUrl + endpoint - let wsInstance = state.getInstance(url) - - await Promise.race([ - new Promise(async (resolve, reject) => { - // Add the new subscription to the subscriptions object. - // Also add the resolver functions to the subscription object to be able - // to resolve the promise after the subscription confirmation message. - if (state.getSubscription(url, subscriptionId) !== null) { - reject(new Error('Subscription with the same ID already exists')) - } - state.setSubscription(url, subscriptionId, { - ...initialListeners, - url, - resolve, - reject, - closeRequested: false, - }) - - try { - const token = new Token().get() - const tokenParsed = typeof token === 'function' ? `${(await token()).access_token}` : token - const baseUrlParsed = baseUrl.replace('http', 'ws') - - // Open up the WebSocket connection if it doesn't exist. - if (!wsInstance) { - wsInstance = state.setInstance( - url, - new WebSocket(`${baseUrlParsed}${endpoint}`, [ - 'ttn.lorawan.v3.console.internal.events.v1', - `ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`, - ]), - ) - - // Broadcast connection errors to all listeners. - wsInstance.addEventListener('error', () => { - const err = new Error('Error in WebSocket connection') - const subscriptions = state.getSubscriptions(url) - for (const s of subscriptions) { - notify(s[EVENTS.ERROR], err) - // The error is an error event, but we should only throw proper errors. - // It has an optional error code that we could use to map to a proper error. - // However, the error codes are optional and not always used. - s.reject(err) - } - }) - - // Event listener for 'close' - wsInstance.addEventListener('close', closeEvent => { - // TODO: Handle close event codes. - // https://github.com/TheThingsNetwork/lorawan-stack/issues/6752 - const subscriptions = state.getSubscriptions(url) - const wasClean = closeEvent?.wasClean ?? false - - for (const s of subscriptions) { - notify(s[EVENTS.CLOSE], wasClean) - if (wasClean) { - s.resolve() - } else { - s.reject( - new Error( - `WebSocket connection closed unexpectedly with code ${closeEvent.code}`, - ), - ) - } - } - - state.deleteInstance(url) - }) - - // After the WebSocket connection is open, add the event listeners. - // Wait for the subscription confirmation message before resolving. - wsInstance.addEventListener('message', ({ data }) => { - const dataParsed = JSON.parse(data) - const sid = dataParsed.id - const subscription = state.getSubscription(url, sid) - - if (!subscription) { - warn('Message received for closed or unknown subscription with ID', sid) - - return - } - - if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) { - notify(subscription[EVENTS.OPEN]) - // Resolve the promise after the subscription confirmation message. - subscription.resolve() - } - - if (dataParsed.type === MESSAGE_TYPES.ERROR) { - notify(subscription[EVENTS.ERROR], dataParsed) - } - - if (dataParsed.type === MESSAGE_TYPES.PUBLISH) { - notify(subscription[EVENTS.MESSAGE], dataParsed.event) - } - - if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) { - notify(subscription[EVENTS.CLOSE], subscription.closeRequested) - // Remove the subscription - state.deleteSubscription(url, sid) - if (state.getSubscriptionCount(url) === 0) { - wsInstance.close() - state.deleteInstance(url) - } - } - }) - } - - if (wsInstance.readyState === WebSocket.OPEN) { - // If the WebSocket connection is already open, only add the subscription. - wsInstance.send(subscriptionPayload) - } else if (wsInstance.readyState === WebSocket.CONNECTING) { - // Otherwise wait for the connection to open and then add the subscription. - const onOpen = () => { - wsInstance.send(subscriptionPayload) - wsInstance.removeEventListener('open', onOpen) - } - wsInstance.addEventListener('open', onOpen) - } else { - reject(new Error('WebSocket connection is closed')) - } - } catch (error) { - const err = error instanceof Error ? error : new Error(error) - const subscriptions = state.getSubscriptions(url) - for (const s of subscriptions) { - notify(s[EVENTS.ERROR], err) - s.reject(err) - } - } - }), - new Promise((resolve, reject) => setTimeout(() => reject(new Error('timeout')), timeout)), - ]) - - // Return an observer object with the `on` and `close` functions for - // the current subscription. - return { - on(eventName, callback) { - if (!Object.values(EVENTS).includes(eventName)) { - throw new Error( - `${eventName} event is not supported. Should be one of: open, message, error or close`, - ) - } - const subscription = state.getSubscription(url, subscriptionId) - subscription[eventName] = callback + const token = new Token().get() + const tokenParsed = typeof token === 'function' ? `${(await token()).access_token}` : token + const baseUrlParsed = baseUrl.replace('http', 'ws') - return this - }, - close: () => { - if ( - !wsInstance || - wsInstance.readyState === WebSocket.CLOSED || - wsInstance.readyState === WebSocket.CLOSING - ) { - warn('WebSocket was already closed') - return Promise.resolve() - } + const subscribe = new Promise((resolve, reject) => { + let instance = state.getInstance(url) + // Open up the WebSocket connection if it doesn't exist. + if (!instance) { + instance = state.setInstance( + url, + new WebSocket(`${baseUrlParsed}${endpoint}`, [ + 'ttn.lorawan.v3.console.internal.events.v1', + `ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`, + ]), + ) + } - state.markSubscriptionClosing(url, subscriptionId) - wsInstance.send(unsubscribePayload) + // Add the new subscription to the subscriptions object. + // Also add the resolver functions to the subscription object to be able + // to resolve the promise after the subscription confirmation message. + instance.subscribe( + subscriptionId, + subscriptionPayload, + unsubscribePayload, + filledListeners, + resolve, + reject, + ) + }) - // Wait for the server to confirm the unsubscribe. - return new Promise(resolve => { - const onMessage = ({ data }) => { - const { type, id } = JSON.parse(data) - if (id === subscriptionId && type === MESSAGE_TYPES.UNSUBSCRIBE) { - resolve() - } - wsInstance.removeEventListener('message', onMessage) - } - wsInstance.addEventListener('message', onMessage) - }) - }, + try { + return await Promise.race([ + subscribe, + new Promise((_resolve, reject) => + setTimeout(() => reject(new ConnectionTimeoutError('Timed out')), timeout), + ), + ]) + } catch (err) { + subscribe.then( + subscription => subscription.close(), + () => {}, + ) + throw err } } diff --git a/sdk/js/src/api/stream/subscribeToWebSocketStreams.js b/sdk/js/src/api/stream/subscribeToWebSocketStreams.js new file mode 100644 index 0000000000..87a73dd53f --- /dev/null +++ b/sdk/js/src/api/stream/subscribeToWebSocketStreams.js @@ -0,0 +1,105 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { EVENTS, INITIAL_LISTENERS, notify } from './shared' +import subscribeToWebSocketStream from './subscribeToWebSocketStream' + +/* + * Subscribe to an event stream with multiple base URLs. + * Semantically equivalent to subscribeToWebSocketStream, but guarantees uniform stream closure and unaffected + * event emission in case of failure of one or more streams. + */ +export default async ( + payload, + baseUrls, + listeners, + endpoint = '/console/internal/events/', + timeout = 10000, +) => { + if (!(baseUrls instanceof Array) || baseUrls.length === 0) { + throw new Error('Cannot subscribe to events without base URLs') + } + if (baseUrls.length === 1) { + return subscribeToWebSocketStream(payload, baseUrls[0], listeners, endpoint, timeout) + } + + for (const eventName of Object.keys(listeners)) { + if (!Object.values(EVENTS).includes(eventName)) { + throw new Error( + `${eventName} event is not supported. Should be one of: message, error or close`, + ) + } + } + const filledListeners = { ...INITIAL_LISTENERS, ...listeners } + + // Interweaving multiple streams has the side effect of making the standard event listener + // state machine look erratic externally - only certain streams may fail, while others may continue + // to work. Upper layers which use the stream must not be exposed to this detail - once one + // stream fails, we need to ensure that all streams are closed and no stray events are emitted. + // + // The standard state machine guarantees that once an error or close event is emitted, no further + // message events will be emitted. It is also guaranteed that once an error event is emitted, a close + // event will follow. + let [closeAll, hadError, hadClose] = [() => {}, false, false] + const uniformListeners = { + [EVENTS.MESSAGE]: (...params) => { + if (hadClose || hadError) { + return + } + notify(filledListeners[EVENTS.MESSAGE], ...params) + }, + [EVENTS.ERROR]: (...params) => { + if (hadClose || hadError) { + return + } + hadError = true + notify(filledListeners[EVENTS.ERROR], ...params) + }, + [EVENTS.CLOSE]: (...params) => { + if (hadClose) { + return + } + hadClose = true + notify(filledListeners[EVENTS.CLOSE], ...params) + closeAll() + }, + } + + const pendingStreams = baseUrls.map(baseUrl => + subscribeToWebSocketStream(payload, baseUrl, uniformListeners, endpoint, timeout), + ) + try { + const streams = await Promise.all(pendingStreams) + closeAll = () => Promise.all(streams.map(stream => stream.close())) + return { + open: () => streams.forEach(stream => stream.open()), + close: closeAll, + } + } catch (error) { + // Ensure that if only some streams fail, the successful ones are closed. + await Promise.all( + pendingStreams.map(async pendingStream => { + try { + const stream = await pendingStream + await stream.close() + } catch { + // Only the pending stream promise may throw, as `close` does not throw. + // Although multiple streams may fail, we will rethrow only the first error + // and ignore the rest, as they are not really actionable. + } + }), + ) + throw error + } +} diff --git a/sdk/js/src/service/applications.js b/sdk/js/src/service/applications.js index 42cde4598e..7ad00cfc22 100644 --- a/sdk/js/src/service/applications.js +++ b/sdk/js/src/service/applications.js @@ -15,8 +15,7 @@ import autoBind from 'auto-bind' import Marshaler from '../util/marshaler' -import subscribeToWebSocketStream from '../api/stream/subscribeToWebSocketStream' -import combineStreams from '../util/combine-streams' +import subscribeToWebSocketStreams from '../api/stream/subscribeToWebSocketStreams' import { STACK_COMPONENTS_MAP } from '../util/constants' import Devices from './devices' @@ -217,7 +216,7 @@ class Applications { // Events Stream - async openStream(identifiers, names, tail, after) { + async openStream(identifiers, names, tail, after, listeners) { const payload = { identifiers: identifiers.map(id => ({ application_ids: { application_id: id }, @@ -235,11 +234,8 @@ class Applications { const baseUrls = new Set( distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) - - const streams = [...baseUrls].map(baseUrl => subscribeToWebSocketStream(payload, baseUrl)) - // Combine all stream sources to one subscription generator. - return combineStreams(streams) + return subscribeToWebSocketStreams(payload, [...baseUrls], listeners) } } diff --git a/sdk/js/src/service/devices/index.js b/sdk/js/src/service/devices/index.js index 5b8e0021d4..a3a975a1d2 100644 --- a/sdk/js/src/service/devices/index.js +++ b/sdk/js/src/service/devices/index.js @@ -19,12 +19,11 @@ import traverse from 'traverse' import { notify, EVENTS } from '../../api/stream/shared' import Marshaler from '../../util/marshaler' -import subscribeToWebSocketStream from '../../api/stream/subscribeToWebSocketStream' +import subscribeToWebSocketStreams from '../../api/stream/subscribeToWebSocketStreams' import deviceEntityMap from '../../../generated/device-entity-map.json' import DownlinkQueue from '../downlink-queue' import { STACK_COMPONENTS_MAP } from '../../util/constants' import DeviceClaim from '../claim' -import combineStreams from '../../util/combine-streams' import Repository from './repository' import { splitSetPaths, splitGetPaths, makeRequests } from './split' @@ -680,7 +679,7 @@ class Devices { // Events Stream - async openStream(identifiers, names, tail, after) { + async openStream(identifiers, names, tail, after, listeners) { const payload = { identifiers: identifiers.map(ids => ({ device_ids: ids, @@ -698,11 +697,8 @@ class Devices { const baseUrls = new Set( distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) - - const streams = [...baseUrls].map(baseUrl => subscribeToWebSocketStream(payload, baseUrl)) - // Combine all stream sources to one subscription generator. - return combineStreams(streams) + return subscribeToWebSocketStreams(payload, [...baseUrls], listeners) } async simulateUplink(applicationId, deviceId, uplink) { diff --git a/sdk/js/src/service/gateways.js b/sdk/js/src/service/gateways.js index 41f5d820a7..2f16e0fd03 100644 --- a/sdk/js/src/service/gateways.js +++ b/sdk/js/src/service/gateways.js @@ -15,9 +15,8 @@ import autoBind from 'auto-bind' import Marshaler from '../util/marshaler' -import subscribeToWebSocketStream from '../api/stream/subscribeToWebSocketStream' +import subscribeToWebSocketStreams from '../api/stream/subscribeToWebSocketStreams' import { STACK_COMPONENTS_MAP } from '../util/constants' -import combineStreams from '../util/combine-streams' import ApiKeys from './api-keys' import Collaborators from './collaborators' @@ -240,7 +239,7 @@ class Gateways { // Events Stream - async openStream(identifiers, names, tail, after) { + async openStream(identifiers, names, tail, after, listeners) { const payload = { identifiers: identifiers.map(id => ({ gateway_ids: { gateway_id: id }, @@ -261,11 +260,8 @@ class Gateways { const baseUrls = new Set( distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) - - const streams = [...baseUrls].map(baseUrl => subscribeToWebSocketStream(payload, baseUrl)) - // Combine all stream sources to one subscription generator. - return combineStreams(streams) + return subscribeToWebSocketStreams(payload, [...baseUrls], listeners) } // Gateway Configuration Server. diff --git a/sdk/js/src/service/organizations.js b/sdk/js/src/service/organizations.js index 5469acd031..e63022185e 100644 --- a/sdk/js/src/service/organizations.js +++ b/sdk/js/src/service/organizations.js @@ -156,7 +156,7 @@ class Organizations { // Events stream. - async openStream(identifiers, names, tail, after) { + async openStream(identifiers, names, tail, after, listeners) { const payload = { identifiers: identifiers.map(id => ({ organization_ids: { organization_id: id }, @@ -168,7 +168,7 @@ class Organizations { const baseUrl = this._stackConfig.getComponentUrlByName(STACK_COMPONENTS_MAP.is) - return subscribeToWebSocketStream(payload, baseUrl) + return subscribeToWebSocketStream(payload, baseUrl, listeners) } } diff --git a/sdk/js/src/util/combine-streams.js b/sdk/js/src/util/combine-streams.js deleted file mode 100644 index 75e20855b5..0000000000 --- a/sdk/js/src/util/combine-streams.js +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * Combines multiple streams into a single subscription provider. - * - * @param {Array} streams - An array of (async) stream functions. - * @returns {object} The stream subscription object with the `on` function for - * attaching listeners and the `close` function to close the stream. - */ -const combinedStream = async streams => { - if (!(streams instanceof Array) || streams.length === 0) { - throw new Error('Cannot combine streams with invalid stream array.') - } else if (streams.length === 1) { - return streams[0] - } - - const subscribers = await Promise.all(streams) - - return { - on: (eventName, callback) => { - for (const subscriber of subscribers) { - subscriber.on(eventName, callback) - } - }, - close: () => { - for (const subscriber of subscribers) { - subscriber.close() - } - }, - } -} - -export default combinedStream