From 601992ecd8c8dc4403cf980558bf07c251a8410b Mon Sep 17 00:00:00 2001 From: Henri Koski Date: Tue, 28 May 2024 00:37:22 +0300 Subject: [PATCH] Feat/group tx (#30) * Create type for state * Update mock store * Group tx events * Fix tests * Remove unnecessary mutex --- .github/workflows/main.yml | 1 + .github/workflows/pr.yml | 1 + api/broadcaster.go | 38 ++++---- api/broadcaster_test.go | 9 +- api/event_buffer.go | 59 ++++++++++++ api/server_test.go | 3 +- api/templates/index.templ | 8 +- api/templates/index_templ.go | 36 +++++-- integrationtests/integration_test.go | 26 ++--- model/model.go | 82 +++++++++++++++- store/mock/store.go | 138 ++++++++++++++++++--------- store/pg/store.go | 51 +++++----- 12 files changed, 327 insertions(+), 125 deletions(-) create mode 100644 api/event_buffer.go diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 9a4b5c3..ea8617e 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -8,6 +8,7 @@ on: jobs: build: runs-on: ubuntu-latest + timeout-minutes: 20 steps: - uses: actions/checkout@v4 - uses: elisa-actions/setup-go-and-mage@v1 diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 8a3b397..2e207fe 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -9,6 +9,7 @@ on: jobs: build: runs-on: ubuntu-latest + timeout-minutes: 20 permissions: pull-requests: write steps: diff --git a/api/broadcaster.go b/api/broadcaster.go index 3456991..d2bdb5c 100644 --- a/api/broadcaster.go +++ b/api/broadcaster.go @@ -13,19 +13,19 @@ import ( "github.com/gevulotnetwork/devnet-explorer/model" ) -const BufferSize = 50 +const BufferSize = 100 type Broadcaster struct { - s EventStream - clientsMu sync.Mutex - nextID uint64 - clients map[uint64]member - headIndex uint8 - head [BufferSize][]byte + s EventStream + head *eventBuffer + + clientsMu sync.Mutex + clients map[uint64]member + nextID uint64 + retryTimeout time.Duration done chan struct{} } - type member struct { ch chan<- []byte filter Filter @@ -42,6 +42,7 @@ func NewBroadcaster(s EventStream, retryTimeout time.Duration) *Broadcaster { s: s, clients: make(map[uint64]member), retryTimeout: retryTimeout, + head: newEventBuffer(BufferSize), done: make(chan struct{}), } } @@ -51,18 +52,13 @@ func (b *Broadcaster) Subscribe(f Filter, prefill bool) (data <-chan []byte, uns defer b.clientsMu.Unlock() id := b.nextID - ch := make(chan []byte, len(b.head)+2) + ch := make(chan []byte, BufferSize+5) // +5 to avoid unnecessary blocking on broadcast b.clients[id] = member{ch: ch, filter: f} b.nextID++ slog.Info("client subscribed", slog.Uint64("id", id)) if prefill { - for i := 1; i <= len(b.head); i++ { - idx := (b.headIndex + uint8(i)) % uint8(len(b.head)) - if b.head[idx] != nil { - ch <- b.head[idx] - } - } + b.head.writeAllToCh(ch) } return ch, func() { @@ -100,10 +96,9 @@ func (b *Broadcaster) broadcast(e model.Event) { b.clientsMu.Lock() defer b.clientsMu.Unlock() - b.head[b.headIndex] = data - b.headIndex = (b.headIndex + 1) % uint8(len(b.head)) - blocked := make([]uint64, 0, len(b.clients)) + b.head.add(e, data) + blocked := make([]uint64, 0, len(b.clients)) for id, c := range b.clients { if c.filter(e) { select { @@ -133,7 +128,12 @@ func (b *Broadcaster) Stop() error { } func writeEvent(w io.Writer, e model.Event) error { - fmt.Fprintf(w, "event: %s\ndata: ", templates.EventTXRow) + eType := e.TxID + if e.State == model.StateSubmitted { + eType = templates.EventTXRow + } + + fmt.Fprintf(w, "event: %s\ndata: ", eType) if err := templates.Row(e).Render(context.Background(), w); err != nil { return fmt.Errorf("failed render html: %w", err) } diff --git a/api/broadcaster_test.go b/api/broadcaster_test.go index 5dc1ef9..8ba860e 100644 --- a/api/broadcaster_test.go +++ b/api/broadcaster_test.go @@ -1,6 +1,7 @@ package api_test import ( + "fmt" "testing" "time" @@ -22,7 +23,7 @@ func TestBroadcasterOneClient(t *testing.T) { eg.Go(b.Run) ch, unsubscribe := b.Subscribe(api.NoFilter, true) - s.events <- model.Event{} + s.events <- model.Event{State: model.StateProving} select { case <-ch: case <-time.After(time.Second): @@ -51,7 +52,7 @@ func TestBroadcasterBuffer(t *testing.T) { const numOfEvents = api.BufferSize + 2 for i := 0; i < numOfEvents; i++ { - s.events <- model.Event{} + s.events <- model.Event{TxID: fmt.Sprint(i), State: model.StateProving} } // Give server some time to buffer events. @@ -119,7 +120,7 @@ func TestBroadcasterStuckClient(t *testing.T) { <-ready for i := 0; i < numOfEvents; i++ { - s.events <- model.Event{} + s.events <- model.Event{State: model.StateProving} } <-done @@ -167,7 +168,7 @@ func TestBroadcasterRetry(t *testing.T) { <-ready for i := 0; i < numOfEvents; i++ { - s.events <- model.Event{} + s.events <- model.Event{State: model.StateProving} } <-done diff --git a/api/event_buffer.go b/api/event_buffer.go new file mode 100644 index 0000000..ab7fef8 --- /dev/null +++ b/api/event_buffer.go @@ -0,0 +1,59 @@ +package api + +import ( + "bytes" + + "github.com/gevulotnetwork/devnet-explorer/api/templates" + "github.com/gevulotnetwork/devnet-explorer/model" +) + +type eventBuffer struct { + headIndex int + head []eventData + headMap map[string]header +} + +type eventData struct { + txID string + data []byte +} + +type header struct { + state model.State + index int +} + +func newEventBuffer(size uint) *eventBuffer { + return &eventBuffer{ + head: make([]eventData, size), + headMap: make(map[string]header, size), + } +} + +func (b *eventBuffer) add(e model.Event, data []byte) { + data = bytes.Replace(data, []byte("event: "+e.TxID), []byte("event: "+templates.EventTXRow), 1) + old, ok := b.headMap[e.TxID] + if !ok { + delete(b.headMap, b.head[b.headIndex].txID) + b.head[b.headIndex] = eventData{txID: e.TxID, data: data} + b.headMap[e.TxID] = header{state: e.State, index: b.headIndex} + b.headIndex = (b.headIndex + 1) % len(b.head) + return + } + + if e.State < old.state { + return + } + + b.head[old.index] = eventData{txID: e.TxID, data: data} + b.headMap[e.TxID] = header{state: e.State, index: old.index} +} + +func (b *eventBuffer) writeAllToCh(ch chan<- []byte) { + for i := 1; i <= len(b.head); i++ { + data := b.head[(b.headIndex+i)%len(b.head)].data + if data != nil { + ch <- data + } + } +} diff --git a/api/server_test.go b/api/server_test.go index fba9103..f514061 100644 --- a/api/server_test.go +++ b/api/server_test.go @@ -1,6 +1,7 @@ package api_test import ( + "fmt" "sync" "testing" "time" @@ -43,7 +44,7 @@ func TestServerMultipleClients(t *testing.T) { time.Sleep(time.Second * 1) for i := 0; i < numOfEvents; i++ { - s.events <- model.Event{} + s.events <- model.Event{TxID: fmt.Sprint(i), State: model.StateProving} } // Wait that at least half of the clients receive all events. diff --git a/api/templates/index.templ b/api/templates/index.templ index 9cd52dc..8883224 100644 --- a/api/templates/index.templ +++ b/api/templates/index.templ @@ -99,12 +99,12 @@ templ Table(events []model.Event, query url.Values) { } templ Row(e model.Event) { -
+
State
- { e.State } + { e.State.String() }
@@ -148,7 +148,7 @@ templ TxInfo(tx model.TxInfo) {
-
{ tx.State }
+
{ tx.State.String() }
{ formatDuration(tx.Duration) }
@TxIDBlock(tx.TxID, "Transaction ID") @@ -194,7 +194,7 @@ templ TxLogEvent(e model.TxLogEvent) {
State
- { e.State } + { e.State.String() }
diff --git a/api/templates/index_templ.go b/api/templates/index_templ.go index bcf61c3..01bba2a 100644 --- a/api/templates/index_templ.go +++ b/api/templates/index_templ.go @@ -315,7 +315,15 @@ func Row(e model.Event) templ.Component { templ_7745c5c3_Var13 = templ.NopComponent } ctx = templ.ClearChildren(ctx) - _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString("
State
") + _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString("\" hx-trigger=\"click\" sse-swap=\"") + if templ_7745c5c3_Err != nil { + return templ_7745c5c3_Err + } + _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(e.TxID)) + if templ_7745c5c3_Err != nil { + return templ_7745c5c3_Err + } + _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString("\" hx-swap=\"outerHTML\">
State
") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - var templ_7745c5c3_Var14 = []any{"tag", strings.ToLower(e.State)} + var templ_7745c5c3_Var14 = []any{"tag", strings.ToLower(e.State.String())} templ_7745c5c3_Err = templ.RenderCSSItems(ctx, templ_7745c5c3_Buffer, templ_7745c5c3_Var14...) if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err @@ -345,9 +361,9 @@ func Row(e model.Event) templ.Component { return templ_7745c5c3_Err } var templ_7745c5c3_Var15 string - templ_7745c5c3_Var15, templ_7745c5c3_Err = templ.JoinStringErrs(e.State) + templ_7745c5c3_Var15, templ_7745c5c3_Err = templ.JoinStringErrs(e.State.String()) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `templates/index.templ`, Line: 106, Col: 62} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `templates/index.templ`, Line: 106, Col: 80} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var15)) if templ_7745c5c3_Err != nil { @@ -480,9 +496,9 @@ func TxInfo(tx model.TxInfo) templ.Component { return templ_7745c5c3_Err } var templ_7745c5c3_Var22 string - templ_7745c5c3_Var22, templ_7745c5c3_Err = templ.JoinStringErrs(tx.State) + templ_7745c5c3_Var22, templ_7745c5c3_Err = templ.JoinStringErrs(tx.State.String()) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `templates/index.templ`, Line: 150, Col: 42} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `templates/index.templ`, Line: 150, Col: 51} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var22)) if templ_7745c5c3_Err != nil { @@ -650,7 +666,7 @@ func TxLogEvent(e model.TxLogEvent) templ.Component { if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - var templ_7745c5c3_Var30 = []any{"tag", strings.ToLower(e.State)} + var templ_7745c5c3_Var30 = []any{"tag", strings.ToLower(e.State.String())} templ_7745c5c3_Err = templ.RenderCSSItems(ctx, templ_7745c5c3_Buffer, templ_7745c5c3_Var30...) if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err @@ -668,9 +684,9 @@ func TxLogEvent(e model.TxLogEvent) templ.Component { return templ_7745c5c3_Err } var templ_7745c5c3_Var31 string - templ_7745c5c3_Var31, templ_7745c5c3_Err = templ.JoinStringErrs(e.State) + templ_7745c5c3_Var31, templ_7745c5c3_Err = templ.JoinStringErrs(e.State.String()) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `templates/index.templ`, Line: 196, Col: 61} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `templates/index.templ`, Line: 196, Col: 79} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var31)) if templ_7745c5c3_Err != nil { diff --git a/integrationtests/integration_test.go b/integrationtests/integration_test.go index d760eab..1b3ca6b 100644 --- a/integrationtests/integration_test.go +++ b/integrationtests/integration_test.go @@ -70,12 +70,12 @@ func getStats(t *testing.T) { func receiveFirstEvent(t *testing.T) { events := sseClient(t, "tx-row") - notify(t, `{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`) + notify(t, `{"state": "submitted","tx_id": "0","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`) select { case e := <-events: - expected := `
State
Transaction ID
1234
Prover ID
5678
Time
03:04 PM, 02/01/06
` - require.Contains(t, string(e.Data), expected) + expected := `
State
Transaction ID
0
Prover ID
5678
Time
03:04 PM, 02/01/06
` + require.Equal(t, expected, string(e.Data)) case <-time.After(time.Second * 5): t.Fatal("timeout") } @@ -83,14 +83,14 @@ func receiveFirstEvent(t *testing.T) { func receiveEventsFromBuffer(t *testing.T) { txs := []string{ - `{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, - `{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, - `{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, - `{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, - `{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, - `{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, - `{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, - `{"state": "submitted","tx_id": "1234","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, + `{"state": "submitted","tx_id": "1","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, + `{"state": "submitted","tx_id": "2","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, + `{"state": "submitted","tx_id": "3","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, + `{"state": "submitted","tx_id": "4","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, + `{"state": "submitted","tx_id": "5","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, + `{"state": "submitted","tx_id": "6","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, + `{"state": "submitted","tx_id": "7","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, + `{"state": "submitted","tx_id": "8","prover_id": "5678","timestamp": "2006-01-02T15:04:05Z"}`, } notify(t, txs...) @@ -103,8 +103,8 @@ func receiveEventsFromBuffer(t *testing.T) { for i := 0; i < expectedEvents; i++ { select { case e := <-events: - expected := `
State
Transaction ID
1234
Prover ID
5678
Time
03:04 PM, 02/01/06
` - assert.Contains(t, string(e.Data), expected) + expected := `
State
Transaction ID
%d
Prover ID
5678
Time
03:04 PM, 02/01/06
` + assert.Equal(t, fmt.Sprintf(expected, i, i, i, i), string(e.Data)) case <-time.After(time.Second * 5): t.Fatal("timeout") } diff --git a/model/model.go b/model/model.go index 8a3aac2..c05d676 100644 --- a/model/model.go +++ b/model/model.go @@ -1,6 +1,8 @@ package model import ( + "database/sql/driver" + "encoding/json" "fmt" "strings" "time" @@ -18,7 +20,7 @@ type Stats struct { } type Event struct { - State string `json:"state"` + State State `json:"state"` TxID string `db:"tx_id" json:"tx_id"` ProverID string `db:"prover_id" json:"prover_id"` Tag string `json:"tag"` @@ -26,7 +28,7 @@ type Event struct { } type TxInfo struct { - State string `json:"state"` + State State `json:"state"` Duration time.Duration `json:"duration"` TxID string `json:"tx_id"` UserID string `json:"user_id"` @@ -35,7 +37,7 @@ type TxInfo struct { } type TxLogEvent struct { - State string `json:"state"` + State State `json:"state"` IDType string `json:"id_type"` ID string `json:"id"` Timestamp time.Time `json:"timestamp"` @@ -90,3 +92,77 @@ func ParseStatsRange(r string) (StatsRange, error) { func SupportedStatsRanges() []StatsRange { return []StatsRange{RangeWeek, RangeMonth, RangeHalfYear, RangeYear} } + +type State uint8 + +const ( + StateUnknown State = 0 + StateSubmitted State = 1 + StateProving State = 2 + StateVerifying State = 3 + StateComplete State = 4 +) + +func (s *State) String() string { + switch *s { + case StateSubmitted: + return "submitted" + case StateProving: + return "proving" + case StateVerifying: + return "verifying" + case StateComplete: + return "complete" + default: + return "unknown" + } +} + +func (s *State) Scan(value interface{}) error { + stateStr, ok := value.(string) + if !ok { + return fmt.Errorf("incompatible type for State: %T", value) + } + newState, err := ParseState(stateStr) + if err != nil { + return err + } + *s = newState + return nil +} + +func (s *State) Value() (driver.Value, error) { + return s.String(), nil +} + +func (s *State) MarshalJSON() ([]byte, error) { + return json.Marshal(s.String()) +} + +func (s *State) UnmarshalJSON(data []byte) error { + var stateStr string + if err := json.Unmarshal(data, &stateStr); err != nil { + return err + } + newState, err := ParseState(stateStr) + if err != nil { + return err + } + *s = newState + return nil +} + +func ParseState(r string) (State, error) { + switch strings.ToLower(r) { + case "submitted": + return StateSubmitted, nil + case "proving": + return StateProving, nil + case "verifying": + return StateVerifying, nil + case "complete": + return StateComplete, nil + default: + return StateUnknown, fmt.Errorf("invalid State string: %s", r) + } +} diff --git a/store/mock/store.go b/store/mock/store.go index 9dbe475..8fa8e87 100644 --- a/store/mock/store.go +++ b/store/mock/store.go @@ -4,6 +4,7 @@ package mock import ( "crypto/sha512" "encoding/hex" + "fmt" "math/rand" "strings" "sync" @@ -13,18 +14,25 @@ import ( _ "github.com/jackc/pgx/v5/stdlib" ) +const ( + Parallelism = 20 +) + type Store struct { - eventsMu sync.RWMutex - events []model.Event - stats model.Stats - eventsCh chan model.Event - done chan struct{} + eventsMu sync.RWMutex + events []model.Event + eventQueue [Parallelism]model.Event + eventMap map[string]model.TxInfo + stats model.Stats + eventsCh chan model.Event + done chan struct{} } func New() *Store { return &Store{ stats: model.Stats{}, eventsCh: make(chan model.Event, 1000), + eventMap: make(map[string]model.TxInfo), done: make(chan struct{}), } } @@ -44,7 +52,7 @@ func (s *Store) Stats(model.StatsRange) (model.Stats, error) { func (s *Store) Run() error { defer close(s.eventsCh) for { - e := randomEvent() + e := s.nextEvent() s.eventsMu.Lock() s.events = append(s.events, e) s.eventsMu.Unlock() @@ -78,59 +86,99 @@ func (s *Store) Search(filter string) ([]model.Event, error) { } func (s *Store) TxInfo(id string) (model.TxInfo, error) { - now := time.Now() - proverID := sha512.Sum512([]byte(time.Now().String())) - verifierID := sha512.Sum512([]byte(time.Now().String())) - completeID := sha512.Sum512([]byte(time.Now().String())) - userID := sha512.Sum512([]byte(time.Now().String())) - info := model.TxInfo{ - State: "completed", - Duration: 65*time.Minute + 12*time.Second, - TxID: id, - UserID: hex.EncodeToString(userID[:]), - ProverID: hex.EncodeToString(proverID[:]), + info, ok := s.eventMap[id] + if !ok { + return model.TxInfo{}, fmt.Errorf("tx %s not found", id) } + return info, nil +} + +func (s *Store) Stop() error { + close(s.done) + return nil +} + +func (s *Store) nextEvent() model.Event { + i := rand.Intn(Parallelism) + switch s.eventQueue[i].State { + case model.StateSubmitted: + s.eventQueue[i].State = model.StateProving + s.eventQueue[i].Timestamp = time.Now().Local() - info.Log = []model.TxLogEvent{ - { - State: "complete", + info := s.eventMap[s.eventQueue[i].TxID] + info.Log = append(s.eventMap[s.eventQueue[i].TxID].Log, model.TxLogEvent{ + State: model.StateProving, IDType: "node id", - ID: hex.EncodeToString(completeID[:]), - Timestamp: now, - }, - { - State: "verifying", + ID: s.eventQueue[i].ProverID, + Timestamp: s.eventQueue[i].Timestamp, + }) + s.eventMap[s.eventQueue[i].TxID] = info + + case model.StateProving: + s.eventQueue[i].State = model.StateVerifying + s.eventQueue[i].Timestamp = time.Now().Local() + + verifierID := sha512.Sum512([]byte(time.Now().String())) + info := s.eventMap[s.eventQueue[i].TxID] + info.Log = append(s.eventMap[s.eventQueue[i].TxID].Log, model.TxLogEvent{ + State: model.StateVerifying, IDType: "node id", ID: hex.EncodeToString(verifierID[:]), - Timestamp: now.Add(-12 * time.Minute), - }, - { - State: "proving", + Timestamp: s.eventQueue[i].Timestamp, + }) + s.eventMap[s.eventQueue[i].TxID] = info + + case model.StateVerifying: + s.eventQueue[i].State = []model.State{model.StateVerifying, model.StateVerifying, model.StateVerifying, model.StateComplete}[rand.Intn(4)] + s.eventQueue[i].Timestamp = time.Now().Local() + + info := s.eventMap[s.eventQueue[i].TxID] + id := sha512.Sum512([]byte(time.Now().String())) + log := model.TxLogEvent{ IDType: "node id", - ID: info.ProverID, - Timestamp: now.Add(-33 * time.Minute), - }, - { - State: "submitted", - IDType: "user id", - ID: info.UserID, - Timestamp: now.Add(-65 * time.Minute), - }, - } + ID: hex.EncodeToString(id[:]), + Timestamp: s.eventQueue[i].Timestamp, + } - return info, nil -} + if s.eventQueue[i].State == model.StateComplete { + info.Duration = s.eventQueue[i].Timestamp.Sub(info.Log[0].Timestamp) + info.State = model.StateComplete + log.State = model.StateComplete + } else { + log.State = model.StateVerifying + } -func (s *Store) Stop() error { - close(s.done) - return nil + info.Log = append(s.eventMap[s.eventQueue[i].TxID].Log, log) + s.eventMap[s.eventQueue[i].TxID] = info + + case model.StateUnknown, model.StateComplete: + s.eventQueue[i] = randomEvent() + + userID := sha512.Sum512([]byte(time.Now().String())) + s.eventMap[s.eventQueue[i].TxID] = model.TxInfo{ + State: model.StateProving, + TxID: s.eventQueue[i].TxID, + UserID: hex.EncodeToString(userID[:]), + ProverID: s.eventQueue[i].ProverID, + Log: []model.TxLogEvent{ + { + State: model.StateSubmitted, + IDType: "user id", + ID: hex.EncodeToString(userID[:]), + Timestamp: s.eventQueue[i].Timestamp, + }, + }, + } + } + + return s.eventQueue[i] } func randomEvent() model.Event { txID := sha512.Sum512([]byte(time.Now().String())) proverID := sha512.Sum512([]byte(time.Now().String())) return model.Event{ - State: []string{"submitted", "verifying", "proving", "complete"}[rand.Intn(4)], + State: model.StateSubmitted, Tag: []string{"starknet", "polygon", "", "", "", "", "", ""}[rand.Intn(8)], TxID: hex.EncodeToString(txID[:]), ProverID: hex.EncodeToString(proverID[:]), diff --git a/store/pg/store.go b/store/pg/store.go index 755ada4..4a9813a 100644 --- a/store/pg/store.go +++ b/store/pg/store.go @@ -313,10 +313,7 @@ func getJobDuration(txs []gevulotTransaction) time.Duration { return end.Sub(begin) } -func getState(txs []gevulotTransaction) string { - // Submitted is the default state. - state := "Submitted" - +func getState(txs []gevulotTransaction) model.State { proofs := 0 verifications := 0 @@ -328,30 +325,32 @@ func getState(txs []gevulotTransaction) string { } } - if proofs > 0 && verifications == 0 { - state = "Proving" - } else if verifications > 0 && verifications < 3 { - state = "Verifying" - } else if verifications > 2 { - state = "Complete" + switch { + case proofs > 0 && verifications == 0: + return model.StateProving + case verifications > 0 && verifications < 3: + return model.StateVerifying + case verifications > 2: + return model.StateComplete + default: + return model.StateSubmitted } - - return state } -func txLogEventsFromTxs(txs []gevulotTransaction) []model.TxLogEvent { - stateFromKind := func(k txKind) string { - switch k { - case run: - return "Submitted" - case proof: - return "Proving" - case verification: - return "Verifying" - } - return "" +func stateFromKind(k txKind) model.State { + switch k { + case run: + return model.StateSubmitted + case proof: + return model.StateProving + case verification: + return model.StateVerifying + default: + return model.StateUnknown } +} +func txLogEventsFromTxs(txs []gevulotTransaction) []model.TxLogEvent { var events []model.TxLogEvent for _, tx := range txs { e := model.TxLogEvent{ @@ -362,7 +361,7 @@ func txLogEventsFromTxs(txs []gevulotTransaction) []model.TxLogEvent { } // Special handling for the Run tx. - if e.State == "Submitted" { + if e.State == model.StateComplete { e.IDType = "user id" } events = append(events, e) @@ -376,10 +375,10 @@ func txLogEventsFromTxs(txs []gevulotTransaction) []model.TxLogEvent { // Finalize run job as complete after two verifications. verifications := 0 for _, e := range events { - if e.State == "Verifying" { + if e.State == model.StateVerifying { verifications++ if verifications > 2 { - e.State = "Complete" + e.State = model.StateComplete } } }