From ff36ad75813113975cbcc25f3b939b1489a48b74 Mon Sep 17 00:00:00 2001 From: Chance Zibolski Date: Tue, 27 Jan 2015 20:09:02 -0800 Subject: [PATCH] registry: Move lease implementation into separate package Registry no longer uses lease Engine updated to use lease manager instead of lease registry. --- engine/engine.go | 27 ++-- etcd/error.go | 10 ++ {registry => etcd}/lease.go | 209 ++++++++++++++++--------------- {registry => etcd}/lease_test.go | 18 ++- pkg/lease/interface.go | 72 +++++++++++ registry/fake.go | 12 +- registry/interface.go | 55 -------- registry/job.go | 14 +-- registry/machine.go | 4 +- registry/registry.go | 15 +-- registry/unit.go | 4 +- registry/unit_state.go | 8 +- registry/unit_state_test.go | 14 +-- registry/version.go | 6 +- server/server.go | 3 +- 15 files changed, 250 insertions(+), 221 deletions(-) rename {registry => etcd}/lease.go (64%) rename {registry => etcd}/lease_test.go (92%) create mode 100644 pkg/lease/interface.go diff --git a/engine/engine.go b/engine/engine.go index c4ee747e8..bce3ebe31 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/fleet/log" "github.com/coreos/fleet/machine" "github.com/coreos/fleet/pkg" + "github.com/coreos/fleet/pkg/lease" "github.com/coreos/fleet/registry" ) @@ -36,21 +37,21 @@ type Engine struct { rec *Reconciler registry registry.Registry cRegistry registry.ClusterRegistry - lRegistry registry.LeaseRegistry + lManager lease.Manager rStream pkg.EventStream machine machine.Machine - lease registry.Lease + lease lease.Lease trigger chan struct{} } -func New(reg *registry.EtcdRegistry, rStream pkg.EventStream, mach machine.Machine) *Engine { +func New(reg *registry.EtcdRegistry, lManager lease.Manager, rStream pkg.EventStream, mach machine.Machine) *Engine { rec := NewReconciler() return &Engine{ rec: rec, registry: reg, cRegistry: reg, - lRegistry: reg, + lManager: lManager, rStream: rStream, machine: mach, trigger: make(chan struct{}), @@ -66,11 +67,11 @@ func (e *Engine) Run(ival time.Duration, stop chan bool) { return } - var l registry.Lease + var l lease.Lease if isLeader(e.lease, machID) { l = renewLeadership(e.lease, leaseTTL) } else { - l = acquireLeadership(e.lRegistry, machID, engineVersion, leaseTTL) + l = acquireLeadership(e.lManager, machID, engineVersion, leaseTTL) } // log all leadership changes @@ -132,7 +133,7 @@ func (e *Engine) Purge() { } } -func isLeader(l registry.Lease, machID string) bool { +func isLeader(l lease.Lease, machID string) bool { if l == nil { return false } @@ -164,16 +165,16 @@ func ensureEngineVersionMatch(cReg registry.ClusterRegistry, expect int) bool { return true } -func acquireLeadership(lReg registry.LeaseRegistry, machID string, ver int, ttl time.Duration) registry.Lease { - existing, err := lReg.GetLease(engineLeaseName) +func acquireLeadership(lManager lease.Manager, machID string, ver int, ttl time.Duration) lease.Lease { + existing, err := lManager.GetLease(engineLeaseName) if err != nil { log.Errorf("Unable to determine current lessee: %v", err) return nil } - var l registry.Lease + var l lease.Lease if existing == nil { - l, err = lReg.AcquireLease(engineLeaseName, machID, ver, ttl) + l, err = lManager.AcquireLease(engineLeaseName, machID, ver, ttl) if err != nil { log.Errorf("Engine leadership acquisition failed: %v", err) return nil @@ -191,7 +192,7 @@ func acquireLeadership(lReg registry.LeaseRegistry, machID string, ver int, ttl } rem := existing.TimeRemaining() - l, err = lReg.StealLease(engineLeaseName, machID, ver, ttl+rem, existing.Index()) + l, err = lManager.StealLease(engineLeaseName, machID, ver, ttl+rem, existing.Index()) if err != nil { log.Errorf("Engine leadership steal failed: %v", err) return nil @@ -210,7 +211,7 @@ func acquireLeadership(lReg registry.LeaseRegistry, machID string, ver int, ttl return l } -func renewLeadership(l registry.Lease, ttl time.Duration) registry.Lease { +func renewLeadership(l lease.Lease, ttl time.Duration) lease.Lease { err := l.Renew(ttl) if err != nil { log.Errorf("Engine leadership lost, renewal failed: %v", err) diff --git a/etcd/error.go b/etcd/error.go index 48706b858..c119407fb 100644 --- a/etcd/error.go +++ b/etcd/error.go @@ -46,3 +46,13 @@ func unmarshalFailedResponse(resp *http.Response, body []byte) (*Result, error) return nil, etcdErr } + +func IsKeyNotFound(err error) bool { + e, ok := err.(Error) + return ok && e.ErrorCode == ErrorKeyNotFound +} + +func IsNodeExist(err error) bool { + e, ok := err.(Error) + return ok && e.ErrorCode == ErrorNodeExist +} diff --git a/registry/lease.go b/etcd/lease.go similarity index 64% rename from registry/lease.go rename to etcd/lease.go index 1b5b6a0e7..fc076b1cc 100644 --- a/registry/lease.go +++ b/etcd/lease.go @@ -12,183 +12,192 @@ // See the License for the specific language governing permissions and // limitations under the License. -package registry +package etcd import ( "encoding/json" "path" "time" - "github.com/coreos/fleet/etcd" + "github.com/coreos/fleet/pkg/lease" ) const ( leasePrefix = "lease" ) -func (r *EtcdRegistry) leasePath(name string) string { +type etcdLeaseMetadata struct { + MachineID string + Version int +} + +// etcdLease implements the Lease interface +type etcdLease struct { + key string + meta etcdLeaseMetadata + idx uint64 + ttl time.Duration + client Client +} + +func (l *etcdLease) Release() error { + req := Delete{ + Key: l.key, + PreviousIndex: l.idx, + } + _, err := l.client.Do(&req) + return err +} + +func (l *etcdLease) Renew(period time.Duration) error { + val, err := serializeLeaseMetadata(l.meta.MachineID, l.meta.Version) + req := Set{ + Key: l.key, + Value: val, + PreviousIndex: l.idx, + TTL: period, + } + + resp, err := l.client.Do(&req) + if err != nil { + return err + } + + renewed := leaseFromResult(resp, l.client) + *l = *renewed + + return nil +} + +func (l *etcdLease) MachineID() string { + return l.meta.MachineID +} + +func (l *etcdLease) Version() int { + return l.meta.Version +} + +func (l *etcdLease) Index() uint64 { + return l.idx +} + +func (l *etcdLease) TimeRemaining() time.Duration { + return l.ttl +} + +func serializeLeaseMetadata(machID string, ver int) (string, error) { + meta := etcdLeaseMetadata{ + MachineID: machID, + Version: ver, + } + + b, err := json.Marshal(meta) + if err != nil { + return "", err + } + + return string(b), nil +} + +type LeaseManager struct { + client Client + keyPrefix string +} + +func NewLeaseManager(client Client, keyPrefix string) *LeaseManager { + return &LeaseManager{client: client, keyPrefix: keyPrefix} +} + +func (r *LeaseManager) leasePath(name string) string { return path.Join(r.keyPrefix, leasePrefix, name) } -func (r *EtcdRegistry) GetLease(name string) (Lease, error) { +func (r *LeaseManager) GetLease(name string) (lease.Lease, error) { key := r.leasePath(name) - req := etcd.Get{ + req := Get{ Key: key, } - resp, err := r.etcd.Do(&req) + resp, err := r.client.Do(&req) if err != nil { - if isKeyNotFound(err) { + if IsKeyNotFound(err) { err = nil } return nil, err } - l := leaseFromResult(resp, r.etcd) + l := leaseFromResult(resp, r.client) return l, nil } -func (r *EtcdRegistry) StealLease(name, machID string, ver int, period time.Duration, idx uint64) (Lease, error) { +func (r *LeaseManager) StealLease(name, machID string, ver int, period time.Duration, idx uint64) (lease.Lease, error) { val, err := serializeLeaseMetadata(machID, ver) if err != nil { return nil, err } - req := etcd.Set{ + req := Set{ Key: r.leasePath(name), Value: val, PreviousIndex: idx, TTL: period, } - resp, err := r.etcd.Do(&req) + resp, err := r.client.Do(&req) if err != nil { - if isNodeExist(err) { + if IsNodeExist(err) { err = nil } return nil, err } - l := leaseFromResult(resp, r.etcd) + l := leaseFromResult(resp, r.client) return l, nil } -func (r *EtcdRegistry) AcquireLease(name string, machID string, ver int, period time.Duration) (Lease, error) { +func (r *LeaseManager) AcquireLease(name string, machID string, ver int, period time.Duration) (lease.Lease, error) { val, err := serializeLeaseMetadata(machID, ver) if err != nil { return nil, err } - req := etcd.Create{ + req := Create{ Key: r.leasePath(name), Value: val, TTL: period, } - resp, err := r.etcd.Do(&req) + resp, err := r.client.Do(&req) if err != nil { - if isNodeExist(err) { + if IsNodeExist(err) { err = nil } return nil, err } - l := leaseFromResult(resp, r.etcd) + l := leaseFromResult(resp, r.client) return l, nil } -type etcdLeaseMetadata struct { - MachineID string - Version int -} - -// etcdLease implements the Lease interface -type etcdLease struct { - key string - meta etcdLeaseMetadata - idx uint64 - ttl time.Duration - etcd etcd.Client -} - -func (l *etcdLease) Release() error { - req := etcd.Delete{ - Key: l.key, - PreviousIndex: l.idx, +func leaseFromResult(res *Result, client Client) *etcdLease { + l := &etcdLease{ + key: res.Node.Key, + idx: res.Node.ModifiedIndex, + ttl: res.Node.TTLDuration(), + client: client, } - _, err := l.etcd.Do(&req) - return err -} -func (l *etcdLease) Renew(period time.Duration) error { - val, err := serializeLeaseMetadata(l.meta.MachineID, l.meta.Version) - req := etcd.Set{ - Key: l.key, - Value: val, - PreviousIndex: l.idx, - TTL: period, - } - - resp, err := l.etcd.Do(&req) - if err != nil { - return err - } - - renewed := leaseFromResult(resp, l.etcd) - *l = *renewed - - return nil -} - -func (l *etcdLease) MachineID() string { - return l.meta.MachineID -} - -func (l *etcdLease) Version() int { - return l.meta.Version -} - -func (l *etcdLease) Index() uint64 { - return l.idx -} - -func (l *etcdLease) TimeRemaining() time.Duration { - return l.ttl -} - -func leaseFromResult(res *etcd.Result, ec etcd.Client) *etcdLease { - lease := &etcdLease{ - key: res.Node.Key, - idx: res.Node.ModifiedIndex, - ttl: res.Node.TTLDuration(), - etcd: ec, - } - - err := json.Unmarshal([]byte(res.Node.Value), &lease.meta) + err := json.Unmarshal([]byte(res.Node.Value), &l.meta) // fall back to using the entire value as the MachineID for // backwards-compatibility with engines that are not aware // of this versioning mechanism if err != nil { - lease.meta = etcdLeaseMetadata{ + l.meta = etcdLeaseMetadata{ MachineID: res.Node.Value, Version: 0, } } - return lease -} - -func serializeLeaseMetadata(machID string, ver int) (string, error) { - meta := etcdLeaseMetadata{ - MachineID: machID, - Version: ver, - } - - b, err := json.Marshal(meta) - if err != nil { - return "", err - } - - return string(b), nil + return l } diff --git a/registry/lease_test.go b/etcd/lease_test.go similarity index 92% rename from registry/lease_test.go rename to etcd/lease_test.go index 09bde1d22..76c5899a9 100644 --- a/registry/lease_test.go +++ b/etcd/lease_test.go @@ -12,14 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package registry +package etcd import ( "reflect" "testing" "time" - - "github.com/coreos/fleet/etcd" ) func TestSerializeLeaseMetadata(t *testing.T) { @@ -54,13 +52,13 @@ func TestSerializeLeaseMetadata(t *testing.T) { func TestLeaseFromResult(t *testing.T) { tests := []struct { - res etcd.Result + res Result want etcdLease }{ // typical case { - res: etcd.Result{ - Node: &etcd.Node{ + res: Result{ + Node: &Node{ Key: "/foo/bar", ModifiedIndex: 12, TTL: 9, @@ -80,8 +78,8 @@ func TestLeaseFromResult(t *testing.T) { // backwards-compatibility with unversioned engines { - res: etcd.Result{ - Node: &etcd.Node{ + res: Result{ + Node: &Node{ Key: "/foo/bar", ModifiedIndex: 12, TTL: 9, @@ -101,8 +99,8 @@ func TestLeaseFromResult(t *testing.T) { // json decode failures are treated like a nonversioned lease { - res: etcd.Result{ - Node: &etcd.Node{ + res: Result{ + Node: &Node{ Key: "/foo/bar", ModifiedIndex: 12, TTL: 9, diff --git a/pkg/lease/interface.go b/pkg/lease/interface.go new file mode 100644 index 000000000..142187c22 --- /dev/null +++ b/pkg/lease/interface.go @@ -0,0 +1,72 @@ +// Copyright 2014 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lease + +import "time" + +// Lease proxies to an auto-expiring lease stored in a LeaseRegistry. +// The creator of a Lease must repeatedly call Renew to keep their lease +// from expiring. +type Lease interface { + // Renew attempts to extend the Lease TTL to the provided duration. + // The operation will succeed only if the Lease has not changed in + // the LeaseRegistry since it was last renewed or first acquired. + // An error is returned if the Lease has already expired, or if the + // operation fails for any other reason. + Renew(time.Duration) error + + // Release relinquishes the ownership of a Lease back to the Registry. + // After calling Release, the Lease object should be discarded. An + // error is returned if the Lease has already expired, or if the + // operation fails for any other reason. + Release() error + + // MachineID returns the ID of the Machine that holds this Lease. This + // value must be considered a cached value as it is not guaranteed to + // be correct. + MachineID() string + + // Version returns the current version at which the lesse is operating. + // This value has the same correctness guarantees as MachineID. + // It is up to the caller to determine what this Version means. + Version() int + + // Index exposes the relative time at which the Lease was created or + // renewed. For example, this could be implemented as the ModifiedIndex + // field of a node in etcd. + Index() uint64 + + // TimeRemaining represents the amount of time left on the Lease when + // it was fetched from the LeaseRegistry. + TimeRemaining() time.Duration +} + +type Manager interface { + // GetLease fetches a Lease only if it exists. If it does not + // exist, a nil Lease will be returned. Any other failures + // result in non-nil error and nil Lease objects. + GetLease(name string) (Lease, error) + + // AcquireLease acquires a named lease only if the lease is not + // currently held. If a Lease cannot be acquired, a nil Lease + // object is returned. An error is returned only if there is a + // failure communicating with the Registry. + AcquireLease(name, machID string, ver int, period time.Duration) (Lease, error) + + // StealLease attempts to replace the lessee of the Lease identified + // by the provided name and index with a new lessee. This function + // will fail if the named Lease has progressed past the given index. + StealLease(name, machID string, ver int, period time.Duration, idx uint64) (Lease, error) +} diff --git a/registry/fake.go b/registry/fake.go index 266ad5408..2a6c8bc5a 100644 --- a/registry/fake.go +++ b/registry/fake.go @@ -21,9 +21,9 @@ import ( "time" "github.com/coreos/fleet/Godeps/_workspace/src/github.com/coreos/go-semver/semver" - "github.com/coreos/fleet/job" "github.com/coreos/fleet/machine" + "github.com/coreos/fleet/pkg/lease" "github.com/coreos/fleet/unit" ) @@ -366,19 +366,19 @@ func (l *fakeLease) Release() error { func NewFakeLeaseRegistry() *FakeLeaseRegistry { return &FakeLeaseRegistry{ - leaseMap: make(map[string]Lease), + leaseMap: make(map[string]lease.Lease), } } type FakeLeaseRegistry struct { - leaseMap map[string]Lease + leaseMap map[string]lease.Lease } -func (fl *FakeLeaseRegistry) GetLease(name string) (Lease, error) { +func (fl *FakeLeaseRegistry) GetLease(name string) (lease.Lease, error) { return fl.leaseMap[name], nil } -func (fl *FakeLeaseRegistry) AcquireLease(name, machID string, ver int, ttl time.Duration) (Lease, error) { +func (fl *FakeLeaseRegistry) AcquireLease(name, machID string, ver int, ttl time.Duration) (lease.Lease, error) { if _, ok := fl.leaseMap[name]; ok { return nil, errors.New("already exists") } @@ -395,7 +395,7 @@ func (fl *FakeLeaseRegistry) AcquireLease(name, machID string, ver int, ttl time return l, nil } -func (fl *FakeLeaseRegistry) StealLease(name, machID string, ver int, ttl time.Duration, idx uint64) (Lease, error) { +func (fl *FakeLeaseRegistry) StealLease(name, machID string, ver int, ttl time.Duration, idx uint64) (lease.Lease, error) { if idx != 0 { panic("unable to test StealLease with index other than zero") } diff --git a/registry/interface.go b/registry/interface.go index 1dbd8fd4c..270a2ef9d 100644 --- a/registry/interface.go +++ b/registry/interface.go @@ -64,58 +64,3 @@ type ClusterRegistry interface { // on success. UpdateEngineVersion(from, to int) error } - -type LeaseRegistry interface { - // GetLease fetches a Lease only if it exists. If it does not - // exist, a nil Lease will be returned. Any other failures - // result in non-nil error and nil Lease objects. - GetLease(name string) (Lease, error) - - // AcquireLease acquires a named lease only if the lease is not - // currently held. If a Lease cannot be acquired, a nil Lease - // object is returned. An error is returned only if there is a - // failure communicating with the Registry. - AcquireLease(name, machID string, ver int, period time.Duration) (Lease, error) - - // StealLease attempts to replace the lessee of the Lease identified - // by the provided name and index with a new lessee. This function - // will fail if the named Lease has progressed past the given index. - StealLease(name, machID string, ver int, period time.Duration, idx uint64) (Lease, error) -} - -// Lease proxies to an auto-expiring lease stored in a LeaseRegistry. -// The creator of a Lease must repeatedly call Renew to keep their lease -// from expiring. -type Lease interface { - // Renew attempts to extend the Lease TTL to the provided duration. - // The operation will succeed only if the Lease has not changed in - // the LeaseRegistry since it was last renewed or first acquired. - // An error is returned if the Lease has already expired, or if the - // operation fails for any other reason. - Renew(time.Duration) error - - // Release relinquishes the ownership of a Lease back to the Registry. - // After calling Release, the Lease object should be discarded. An - // error is returned if the Lease has already expired, or if the - // operation fails for any other reason. - Release() error - - // MachineID returns the ID of the Machine that holds this Lease. This - // value must be considered a cached value as it is not guaranteed to - // be correct. - MachineID() string - - // Version returns the current version at which the lesse is operating. - // This value has the same correctness guarantees as MachineID. - // It is up to the caller to determine what this Version means. - Version() int - - // Index exposes the relative time at which the Lease was created or - // renewed. For example, this could be implemented as the ModifiedIndex - // field of a node in etcd. - Index() uint64 - - // TimeRemaining represents the amount of time left on the Lease when - // it was fetched from the LeaseRegistry. - TimeRemaining() time.Duration -} diff --git a/registry/job.go b/registry/job.go index 14c7767dd..679bd864a 100644 --- a/registry/job.go +++ b/registry/job.go @@ -40,7 +40,7 @@ func (r *EtcdRegistry) Schedule() ([]job.ScheduledUnit, error) { res, err := r.etcd.Do(&req) if err != nil { - if isKeyNotFound(err) { + if etcd.IsKeyNotFound(err) { err = nil } return nil, err @@ -96,7 +96,7 @@ func (r *EtcdRegistry) Units() ([]job.Unit, error) { res, err := r.etcd.Do(&req) if err != nil { - if isKeyNotFound(err) { + if etcd.IsKeyNotFound(err) { err = nil } return nil, err @@ -139,7 +139,7 @@ func (r *EtcdRegistry) Unit(name string) (*job.Unit, error) { res, err := r.etcd.Do(&req) if err != nil { - if isKeyNotFound(err) { + if etcd.IsKeyNotFound(err) { err = nil } return nil, err @@ -190,7 +190,7 @@ func (r *EtcdRegistry) ScheduledUnit(name string) (*job.ScheduledUnit, error) { res, err := r.etcd.Do(&req) if err != nil { - if isKeyNotFound(err) { + if etcd.IsKeyNotFound(err) { err = nil } return nil, err @@ -222,7 +222,7 @@ func (r *EtcdRegistry) UnscheduleUnit(name, machID string) error { } _, err := r.etcd.Do(&req) - if isKeyNotFound(err) { + if etcd.IsKeyNotFound(err) { err = nil } @@ -297,7 +297,7 @@ func (r *EtcdRegistry) DestroyUnit(name string) error { _, err := r.etcd.Do(&req) if err != nil { - if isKeyNotFound(err) { + if etcd.IsKeyNotFound(err) { err = errors.New("job does not exist") } @@ -330,7 +330,7 @@ func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) { _, err = r.etcd.Do(&req) if err != nil { - if isNodeExist(err) { + if etcd.IsNodeExist(err) { err = errors.New("job already exists") } return diff --git a/registry/machine.go b/registry/machine.go index 8a9834a5e..3ed8a1812 100644 --- a/registry/machine.go +++ b/registry/machine.go @@ -36,7 +36,7 @@ func (r *EtcdRegistry) Machines() (machines []machine.MachineState, err error) { resp, err := r.etcd.Do(&req) if err != nil { - if isKeyNotFound(err) { + if etcd.IsKeyNotFound(err) { err = nil } return @@ -99,7 +99,7 @@ func (r *EtcdRegistry) RemoveMachineState(machID string) error { Key: path.Join(r.keyPrefix, machinePrefix, machID, "object"), } _, err := r.etcd.Do(&req) - if isKeyNotFound(err) { + if etcd.IsKeyNotFound(err) { err = nil } return err diff --git a/registry/registry.go b/registry/registry.go index 75c98173f..9149dcdf5 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -30,7 +30,10 @@ type EtcdRegistry struct { } func NewEtcdRegistry(client etcd.Client, keyPrefix string) *EtcdRegistry { - return &EtcdRegistry{client, keyPrefix} + return &EtcdRegistry{ + etcd: client, + keyPrefix: keyPrefix, + } } func marshal(obj interface{}) (string, error) { @@ -48,13 +51,3 @@ func unmarshal(val string, obj interface{}) error { } return fmt.Errorf("unable to JSON-deserialize object: %s", err) } - -func isKeyNotFound(err error) bool { - e, ok := err.(etcd.Error) - return ok && e.ErrorCode == etcd.ErrorKeyNotFound -} - -func isNodeExist(err error) bool { - e, ok := err.(etcd.Error) - return ok && e.ErrorCode == etcd.ErrorNodeExist -} diff --git a/registry/unit.go b/registry/unit.go index 84c844d9b..5173c2842 100644 --- a/registry/unit.go +++ b/registry/unit.go @@ -42,7 +42,7 @@ func (r *EtcdRegistry) storeOrGetUnitFile(u unit.UnitFile) (err error) { } _, err = r.etcd.Do(&req) // unit is already stored - if err != nil && isNodeExist(err) { + if err != nil && etcd.IsNodeExist(err) { // TODO(jonboulle): verify more here? err = nil } @@ -57,7 +57,7 @@ func (r *EtcdRegistry) getUnitByHash(hash unit.Hash) *unit.UnitFile { } resp, err := r.etcd.Do(&req) if err != nil { - if isKeyNotFound(err) { + if etcd.IsKeyNotFound(err) { err = nil } return nil diff --git a/registry/unit_state.go b/registry/unit_state.go index 353e35ba3..95a476aee 100644 --- a/registry/unit_state.go +++ b/registry/unit_state.go @@ -98,7 +98,7 @@ func (r *EtcdRegistry) statesByMUSKey() (map[MUSKey]*unit.UnitState, error) { Recursive: true, } res, err := r.etcd.Do(&req) - if err != nil && !isKeyNotFound(err) { + if err != nil && !etcd.IsKeyNotFound(err) { return nil, err } if res != nil { @@ -131,7 +131,7 @@ func (r *EtcdRegistry) getUnitState(uName, machID string) (*unit.UnitState, erro res, err := r.etcd.Do(&req) if err != nil { - if isKeyNotFound(err) { + if etcd.IsKeyNotFound(err) { err = nil } return nil, err @@ -184,7 +184,7 @@ func (r *EtcdRegistry) RemoveUnitState(jobName string) error { Key: legacyKey, } _, err := r.etcd.Do(&req) - if err != nil && !isKeyNotFound(err) { + if err != nil && !etcd.IsKeyNotFound(err) { return err } @@ -195,7 +195,7 @@ func (r *EtcdRegistry) RemoveUnitState(jobName string) error { Recursive: true, } _, err = r.etcd.Do(&req) - if err != nil && !isKeyNotFound(err) { + if err != nil && !etcd.IsKeyNotFound(err) { return err } return nil diff --git a/registry/unit_state_test.go b/registry/unit_state_test.go index 18f9dfad5..72a2a4b11 100644 --- a/registry/unit_state_test.go +++ b/registry/unit_state_test.go @@ -66,7 +66,7 @@ func (t *testEtcdClient) Wait(req etcd.Action, ch <-chan struct{}) (*etcd.Result } func TestUnitStatePaths(t *testing.T) { - r := &EtcdRegistry{nil, "/fleet/"} + r := &EtcdRegistry{etcd: nil, keyPrefix: "/fleet/"} j := "foo.service" want := "/fleet/state/foo.service" got := r.legacyUnitStatePath(j) @@ -83,7 +83,7 @@ func TestUnitStatePaths(t *testing.T) { func TestSaveUnitState(t *testing.T) { e := &testEtcdClient{} - r := &EtcdRegistry{e, "/fleet/"} + r := &EtcdRegistry{etcd: e, keyPrefix: "/fleet/"} j := "foo.service" mID := "mymachine" us := unit.NewUnitState("abc", "def", "ghi", mID) @@ -129,7 +129,7 @@ func TestSaveUnitState(t *testing.T) { func TestRemoveUnitState(t *testing.T) { e := &testEtcdClient{} - r := &EtcdRegistry{e, "/fleet/"} + r := &EtcdRegistry{etcd: e, keyPrefix: "/fleet/"} j := "foo.service" err := r.RemoveUnitState(j) if err != nil { @@ -162,7 +162,7 @@ func TestRemoveUnitState(t *testing.T) { {[]error{nil, errors.New("ur registry don't work")}, true}, } { e = &testEtcdClient{err: tt.errs} - r = &EtcdRegistry{e, "/fleet"} + r = &EtcdRegistry{etcd: e, keyPrefix: "/fleet"} err = r.RemoveUnitState("foo.service") if (err != nil) != tt.fail { t.Errorf("case %d: unexpected error state calling UnitStates(): got %v, want %v", i, err, tt.fail) @@ -362,7 +362,7 @@ func TestGetUnitState(t *testing.T) { res: []*etcd.Result{tt.res}, err: []error{tt.err}, } - r := &EtcdRegistry{e, "/fleet/"} + r := &EtcdRegistry{etcd: e, keyPrefix: "/fleet/"} j := "foo.service" us, err := r.getUnitState(j, "XXX") if tt.wantErr != (err != nil) { @@ -442,7 +442,7 @@ func TestUnitStates(t *testing.T) { e := &testEtcdClient{ res: []*etcd.Result{res2}, } - r := &EtcdRegistry{e, "/fleet/"} + r := &EtcdRegistry{etcd: e, keyPrefix: "/fleet/"} got, err := r.UnitStates() if err != nil { @@ -477,7 +477,7 @@ func TestUnitStates(t *testing.T) { {[]error{errors.New("ur registry don't work")}, true}, } { e = &testEtcdClient{err: tt.errs} - r = &EtcdRegistry{e, "/fleet"} + r = &EtcdRegistry{etcd: e, keyPrefix: "/fleet"} got, err = r.UnitStates() if (err != nil) != tt.fail { t.Errorf("case %d: unexpected error state calling UnitStates(): got %v, want %v", i, err, tt.fail) diff --git a/registry/version.go b/registry/version.go index 824ec0ddc..e21661386 100644 --- a/registry/version.go +++ b/registry/version.go @@ -29,7 +29,7 @@ import ( func (r *EtcdRegistry) LatestDaemonVersion() (*semver.Version, error) { machs, err := r.Machines() if err != nil { - if isKeyNotFound(err) { + if etcd.IsKeyNotFound(err) { err = nil } return nil, err @@ -56,7 +56,7 @@ func (r *EtcdRegistry) EngineVersion() (int, error) { if err != nil { // no big deal, either the cluster is new or is just // upgrading from old unversioned code - if isKeyNotFound(err) { + if etcd.IsKeyNotFound(err) { err = nil } return 0, err @@ -82,7 +82,7 @@ func (r *EtcdRegistry) UpdateEngineVersion(from, to int) error { _, err := r.etcd.Do(req) if err == nil { return nil - } else if !isKeyNotFound(err) { + } else if !etcd.IsKeyNotFound(err) { return err } diff --git a/server/server.go b/server/server.go index 08ac61682..b47c27c4a 100644 --- a/server/server.go +++ b/server/server.go @@ -95,10 +95,11 @@ func New(cfg config.Config) (*Server, error) { a := agent.New(mgr, gen, reg, mach, agentTTL) rStream := registry.NewEtcdEventStream(eClient, cfg.EtcdKeyPrefix) + lManager := etcd.NewLeaseManager(eClient, cfg.EtcdKeyPrefix) ar := agent.NewReconciler(reg, rStream) - e := engine.New(reg, rStream, mach) + e := engine.New(reg, lManager, rStream, mach) listeners, err := activation.Listeners(false) if err != nil {