Skip to content
Permalink
Browse files

ForceMaster election library (#1453)

* Add quotes to resource names

* ForceMaster election library

The previous NoopElection broke the API contract of the election
factory because it did not cancel contexts upon resigning.
As a result, the KT sequencer hung attempting to resign mastership.

This PR introduces a new `forcemaster.Election` which respects the
election API, and tests it against the `election.Tracker`.
  • Loading branch information
gdbelvin committed Feb 13, 2020
1 parent 128f290 commit abd696778a6536db59dade0be9c9e45a3bc26c32
@@ -39,6 +39,7 @@ import (
"github.com/google/keytransparency/core/sequencer/election"
"github.com/google/keytransparency/impl/sql/directory"
"github.com/google/keytransparency/impl/sql/mutationstorage"
"github.com/google/keytransparency/internal/forcemaster"

pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto"
dir "github.com/google/keytransparency/core/directory"
@@ -78,7 +79,7 @@ var (
func getElectionFactory() (election2.Factory, func()) {
if *forceMaster {
glog.Warning("Acting as master for all directories")
return election2.NoopFactory{}, func() {}
return forcemaster.Factory{}, func() {}
}
if len(*etcdServers) == 0 {
glog.Exit("Either --force_master or --etcd_servers must be supplied")
@@ -121,7 +121,7 @@ func (mt *Tracker) watchOnce(ctx context.Context, e election2.Election, res stri
if err := e.Await(ctx); err != nil {
return err
}
glog.Infof("Obtained mastership for %v", res)
glog.Infof("Obtained mastership for %q", res)

// Obtain mastership ctx *before* Masterships runs to avoid racing.
mastershipCtx, err := e.WithMastership(ctx)
@@ -137,7 +137,7 @@ func (mt *Tracker) watchOnce(ctx context.Context, e election2.Election, res stri
// the parent context was closed. In either case work being done will
// be canceled and we will mark ourselves as not-master until we can
// acquire mastership again.
glog.Warningf("No longer master for %v", res)
glog.Warningf("No longer master for %q", res)
return nil
}

@@ -184,9 +184,9 @@ func (mt *Tracker) Masterships(ctx context.Context) (map[string]context.Context,
// Resign mastership if we've held it for over maxHold.
// Resign before attempting to acquire a mastership lock.
if held := time.Since(m.acquired); held > mt.maxHold {
glog.Infof("Resigning from %v after %v", res, held)
glog.Infof("Resigning from %q after %v", res, held)
if err := m.e.Resign(ctx); err != nil {
glog.Errorf("Resign failed for resource %v: %v", res, err)
glog.Errorf("Resign failed for resource %q: %v", res, err)
}
continue
}
@@ -0,0 +1,71 @@
// Copyright 2020 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package election

import (
"context"
"testing"
"time"

"github.com/google/keytransparency/internal/forcemaster"
"github.com/google/trillian/monitoring/prometheus"
)

// Ensure that mastership continues to work after resignTime.
func TestForceMaster(t *testing.T) {
ctx, done := context.WithCancel(context.Background())
defer done()
resignTime := 1 * time.Hour
res := "test resource"

mt := NewTracker(forcemaster.Factory{}, resignTime, prometheus.MetricFactory{})
go mt.Run(ctx)
mt.AddResource(res)
time.Sleep(time.Millisecond) // Wait to acquire mastership.

// Verify that mastersihp works as expected, with 1 mastership for res.
m, err := mt.Masterships(ctx)
if err != nil {
t.Error(err)
}
if got := len(m); got != 1 {
t.Errorf("Masterships returned %v, want 1", got)
}

// Advance the clock by pretending we acquired mastersihp a long time ago.
mastership := mt.master[res]
mastership.acquired = time.Now().Add(-2 * resignTime)
mt.master[res] = mastership

// Verify that we resign the mastership after the clock as advanced.
m2, err := mt.Masterships(ctx)
if err != nil {
t.Error(err)
}
if got := len(m2); got != 0 {
t.Errorf("Masterships returned %v, want 0", got)
}

time.Sleep(time.Millisecond) // Wait to acquire mastership.

// Verify that we reaquire mastership
m3, err := mt.Masterships(ctx)
if err != nil {
t.Error(err)
}
if got := len(m3); got != 1 {
t.Errorf("Masterships returned %v, want 0", got)
}
}
@@ -0,0 +1,74 @@
// Copyright 2020 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package forcemaster

import (
"context"
"sync"

"github.com/google/trillian/util/election2"
)

// Election is a stub Election that always believes to be the master.
type Election struct {
id string
cancels []context.CancelFunc
mu sync.Mutex
}

func NewElection(id string) *Election {
return &Election{
id: id,
cancels: make([]context.CancelFunc, 0, 1),
}
}

// Await returns immediately, as the instance is always the master.
func (ne *Election) Await(ctx context.Context) error {
return nil
}

// WithMastership returns a cancelable context derived from the passed in context.
func (ne *Election) WithMastership(ctx context.Context) (context.Context, error) {
cctx, done := context.WithCancel(ctx)
ne.mu.Lock()
defer ne.mu.Unlock()
ne.cancels = append(ne.cancels, done)
return cctx, nil
}

// Resign cancels the contexts obtained through WithMastership.
func (ne *Election) Resign(ctx context.Context) error {
ne.mu.Lock()
defer ne.mu.Unlock()
for _, cancel := range ne.cancels {
cancel()
}
ne.cancels = ne.cancels[:0] // Empty the slice but keep the memory.
return nil
}

// Close does nothing because Election is always the master.
func (ne *Election) Close(ctx context.Context) error {
return ne.Resign(ctx)
}

// Factory creates Election instances.
type Factory struct{}

// NewElection creates a specific Election instance.
func (nf Factory) NewElection(ctx context.Context, resourceID string) (election2.Election, error) {
return NewElection(resourceID), nil
}

0 comments on commit abd6967

Please sign in to comment.
You can’t perform that action at this time.