Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix event stream concurrency and state machine #6776

Merged
merged 12 commits into from
Dec 18, 2023
Merged
9 changes: 8 additions & 1 deletion pkg/console/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ import (
"context"
"net/http"
"sync"
"time"

"github.com/gorilla/mux"
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/config"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/middleware"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/log"
"go.thethings.network/lorawan-stack/v3/pkg/random"
"go.thethings.network/lorawan-stack/v3/pkg/ratelimit"
"go.thethings.network/lorawan-stack/v3/pkg/task"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
Expand All @@ -40,6 +43,9 @@ import (
const (
authorizationProtocolPrefix = "ttn.lorawan.v3.header.authorization.bearer."
protocolV1 = "ttn.lorawan.v3.console.internal.events.v1"

pingPeriod = time.Minute
pingJitter = 0.1
)

// Component is the interface of the component to the events API handler.
Expand Down Expand Up @@ -95,7 +101,7 @@ func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) {
}
defer conn.Close(websocket.StatusNormalClosure, "main task closed")

ctx, cancel := context.WithCancelCause(ctx)
ctx, cancel := errorcontext.New(ctx)
defer cancel(nil)

var wg sync.WaitGroup
Expand All @@ -108,6 +114,7 @@ func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) {
"console_events_mux": makeMuxTask(m, cancel),
"console_events_read": makeReadTask(conn, m, rateLimit, cancel),
"console_events_write": makeWriteTask(conn, m, cancel),
"console_events_ping": makePingTask(conn, cancel, random.Jitter(pingPeriod, pingJitter)),
} {
wg.Add(1)
h.component.StartTask(&task.Config{
Expand Down
7 changes: 6 additions & 1 deletion pkg/console/internal/events/eventsmux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package eventsmux
import (
"context"

"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/log"
)
Expand Down Expand Up @@ -54,7 +56,7 @@ func (m *mux) Responses() <-chan protocol.Response {

// Run implements Interface.
func (m *mux) Run(ctx context.Context) (err error) {
ctx, cancel := context.WithCancelCause(ctx)
ctx, cancel := errorcontext.New(ctx)
defer func() { cancel(err) }()
subs := m.createSubs(ctx, cancel)
defer subs.Close()
Expand All @@ -63,6 +65,9 @@ func (m *mux) Run(ctx context.Context) (err error) {
case <-ctx.Done():
return ctx.Err()
case req := <-m.requestCh:
if err := rights.RequireAuthenticated(ctx); err != nil {
return err
}
var resp protocol.Response
switch req := req.(type) {
case *protocol.SubscribeRequest:
Expand Down
4 changes: 4 additions & 0 deletions pkg/console/internal/events/eventsmux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ func TestMux(t *testing.T) { // nolint:gocyclo
unique.ID(ctx, appIDs): ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL),
}),
})
ctx = rights.NewContextWithAuthInfo(ctx, &ttnpb.AuthInfoResponse{
UniversalRights: ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL),
IsAdmin: true,
})

subs := &mockSubscriptions{
ctx: ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights/rightsutil"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/errors"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/log"
Expand Down Expand Up @@ -141,7 +142,7 @@ func (s *subscriptions) Subscribe(
return err
}
ch := make(chan events.Event, channelSize(tail))
ctx, cancel := context.WithCancelCause(s.ctx)
ctx, cancel := errorcontext.New(s.ctx)
defer func() {
if err != nil {
cancel(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/task"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
Expand Down Expand Up @@ -143,7 +144,7 @@ func runTestSubscriptions(
_, historical := subscriber.(interface{ historical() })

a, ctx := test.New(t)
ctx, cancel := context.WithCancelCause(ctx)
ctx, cancel := errorcontext.New(ctx)
defer cancel(nil)

timeout := test.Delay << 3
Expand Down
19 changes: 19 additions & 0 deletions pkg/console/internal/events/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"io"
"time"

"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol"
Expand Down Expand Up @@ -79,3 +80,21 @@ func makeWriteTask(conn *websocket.Conn, m eventsmux.Interface, cancel func(erro
}
}
}

func makePingTask(conn *websocket.Conn, cancel func(error), period time.Duration) func(context.Context) error {
return func(ctx context.Context) (err error) {
ticker := time.NewTicker(period)
defer ticker.Stop()
defer func() { cancel(err) }()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := conn.Ping(ctx); err != nil {
return err
}
}
}
}
}
29 changes: 24 additions & 5 deletions pkg/webui/console/lib/events/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { ingestError } from '@ttn-lw/lib/errors/utils'

import { createNetworkErrorEvent, createUnknownErrorEvent } from './definitions'

export const defineSyntheticEvent = name => data => ({
Expand All @@ -23,13 +25,30 @@ export const defineSyntheticEvent = name => data => ({
data,
})

export const createSyntheticEventFromError = error => {
const convertError = error => {
if (error instanceof Error) {
const errorString = error.toString()
if (error.message === 'network error' || error.message === 'Error in body stream') {
return createNetworkErrorEvent({ error: errorString })
return {
...error,
message: error.message,
name: error.name,
// The stack is omitted intentionally, as it is not relevant for a user.
}
}
return error
}

return createUnknownErrorEvent({ error: errorString })
export const createSyntheticEventFromError = error => {
if (error instanceof Error) {
if (
error.name === 'ConnectionError' ||
error.name === 'ConnectionClosedError' ||
error.name === 'ConnectionTimeoutError'
) {
return createNetworkErrorEvent({ error: convertError(error) })
} else if (error.name === 'ProtocolError') {
ingestError(error.error)
return createUnknownErrorEvent({ error: convertError(error) })
}
return createUnknownErrorEvent({ error: convertError(error) })
}
}
Loading
Loading