Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 28 additions & 33 deletions events/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,34 @@ import (

// Publish events using tm buses to clients. Waits on context
// shutdown signals to exit.
func Publish(ctx context.Context, tmbus tmclient.EventsClient, name string, bus pubsub.Bus) (err error) {

func Publish(ctx context.Context, client tmclient.Client, name string, bus pubsub.Bus) (err error) {
const (
queuesz = 100
queuesz = 1000
)
var (
txname = name + "-tx"
blkname = name + "-blk"
blkHeaderName = name + "-blk-hdr"
)

txch, err := tmbus.Subscribe(ctx, txname, txQuery().String(), queuesz)
if err != nil {
return err
}
defer func() {
err = tmbus.UnsubscribeAll(ctx, txname)
}()
tmbus := client.(tmclient.EventsClient)

blkch, err := tmbus.Subscribe(ctx, blkname, blkQuery().String(), queuesz)
blkch, err := tmbus.Subscribe(ctx, blkHeaderName, blkHeaderQuery().String(), queuesz)
if err != nil {
return err
}
defer func() {
err = tmbus.UnsubscribeAll(ctx, blkname)
err = tmbus.UnsubscribeAll(ctx, blkHeaderName)
}()

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
return publishEvents(ctx, txch, bus)
})

g.Go(func() error {
return publishEvents(ctx, blkch, bus)
return publishEvents(ctx, client, blkch, bus)
})

return g.Wait()
}

func publishEvents(ctx context.Context, ch <-chan ctypes.ResultEvent, bus pubsub.Bus) error {
func publishEvents(ctx context.Context, client tmclient.Client, ch <-chan ctypes.ResultEvent, bus pubsub.Bus) error {
var err error

loop:
Expand All @@ -70,30 +58,37 @@ loop:
case <-ctx.Done():
break loop
case ed := <-ch:
// nolint: gocritic
switch evt := ed.Data.(type) {
case tmtmtypes.EventDataTx:
if !evt.Result.IsOK() {
continue
}
processEvents(bus, evt.Result.GetEvents())
case tmtmtypes.EventDataNewBlockHeader:
processEvents(bus, evt.ResultEndBlock.GetEvents())
processBlock(ctx, bus, client, evt.Header.Height)
}
}
}

return err
}

func processEvents(bus pubsub.Bus, events []abci.Event) {
for _, ev := range events {
if mev, ok := processEvent(ev); ok {
if err := bus.Publish(mev); err != nil {
bus.Close()
return
}
func processBlock(ctx context.Context, bus pubsub.Bus, client tmclient.Client, height int64) {
blkResults, err := client.BlockResults(ctx, &height)
if err != nil {
return
}

for _, tx := range blkResults.TxsResults {
if tx == nil {
continue
}

for _, ev := range tx.Events {
if mev, ok := processEvent(ev); ok {
if err := bus.Publish(mev); err != nil {
bus.Close()
return
}
continue
}
}
}
}

Expand Down
15 changes: 10 additions & 5 deletions events/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ import (
tmtypes "github.com/tendermint/tendermint/types"
)

func txQuery() pubsub.Query {
return tmquery.MustParse(
fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx))
}
// func txQuery() pubsub.Query {
// return tmquery.MustParse(
// fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx))
// }
//
// func blkQuery() pubsub.Query {
// return tmquery.MustParse(
// fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlock))
// }

func blkQuery() pubsub.Query {
func blkHeaderQuery() pubsub.Query {
return tmquery.MustParse(
fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlockHeader))
}
Loading