Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event stream reconnection fixes #6777

Merged
9 changes: 8 additions & 1 deletion pkg/console/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ import (
"context"
"net/http"
"sync"
"time"

"github.com/gorilla/mux"
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/config"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/middleware"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/log"
"go.thethings.network/lorawan-stack/v3/pkg/random"
"go.thethings.network/lorawan-stack/v3/pkg/ratelimit"
"go.thethings.network/lorawan-stack/v3/pkg/task"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
Expand All @@ -40,6 +43,9 @@ import (
const (
authorizationProtocolPrefix = "ttn.lorawan.v3.header.authorization.bearer."
protocolV1 = "ttn.lorawan.v3.console.internal.events.v1"

pingPeriod = time.Minute
adriansmares marked this conversation as resolved.
Show resolved Hide resolved
pingJitter = 0.1
)

// Component is the interface of the component to the events API handler.
Expand Down Expand Up @@ -95,7 +101,7 @@ func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) {
}
defer conn.Close(websocket.StatusNormalClosure, "main task closed")

ctx, cancel := context.WithCancelCause(ctx)
ctx, cancel := errorcontext.New(ctx)
defer cancel(nil)

var wg sync.WaitGroup
Expand All @@ -108,6 +114,7 @@ func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) {
"console_events_mux": makeMuxTask(m, cancel),
"console_events_read": makeReadTask(conn, m, rateLimit, cancel),
"console_events_write": makeWriteTask(conn, m, cancel),
"console_events_ping": makePingTask(conn, cancel, random.Jitter(pingPeriod, pingJitter)),
} {
wg.Add(1)
h.component.StartTask(&task.Config{
Expand Down
7 changes: 6 additions & 1 deletion pkg/console/internal/events/eventsmux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package eventsmux
import (
"context"

"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/log"
)
Expand Down Expand Up @@ -54,7 +56,7 @@ func (m *mux) Responses() <-chan protocol.Response {

// Run implements Interface.
func (m *mux) Run(ctx context.Context) (err error) {
ctx, cancel := context.WithCancelCause(ctx)
ctx, cancel := errorcontext.New(ctx)
defer func() { cancel(err) }()
subs := m.createSubs(ctx, cancel)
defer subs.Close()
Expand All @@ -63,6 +65,9 @@ func (m *mux) Run(ctx context.Context) (err error) {
case <-ctx.Done():
return ctx.Err()
case req := <-m.requestCh:
if err := rights.RequireAuthenticated(ctx); err != nil {
return err
}
var resp protocol.Response
switch req := req.(type) {
case *protocol.SubscribeRequest:
Expand Down
4 changes: 4 additions & 0 deletions pkg/console/internal/events/eventsmux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ func TestMux(t *testing.T) { // nolint:gocyclo
unique.ID(ctx, appIDs): ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL),
}),
})
ctx = rights.NewContextWithAuthInfo(ctx, &ttnpb.AuthInfoResponse{
UniversalRights: ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL),
IsAdmin: true,
})

subs := &mockSubscriptions{
ctx: ctx,
Expand Down
3 changes: 2 additions & 1 deletion pkg/console/internal/events/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights/rightsutil"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/errors"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/log"
Expand Down Expand Up @@ -141,7 +142,7 @@ func (s *subscriptions) Subscribe(
return err
}
ch := make(chan events.Event, channelSize(tail))
ctx, cancel := context.WithCancelCause(s.ctx)
ctx, cancel := errorcontext.New(s.ctx)
defer func() {
if err != nil {
cancel(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/task"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
Expand Down Expand Up @@ -143,7 +144,7 @@ func runTestSubscriptions(
_, historical := subscriber.(interface{ historical() })

a, ctx := test.New(t)
ctx, cancel := context.WithCancelCause(ctx)
ctx, cancel := errorcontext.New(ctx)
defer cancel(nil)

timeout := test.Delay << 3
Expand Down
19 changes: 19 additions & 0 deletions pkg/console/internal/events/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"io"
"time"

"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol"
Expand Down Expand Up @@ -79,3 +80,21 @@ func makeWriteTask(conn *websocket.Conn, m eventsmux.Interface, cancel func(erro
}
}
}

func makePingTask(conn *websocket.Conn, cancel func(error), period time.Duration) func(context.Context) error {
return func(ctx context.Context) (err error) {
ticker := time.NewTicker(period)
defer ticker.Stop()
defer func() { cancel(err) }()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := conn.Ping(ctx); err != nil {
return err
}
}
}
}
}
29 changes: 24 additions & 5 deletions pkg/webui/console/lib/events/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { ingestError } from '@ttn-lw/lib/errors/utils'

import { createNetworkErrorEvent, createUnknownErrorEvent } from './definitions'

export const defineSyntheticEvent = name => data => ({
Expand All @@ -23,13 +25,30 @@ export const defineSyntheticEvent = name => data => ({
data,
})

export const createSyntheticEventFromError = error => {
const convertError = error => {
if (error instanceof Error) {
const errorString = error.toString()
if (error.message === 'network error' || error.message === 'Error in body stream') {
return createNetworkErrorEvent({ error: errorString })
return {
...error,
message: error.message,
name: error.name,
// The stack is omitted intentionally, as it is not relevant for a user.
}
}
return error
}

return createUnknownErrorEvent({ error: errorString })
export const createSyntheticEventFromError = error => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not find a good way to import the actual types in order to do instanceof from the SDK, so I used the name instead. I am open to suggestions if we can do that.

Copy link
Member

@kschiffer kschiffer Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could create all relevant error classes in JS and then create a mapping to do sth like:

class ConnectionError extends Error {
  constructor(err) {
    super(err)
    this.name = err.name
    this.code = err.code
    this.message = err.message
    // Add any other property specific to the error type
    
  }
}

const classMap = {
  "ConnectionError": ConnectionError,}
// In the error ingestion the error would then be instantiated like this
catch (err) {
  if (isContractualError(err)) { // some form of check that the error is a well-known error
    throw new classMap[err.name]
  }
  throw new Error("An unknown error occurred")
}
// Then in the err util it could be checked like this
if (error instanceof classMap["ConnectionError"]) {
  
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was that if we do that, we don't guarantee that the type caught is really the intended type - if two files in the JS SDK (accidentally) decide to define a ConnectionError, and we use name to identify name, we may mix them up, while the type would correctly be identified via instanceof.

I think this is more goldplating at the moment, and if we end up with such a situation we should just create an explicit errors package in the SDK in order to avoid duplication or conflicts.

if (error instanceof Error) {
if (
error.name === 'ConnectionError' ||
error.name === 'ConnectionClosedError' ||
error.name === 'ConnectionTimeoutError'
) {
return createNetworkErrorEvent({ error: convertError(error) })
} else if (error.name === 'ProtocolError') {
adriansmares marked this conversation as resolved.
Show resolved Hide resolved
ingestError(error.error)
return createUnknownErrorEvent({ error: convertError(error) })
}
return createUnknownErrorEvent({ error: convertError(error) })
}
}
Loading
Loading