From 6c22b5731e230fea7bf6429ca5b00d26f3fa8cb2 Mon Sep 17 00:00:00 2001 From: Henri Koski Date: Mon, 27 May 2024 20:43:23 +0300 Subject: [PATCH] Group tx events --- api/broadcaster.go | 40 ++++++++++----------- api/event_buffer.go | 70 ++++++++++++++++++++++++++++++++++++ api/templates/index.templ | 2 +- api/templates/index_templ.go | 20 +++++++++-- model/model.go | 7 ++-- store/mock/store.go | 2 +- 6 files changed, 115 insertions(+), 26 deletions(-) create mode 100644 api/event_buffer.go diff --git a/api/broadcaster.go b/api/broadcaster.go index 3456991..9630e99 100644 --- a/api/broadcaster.go +++ b/api/broadcaster.go @@ -13,19 +13,19 @@ import ( "github.com/gevulotnetwork/devnet-explorer/model" ) -const BufferSize = 50 +const BufferSize = 100 type Broadcaster struct { - s EventStream - clientsMu sync.Mutex - nextID uint64 - clients map[uint64]member - headIndex uint8 - head [BufferSize][]byte + s EventStream + head *eventBuffer + + clientsMu sync.Mutex + clients map[uint64]member + nextID uint64 + retryTimeout time.Duration done chan struct{} } - type member struct { ch chan<- []byte filter Filter @@ -42,6 +42,7 @@ func NewBroadcaster(s EventStream, retryTimeout time.Duration) *Broadcaster { s: s, clients: make(map[uint64]member), retryTimeout: retryTimeout, + head: newEventBuffer(BufferSize), done: make(chan struct{}), } } @@ -51,18 +52,13 @@ func (b *Broadcaster) Subscribe(f Filter, prefill bool) (data <-chan []byte, uns defer b.clientsMu.Unlock() id := b.nextID - ch := make(chan []byte, len(b.head)+2) + ch := make(chan []byte, BufferSize+5) // +5 to avoid unnecessary blocking on broadcast b.clients[id] = member{ch: ch, filter: f} b.nextID++ slog.Info("client subscribed", slog.Uint64("id", id)) if prefill { - for i := 1; i <= len(b.head); i++ { - idx := (b.headIndex + uint8(i)) % uint8(len(b.head)) - if b.head[idx] != nil { - ch <- b.head[idx] - } - } + b.head.writeAllToCh(ch) } return ch, func() { @@ -96,14 +92,13 @@ func (b *Broadcaster) broadcast(e model.Event) { slog.Error("failed write event into buffer", slog.Any("error", err)) return } + data := buf.Bytes() + b.head.add(e, data) + blocked := make([]uint64, 0, len(b.clients)) b.clientsMu.Lock() defer b.clientsMu.Unlock() - b.head[b.headIndex] = data - b.headIndex = (b.headIndex + 1) % uint8(len(b.head)) - blocked := make([]uint64, 0, len(b.clients)) - for id, c := range b.clients { if c.filter(e) { select { @@ -133,7 +128,12 @@ func (b *Broadcaster) Stop() error { } func writeEvent(w io.Writer, e model.Event) error { - fmt.Fprintf(w, "event: %s\ndata: ", templates.EventTXRow) + eType := e.TxID + if e.State == model.StateSubmitted { + eType = templates.EventTXRow + } + + fmt.Fprintf(w, "event: %s\ndata: ", eType) if err := templates.Row(e).Render(context.Background(), w); err != nil { return fmt.Errorf("failed render html: %w", err) } diff --git a/api/event_buffer.go b/api/event_buffer.go new file mode 100644 index 0000000..9fdc1c2 --- /dev/null +++ b/api/event_buffer.go @@ -0,0 +1,70 @@ +package api + +import ( + "bytes" + "sync" + + "github.com/gevulotnetwork/devnet-explorer/api/templates" + "github.com/gevulotnetwork/devnet-explorer/model" +) + +type eventBuffer struct { + headMu sync.Mutex + headIndex int + head []eventData + headMap map[string]header +} + +type eventData struct { + txID string + data []byte +} + +type header struct { + state model.State + index int +} + +func newEventBuffer(size uint) *eventBuffer { + return &eventBuffer{ + head: make([]eventData, size), + headMap: make(map[string]header, size), + } +} + +func (b *eventBuffer) add(e model.Event, data []byte) { + b.headMu.Lock() + defer b.headMu.Unlock() + + data = bytes.Replace(data, []byte("event: "+e.TxID), []byte("event: "+templates.EventTXRow), 1) + old, ok := b.headMap[e.TxID] + if !ok { + delete(b.headMap, b.head[b.headIndex].txID) + b.head[b.headIndex] = eventData{txID: e.TxID, data: data} + b.headMap[e.TxID] = header{state: e.State, index: b.headIndex} + b.headIndex = (b.headIndex + 1) % len(b.head) + return + } + + if e.State.LessThan(old.state) { + return + } + + b.head[old.index] = eventData{txID: e.TxID, data: data} + b.headMap[e.TxID] = header{state: e.State, index: old.index} +} + +func (b *eventBuffer) writeAllToCh(ch chan<- []byte) { + b.headMu.Lock() + index := b.headIndex + headCopy := make([]eventData, len(b.head)) + copy(headCopy, b.head) + b.headMu.Unlock() + + for i := 1; i <= len(headCopy); i++ { + idx := (index + i) % len(headCopy) + if headCopy[idx].data != nil { + ch <- headCopy[idx].data + } + } +} diff --git a/api/templates/index.templ b/api/templates/index.templ index 5b42ed5..8883224 100644 --- a/api/templates/index.templ +++ b/api/templates/index.templ @@ -99,7 +99,7 @@ templ Table(events []model.Event, query url.Values) { } templ Row(e model.Event) { -
+
State
diff --git a/api/templates/index_templ.go b/api/templates/index_templ.go index 344b164..01bba2a 100644 --- a/api/templates/index_templ.go +++ b/api/templates/index_templ.go @@ -315,7 +315,15 @@ func Row(e model.Event) templ.Component { templ_7745c5c3_Var13 = templ.NopComponent } ctx = templ.ClearChildren(ctx) - _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString("
State
") + _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString("\" hx-trigger=\"click\" sse-swap=\"") + if templ_7745c5c3_Err != nil { + return templ_7745c5c3_Err + } + _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(e.TxID)) + if templ_7745c5c3_Err != nil { + return templ_7745c5c3_Err + } + _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString("\" hx-swap=\"outerHTML\">
State
") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } diff --git a/model/model.go b/model/model.go index bf50489..abf67ae 100644 --- a/model/model.go +++ b/model/model.go @@ -95,7 +95,8 @@ func SupportedStatsRanges() []StatsRange { type State interface { String() string - state() + LessThan(State) bool + state() state } type state uint8 @@ -122,7 +123,9 @@ func (s state) String() string { } } -func (s state) state() {} +func (s state) state() state { return s } + +func (s state) LessThan(ss State) bool { return s < ss.state() } func (s *state) Scan(value interface{}) error { stateStr, ok := value.(string) diff --git a/store/mock/store.go b/store/mock/store.go index 8d68c91..c828b9c 100644 --- a/store/mock/store.go +++ b/store/mock/store.go @@ -15,7 +15,7 @@ import ( ) const ( - Parallelism = 10 + Parallelism = 20 ) type Store struct {