Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate Log Revision Writes from Map Writes #1263

Merged
merged 11 commits into from Apr 23, 2019
@@ -228,4 +228,13 @@ func runSequencer(ctx context.Context, conn *grpc.ClientConn,
glog.Errorf("PeriodicallyRun(RunBatchForAllMasterships): %v", err)
}
})

This conversation was marked as resolved by therealdrake

This comment has been minimized.

Copy link
@golangcibot

golangcibot Apr 19, 2019

File is not gofmt-ed with -s (from gofmt)

This conversation was marked as resolved by therealdrake

This comment has been minimized.

Copy link
@golangcibot

golangcibot Apr 19, 2019

File is not gofmt-ed with -s (from gofmt)

sequencer.PeriodicallyRun(ctx, time.Tick(*refresh), func(ctx context.Context) {
This conversation was marked as resolved by therealdrake

This comment has been minimized.

Copy link
@jtoohill

jtoohill Apr 22, 2019

Contributor

Need to use a goroutine above, since sequencer.PeriodicallyRun runs in a loop until context cancellation.

if err := signer.AddAllDirectories(ctx); err != nil {
This conversation was marked as resolved by therealdrake

This comment has been minimized.

Copy link
@jtoohill

jtoohill Apr 22, 2019

Contributor

I think you have a data race here, since both periodic jobs are trying to add directories.

glog.Errorf("PeriodicallyRun(AddAllDirectories): %v", err)
}
if err := signer.PublishLogForAllMasterships(ctx); err != nil {
glog.Errorf("PeriodicallyRun(PublishRevisionsForAllMasterships): %v", err)
}
})
}
@@ -120,3 +120,24 @@ func (s *Sequencer) RunBatchForAllMasterships(ctx context.Context, batchSize int

return lastErr
}

func (s *Sequencer) PublishLogForAllMasterships(ctx context.Context) error {
glog.Infof("PublishLogForAllMasterships")
cctx, cancel := context.WithCancel(ctx)
defer cancel()
masterships, err := s.tracker.Masterships(cctx)
if err != nil {
return err
}

var lastErr error
for dirID, whileMaster := range masterships {
This conversation was marked as resolved by therealdrake

This comment has been minimized.

Copy link
@golangcibot

golangcibot Apr 19, 2019

ineffectual assignment to whileMaster (from ineffassign)

publishReq := &spb.PublishRevisionsRequest{DirectoryId: dirID}
if _, err = s.sequencerClient.PublishRevisions(whileMaster, publishReq); err != nil {
lastErr = err
glog.Errorf("RunBatch for %v failed: %v", dirID, err)
}
}

return lastErr
}
@@ -255,19 +255,10 @@ func (s *Server) RunBatch(ctx context.Context, in *spb.RunBatchRequest) (*empty.
revReq := &spb.ApplyRevisionRequest{DirectoryId: in.DirectoryId, Revision: rev}
_, err := s.loopback.ApplyRevision(ctx, revReq)
if err != nil {
// Log the error and continue to publish any revsisions this run may have completed.
// This revision will be retried on the next execution of RunBatch.
glog.Errorf("ApplyRevision(dir: %v, rev: %v): %v", in.DirectoryId, rev, err)
break
return nil, err
}
handledCount++
}

publishReq := &spb.PublishRevisionsRequest{DirectoryId: in.DirectoryId, Block: in.Block}
_, err = s.loopback.PublishRevisions(ctx, publishReq)
if err != nil {
return nil, err
}
return &empty.Empty{}, nil
}

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.