Permalink
Browse files

Enforce one trillian Map caller at a time with sequencer election (#1009

)

* Mastership tracker

* Resign mastership in client thread

* RunBatch even if first domain fails

* Callers should cancel ctx

* Add flags to deploy
  • Loading branch information...
gdbelvin committed Dec 5, 2018
1 parent 3fa5d84 commit dc66c58fc65f052149a82b533b2b8c72659db27d
@@ -18,7 +18,10 @@ import (
"context"
"database/sql"
"flag"
"fmt"
"net"
"os"
"strings"
"time"
"github.com/golang/glog"
@@ -27,12 +30,15 @@ import (
"github.com/google/trillian/crypto/keys/der"
"github.com/google/trillian/crypto/keyspb"
"github.com/google/trillian/monitoring/prometheus"
"github.com/google/trillian/util/election2"
"github.com/google/trillian/util/election2/etcd"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"
"github.com/google/keytransparency/core/adminserver"
"github.com/google/keytransparency/core/sequencer"
"github.com/google/keytransparency/core/sequencer/election"
"github.com/google/keytransparency/impl/sql/directory"
"github.com/google/keytransparency/impl/sql/engine"
"github.com/google/keytransparency/impl/sql/mutationstorage"
@@ -52,6 +58,10 @@ var (
listenAddr = flag.String("addr", ":8080", "The ip:port to serve on")
metricsAddr = flag.String("metrics-addr", ":8081", "The ip:port to publish metrics on")
forceMaster = flag.Bool("force_master", false, "If true, assume master for all directories")
etcdServers = flag.String("etcd_servers", "", "A comma-separated list of etcd servers; no etcd registration if empty")
lockDir = flag.String("lock_file_path", "/keytransparency/master", "etcd lock file directory path")
serverDBPath = flag.String("db", "db", "Database connection string")
// Info to connect to the trillian map and log.
@@ -73,6 +83,34 @@ func openDB() *sql.DB {
return db
}
// getElectionFactory returns an election factory based on flags, and a
// function which releases the resources associated with the factory.
func getElectionFactory() (election2.Factory, func()) {
if *forceMaster {
glog.Warning("Acting as master for all directories")
return election2.NoopFactory{}, func() {}
}
if len(*etcdServers) == 0 {
glog.Exit("Either --force_master or --etcd_servers must be supplied")
}
cli, err := etcd.NewClient(strings.Split(*etcdServers, ","), 5*time.Second)
if err != nil || cli == nil {
glog.Exitf("Failed to create etcd client: %v", err)
}
closeFn := func() {
if err := cli.Close(); err != nil {
glog.Warningf("etcd client Close(): %v", err)
}
}
hostname, _ := os.Hostname()
instanceID := fmt.Sprintf("%s.%d", hostname, os.Getpid())
factory := etcd.NewFactory(instanceID, cli, *lockDir)
return factory, closeFn
}
func main() {
flag.Parse()
ctx := context.Background()
@@ -155,12 +193,21 @@ func main() {
pb.RegisterKeyTransparencyAdminHandlerFromEndpoint,
)
cli, err := etcd.NewClient(strings.Split(*etcdServers, ","), 5*time.Second)
if err != nil || cli == nil {
glog.Exitf("Failed to create etcd client: %v", err)
}
// Periodically run batch.
electionFactory, closeFactory := getElectionFactory()
defer closeFactory()
signer := sequencer.New(
spb.NewKeyTransparencySequencerClient(conn),
trillian.NewTrillianAdminClient(mconn),
directoryStorage,
int32(*batchSize))
int32(*batchSize),
election.NewTracker(electionFactory, 1*time.Hour, prometheus.MetricFactory{}),
)
cctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -0,0 +1,201 @@
// Copyright 2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package election
import (
"context"
"sync"
"time"
"github.com/golang/glog"
"github.com/google/trillian/monitoring"
"github.com/google/trillian/util/election2"
)
const resourceLabel = "resource"
var (
once sync.Once
isMaster monitoring.Gauge
)
func createMetrics(mf monitoring.MetricFactory) {
isMaster = mf.NewGauge(
"is_master",
"Set to 1 for resources for which this instance is currently master",
resourceLabel)
}
type mastership struct {
e election2.Election
acquired time.Time
}
// Tracker tracks mastership of a collection of resources.
type Tracker struct {
factory election2.Factory
maxHold time.Duration
master map[string]mastership
masterMu sync.RWMutex
watching map[string]bool
watchingMu sync.RWMutex
newResource chan string
}
// NewTracker returns a new mastership tracker.
func NewTracker(factory election2.Factory, maxHold time.Duration, metricFactory monitoring.MetricFactory) *Tracker {
once.Do(func() { createMetrics(metricFactory) })
return &Tracker{
factory: factory,
maxHold: maxHold,
master: make(map[string]mastership),
watching: make(map[string]bool),
newResource: make(chan string),
}
}
// AddResource makes the mastership tracker aware of new resources.
// The same resource may be added an unlimited number of times.
func (mt *Tracker) AddResource(res string) {
mt.newResource <- res
}
// Run starts new watchers for new resources.
func (mt *Tracker) Run(ctx context.Context) {
for {
select {
case res := <-mt.newResource:
if mt.setWatching(res) {
go func() {
defer mt.setNotWatching(res)
if err := mt.watchResource(ctx, res); err != nil {
glog.Errorf("watchResource(%v): %v", res, err)
}
}()
}
case <-ctx.Done():
glog.Infof("election: Run() exiting due to expired context: %v", ctx.Err())
return
}
}
}
// watchResource is a blocking method that runs elections for res and updates mt.master.
func (mt *Tracker) watchResource(ctx context.Context, res string) error {
e, err := mt.factory.NewElection(ctx, res)
if err != nil {
return err
}
defer func() {
if err := e.Close(ctx); err != nil {
glog.Warningf("election.Close(%v): %v", res, err)
}
}()
for err := error(nil); err == nil; err = ctx.Err() {
if err := mt.watchOnce(ctx, e, res); err != nil {
return err
}
}
return nil
}
// watchOnce waits until it acquires mastership, marks itself as master for res,
// and then waits until either resign duration has passed or it loses
// mastership, at which point it marks itself as not master for res. Returns
// an error if there were problems with acquiring mastership or resigning.
func (mt *Tracker) watchOnce(ctx context.Context, e election2.Election, res string) error {
mt.setNotMaster(res)
if err := e.Await(ctx); err != nil {
return err
}
glog.Infof("Obtained mastership for %v", res)
// Obtain mastership ctx *before* Masterships runs to avoid racing.
mastershipCtx, err := e.WithMastership(ctx)
if err != nil {
return err
}
mt.setMaster(res, mastership{e: e, acquired: time.Now()})
defer mt.setNotMaster(res)
<-mastershipCtx.Done()
// We don't know if we got here because we are no longer master or if
// the parent context was closed. In either case work being done will
// be canceled and we will mark ourselves as not-master until we can
// acquire mastership again.
glog.Warningf("No longer master for %v", res)
return nil
}
// setWatching sets mt.watching[res] to true.
// Returns true if it set watching to true.
func (mt *Tracker) setWatching(res string) bool {
mt.watchingMu.Lock()
defer mt.watchingMu.Unlock()
if !mt.watching[res] {
mt.watching[res] = true
return true
}
return false
}
func (mt *Tracker) setNotWatching(res string) {
mt.watchingMu.Lock()
defer mt.watchingMu.Unlock()
delete(mt.watching, res)
}
func (mt *Tracker) setMaster(res string, m mastership) {
isMaster.Set(1, res)
mt.masterMu.Lock()
defer mt.masterMu.Unlock()
mt.master[res] = m
}
func (mt *Tracker) setNotMaster(res string) {
isMaster.Set(0, res)
mt.masterMu.Lock()
defer mt.masterMu.Unlock()
delete(mt.master, res)
}
// Masterships returns a map of resources to mastership contexts.
// Callers should cancel ctx when they no longer are actively using mastership.
// If Masterships is not called periodically, we may retain masterships for longer than maxHold.
func (mt *Tracker) Masterships(ctx context.Context) (map[string]context.Context, error) {
mt.masterMu.RLock()
defer mt.masterMu.RUnlock()
mastershipCtx := make(map[string]context.Context)
for res, m := range mt.master {
// Resign mastership if we've held it for over maxHold.
// Resign before attempting to acquire a mastership lock.
if held := time.Since(m.acquired); held > mt.maxHold {
glog.Infof("Resigning from %v after %v", res, held)
if err := m.e.Resign(ctx); err != nil {
glog.Errorf("Resign failed for resource %v: %v", res, err)
}
continue
}
cctx, err := m.e.WithMastership(ctx)
if err != nil {
return nil, err
}
mastershipCtx[res] = cctx
}
return mastershipCtx, nil
}
@@ -23,6 +23,7 @@ import (
"github.com/golang/glog"
"github.com/google/keytransparency/core/directory"
"github.com/google/keytransparency/core/sequencer/election"
spb "github.com/google/keytransparency/core/sequencer/sequencer_go_proto"
tpb "github.com/google/trillian"
@@ -34,6 +35,7 @@ type Sequencer struct {
mapAdmin tpb.TrillianAdminClient
batchSize int32
sequencerClient spb.KeyTransparencySequencerClient
tracker *election.Tracker
}
// New creates a new instance of the signer.
@@ -42,12 +44,14 @@ func New(
mapAdmin tpb.TrillianAdminClient,
directories directory.Storage,
batchSize int32,
tracker *election.Tracker,
) *Sequencer {
return &Sequencer{
sequencerClient: sequencerClient,
directories: directories,
mapAdmin: mapAdmin,
batchSize: batchSize,
tracker: tracker,
}
}
@@ -72,24 +76,42 @@ func PeriodicallyRun(ctx context.Context, tickch <-chan time.Time, f func(ctx co
}
}
// TrackMasterships monitors resources for mastership.
func (s *Sequencer) TrackMasterships(ctx context.Context) {
s.tracker.Run(ctx)
}
// RunBatchForAllDirectories scans the directories table for new directories and creates new receivers for
// directories that the sequencer is not currently receiving for.
func (s *Sequencer) RunBatchForAllDirectories(ctx context.Context) error {
directories, err := s.directories.List(ctx, false)
if err != nil {
return fmt.Errorf("admin.List(): %v", err)
}
// TODO(#565): Implement per-directory leader election here.
for _, d := range directories {
knownDirectories.Set(1, d.DirectoryID)
s.tracker.AddResource(d.DirectoryID)
}
cctx, cancel := context.WithCancel(ctx)
defer cancel()
masterships, err := s.tracker.Masterships(cctx)
if err != nil {
return err
}
var lastErr error
for dirID, whileMaster := range masterships {
req := &spb.RunBatchRequest{
DirectoryId: d.DirectoryID,
DirectoryId: dirID,
MinBatch: 1,
MaxBatch: s.batchSize,
}
if _, err := s.sequencerClient.RunBatch(ctx, req); err != nil {
return err
if _, err := s.sequencerClient.RunBatch(whileMaster, req); err != nil {
lastErr = err
glog.Errorf("RunBatch for %v failed: %v", dirID, err)
}
}
return nil
return lastErr
}
@@ -20,6 +20,7 @@ spec:
containers:
- command:
- /go/bin/keytransparency-sequencer
- --force_master
- --db=test:zaphod@tcp(db:3306)/test
- --addr=0.0.0.0:8080
- --log-url=log-server:8090
@@ -159,6 +159,7 @@ services:
restart: always
entrypoint:
- /go/bin/keytransparency-sequencer
- --force_master
- --db=test:zaphod@tcp(db:3306)/test
- --addr=0.0.0.0:8080
- --log-url=log-server:8090

0 comments on commit dc66c58

Please sign in to comment.