Skip to content

Commit

Permalink
Broadcast URI
Browse files Browse the repository at this point in the history
  • Loading branch information
ARR552 committed Mar 7, 2023
1 parent 420f1d0 commit 444a92a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 14 deletions.
11 changes: 6 additions & 5 deletions sequencer/broadcast/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ import (
)

// NewClient creates a grpc client to communicates with the Broadcast server
func NewClient(ctx context.Context, serverAddress string) (pb.BroadcastServiceClient, *grpc.ClientConn, context.CancelFunc) {
func NewClient(ctx context.Context, serverAddress string) (pb.BroadcastServiceClient, *grpc.ClientConn, context.CancelFunc, error) {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
const maxWaitSeconds = 120
ctx, cancel := context.WithTimeout(ctx, maxWaitSeconds*time.Second)
ctx2, cancel := context.WithTimeout(ctx, maxWaitSeconds*time.Second)
log.Infof("connecting to broadcast service: %v", serverAddress)
conn, err := grpc.DialContext(ctx, serverAddress, opts...)
conn, err := grpc.DialContext(ctx2, serverAddress, opts...)
if err != nil {
log.Fatalf("failed to connect to broadcast service: %v", err)
log.Errorf("failed to connect to broadcast service: %v", err)
return nil, nil, cancel, err
}
client := pb.NewBroadcastServiceClient(conn)
log.Info("connected to broadcast service")

return client, conn, cancel
return client, conn, cancel, nil
}
2 changes: 1 addition & 1 deletion state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func (s *State) sendBatchRequestToExecutor(ctx context.Context, processBatchRequ
if caller != DiscardCallerLabel {
metrics.ExecutorProcessingTime(string(caller), elapsed)
}
log.Infof("It took %v for the executor to process the request", elapsed)
log.Infof("Batch: %d took %v to be processed by the executor ", processBatchRequest.OldBatchNum+1, elapsed)

return res, err
}
Expand Down
30 changes: 23 additions & 7 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ClientSynchronizer struct {
state stateInterface
pool poolInterface
ethTxManager ethTxManager
broadcastURI string
ctx context.Context
cancelCtx context.CancelFunc
genesis state.Genesis
Expand All @@ -52,6 +53,15 @@ func NewSynchronizer(
cfg Config) (Synchronizer, error) {
ctx, cancel := context.WithCancel(context.Background())

log.Debug("Getting broadcast URI")
broadcastURI, err := getBroadcastURI(ethMan)
if err != nil {
log.Errorf("error getting broadcast URI. Error: %v", err)
cancel()
return nil, err
}
log.Debug("broadcastURI ", broadcastURI)

return &ClientSynchronizer{
isTrustedSequencer: isTrustedSequencer,
state: st,
Expand All @@ -60,6 +70,7 @@ func NewSynchronizer(
ctx: ctx,
cancelCtx: cancel,
ethTxManager: ethTxManager,
broadcastURI: broadcastURI,
genesis: genesis,
cfg: cfg,
}, nil
Expand Down Expand Up @@ -265,25 +276,25 @@ func (s *ClientSynchronizer) syncTrustedState(latestSyncedBatch uint64) error {
return nil
}

log.Debug("Getting broadcast URI")
broadcastURI, err := s.getBroadcastURI()
broadcastClient, _, cancel, err := broadcast.NewClient(s.ctx, s.broadcastURI)
if err != nil {
log.Errorf("error getting broadcast URI. Error: %v", err)
log.Warn("error connecting to the broadcast. Error: ", err)
cancel()
return err
}
log.Debug("broadcastURI ", broadcastURI)
broadcastClient, _, _ := broadcast.NewClient(s.ctx, broadcastURI)

log.Info("Getting trusted state info")
lastTrustedStateBatch, err := broadcastClient.GetLastBatch(s.ctx, &emptypb.Empty{})
if err != nil {
log.Warn("error syncing trusted state. Error: ", err)
cancel()
return err
}

log.Debug("lastTrustedStateBatch.BatchNumber ", lastTrustedStateBatch.BatchNumber)
log.Debug("latestSyncedBatch ", latestSyncedBatch)
if lastTrustedStateBatch.BatchNumber < latestSyncedBatch {
cancel()
return nil
}

Expand All @@ -292,17 +303,20 @@ func (s *ClientSynchronizer) syncTrustedState(latestSyncedBatch uint64) error {
batchToSync, err := broadcastClient.GetBatch(s.ctx, &pb.GetBatchRequest{BatchNumber: batchNumberToSync})
if err != nil {
log.Warnf("failed to get batch %v from trusted state via broadcast. Error: %v", batchNumberToSync, err)
cancel()
return err
}

dbTx, err := s.state.BeginStateTransaction(s.ctx)
if err != nil {
log.Errorf("error creating db transaction to sync trusted batch %v: %v", batchNumberToSync, err)
cancel()
return err
}

if err := s.processTrustedBatch(batchToSync, dbTx); err != nil {
log.Errorf("error processing trusted batch %v: %v", batchNumberToSync, err)
cancel()
err := dbTx.Rollback(s.ctx)
if err != nil {
log.Errorf("error rolling back db transaction to sync trusted batch %v: %v", batchNumberToSync, err)
Expand All @@ -313,19 +327,21 @@ func (s *ClientSynchronizer) syncTrustedState(latestSyncedBatch uint64) error {

if err := dbTx.Commit(s.ctx); err != nil {
log.Errorf("error committing db transaction to sync trusted batch %v: %v", batchNumberToSync, err)
cancel()
return err
}

batchNumberToSync++
}

cancel()
return nil
}

// gets the broadcast URI from trusted sequencer JSON RPC server
func (s *ClientSynchronizer) getBroadcastURI() (string, error) {
func getBroadcastURI(etherMan ethermanInterface) (string, error) {
log.Debug("getting trusted sequencer URL from smc")
trustedSequencerURL, err := s.etherMan.GetTrustedSequencerURL()
trustedSequencerURL, err := etherMan.GetTrustedSequencerURL()
if err != nil {
return "", err
}
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func TestBroadcast(t *testing.T) {

require.NoError(t, populateDB(ctx, st))

client, conn, cancel := broadcast.NewClient(ctx, serverAddress)
client, conn, cancel, err := broadcast.NewClient(ctx, serverAddress)
require.NoError(t, err)
defer func() {
cancel()
require.NoError(t, conn.Close())
Expand Down

0 comments on commit 444a92a

Please sign in to comment.