Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
registry: Move lease implementation into separate package
Browse files Browse the repository at this point in the history
Registry no longer uses lease

Engine updated to use lease manager instead of lease registry.
  • Loading branch information
chancez committed Jan 28, 2015
1 parent 420afd8 commit ff36ad7
Show file tree
Hide file tree
Showing 15 changed files with 250 additions and 221 deletions.
27 changes: 14 additions & 13 deletions engine/engine.go
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/coreos/fleet/log"
"github.com/coreos/fleet/machine"
"github.com/coreos/fleet/pkg"
"github.com/coreos/fleet/pkg/lease"
"github.com/coreos/fleet/registry"
)

Expand All @@ -36,21 +37,21 @@ type Engine struct {
rec *Reconciler
registry registry.Registry
cRegistry registry.ClusterRegistry
lRegistry registry.LeaseRegistry
lManager lease.Manager
rStream pkg.EventStream
machine machine.Machine

lease registry.Lease
lease lease.Lease
trigger chan struct{}
}

func New(reg *registry.EtcdRegistry, rStream pkg.EventStream, mach machine.Machine) *Engine {
func New(reg *registry.EtcdRegistry, lManager lease.Manager, rStream pkg.EventStream, mach machine.Machine) *Engine {
rec := NewReconciler()
return &Engine{
rec: rec,
registry: reg,
cRegistry: reg,
lRegistry: reg,
lManager: lManager,
rStream: rStream,
machine: mach,
trigger: make(chan struct{}),
Expand All @@ -66,11 +67,11 @@ func (e *Engine) Run(ival time.Duration, stop chan bool) {
return
}

var l registry.Lease
var l lease.Lease
if isLeader(e.lease, machID) {
l = renewLeadership(e.lease, leaseTTL)
} else {
l = acquireLeadership(e.lRegistry, machID, engineVersion, leaseTTL)
l = acquireLeadership(e.lManager, machID, engineVersion, leaseTTL)
}

// log all leadership changes
Expand Down Expand Up @@ -132,7 +133,7 @@ func (e *Engine) Purge() {
}
}

func isLeader(l registry.Lease, machID string) bool {
func isLeader(l lease.Lease, machID string) bool {
if l == nil {
return false
}
Expand Down Expand Up @@ -164,16 +165,16 @@ func ensureEngineVersionMatch(cReg registry.ClusterRegistry, expect int) bool {
return true
}

func acquireLeadership(lReg registry.LeaseRegistry, machID string, ver int, ttl time.Duration) registry.Lease {
existing, err := lReg.GetLease(engineLeaseName)
func acquireLeadership(lManager lease.Manager, machID string, ver int, ttl time.Duration) lease.Lease {
existing, err := lManager.GetLease(engineLeaseName)
if err != nil {
log.Errorf("Unable to determine current lessee: %v", err)
return nil
}

var l registry.Lease
var l lease.Lease
if existing == nil {
l, err = lReg.AcquireLease(engineLeaseName, machID, ver, ttl)
l, err = lManager.AcquireLease(engineLeaseName, machID, ver, ttl)
if err != nil {
log.Errorf("Engine leadership acquisition failed: %v", err)
return nil
Expand All @@ -191,7 +192,7 @@ func acquireLeadership(lReg registry.LeaseRegistry, machID string, ver int, ttl
}

rem := existing.TimeRemaining()
l, err = lReg.StealLease(engineLeaseName, machID, ver, ttl+rem, existing.Index())
l, err = lManager.StealLease(engineLeaseName, machID, ver, ttl+rem, existing.Index())
if err != nil {
log.Errorf("Engine leadership steal failed: %v", err)
return nil
Expand All @@ -210,7 +211,7 @@ func acquireLeadership(lReg registry.LeaseRegistry, machID string, ver int, ttl
return l
}

func renewLeadership(l registry.Lease, ttl time.Duration) registry.Lease {
func renewLeadership(l lease.Lease, ttl time.Duration) lease.Lease {
err := l.Renew(ttl)
if err != nil {
log.Errorf("Engine leadership lost, renewal failed: %v", err)
Expand Down
10 changes: 10 additions & 0 deletions etcd/error.go
Expand Up @@ -46,3 +46,13 @@ func unmarshalFailedResponse(resp *http.Response, body []byte) (*Result, error)

return nil, etcdErr
}

func IsKeyNotFound(err error) bool {
e, ok := err.(Error)
return ok && e.ErrorCode == ErrorKeyNotFound
}

func IsNodeExist(err error) bool {
e, ok := err.(Error)
return ok && e.ErrorCode == ErrorNodeExist
}

0 comments on commit ff36ad7

Please sign in to comment.