Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event stream reconnection fixes #6777

Merged
9 changes: 8 additions & 1 deletion pkg/console/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ import (
"context"
"net/http"
"sync"
"time"

"github.com/gorilla/mux"
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/config"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/middleware"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/log"
"go.thethings.network/lorawan-stack/v3/pkg/random"
"go.thethings.network/lorawan-stack/v3/pkg/ratelimit"
"go.thethings.network/lorawan-stack/v3/pkg/task"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
Expand All @@ -40,6 +43,9 @@ import (
const (
authorizationProtocolPrefix = "ttn.lorawan.v3.header.authorization.bearer."
protocolV1 = "ttn.lorawan.v3.console.internal.events.v1"

pingPeriod = time.Minute
adriansmares marked this conversation as resolved.
Show resolved Hide resolved
pingJitter = 0.1
)

// Component is the interface of the component to the events API handler.
Expand Down Expand Up @@ -95,7 +101,7 @@ func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) {
}
defer conn.Close(websocket.StatusNormalClosure, "main task closed")

ctx, cancel := context.WithCancelCause(ctx)
ctx, cancel := errorcontext.New(ctx)
defer cancel(nil)

var wg sync.WaitGroup
Expand All @@ -108,6 +114,7 @@ func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) {
"console_events_mux": makeMuxTask(m, cancel),
"console_events_read": makeReadTask(conn, m, rateLimit, cancel),
"console_events_write": makeWriteTask(conn, m, cancel),
"console_events_ping": makePingTask(conn, cancel, random.Jitter(pingPeriod, pingJitter)),
} {
wg.Add(1)
h.component.StartTask(&task.Config{
Expand Down
7 changes: 6 additions & 1 deletion pkg/console/internal/events/eventsmux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package eventsmux
import (
"context"

"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/log"
)
Expand Down Expand Up @@ -54,7 +56,7 @@ func (m *mux) Responses() <-chan protocol.Response {

// Run implements Interface.
func (m *mux) Run(ctx context.Context) (err error) {
ctx, cancel := context.WithCancelCause(ctx)
ctx, cancel := errorcontext.New(ctx)
defer func() { cancel(err) }()
subs := m.createSubs(ctx, cancel)
defer subs.Close()
Expand All @@ -63,6 +65,9 @@ func (m *mux) Run(ctx context.Context) (err error) {
case <-ctx.Done():
return ctx.Err()
case req := <-m.requestCh:
if err := rights.RequireAuthenticated(ctx); err != nil {
return err
}
var resp protocol.Response
switch req := req.(type) {
case *protocol.SubscribeRequest:
Expand Down
4 changes: 4 additions & 0 deletions pkg/console/internal/events/eventsmux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ func TestMux(t *testing.T) { // nolint:gocyclo
unique.ID(ctx, appIDs): ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL),
}),
})
ctx = rights.NewContextWithAuthInfo(ctx, &ttnpb.AuthInfoResponse{
UniversalRights: ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL),
IsAdmin: true,
})

subs := &mockSubscriptions{
ctx: ctx,
Expand Down
3 changes: 2 additions & 1 deletion pkg/console/internal/events/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights/rightsutil"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/errors"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/log"
Expand Down Expand Up @@ -141,7 +142,7 @@ func (s *subscriptions) Subscribe(
return err
}
ch := make(chan events.Event, channelSize(tail))
ctx, cancel := context.WithCancelCause(s.ctx)
ctx, cancel := errorcontext.New(s.ctx)
defer func() {
if err != nil {
cancel(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/task"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
Expand Down Expand Up @@ -143,7 +144,7 @@ func runTestSubscriptions(
_, historical := subscriber.(interface{ historical() })

a, ctx := test.New(t)
ctx, cancel := context.WithCancelCause(ctx)
ctx, cancel := errorcontext.New(ctx)
defer cancel(nil)

timeout := test.Delay << 3
Expand Down
19 changes: 19 additions & 0 deletions pkg/console/internal/events/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"io"
"time"

"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol"
Expand Down Expand Up @@ -79,3 +80,21 @@ func makeWriteTask(conn *websocket.Conn, m eventsmux.Interface, cancel func(erro
}
}
}

func makePingTask(conn *websocket.Conn, cancel func(error), period time.Duration) func(context.Context) error {
return func(ctx context.Context) (err error) {
ticker := time.NewTicker(period)
defer ticker.Stop()
defer func() { cancel(err) }()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := conn.Ping(ctx); err != nil {
return err
}
}
}
}
}
26 changes: 21 additions & 5 deletions pkg/webui/console/lib/events/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,29 @@ export const defineSyntheticEvent = name => data => ({
data,
})

export const createSyntheticEventFromError = error => {
const convertError = error => {
if (error instanceof Error) {
const errorString = error.toString()
if (error.message === 'network error' || error.message === 'Error in body stream') {
return createNetworkErrorEvent({ error: errorString })
return {
...error,
message: error.message,
name: error.name,
// The stack is omitted intentionally, as it is not relevant for a user.
}
}
return error
}

return createUnknownErrorEvent({ error: errorString })
export const createSyntheticEventFromError = error => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not find a good way to import the actual types in order to do instanceof from the SDK, so I used the name instead. I am open to suggestions if we can do that.

Copy link
Member

@kschiffer kschiffer Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could create all relevant error classes in JS and then create a mapping to do sth like:

class ConnectionError extends Error {
  constructor(err) {
    super(err)
    this.name = err.name
    this.code = err.code
    this.message = err.message
    // Add any other property specific to the error type
    
  }
}

const classMap = {
  "ConnectionError": ConnectionError,}
// In the error ingestion the error would then be instantiated like this
catch (err) {
  if (isContractualError(err)) { // some form of check that the error is a well-known error
    throw new classMap[err.name]
  }
  throw new Error("An unknown error occurred")
}
// Then in the err util it could be checked like this
if (error instanceof classMap["ConnectionError"]) {
  
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was that if we do that, we don't guarantee that the type caught is really the intended type - if two files in the JS SDK (accidentally) decide to define a ConnectionError, and we use name to identify name, we may mix them up, while the type would correctly be identified via instanceof.

I think this is more goldplating at the moment, and if we end up with such a situation we should just create an explicit errors package in the SDK in order to avoid duplication or conflicts.

if (error instanceof Error) {
if (
error.name === 'ConnectionError' ||
error.name === 'ConnectionClosedError' ||
error.name === 'ConnectionTimeoutError'
) {
return createNetworkErrorEvent({ error: convertError(error) })
} else if (error.name === 'ProtocolError') {
adriansmares marked this conversation as resolved.
Show resolved Hide resolved
return createUnknownErrorEvent({ error: convertError(error) })
}
return createUnknownErrorEvent({ error: convertError(error) })
}
}
61 changes: 22 additions & 39 deletions pkg/webui/console/store/middleware/logics/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import CONNECTION_STATUS from '@console/constants/connection-status'
import EVENT_TAIL from '@console/constants/event-tail'

import { getCombinedDeviceId } from '@ttn-lw/lib/selectors/id'
import { isUnauthenticatedError, isNetworkError, isTimeoutError } from '@ttn-lw/lib/errors/utils'
import { TokenError } from '@ttn-lw/lib/errors/custom-errors'
import { SET_CONNECTION_STATUS, setStatusChecking } from '@ttn-lw/lib/store/actions/status'
import { selectIsOnlineStatus } from '@ttn-lw/lib/store/selectors/status'

Expand All @@ -29,7 +29,6 @@ import {
createStartEventsStreamFailureActionType,
createStartEventsStreamSuccessActionType,
createEventStreamClosedActionType,
createGetEventMessageFailureActionType,
createGetEventMessageSuccessActionType,
createSetEventsFilterActionType,
getEventMessageSuccess,
Expand Down Expand Up @@ -65,7 +64,6 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
const START_EVENTS_FAILURE = createStartEventsStreamFailureActionType(reducerName)
const STOP_EVENTS = createStopEventsStreamActionType(reducerName)
const EVENT_STREAM_CLOSED = createEventStreamClosedActionType(reducerName)
const GET_EVENT_MESSAGE_FAILURE = createGetEventMessageFailureActionType(reducerName)
const GET_EVENT_MESSAGE_SUCCESS = createGetEventMessageSuccessActionType(reducerName)
const SET_EVENT_FILTER = createSetEventsFilterActionType(reducerName)
const startEventsSuccess = startEventsStreamSuccess(reducerName)
Expand Down Expand Up @@ -131,22 +129,18 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
const filterRegExp = Boolean(filter) ? filter.filterRegExp : undefined

try {
channel = await onEventsStart([id], filterRegExp, EVENT_TAIL, after)
const listeners = {
message: message => dispatch(getEventSuccess(id, message)),
error: error => dispatch(getEventFailure(id, error)),
close: wasClientRequest => dispatch(closeEvents(id, { silent: wasClientRequest })),
}
channel = await onEventsStart([id], filterRegExp, EVENT_TAIL, after, listeners)
dispatch(startEventsSuccess(id, { silent }))

channel.on('message', message => dispatch(getEventSuccess(id, message)))
channel.on('error', error => dispatch(getEventFailure(id, error)))
channel.on('close', wasClientRequest => {
dispatch(closeEvents(id, { silent: wasClientRequest }))
channel = null
})
channel.open()
adriansmares marked this conversation as resolved.
Show resolved Hide resolved
} catch (error) {
if (isUnauthenticatedError(error)) {
if (error instanceof TokenError) {
adriansmares marked this conversation as resolved.
Show resolved Hide resolved
// The user is no longer authenticated; reinitiate the auth flow
// by refreshing the page.
// NOTE: As a result of the WebSocket refactor, the error shape is
// now very unspecific and authentication errors like before are
// not thrown anymore. This should be addressed eventually.
window.location.reload()
} else {
dispatch(startEventsFailure(id, error))
Expand Down Expand Up @@ -177,16 +171,14 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
},
process: async ({ action }, dispatch, done) => {
if (action.type === START_EVENTS_FAILURE) {
if (action?.error?.message === 'timeout') {
// Set the connection status to `checking` to trigger connection checks
// and detect possible offline state.
dispatch(setStatusChecking())
// Set the connection status to `checking` to trigger connection checks
adriansmares marked this conversation as resolved.
Show resolved Hide resolved
// and detect possible offline state.
dispatch(setStatusChecking())

// In case of a network error, the connection could not be closed
// since the network connection is disrupted. We can regard this
// as equivalent to a closed connection.
return done()
}
// In case of a network error, the connection could not be closed
// since the network connection is disrupted. We can regard this
// as equivalent to a closed connection.
return done()
}
if (action.type === STOP_EVENTS && Boolean(channel)) {
// Close the connection if it wasn't closed already.
Expand All @@ -196,7 +188,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
},
}),
createLogic({
type: [GET_EVENT_MESSAGE_FAILURE, EVENT_STREAM_CLOSED],
type: EVENT_STREAM_CLOSED,
cancelType: [START_EVENTS_SUCCESS, GET_EVENT_MESSAGE_SUCCESS, STOP_EVENTS],
warnTimeout: 0,
validate: ({ getState, action = {} }, allow, reject) => {
Expand All @@ -211,8 +203,9 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
const status = selectEntityEventsStatus(getState(), id)
const connected = status === CONNECTION_STATUS.CONNECTED
const interrupted = selectEntityEventsInterrupted(getState(), id)
if (!connected && interrupted) {
if (!connected || interrupted) {
adriansmares marked this conversation as resolved.
Show resolved Hide resolved
reject()
return
adriansmares marked this conversation as resolved.
Show resolved Hide resolved
}

allow(action)
Expand Down Expand Up @@ -279,20 +272,10 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
}),
createLogic({
type: SET_EVENT_FILTER,
debounce: 250,
adriansmares marked this conversation as resolved.
Show resolved Hide resolved
process: async ({ action }, dispatch, done) => {
if (channel) {
try {
await channel.close()
} catch (error) {
if (isNetworkError(error) || isTimeoutError(action.payload)) {
dispatch(setStatusChecking())
} else {
throw error
}
} finally {
dispatch(startEvents(action.id, { silent: true }))
}
if (Boolean(channel)) {
await channel.close()
adriansmares marked this conversation as resolved.
Show resolved Hide resolved
dispatch(startEvents(action.id, { silent: true }))
}
done()
},
Expand Down
3 changes: 1 addition & 2 deletions pkg/webui/console/store/reducers/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,12 @@ const createNamedEventReducer = (reducerName = '') => {
: state.events),
error: action.error,
status: CONNECTION_STATUS.DISCONNECTED,
interrupted: true,
}
case GET_EVENT_FAILURE:
return {
...state,
...addEvent(state, createSyntheticEventFromError(action.error)),
status: CONNECTION_STATUS.DISCONNECTED,
interrupted: true,
}
case PAUSE_EVENTS:
return {
Expand Down
33 changes: 32 additions & 1 deletion sdk/js/src/api/stream/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ export const notify = (listener, ...args) => {
}

export const EVENTS = Object.freeze({
OPEN: 'open',
MESSAGE: 'message',
ERROR: 'error',
CLOSE: 'close',
Expand All @@ -31,3 +30,35 @@ export const MESSAGE_TYPES = Object.freeze({
PUBLISH: 'publish',
ERROR: 'error',
})

export const INITIAL_LISTENERS = Object.freeze(
Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, [curr]: {} }), {}),
)

export const newQueuedListeners = listeners => {
adriansmares marked this conversation as resolved.
Show resolved Hide resolved
const queue = []
let open = false
const queuedListeners = Object.values(EVENTS).reduce(
(acc, curr) => ({
...acc,
[curr]: (...args) => {
if (open) {
notify(listeners[curr], ...args)
} else {
queue.push([curr, args])
}
},
}),
{},
)
return [
() => {
open = true
for (const [event, args] of queue) {
notify(listeners[event], ...args)
}
queue.splice(0, queue.length)
},
queuedListeners,
]
}