Skip to content
Permalink
Browse files

Separate Log Revision Writes from Map Writes (#1263)

* Separate Log Revision Writes from Map Writes

* Update server.go

* Update main.go

* Update main.go

* Update sequencer.go

* Update main.go

* Update sequencer.go

* Update main.go

* Update main.go

* Update main.go
  • Loading branch information...
therealdrake committed Apr 23, 2019
1 parent 6f59d79 commit ab5619b8a155bd0bd1ce4c8151b763bbc8b9c238
Showing with 32 additions and 11 deletions.
  1. +10 −1 cmd/keytransparency-sequencer/main.go
  2. +21 −0 core/sequencer/sequencer.go
  3. +1 −10 core/sequencer/server.go
@@ -220,12 +220,21 @@ func runSequencer(ctx context.Context, conn *grpc.ClientConn,
}
})

sequencer.PeriodicallyRun(ctx, time.Tick(*refresh), func(ctx context.Context) {
if err := signer.AddAllDirectories(ctx); err != nil {
glog.Errorf("runSequencer(AddAllDirectories): %v", err)
}
go sequencer.PeriodicallyRun(ctx, time.Tick(*refresh), func(ctx context.Context) {
if err := signer.AddAllDirectories(ctx); err != nil {
glog.Errorf("PeriodicallyRun(AddAllDirectories): %v", err)
}
if err := signer.RunBatchForAllMasterships(ctx, int32(*batchSize)); err != nil {
glog.Errorf("PeriodicallyRun(RunBatchForAllMasterships): %v", err)
}
})

sequencer.PeriodicallyRun(ctx, time.Tick(*refresh), func(ctx context.Context) {
if err := signer.PublishLogForAllMasterships(ctx); err != nil {
glog.Errorf("PeriodicallyRun(PublishRevisionsForAllMasterships): %v", err)
}
})
}
@@ -120,3 +120,24 @@ func (s *Sequencer) RunBatchForAllMasterships(ctx context.Context, batchSize int

return lastErr
}

func (s *Sequencer) PublishLogForAllMasterships(ctx context.Context) error {
glog.Infof("PublishLogForAllMasterships")
cctx, cancel := context.WithCancel(ctx)
defer cancel()
masterships, err := s.tracker.Masterships(cctx)
if err != nil {
return err
}

var lastErr error
for dirID, whileMaster := range masterships {
publishReq := &spb.PublishRevisionsRequest{DirectoryId: dirID}
if _, err = s.sequencerClient.PublishRevisions(whileMaster, publishReq); err != nil {
lastErr = err
glog.Errorf("RunBatch for %v failed: %v", dirID, err)
}
}

return lastErr
}
@@ -255,19 +255,10 @@ func (s *Server) RunBatch(ctx context.Context, in *spb.RunBatchRequest) (*empty.
revReq := &spb.ApplyRevisionRequest{DirectoryId: in.DirectoryId, Revision: rev}
_, err := s.loopback.ApplyRevision(ctx, revReq)
if err != nil {
// Log the error and continue to publish any revsisions this run may have completed.
// This revision will be retried on the next execution of RunBatch.
glog.Errorf("ApplyRevision(dir: %v, rev: %v): %v", in.DirectoryId, rev, err)
break
return nil, err
}
handledCount++
}

publishReq := &spb.PublishRevisionsRequest{DirectoryId: in.DirectoryId, Block: in.Block}
_, err = s.loopback.PublishRevisions(ctx, publishReq)
if err != nil {
return nil, err
}
return &empty.Empty{}, nil
}

0 comments on commit ab5619b

Please sign in to comment.
You can’t perform that action at this time.