Skip to content

Commit

Permalink
[WIP] Group tx events
Browse files Browse the repository at this point in the history
  • Loading branch information
heppu committed May 27, 2024
1 parent 665f422 commit b25e3ea
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 15 deletions.
24 changes: 12 additions & 12 deletions api/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ const BufferSize = 50

type Broadcaster struct {
s EventStream
head *eventBuffer

Check failure on line 20 in api/broadcaster.go

View workflow job for this annotation

GitHub Actions / build

undefined: eventBuffer
clientsMu sync.Mutex
nextID uint64
clients map[uint64]member
headIndex uint8
head [BufferSize][]byte
retryTimeout time.Duration
done chan struct{}
}

type member struct {
ch chan<- []byte
filter Filter
Expand All @@ -42,6 +40,7 @@ func NewBroadcaster(s EventStream, retryTimeout time.Duration) *Broadcaster {
s: s,
clients: make(map[uint64]member),
retryTimeout: retryTimeout,
head: newEventBuffer(BufferSize),

Check failure on line 43 in api/broadcaster.go

View workflow job for this annotation

GitHub Actions / build

undefined: newEventBuffer
done: make(chan struct{}),
}
}
Expand All @@ -51,18 +50,13 @@ func (b *Broadcaster) Subscribe(f Filter, prefill bool) (data <-chan []byte, uns
defer b.clientsMu.Unlock()

id := b.nextID
ch := make(chan []byte, len(b.head)+2)
ch := make(chan []byte, BufferSize+5) // +5 to avoid unnecessary blocking on broadcast
b.clients[id] = member{ch: ch, filter: f}
b.nextID++
slog.Info("client subscribed", slog.Uint64("id", id))

if prefill {
for i := 1; i <= len(b.head); i++ {
idx := (b.headIndex + uint8(i)) % uint8(len(b.head))
if b.head[idx] != nil {
ch <- b.head[idx]
}
}
b.head.writeAllToCh(ch)
}

return ch, func() {
Expand Down Expand Up @@ -100,10 +94,11 @@ func (b *Broadcaster) broadcast(e model.Event) {

b.clientsMu.Lock()
defer b.clientsMu.Unlock()

b.head[b.headIndex] = data

Check failure on line 98 in api/broadcaster.go

View workflow job for this annotation

GitHub Actions / build

b.headIndex undefined (type *Broadcaster has no field or method headIndex)
b.headIndex = (b.headIndex + 1) % uint8(len(b.head))

Check failure on line 99 in api/broadcaster.go

View workflow job for this annotation

GitHub Actions / build

b.headIndex undefined (type *Broadcaster has no field or method headIndex)
blocked := make([]uint64, 0, len(b.clients))

blocked := make([]uint64, 0, len(b.clients))
for id, c := range b.clients {
if c.filter(e) {
select {
Expand Down Expand Up @@ -133,7 +128,12 @@ func (b *Broadcaster) Stop() error {
}

func writeEvent(w io.Writer, e model.Event) error {
fmt.Fprintf(w, "event: %s\ndata: ", templates.EventTXRow)
eType := e.TxID
if e.State == model.StateSubmitted {
eType = templates.EventTXRow
}

fmt.Fprintf(w, "event: %s\ndata: ", eType)
if err := templates.Row(e).Render(context.Background(), w); err != nil {
return fmt.Errorf("failed render html: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion api/templates/index.templ
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ templ Table(events []model.Event, query url.Values) {
}

templ Row(e model.Event) {
<div class="tr" hx-get={ "/tx/" + e.TxID } hx-trigger="click" hx-target="#table" hx-swap="outerHTML">
<div id={ e.TxID } class="tr" hx-get={ "/tx/" + e.TxID } hx-trigger="click" sse-swap={ e.TxID } hx-swap="outerHTML">
<div class="left">
<div class="td">
<div class="mobile-label">State</div>
Expand Down
20 changes: 18 additions & 2 deletions api/templates/index_templ.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b25e3ea

Please sign in to comment.