diff --git a/events/publish.go b/events/publish.go index a39f7b3e6c..ab459e79d6 100644 --- a/events/publish.go +++ b/events/publish.go @@ -22,46 +22,34 @@ import ( // Publish events using tm buses to clients. Waits on context // shutdown signals to exit. -func Publish(ctx context.Context, tmbus tmclient.EventsClient, name string, bus pubsub.Bus) (err error) { - +func Publish(ctx context.Context, client tmclient.Client, name string, bus pubsub.Bus) (err error) { const ( - queuesz = 100 + queuesz = 1000 ) var ( - txname = name + "-tx" - blkname = name + "-blk" + blkHeaderName = name + "-blk-hdr" ) - txch, err := tmbus.Subscribe(ctx, txname, txQuery().String(), queuesz) - if err != nil { - return err - } - defer func() { - err = tmbus.UnsubscribeAll(ctx, txname) - }() + tmbus := client.(tmclient.EventsClient) - blkch, err := tmbus.Subscribe(ctx, blkname, blkQuery().String(), queuesz) + blkch, err := tmbus.Subscribe(ctx, blkHeaderName, blkHeaderQuery().String(), queuesz) if err != nil { return err } defer func() { - err = tmbus.UnsubscribeAll(ctx, blkname) + err = tmbus.UnsubscribeAll(ctx, blkHeaderName) }() g, ctx := errgroup.WithContext(ctx) g.Go(func() error { - return publishEvents(ctx, txch, bus) - }) - - g.Go(func() error { - return publishEvents(ctx, blkch, bus) + return publishEvents(ctx, client, blkch, bus) }) return g.Wait() } -func publishEvents(ctx context.Context, ch <-chan ctypes.ResultEvent, bus pubsub.Bus) error { +func publishEvents(ctx context.Context, client tmclient.Client, ch <-chan ctypes.ResultEvent, bus pubsub.Bus) error { var err error loop: @@ -70,14 +58,10 @@ loop: case <-ctx.Done(): break loop case ed := <-ch: + // nolint: gocritic switch evt := ed.Data.(type) { - case tmtmtypes.EventDataTx: - if !evt.Result.IsOK() { - continue - } - processEvents(bus, evt.Result.GetEvents()) case tmtmtypes.EventDataNewBlockHeader: - processEvents(bus, evt.ResultEndBlock.GetEvents()) + processBlock(ctx, bus, client, evt.Header.Height) } } } @@ -85,15 +69,26 @@ loop: return err } -func processEvents(bus pubsub.Bus, events []abci.Event) { - for _, ev := range events { - if mev, ok := processEvent(ev); ok { - if err := bus.Publish(mev); err != nil { - bus.Close() - return - } +func processBlock(ctx context.Context, bus pubsub.Bus, client tmclient.Client, height int64) { + blkResults, err := client.BlockResults(ctx, &height) + if err != nil { + return + } + + for _, tx := range blkResults.TxsResults { + if tx == nil { continue } + + for _, ev := range tx.Events { + if mev, ok := processEvent(ev); ok { + if err := bus.Publish(mev); err != nil { + bus.Close() + return + } + continue + } + } } } diff --git a/events/query.go b/events/query.go index 249b842eaa..a932547104 100644 --- a/events/query.go +++ b/events/query.go @@ -8,12 +8,17 @@ import ( tmtypes "github.com/tendermint/tendermint/types" ) -func txQuery() pubsub.Query { - return tmquery.MustParse( - fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx)) -} +// func txQuery() pubsub.Query { +// return tmquery.MustParse( +// fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx)) +// } +// +// func blkQuery() pubsub.Query { +// return tmquery.MustParse( +// fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlock)) +// } -func blkQuery() pubsub.Query { +func blkHeaderQuery() pubsub.Query { return tmquery.MustParse( fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlockHeader)) }