Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast URI #1722

Merged
merged 4 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions sequencer/broadcast/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ import (
)

// NewClient creates a grpc client to communicates with the Broadcast server
func NewClient(ctx context.Context, serverAddress string) (pb.BroadcastServiceClient, *grpc.ClientConn, context.CancelFunc) {
func NewClient(ctx context.Context, serverAddress string) (pb.BroadcastServiceClient, *grpc.ClientConn, context.CancelFunc, error) {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
const maxWaitSeconds = 120
ctx, cancel := context.WithTimeout(ctx, maxWaitSeconds*time.Second)
ctx2, cancel := context.WithTimeout(ctx, maxWaitSeconds*time.Second)
log.Infof("connecting to broadcast service: %v", serverAddress)
conn, err := grpc.DialContext(ctx, serverAddress, opts...)
conn, err := grpc.DialContext(ctx2, serverAddress, opts...)
if err != nil {
log.Fatalf("failed to connect to broadcast service: %v", err)
log.Errorf("failed to connect to broadcast service: %v", err)
return nil, nil, cancel, err
}
client := pb.NewBroadcastServiceClient(conn)
log.Info("connected to broadcast service")

return client, conn, cancel
return client, conn, cancel, nil
}
34 changes: 22 additions & 12 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ func newDBManager(ctx context.Context, txPool txPool, state dbManagerStateInterf
// Start stars the dbManager routines
func (d *dbManager) Start() {
go d.loadFromPool()
go func() {
for {
// TODO: Move this to a config parameter
time.Sleep(wait * time.Second)
d.checkIfReorg()
}
}()
go d.storeProcessedTxAndDeleteFromPool()
}

Expand Down Expand Up @@ -101,24 +108,26 @@ func (d *dbManager) CreateFirstBatch(ctx context.Context, sequencerAddress commo
return processingCtx
}

// checkIfReorg checks if a reorg has happened
func (d *dbManager) checkIfReorg() {
numberOfReorgs, err := d.state.CountReorgs(d.ctx, nil)
if err != nil {
log.Error("failed to get number of reorgs: %v", err)
}

if numberOfReorgs != d.numberOfReorgs {
log.Warnf("New L2 reorg detected")
d.l2ReorgCh <- L2ReorgEvent{}
d.txsStore.Wg.Done()
}
}

// loadFromPool keeps loading transactions from the pool
func (d *dbManager) loadFromPool() {
for {
// TODO: Move this to a config parameter
time.Sleep(wait * time.Second)

numberOfReorgs, err := d.state.CountReorgs(d.ctx, nil)
if err != nil {
log.Error("failed to get number of reorgs: %v", err)
}

if numberOfReorgs != d.numberOfReorgs {
log.Warnf("New L2 reorg detected")
d.l2ReorgCh <- L2ReorgEvent{}
d.txsStore.Wg.Done()
continue
}

poolTransactions, err := d.txPool.GetNonWIPPendingTxs(d.ctx, false, 0)
if err != nil && err != pgpoolstorage.ErrNotFound {
log.Errorf("load tx from pool: %v", err)
Expand Down Expand Up @@ -174,6 +183,7 @@ func (d *dbManager) storeProcessedTxAndDeleteFromPool() {
// TODO: Finish the retry mechanism and error handling
for {
txToStore := <-d.txsStore.Ch
d.checkIfReorg()
log.Debugf("Storing tx %v", txToStore.txResponse.TxHash)
dbTx, err := d.BeginStateTransaction(d.ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func (s *State) sendBatchRequestToExecutor(ctx context.Context, processBatchRequ
if caller != DiscardCallerLabel {
metrics.ExecutorProcessingTime(string(caller), elapsed)
}
log.Infof("It took %v for the executor to process the request", elapsed)
log.Infof("Batch: %d took %v to be processed by the executor ", processBatchRequest.OldBatchNum+1, elapsed)

return res, err
}
Expand Down
36 changes: 28 additions & 8 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ClientSynchronizer struct {
state stateInterface
pool poolInterface
ethTxManager ethTxManager
broadcastURI string
ctx context.Context
cancelCtx context.CancelFunc
genesis state.Genesis
Expand All @@ -52,6 +53,19 @@ func NewSynchronizer(
cfg Config) (Synchronizer, error) {
ctx, cancel := context.WithCancel(context.Background())

var broadcastURI string
if !isTrustedSequencer {
var err error
log.Debug("Getting broadcast URI")
broadcastURI, err = getBroadcastURI(ethMan)
if err != nil {
log.Errorf("error getting broadcast URI. Error: %v", err)
cancel()
return nil, err
}
log.Debug("broadcastURI ", broadcastURI)
}

return &ClientSynchronizer{
isTrustedSequencer: isTrustedSequencer,
state: st,
Expand All @@ -60,6 +74,7 @@ func NewSynchronizer(
ctx: ctx,
cancelCtx: cancel,
ethTxManager: ethTxManager,
broadcastURI: broadcastURI,
genesis: genesis,
cfg: cfg,
}, nil
Expand Down Expand Up @@ -157,7 +172,6 @@ func (s *ClientSynchronizer) Sync() error {
log.Warn("error syncing trusted state. Error: ", err)
continue
}
log.Info("Trusted state fully synchronized")
waitDuration = s.cfg.SyncInterval.Duration
}
}
Expand Down Expand Up @@ -265,25 +279,25 @@ func (s *ClientSynchronizer) syncTrustedState(latestSyncedBatch uint64) error {
return nil
}

log.Debug("Getting broadcast URI")
broadcastURI, err := s.getBroadcastURI()
broadcastClient, _, cancel, err := broadcast.NewClient(s.ctx, s.broadcastURI)
if err != nil {
log.Errorf("error getting broadcast URI. Error: %v", err)
log.Warn("error connecting to the broadcast. Error: ", err)
cancel()
return err
}
log.Debug("broadcastURI ", broadcastURI)
broadcastClient, _, _ := broadcast.NewClient(s.ctx, broadcastURI)

log.Info("Getting trusted state info")
lastTrustedStateBatch, err := broadcastClient.GetLastBatch(s.ctx, &emptypb.Empty{})
if err != nil {
log.Warn("error syncing trusted state. Error: ", err)
cancel()
return err
}

log.Debug("lastTrustedStateBatch.BatchNumber ", lastTrustedStateBatch.BatchNumber)
log.Debug("latestSyncedBatch ", latestSyncedBatch)
if lastTrustedStateBatch.BatchNumber < latestSyncedBatch {
cancel()
return nil
}

Expand All @@ -292,17 +306,20 @@ func (s *ClientSynchronizer) syncTrustedState(latestSyncedBatch uint64) error {
batchToSync, err := broadcastClient.GetBatch(s.ctx, &pb.GetBatchRequest{BatchNumber: batchNumberToSync})
if err != nil {
log.Warnf("failed to get batch %v from trusted state via broadcast. Error: %v", batchNumberToSync, err)
cancel()
return err
}

dbTx, err := s.state.BeginStateTransaction(s.ctx)
if err != nil {
log.Errorf("error creating db transaction to sync trusted batch %v: %v", batchNumberToSync, err)
cancel()
return err
}

if err := s.processTrustedBatch(batchToSync, dbTx); err != nil {
log.Errorf("error processing trusted batch %v: %v", batchNumberToSync, err)
cancel()
err := dbTx.Rollback(s.ctx)
if err != nil {
log.Errorf("error rolling back db transaction to sync trusted batch %v: %v", batchNumberToSync, err)
Expand All @@ -313,19 +330,22 @@ func (s *ClientSynchronizer) syncTrustedState(latestSyncedBatch uint64) error {

if err := dbTx.Commit(s.ctx); err != nil {
log.Errorf("error committing db transaction to sync trusted batch %v: %v", batchNumberToSync, err)
cancel()
return err
}

batchNumberToSync++
}

cancel()
log.Info("Trusted state fully synchronized")
return nil
}

// gets the broadcast URI from trusted sequencer JSON RPC server
func (s *ClientSynchronizer) getBroadcastURI() (string, error) {
func getBroadcastURI(etherMan ethermanInterface) (string, error) {
log.Debug("getting trusted sequencer URL from smc")
trustedSequencerURL, err := s.etherMan.GetTrustedSequencerURL()
trustedSequencerURL, err := etherMan.GetTrustedSequencerURL()
if err != nil {
return "", err
}
Expand Down
28 changes: 26 additions & 2 deletions synchronizer/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package synchronizer

import (
context "context"
"fmt"
"math/big"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"
Expand All @@ -28,6 +31,11 @@ type mocks struct {
}

func TestTrustedStateReorg(t *testing.T) {
data := `{"jsonrpc":"2.0","id":1,"result":"zkevm-broadcast:61090"}`
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, data)
}))
defer svr.Close()
type testCase struct {
Name string
getTrustedBatch func(*mocks, context.Context, etherman.SequencedBatch) *state.Batch
Expand All @@ -41,7 +49,13 @@ func TestTrustedStateReorg(t *testing.T) {
SyncChunkSize: 10,
GenBlockNumber: uint64(123456),
}
sync, err := NewSynchronizer(true, m.Etherman, m.State, m.Pool, m.EthTxManager, genesis, cfg)

m.Etherman.
On("GetTrustedSequencerURL").
Return(svr.URL, nil).
Once()

sync, err := NewSynchronizer(false, m.Etherman, m.State, m.Pool, m.EthTxManager, genesis, cfg)
require.NoError(t, err)

// state preparation
Expand Down Expand Up @@ -337,6 +351,11 @@ func TestTrustedStateReorg(t *testing.T) {
}

func TestForcedBatch(t *testing.T) {
data := `{"jsonrpc":"2.0","id":1,"result":"zkevm-broadcast:61090"}`
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, data)
}))
defer svr.Close()
genesis := state.Genesis{}
cfg := Config{
SyncInterval: cfgTypes.Duration{Duration: 1 * time.Second},
Expand All @@ -351,7 +370,12 @@ func TestForcedBatch(t *testing.T) {
DbTx: newDbTxMock(t),
}

sync, err := NewSynchronizer(true, m.Etherman, m.State, m.Pool, m.EthTxManager, genesis, cfg)
m.Etherman.
On("GetTrustedSequencerURL").
Return(svr.URL, nil).
Once()

sync, err := NewSynchronizer(false, m.Etherman, m.State, m.Pool, m.EthTxManager, genesis, cfg)
require.NoError(t, err)

// state preparation
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func TestBroadcast(t *testing.T) {

require.NoError(t, populateDB(ctx, st))

client, conn, cancel := broadcast.NewClient(ctx, serverAddress)
client, conn, cancel, err := broadcast.NewClient(ctx, serverAddress)
require.NoError(t, err)
defer func() {
cancel()
require.NoError(t, conn.Close())
Expand Down