Skip to content

Commit

Permalink
Feat/group tx (#30)
Browse files Browse the repository at this point in the history
* Create type for state

* Update mock store

* Group tx events

* Fix tests

* Remove unnecessary mutex
  • Loading branch information
heppu committed May 27, 2024
1 parent 6cdaab6 commit 601992e
Show file tree
Hide file tree
Showing 12 changed files with 327 additions and 125 deletions.
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
jobs:
build:
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- uses: actions/checkout@v4
- uses: elisa-actions/setup-go-and-mage@v1
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ on:
jobs:
build:
runs-on: ubuntu-latest
timeout-minutes: 20
permissions:
pull-requests: write
steps:
Expand Down
38 changes: 19 additions & 19 deletions api/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ import (
"github.com/gevulotnetwork/devnet-explorer/model"
)

const BufferSize = 50
const BufferSize = 100

type Broadcaster struct {
s EventStream
clientsMu sync.Mutex
nextID uint64
clients map[uint64]member
headIndex uint8
head [BufferSize][]byte
s EventStream
head *eventBuffer

clientsMu sync.Mutex
clients map[uint64]member
nextID uint64

retryTimeout time.Duration
done chan struct{}
}

type member struct {
ch chan<- []byte
filter Filter
Expand All @@ -42,6 +42,7 @@ func NewBroadcaster(s EventStream, retryTimeout time.Duration) *Broadcaster {
s: s,
clients: make(map[uint64]member),
retryTimeout: retryTimeout,
head: newEventBuffer(BufferSize),
done: make(chan struct{}),
}
}
Expand All @@ -51,18 +52,13 @@ func (b *Broadcaster) Subscribe(f Filter, prefill bool) (data <-chan []byte, uns
defer b.clientsMu.Unlock()

id := b.nextID
ch := make(chan []byte, len(b.head)+2)
ch := make(chan []byte, BufferSize+5) // +5 to avoid unnecessary blocking on broadcast
b.clients[id] = member{ch: ch, filter: f}
b.nextID++
slog.Info("client subscribed", slog.Uint64("id", id))

if prefill {
for i := 1; i <= len(b.head); i++ {
idx := (b.headIndex + uint8(i)) % uint8(len(b.head))
if b.head[idx] != nil {
ch <- b.head[idx]
}
}
b.head.writeAllToCh(ch)
}

return ch, func() {
Expand Down Expand Up @@ -100,10 +96,9 @@ func (b *Broadcaster) broadcast(e model.Event) {

b.clientsMu.Lock()
defer b.clientsMu.Unlock()
b.head[b.headIndex] = data
b.headIndex = (b.headIndex + 1) % uint8(len(b.head))
blocked := make([]uint64, 0, len(b.clients))

b.head.add(e, data)
blocked := make([]uint64, 0, len(b.clients))
for id, c := range b.clients {
if c.filter(e) {
select {
Expand Down Expand Up @@ -133,7 +128,12 @@ func (b *Broadcaster) Stop() error {
}

func writeEvent(w io.Writer, e model.Event) error {
fmt.Fprintf(w, "event: %s\ndata: ", templates.EventTXRow)
eType := e.TxID
if e.State == model.StateSubmitted {
eType = templates.EventTXRow
}

fmt.Fprintf(w, "event: %s\ndata: ", eType)
if err := templates.Row(e).Render(context.Background(), w); err != nil {
return fmt.Errorf("failed render html: %w", err)
}
Expand Down
9 changes: 5 additions & 4 deletions api/broadcaster_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api_test

import (
"fmt"
"testing"
"time"

Expand All @@ -22,7 +23,7 @@ func TestBroadcasterOneClient(t *testing.T) {
eg.Go(b.Run)

ch, unsubscribe := b.Subscribe(api.NoFilter, true)
s.events <- model.Event{}
s.events <- model.Event{State: model.StateProving}
select {
case <-ch:
case <-time.After(time.Second):
Expand Down Expand Up @@ -51,7 +52,7 @@ func TestBroadcasterBuffer(t *testing.T) {

const numOfEvents = api.BufferSize + 2
for i := 0; i < numOfEvents; i++ {
s.events <- model.Event{}
s.events <- model.Event{TxID: fmt.Sprint(i), State: model.StateProving}
}

// Give server some time to buffer events.
Expand Down Expand Up @@ -119,7 +120,7 @@ func TestBroadcasterStuckClient(t *testing.T) {

<-ready
for i := 0; i < numOfEvents; i++ {
s.events <- model.Event{}
s.events <- model.Event{State: model.StateProving}
}

<-done
Expand Down Expand Up @@ -167,7 +168,7 @@ func TestBroadcasterRetry(t *testing.T) {

<-ready
for i := 0; i < numOfEvents; i++ {
s.events <- model.Event{}
s.events <- model.Event{State: model.StateProving}
}

<-done
Expand Down
59 changes: 59 additions & 0 deletions api/event_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package api

import (
"bytes"

"github.com/gevulotnetwork/devnet-explorer/api/templates"
"github.com/gevulotnetwork/devnet-explorer/model"
)

type eventBuffer struct {
headIndex int
head []eventData
headMap map[string]header
}

type eventData struct {
txID string
data []byte
}

type header struct {
state model.State
index int
}

func newEventBuffer(size uint) *eventBuffer {
return &eventBuffer{
head: make([]eventData, size),
headMap: make(map[string]header, size),
}
}

func (b *eventBuffer) add(e model.Event, data []byte) {
data = bytes.Replace(data, []byte("event: "+e.TxID), []byte("event: "+templates.EventTXRow), 1)
old, ok := b.headMap[e.TxID]
if !ok {
delete(b.headMap, b.head[b.headIndex].txID)
b.head[b.headIndex] = eventData{txID: e.TxID, data: data}
b.headMap[e.TxID] = header{state: e.State, index: b.headIndex}
b.headIndex = (b.headIndex + 1) % len(b.head)
return
}

if e.State < old.state {
return
}

b.head[old.index] = eventData{txID: e.TxID, data: data}
b.headMap[e.TxID] = header{state: e.State, index: old.index}
}

func (b *eventBuffer) writeAllToCh(ch chan<- []byte) {
for i := 1; i <= len(b.head); i++ {
data := b.head[(b.headIndex+i)%len(b.head)].data
if data != nil {
ch <- data
}
}
}
3 changes: 2 additions & 1 deletion api/server_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api_test

import (
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -43,7 +44,7 @@ func TestServerMultipleClients(t *testing.T) {
time.Sleep(time.Second * 1)

for i := 0; i < numOfEvents; i++ {
s.events <- model.Event{}
s.events <- model.Event{TxID: fmt.Sprint(i), State: model.StateProving}
}

// Wait that at least half of the clients receive all events.
Expand Down
8 changes: 4 additions & 4 deletions api/templates/index.templ
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ templ Table(events []model.Event, query url.Values) {
}

templ Row(e model.Event) {
<div class="tr" hx-get={ "/tx/" + e.TxID } hx-trigger="click" hx-target="#table" hx-swap="outerHTML">
<div id={ e.TxID } class="tr" hx-get={ "/tx/" + e.TxID } hx-trigger="click" sse-swap={ e.TxID } hx-swap="outerHTML">
<div class="left">
<div class="td">
<div class="mobile-label">State</div>
<div>
<span class={ "tag", strings.ToLower(e.State) }>{ e.State }</span>
<span class={ "tag", strings.ToLower(e.State.String()) }>{ e.State.String() }</span>
</div>
</div>
<div class="td">
Expand Down Expand Up @@ -148,7 +148,7 @@ templ TxInfo(tx model.TxInfo) {
<div id="tx-info-blocks">
<div id="tx-top">
<div id="tx-state-block">
<div id="tx-current-state">{ tx.State }</div>
<div id="tx-current-state">{ tx.State.String() }</div>
<div id="tx-duration">{ formatDuration(tx.Duration) }</div>
</div>
@TxIDBlock(tx.TxID, "Transaction ID")
Expand Down Expand Up @@ -194,7 +194,7 @@ templ TxLogEvent(e model.TxLogEvent) {
<div class="tx-log-state">
<div class="mobile-label">State</div>
<div>
<span class={ "tag", strings.ToLower(e.State) }>{ e.State }</span>
<span class={ "tag", strings.ToLower(e.State.String()) }>{ e.State.String() }</span>
</div>
</div>
<div class="tx-log-id-wrap">
Expand Down
36 changes: 26 additions & 10 deletions api/templates/index_templ.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 13 additions & 13 deletions integrationtests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,27 @@ func getStats(t *testing.T) {

func receiveFirstEvent(t *testing.T) {
events := sseClient(t, "tx-row")
notify(t, `{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`)
notify(t, `{"state": "submitted","tx_id": "0","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`)

select {
case e := <-events:
expected := `<div class="tr" hx-get="/tx/1234" hx-trigger="click" hx-target="#table" hx-swap="outerHTML"><div class="left"><div class="td"><div class="mobile-label">State</div><div><span class="tag submitted">submitted</span></div></div><div class="td"><div class="mobile-label">Transaction ID</div><div>1234</div></div></div><div class="right"><div class="td"><div class="mobile-label">Prover ID</div><div><span>5678</span></div></div><div class="td"><div class="mobile-label">Time</div><div><span class="datetime">03:04 PM, 02/01/06</span></div></div></div><div class="end"><span class="arrow">→</span></div></div>`
require.Contains(t, string(e.Data), expected)
expected := `<div id="0" class="tr" hx-get="/tx/0" hx-trigger="click" sse-swap="0" hx-swap="outerHTML"><div class="left"><div class="td"><div class="mobile-label">State</div><div><span class="tag submitted">submitted</span></div></div><div class="td"><div class="mobile-label">Transaction ID</div><div>0</div></div></div><div class="right"><div class="td"><div class="mobile-label">Prover ID</div><div><span>5678</span></div></div><div class="td"><div class="mobile-label">Time</div><div><span class="datetime">03:04 PM, 02/01/06</span></div></div></div><div class="end"><span class="arrow">→</span></div></div>`
require.Equal(t, expected, string(e.Data))
case <-time.After(time.Second * 5):
t.Fatal("timeout")
}
}

func receiveEventsFromBuffer(t *testing.T) {
txs := []string{
`{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "1","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "2","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "3","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "4","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "5","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "6","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "7","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
`{"state": "submitted","tx_id": "8","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`,
}

notify(t, txs...)
Expand All @@ -103,8 +103,8 @@ func receiveEventsFromBuffer(t *testing.T) {
for i := 0; i < expectedEvents; i++ {
select {
case e := <-events:
expected := `<div class="tr" hx-get="/tx/1234" hx-trigger="click" hx-target="#table" hx-swap="outerHTML"><div class="left"><div class="td"><div class="mobile-label">State</div><div><span class="tag submitted">submitted</span></div></div><div class="td"><div class="mobile-label">Transaction ID</div><div>1234</div></div></div><div class="right"><div class="td"><div class="mobile-label">Prover ID</div><div><span>5678</span></div></div><div class="td"><div class="mobile-label">Time</div><div><span class="datetime">03:04 PM, 02/01/06</span></div></div></div><div class="end"><span class="arrow">→</span></div></div>`
assert.Contains(t, string(e.Data), expected)
expected := `<div id="%d" class="tr" hx-get="/tx/%d" hx-trigger="click" sse-swap="%d" hx-swap="outerHTML"><div class="left"><div class="td"><div class="mobile-label">State</div><div><span class="tag submitted">submitted</span></div></div><div class="td"><div class="mobile-label">Transaction ID</div><div>%d</div></div></div><div class="right"><div class="td"><div class="mobile-label">Prover ID</div><div><span>5678</span></div></div><div class="td"><div class="mobile-label">Time</div><div><span class="datetime">03:04 PM, 02/01/06</span></div></div></div><div class="end"><span class="arrow">→</span></div></div>`
assert.Equal(t, fmt.Sprintf(expected, i, i, i, i), string(e.Data))
case <-time.After(time.Second * 5):
t.Fatal("timeout")
}
Expand Down
Loading

0 comments on commit 601992e

Please sign in to comment.