Skip to content

Commit

Permalink
Group tx events
Browse files Browse the repository at this point in the history
  • Loading branch information
heppu committed May 27, 2024
1 parent 665f422 commit 6c22b57
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 26 deletions.
40 changes: 20 additions & 20 deletions api/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ import (
"github.com/gevulotnetwork/devnet-explorer/model"
)

const BufferSize = 50
const BufferSize = 100

type Broadcaster struct {
s EventStream
clientsMu sync.Mutex
nextID uint64
clients map[uint64]member
headIndex uint8
head [BufferSize][]byte
s EventStream
head *eventBuffer

clientsMu sync.Mutex
clients map[uint64]member
nextID uint64

retryTimeout time.Duration
done chan struct{}
}

type member struct {
ch chan<- []byte
filter Filter
Expand All @@ -42,6 +42,7 @@ func NewBroadcaster(s EventStream, retryTimeout time.Duration) *Broadcaster {
s: s,
clients: make(map[uint64]member),
retryTimeout: retryTimeout,
head: newEventBuffer(BufferSize),
done: make(chan struct{}),
}
}
Expand All @@ -51,18 +52,13 @@ func (b *Broadcaster) Subscribe(f Filter, prefill bool) (data <-chan []byte, uns
defer b.clientsMu.Unlock()

id := b.nextID
ch := make(chan []byte, len(b.head)+2)
ch := make(chan []byte, BufferSize+5) // +5 to avoid unnecessary blocking on broadcast
b.clients[id] = member{ch: ch, filter: f}
b.nextID++
slog.Info("client subscribed", slog.Uint64("id", id))

if prefill {
for i := 1; i <= len(b.head); i++ {
idx := (b.headIndex + uint8(i)) % uint8(len(b.head))
if b.head[idx] != nil {
ch <- b.head[idx]
}
}
b.head.writeAllToCh(ch)
}

return ch, func() {
Expand Down Expand Up @@ -96,14 +92,13 @@ func (b *Broadcaster) broadcast(e model.Event) {
slog.Error("failed write event into buffer", slog.Any("error", err))
return
}

data := buf.Bytes()
b.head.add(e, data)
blocked := make([]uint64, 0, len(b.clients))

b.clientsMu.Lock()
defer b.clientsMu.Unlock()
b.head[b.headIndex] = data
b.headIndex = (b.headIndex + 1) % uint8(len(b.head))
blocked := make([]uint64, 0, len(b.clients))

for id, c := range b.clients {
if c.filter(e) {
select {
Expand Down Expand Up @@ -133,7 +128,12 @@ func (b *Broadcaster) Stop() error {
}

func writeEvent(w io.Writer, e model.Event) error {
fmt.Fprintf(w, "event: %s\ndata: ", templates.EventTXRow)
eType := e.TxID
if e.State == model.StateSubmitted {
eType = templates.EventTXRow
}

fmt.Fprintf(w, "event: %s\ndata: ", eType)
if err := templates.Row(e).Render(context.Background(), w); err != nil {
return fmt.Errorf("failed render html: %w", err)
}
Expand Down
70 changes: 70 additions & 0 deletions api/event_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package api

import (
"bytes"
"sync"

"github.com/gevulotnetwork/devnet-explorer/api/templates"
"github.com/gevulotnetwork/devnet-explorer/model"
)

type eventBuffer struct {
headMu sync.Mutex
headIndex int
head []eventData
headMap map[string]header
}

type eventData struct {
txID string
data []byte
}

type header struct {
state model.State
index int
}

func newEventBuffer(size uint) *eventBuffer {
return &eventBuffer{
head: make([]eventData, size),
headMap: make(map[string]header, size),
}
}

func (b *eventBuffer) add(e model.Event, data []byte) {
b.headMu.Lock()
defer b.headMu.Unlock()

data = bytes.Replace(data, []byte("event: "+e.TxID), []byte("event: "+templates.EventTXRow), 1)
old, ok := b.headMap[e.TxID]
if !ok {
delete(b.headMap, b.head[b.headIndex].txID)
b.head[b.headIndex] = eventData{txID: e.TxID, data: data}
b.headMap[e.TxID] = header{state: e.State, index: b.headIndex}
b.headIndex = (b.headIndex + 1) % len(b.head)
return
}

if e.State.LessThan(old.state) {
return
}

b.head[old.index] = eventData{txID: e.TxID, data: data}
b.headMap[e.TxID] = header{state: e.State, index: old.index}
}

func (b *eventBuffer) writeAllToCh(ch chan<- []byte) {
b.headMu.Lock()
index := b.headIndex
headCopy := make([]eventData, len(b.head))
copy(headCopy, b.head)
b.headMu.Unlock()

for i := 1; i <= len(headCopy); i++ {
idx := (index + i) % len(headCopy)
if headCopy[idx].data != nil {
ch <- headCopy[idx].data
}
}
}
2 changes: 1 addition & 1 deletion api/templates/index.templ
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ templ Table(events []model.Event, query url.Values) {
}

templ Row(e model.Event) {
<div class="tr" hx-get={ "/tx/" + e.TxID } hx-trigger="click" hx-target="#table" hx-swap="outerHTML">
<div id={ e.TxID } class="tr" hx-get={ "/tx/" + e.TxID } hx-trigger="click" sse-swap={ e.TxID } hx-swap="outerHTML">
<div class="left">
<div class="td">
<div class="mobile-label">State</div>
Expand Down
20 changes: 18 additions & 2 deletions api/templates/index_templ.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func SupportedStatsRanges() []StatsRange {

type State interface {
String() string
state()
LessThan(State) bool
state() state
}

type state uint8
Expand All @@ -122,7 +123,9 @@ func (s state) String() string {
}
}

func (s state) state() {}
func (s state) state() state { return s }

func (s state) LessThan(ss State) bool { return s < ss.state() }

func (s *state) Scan(value interface{}) error {
stateStr, ok := value.(string)
Expand Down
2 changes: 1 addition & 1 deletion store/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

const (
Parallelism = 10
Parallelism = 20
)

type Store struct {
Expand Down

0 comments on commit 6c22b57

Please sign in to comment.