Skip to content

Commit

Permalink
fix consul locking (#19)
Browse files Browse the repository at this point in the history
fix consul services

fix network partitioning issues
  • Loading branch information
shaleman authored Sep 22, 2016
1 parent 75ae6c7 commit b9df55d
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 40 deletions.
107 changes: 70 additions & 37 deletions consulLock.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type consulLock struct {
isAcquired bool
isReleased bool
ttl string
timeout uint64
sessionID string
eventChan chan LockEvent
stopChan chan struct{}
Expand All @@ -57,31 +56,15 @@ func (cp *ConsulClient) NewLock(name string, myID string, ttl uint64) (LockInter

// Acquire a lock
func (lk *consulLock) Acquire(timeout uint64) error {
// session configuration
sessCfg := api.SessionEntry{
Name: lk.keyName,
Behavior: "release",
LockDelay: 10 * time.Millisecond,
TTL: lk.ttl,
}

// Create consul session
sessionID, _, err := lk.client.Session().CreateNoChecks(&sessCfg, nil)
err := lk.createSession()
if err != nil {
log.Errorf("Error Creating session for lock %s. Err: %v", lk.keyName, err)
return err
}

log.Infof("Created session: %s for lock %s/%s", sessionID, lk.name, lk.myID)

// save the session ID for later
lk.mutex.Lock()
lk.timeout = timeout
lk.sessionID = sessionID
lk.mutex.Unlock()

// Refresh the session in background
go lk.client.Session().RenewPeriodic(lk.ttl, sessionID, nil, lk.stopChan)
go lk.renewSession()

// Watch for changes on the lock
go lk.acquireLock()
Expand All @@ -105,18 +88,18 @@ func (lk *consulLock) Acquire(timeout uint64) error {

// Release a lock
func (lk *consulLock) Release() error {
lk.mutex.Lock()
defer lk.mutex.Unlock()

// Mark this as released
lk.mutex.Lock()
lk.isReleased = true
lk.mutex.Unlock()

// Send stop signal on stop channel
close(lk.stopChan)

// If the lock was acquired, release it
if lk.isAcquired {
lk.isAcquired = false
if lk.IsAcquired() {
lk.setAcquired(false)

// Release it via consul client
succ, _, err := lk.client.KV().Release(&api.KVPair{Key: lk.keyName, Value: []byte(lk.myID), Session: lk.sessionID}, nil)
Expand Down Expand Up @@ -164,6 +147,13 @@ func (lk *consulLock) IsAcquired() bool {
return lk.isAcquired
}

// IsReleased Checks if the lock is released
func (lk *consulLock) IsReleased() bool {
lk.mutex.Lock()
defer lk.mutex.Unlock()
return lk.isReleased
}

// GetHolder Gets current lock holder's ID
func (lk *consulLock) GetHolder() string {
lk.mutex.Lock()
Expand Down Expand Up @@ -206,25 +196,18 @@ func (lk *consulLock) acquireLock() {

log.Debugf("Got lock(%s) watch Resp: %+v", lk.myID, resp)

// check if we are holding the lock
lk.mutex.Lock()
// exit the loop if lock is released
if lk.isReleased {
if lk.IsReleased() {
log.Infof("Lock is released. exiting watch")
lk.mutex.Unlock()
return
}

isAcquired := lk.isAcquired
lk.mutex.Unlock()

if isAcquired {
// check if we are holding the lock
if lk.IsAcquired() {
// check if we lost the lock
if resp == nil || resp.Session != lk.sessionID || string(resp.Value) != lk.myID {
// lock is released
lk.mutex.Lock()
lk.isAcquired = false
lk.mutex.Unlock()
lk.setAcquired(false)

log.Infof("Lost lock %s", lk.name, lk.myID)

Expand All @@ -248,9 +231,7 @@ func (lk *consulLock) acquireLock() {
log.Infof("Acquired lock %s/%s", lk.name, lk.myID)

// Mark the lock as acquired
lk.mutex.Lock()
lk.isAcquired = true
lk.mutex.Unlock()
lk.setAcquired(true)

// Send acquired message to event channel
lk.eventChan <- LockEvent{EventType: LockAcquired}
Expand All @@ -263,3 +244,55 @@ func (lk *consulLock) acquireLock() {
waitIdx = meta.LastIndex
}
}

// setAcquired marks the lock as acquired/not
func (lk *consulLock) setAcquired(isAcquired bool) {
lk.mutex.Lock()
lk.isAcquired = isAcquired
lk.mutex.Unlock()
}

// createSession creates a consul-session for the lock
func (lk *consulLock) createSession() error {
// session configuration
sessCfg := api.SessionEntry{
Name: lk.keyName,
Behavior: "delete",
LockDelay: 10 * time.Millisecond,
TTL: lk.ttl,
}

// Create consul session
sessionID, _, err := lk.client.Session().CreateNoChecks(&sessCfg, nil)
if err != nil {
log.Errorf("Error Creating session for lock %s. Err: %v", lk.keyName, err)
return err
}

log.Infof("Created session: %s for lock %s/%s", sessionID, lk.name, lk.myID)

// save the session ID for later
lk.mutex.Lock()
lk.sessionID = sessionID
lk.mutex.Unlock()

return nil
}

// renewSession keeps the session alive.. If a session expires, it creates new one..
func (lk *consulLock) renewSession() {
for {
err := lk.client.Session().RenewPeriodic(lk.ttl, lk.sessionID, nil, lk.stopChan)
if err == nil || lk.IsReleased() {
// If lock was released, exit this go routine
return
}

// Create new consul session
err = lk.createSession()
if err != nil {
log.Errorf("Error Creating session for lock %s. Err: %v", lk.keyName, err)
}
}

}
33 changes: 31 additions & 2 deletions consulService.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (cp *ConsulClient) RegisterService(serviceInfo ServiceInfo) error {

// Run refresh in background
stopChan := make(chan struct{})
go cp.renewService(keyName, sessCfg.TTL, sessionID, stopChan)
go cp.renewService(keyName, sessCfg.TTL, sessionID, jsonVal, stopChan)

// Store it in DB
cp.serviceDb[keyName] = &consulServiceState{
Expand Down Expand Up @@ -259,14 +259,43 @@ func (cp *ConsulClient) DeregisterService(serviceInfo ServiceInfo) error {
}

//--------------------- Internal funcitons -------------------
func (cp *ConsulClient) renewService(keyName, ttl, sessionID string, stopChan chan struct{}) {
func (cp *ConsulClient) renewService(keyName, ttl, sessionID string, jsonVal []byte, stopChan chan struct{}) {
for {
err := cp.client.Session().RenewPeriodic(ttl, sessionID, nil, stopChan)
if err == nil {
log.Infof("Stoping renew on %s", keyName)
return
}
log.Infof("RenewPeriodic for session %s exited with error: %v. Retrying..", keyName, err)

// session configuration
sessCfg := api.SessionEntry{
Name: keyName,
Behavior: "delete",
LockDelay: 10 * time.Millisecond,
TTL: ttl,
}

// Create consul session
sessionID, _, err = cp.client.Session().CreateNoChecks(&sessCfg, nil)
if err != nil {
log.Errorf("Error Creating session for lock %s. Err: %v", keyName, err)
}

// Delete the old key if it exists..
log.Infof("Deleting old service entry for key %s", keyName)
_, err = cp.client.KV().Delete(keyName, nil)
if err != nil {
log.Errorf("Error deleting key %s. Err: %v", keyName, err)
}

// Set it via consul client
succ, _, err := cp.client.KV().Acquire(&api.KVPair{Key: keyName, Value: jsonVal, Session: sessionID}, nil)
if err != nil {
log.Errorf("Error setting key %s, Err: %v", keyName, err)
} else if !succ {
log.Errorf("Failed to acquire key %s. Already acquired", keyName)
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion etcdLock.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,13 @@ func (lk *etcdLock) acquireLock() {
for {
log.Infof("Getting the lock %s to see if its acquired", keyName)
// Get the key and see if we or someone else has already acquired the lock
resp, err := lk.kapi.Get(context.Background(), keyName, nil)
resp, err := lk.kapi.Get(context.Background(), keyName, &client.GetOptions{Quorum: true})
if err != nil {
if !client.IsKeyNotFound(err) {
log.Errorf("Error getting the key %s. Err: %v", keyName, err)
// Retry after a second in case of error
time.Sleep(time.Second)
continue
} else {
log.Infof("Lock %s does not exist. trying to acquire it", keyName)
}
Expand Down

0 comments on commit b9df55d

Please sign in to comment.