diff --git a/api/broadcaster.go b/api/broadcaster.go index 3456991..bd15518 100644 --- a/api/broadcaster.go +++ b/api/broadcaster.go @@ -17,15 +17,13 @@ const BufferSize = 50 type Broadcaster struct { s EventStream + head *eventBuffer clientsMu sync.Mutex nextID uint64 clients map[uint64]member - headIndex uint8 - head [BufferSize][]byte retryTimeout time.Duration done chan struct{} } - type member struct { ch chan<- []byte filter Filter @@ -42,6 +40,7 @@ func NewBroadcaster(s EventStream, retryTimeout time.Duration) *Broadcaster { s: s, clients: make(map[uint64]member), retryTimeout: retryTimeout, + head: newEventBuffer(BufferSize), done: make(chan struct{}), } } @@ -51,18 +50,13 @@ func (b *Broadcaster) Subscribe(f Filter, prefill bool) (data <-chan []byte, uns defer b.clientsMu.Unlock() id := b.nextID - ch := make(chan []byte, len(b.head)+2) + ch := make(chan []byte, BufferSize+5) // +5 to avoid unnecessary blocking on broadcast b.clients[id] = member{ch: ch, filter: f} b.nextID++ slog.Info("client subscribed", slog.Uint64("id", id)) if prefill { - for i := 1; i <= len(b.head); i++ { - idx := (b.headIndex + uint8(i)) % uint8(len(b.head)) - if b.head[idx] != nil { - ch <- b.head[idx] - } - } + b.head.writeAllToCh(ch) } return ch, func() { @@ -100,10 +94,11 @@ func (b *Broadcaster) broadcast(e model.Event) { b.clientsMu.Lock() defer b.clientsMu.Unlock() + b.head[b.headIndex] = data b.headIndex = (b.headIndex + 1) % uint8(len(b.head)) - blocked := make([]uint64, 0, len(b.clients)) + blocked := make([]uint64, 0, len(b.clients)) for id, c := range b.clients { if c.filter(e) { select { @@ -133,7 +128,12 @@ func (b *Broadcaster) Stop() error { } func writeEvent(w io.Writer, e model.Event) error { - fmt.Fprintf(w, "event: %s\ndata: ", templates.EventTXRow) + eType := e.TxID + if e.State == model.StateSubmitted { + eType = templates.EventTXRow + } + + fmt.Fprintf(w, "event: %s\ndata: ", eType) if err := templates.Row(e).Render(context.Background(), w); err != nil { return fmt.Errorf("failed render html: %w", err) } diff --git a/api/templates/index.templ b/api/templates/index.templ index 5b42ed5..8883224 100644 --- a/api/templates/index.templ +++ b/api/templates/index.templ @@ -99,7 +99,7 @@ templ Table(events []model.Event, query url.Values) { } templ Row(e model.Event) { -
+
State
diff --git a/api/templates/index_templ.go b/api/templates/index_templ.go index 344b164..01bba2a 100644 --- a/api/templates/index_templ.go +++ b/api/templates/index_templ.go @@ -315,7 +315,15 @@ func Row(e model.Event) templ.Component { templ_7745c5c3_Var13 = templ.NopComponent } ctx = templ.ClearChildren(ctx) - _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString("
State
") + _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString("\" hx-trigger=\"click\" sse-swap=\"") + if templ_7745c5c3_Err != nil { + return templ_7745c5c3_Err + } + _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(e.TxID)) + if templ_7745c5c3_Err != nil { + return templ_7745c5c3_Err + } + _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString("\" hx-swap=\"outerHTML\">
State
") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err }