Skip to content

Commit

Permalink
0.4 (#13)
Browse files Browse the repository at this point in the history
* handling flapping of the master on start

* Fixes for not so slight refactor #8

#8
  • Loading branch information
burdandrei authored and vladshub committed Apr 26, 2018
1 parent 870b9a9 commit bb9c7ab
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 24 deletions.
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
## 0.4.0 (April 26, 2018)

FEATURES:

* Service check now registered as initially passing to avoid service flapping

FIXES:

* Handling slave session invalidation
* Prevent from shutting down if local consul agent is not healthy

## 0.3.0 (April 23, 2018)

FEATURES:

* Consul lock session name can be adjusted
* Consul lock retry oprion added
* Consul lock retry option added
* Dependencies updated (Consul to 1.0.7, go-redis to 6.10.2)

## 0.2.0 (March 13, 2018)
Expand Down
9 changes: 5 additions & 4 deletions consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func (rc *resec) acquireConsulLeadership() {
return
}

rc.consul.lockIsWaiting = false
rc.consul.lockErrorCh, err = rc.consul.lock.Lock(rc.consul.lockAbortCh)
if err != nil {
rc.consul.lockIsWaiting = false
rc.consul.lockIsHeld = false
rc.consul.lockStatusCh <- &consulLockStatus{
acquired: false,
err: err,
Expand Down Expand Up @@ -90,7 +91,7 @@ func (rc *resec) handleWaitForLockError() {
break
}

log.Printf("[DEBUG] Lock Error chanel is closed")
log.Printf("[DEBUG] Lock Error channel is closed")

err := fmt.Errorf("Consul lock lost or error")
log.Printf("[DEBUG] %s", err)
Expand Down Expand Up @@ -178,7 +179,7 @@ func (rc *resec) registerService() error {
ServiceID: rc.consul.serviceID,
AgentServiceCheck: consulapi.AgentServiceCheck{
TTL: rc.consul.ttl,
Status: "critical",
Status: "passing",
DeregisterCriticalServiceAfter: rc.consul.deregisterServiceAfter.String(),
},
}
Expand Down Expand Up @@ -220,7 +221,7 @@ func (rc *resec) watchConsulMasterService() error {
wp.Handler = func(idx uint64, data interface{}) {
switch masterConsulServiceStatus := data.(type) {
case []*consulapi.ServiceEntry:
log.Printf("[INFO] Received update for master from consul")
log.Printf("[DEBUG] Received update for master from consul")
rc.consulMasterServiceCh <- masterConsulServiceStatus

default:
Expand Down
11 changes: 11 additions & 0 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ func (rc *resec) runAsSlave(masterAddress string, masterPort int) error {
}

log.Printf("[INFO] Enslaved redis %s to be slave of %s:%d", rc.redis.address, masterAddress, masterPort)

// change our internal state to being a slave
rc.redis.replicationStatus = "slave"
if err := rc.registerService(); err != nil {
return fmt.Errorf("[ERROR] Consul Service registration failed - %s", err)
}

// if we are enslaved and our status is published in consul, lets go back to trying
// to acquire leadership / master role as well
go rc.acquireConsulLeadership()

return nil
}

Expand Down
32 changes: 13 additions & 19 deletions resec.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ func (rc *resec) start() {
return
}

log.Printf("[DEBUG] Got redis replication status update:\n %s", update.output)
log.Printf("[DEBUG] Got redis replication info update")

if rc.consul.healthy {
// if we don't have any check id, we haven't registered our service yet
// let's do that first
if rc.redis.replicationStatus != "" {
if rc.consul.checkID == "" {
log.Printf("[DEBUG] Consul Check ID is not generated")
rc.registerService()
}

Expand All @@ -51,6 +52,8 @@ func (rc *resec) start() {
rc.handleConsulError(err)
log.Printf("[ERROR] Failed to update consul Check TTL - %s", err)
}
} else {
log.Printf("[DEBUG] Redis replication status is not defined")
}
} else {
log.Printf("[INFO] Consul is not healthy, skipping service check update")
Expand All @@ -66,21 +69,19 @@ func (rc *resec) start() {
// our state is now unhealthy, release the consul lock so someone else can
// acquire the consul leadership and become redis master
if !update.healthy {
log.Printf("[INFO] Redis replication status changed to NOT healthy")
log.Printf("[INFO] Redis status changed to NOT healthy")
rc.releaseConsulLock()
continue
}

log.Printf("[INFO] Redis replication status changed to healthy")
if rc.redis.replicationStatus == "slave" {
log.Printf("[INFO] Redis status changed to healthy")
if rc.redis.replicationStatus != "master" {
if err := rc.runAsSlave(rc.lastKnownMasterInfo.address, rc.lastKnownMasterInfo.port); err != nil {
log.Println(err)
continue
}
}

go rc.acquireConsulLeadership()

case update, ok := <-rc.consulMasterServiceCh:
if !ok {
log.Printf("[ERROR] Consul master service channel was closed, shutting down")
Expand All @@ -99,7 +100,7 @@ func (rc *resec) start() {
go rc.acquireConsulLeadership()
continue
}
log.Printf("[DEBUG] Redis is not healthy, nothing to do here")
log.Printf("[DEBUG] No Master found in consul, but redis is not healthy, nothing to do here")

// multiple masters is not good
case masterCount > 1:
Expand All @@ -126,21 +127,13 @@ func (rc *resec) start() {

// todo(jippi): if we can't enslave our redis, we shouldn't try to do any further work
// especially not updating our consul catalog entry
if err := rc.runAsSlave(rc.lastKnownMasterInfo.address, rc.lastKnownMasterInfo.port); err != nil {
log.Println(err)
if !rc.redis.healthy {
continue
}

// change our internal state to being a slave
rc.redis.replicationStatus = "slave"
if err := rc.registerService(); err != nil {
log.Printf("[ERROR] Consul Service registration failed - %s", err)
if err := rc.runAsSlave(rc.lastKnownMasterInfo.address, rc.lastKnownMasterInfo.port); err != nil {
log.Println(err)
continue
}

// if we are enslaved and our status is published in consul, lets go back to trying
// to acquire leadership / master role as well
go rc.acquireConsulLeadership()
}

// if our consul lock status has changed
Expand Down Expand Up @@ -173,8 +166,9 @@ func (rc *resec) start() {
log.Printf("[ERROR] %s", update.err)
rc.handleConsulError(update.err)


if !rc.consul.healthy {
return
continue
}

if rc.redis.replicationStatus == "master" {
Expand Down

0 comments on commit bb9c7ab

Please sign in to comment.