From ef611086a4ccadaa28ab8a3df17d3cbac65e04f8 Mon Sep 17 00:00:00 2001 From: Artur Troian Date: Sat, 29 Mar 2025 17:38:06 -0500 Subject: [PATCH] refactor(event): parse events after querying block results fixes issue with server side of websocket closing subscription due to client not reading events fast. this happens when block has substantial amount of txs thus events count can be > 1000. this implementation now just waits for event with block header then queries block info and parses events. Signed-off-by: Artur Troian --- events/publish.go | 61 ++++++++++++++++++++++------------------------- events/query.go | 15 ++++++++---- 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/events/publish.go b/events/publish.go index a39f7b3e6c..ab459e79d6 100644 --- a/events/publish.go +++ b/events/publish.go @@ -22,46 +22,34 @@ import ( // Publish events using tm buses to clients. Waits on context // shutdown signals to exit. -func Publish(ctx context.Context, tmbus tmclient.EventsClient, name string, bus pubsub.Bus) (err error) { - +func Publish(ctx context.Context, client tmclient.Client, name string, bus pubsub.Bus) (err error) { const ( - queuesz = 100 + queuesz = 1000 ) var ( - txname = name + "-tx" - blkname = name + "-blk" + blkHeaderName = name + "-blk-hdr" ) - txch, err := tmbus.Subscribe(ctx, txname, txQuery().String(), queuesz) - if err != nil { - return err - } - defer func() { - err = tmbus.UnsubscribeAll(ctx, txname) - }() + tmbus := client.(tmclient.EventsClient) - blkch, err := tmbus.Subscribe(ctx, blkname, blkQuery().String(), queuesz) + blkch, err := tmbus.Subscribe(ctx, blkHeaderName, blkHeaderQuery().String(), queuesz) if err != nil { return err } defer func() { - err = tmbus.UnsubscribeAll(ctx, blkname) + err = tmbus.UnsubscribeAll(ctx, blkHeaderName) }() g, ctx := errgroup.WithContext(ctx) g.Go(func() error { - return publishEvents(ctx, txch, bus) - }) - - g.Go(func() error { - return publishEvents(ctx, blkch, bus) + return publishEvents(ctx, client, blkch, bus) }) return g.Wait() } -func publishEvents(ctx context.Context, ch <-chan ctypes.ResultEvent, bus pubsub.Bus) error { +func publishEvents(ctx context.Context, client tmclient.Client, ch <-chan ctypes.ResultEvent, bus pubsub.Bus) error { var err error loop: @@ -70,14 +58,10 @@ loop: case <-ctx.Done(): break loop case ed := <-ch: + // nolint: gocritic switch evt := ed.Data.(type) { - case tmtmtypes.EventDataTx: - if !evt.Result.IsOK() { - continue - } - processEvents(bus, evt.Result.GetEvents()) case tmtmtypes.EventDataNewBlockHeader: - processEvents(bus, evt.ResultEndBlock.GetEvents()) + processBlock(ctx, bus, client, evt.Header.Height) } } } @@ -85,15 +69,26 @@ loop: return err } -func processEvents(bus pubsub.Bus, events []abci.Event) { - for _, ev := range events { - if mev, ok := processEvent(ev); ok { - if err := bus.Publish(mev); err != nil { - bus.Close() - return - } +func processBlock(ctx context.Context, bus pubsub.Bus, client tmclient.Client, height int64) { + blkResults, err := client.BlockResults(ctx, &height) + if err != nil { + return + } + + for _, tx := range blkResults.TxsResults { + if tx == nil { continue } + + for _, ev := range tx.Events { + if mev, ok := processEvent(ev); ok { + if err := bus.Publish(mev); err != nil { + bus.Close() + return + } + continue + } + } } } diff --git a/events/query.go b/events/query.go index 249b842eaa..a932547104 100644 --- a/events/query.go +++ b/events/query.go @@ -8,12 +8,17 @@ import ( tmtypes "github.com/tendermint/tendermint/types" ) -func txQuery() pubsub.Query { - return tmquery.MustParse( - fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx)) -} +// func txQuery() pubsub.Query { +// return tmquery.MustParse( +// fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx)) +// } +// +// func blkQuery() pubsub.Query { +// return tmquery.MustParse( +// fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlock)) +// } -func blkQuery() pubsub.Query { +func blkHeaderQuery() pubsub.Query { return tmquery.MustParse( fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlockHeader)) }