Skip to content
This repository has been archived by the owner on Feb 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #2826 from dani-docker/orca-7577
Browse files Browse the repository at this point in the history
Address concurrency issue
  • Loading branch information
nishanttotla committed Jan 4, 2018
2 parents 307098c + 85bee20 commit d971ed4
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 47 deletions.
80 changes: 50 additions & 30 deletions cluster/engine.go
Expand Up @@ -315,9 +315,11 @@ func (e *Engine) HealthIndicator() int64 {
}

// setState sets engine state
func (e *Engine) setState(state engineState) {
e.Lock()
defer e.Unlock()
func (e *Engine) setState(state engineState, lockSet bool) {
if !lockSet {
e.Lock()
defer e.Unlock()
}
e.state = state
}

Expand Down Expand Up @@ -354,9 +356,11 @@ func (e *Engine) ValidationComplete() {
}

// setErrMsg sets error message for the engine
func (e *Engine) setErrMsg(errMsg string) {
e.Lock()
defer e.Unlock()
func (e *Engine) setErrMsg(errMsg string, lockSet bool) {
if !lockSet {
e.Lock()
defer e.Unlock()
}
e.lastError = strings.TrimSpace(errMsg)
e.updatedAt = time.Now()
}
Expand All @@ -370,7 +374,7 @@ func (e *Engine) ErrMsg() string {

// HandleIDConflict handles ID duplicate with existing engine
func (e *Engine) HandleIDConflict(otherAddr string) {
e.setErrMsg(fmt.Sprintf("ID duplicated. %s shared by this node %s and another node %s", e.ID, e.Addr, otherAddr))
e.setErrMsg(fmt.Sprintf("ID duplicated. %s shared by this node %s and another node %s", e.ID, e.Addr, otherAddr), false)
}

// Status returns the health status of the Engine: Healthy or Unhealthy
Expand All @@ -381,9 +385,11 @@ func (e *Engine) Status() string {
}

// incFailureCount increases engine's failure count, and sets engine as unhealthy if threshold is crossed
func (e *Engine) incFailureCount() {
e.Lock()
defer e.Unlock()
func (e *Engine) incFailureCount(lockSet bool) {
if !lockSet {
e.Lock()
defer e.Unlock()
}
e.failureCount++
if e.state == stateHealthy && e.failureCount >= e.opts.FailureRetry {
e.state = stateUnhealthy
Expand All @@ -406,23 +412,29 @@ func (e *Engine) UpdatedAt() time.Time {
return e.updatedAt
}

func (e *Engine) resetFailureCount() {
e.Lock()
defer e.Unlock()
func (e *Engine) resetFailureCount(lockSet bool) {
if !lockSet {
e.Lock()
defer e.Unlock()
}
e.failureCount = 0
}

// CheckConnectionErr checks error from client response and adjusts engine healthy indicators
func (e *Engine) CheckConnectionErr(err error) {
e.checkConnectionErr(err, false)
}

// CheckConnectionErr checks error from client response and adjusts engine healthy indicators
func (e *Engine) checkConnectionErr(err error, lockSet bool) {
if err == nil {
e.setErrMsg("")
e.setErrMsg("", lockSet)
// If current state is unhealthy, change it to healthy
if e.state == stateUnhealthy {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.getFailureCount())
e.emitEvent("engine_reconnect")
e.setState(stateHealthy)
e.setState(stateHealthy, lockSet)
}
e.resetFailureCount()
e.resetFailureCount(lockSet)
return
}

Expand All @@ -432,9 +444,9 @@ func (e *Engine) CheckConnectionErr(err error) {
// in engine marked as unhealthy. If this causes unnecessary failure, engine
// can track last error time. Only increase failure count if last error is
// not too recent, e.g., last error is at least 1 seconds ago.
e.incFailureCount()
e.incFailureCount(lockSet)
// update engine error message
e.setErrMsg(err.Error())
e.setErrMsg(err.Error(), lockSet)
return
}
// other errors may be ambiguous.
Expand Down Expand Up @@ -724,30 +736,30 @@ func (e *Engine) refreshVolume(IDOrName string) error {
// true, each container will be inspected.
// FIXME: unexport this method after mesos scheduler stops using it directly
func (e *Engine) RefreshContainers(full bool) error {
e.Lock()
defer e.Unlock()
opts := types.ContainerListOptions{
All: true,
Size: false,
}
ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout)
defer cancel()
containers, err := e.apiClient.ContainerList(ctx, opts)
e.CheckConnectionErr(err)
e.checkConnectionErr(err, true)
if err != nil {
return err
}

merged := make(map[string]*Container)
for _, c := range containers {
mergedUpdate, err := e.updateContainer(c, merged, full)
mergedUpdate, err := e.updateContainer(c, merged, full, true)
if err != nil {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Unable to update state of container %q: %v", c.ID, err)
} else {
merged = mergedUpdate
}
}

e.Lock()
defer e.Unlock()
e.containers = merged

return nil
Expand Down Expand Up @@ -784,17 +796,19 @@ func (e *Engine) refreshContainer(ID string, full bool) (*Container, error) {
return nil, nil
}

_, err = e.updateContainer(containers[0], e.containers, full)
_, err = e.updateContainer(containers[0], e.containers, full, false)
e.RLock()
container := e.containers[containers[0].ID]
e.RUnlock()
return container, err
}

func (e *Engine) updateContainer(c types.Container, containers map[string]*Container, full bool) (map[string]*Container, error) {
func (e *Engine) updateContainer(c types.Container, containers map[string]*Container, full bool, lockSet bool) (map[string]*Container, error) {
var container *Container

e.RLock()
if !lockSet {
e.RLock()
}
if current, exists := e.containers[c.ID]; exists {
// The container is already known.
container = current
Expand All @@ -814,14 +828,16 @@ func (e *Engine) updateContainer(c types.Container, containers map[string]*Conta
// Trade-off: If updateContainer() is called concurrently for the same
// container, we will end up doing a full refresh twice and the original
// container (containers[container.Id]) will get replaced.
e.RUnlock()
if !lockSet {
e.RUnlock()
}

c.Created = time.Unix(c.Created, 0).Add(e.DeltaDuration).Unix()

// Update ContainerInfo.
if full {
info, err := e.apiClient.ContainerInspect(context.Background(), c.ID)
e.CheckConnectionErr(err)
e.checkConnectionErr(err, lockSet)
if err != nil {
return nil, err
}
Expand All @@ -848,10 +864,14 @@ func (e *Engine) updateContainer(c types.Container, containers map[string]*Conta
}

// Update its internal state.
e.Lock()
if !lockSet {
e.Lock()
}
container.Container = c
containers[container.ID] = container
e.Unlock()
if !lockSet {
e.Unlock()
}

return containers, nil
}
Expand Down
34 changes: 17 additions & 17 deletions cluster/engine_test.go
Expand Up @@ -58,23 +58,23 @@ func (nopCloser) Close() error {
func TestSetEngineState(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
assert.True(t, engine.state == statePending)
engine.setState(stateUnhealthy)
engine.setState(stateUnhealthy, false)
assert.True(t, engine.state == stateUnhealthy)
engine.setState(stateHealthy)
engine.setState(stateHealthy, false)
assert.True(t, engine.state == stateHealthy)
}

func TestErrMsg(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
assert.True(t, len(engine.ErrMsg()) == 0)
message := "cannot connect"
engine.setErrMsg(message)
engine.setErrMsg(message, false)
assert.True(t, engine.ErrMsg() == message)
}

func TestCheckConnectionErr(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
engine.setState(stateHealthy)
engine.setState(stateHealthy, false)
assert.True(t, engine.failureCount == 0)
err := engineapi.ErrorConnectionFailed("")
engine.CheckConnectionErr(err)
Expand All @@ -97,26 +97,26 @@ func TestCheckConnectionErr(t *testing.T) {

func TestEngineFailureCount(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
engine.setState(stateHealthy)
engine.setState(stateHealthy, false)
for i := 0; i < engine.opts.FailureRetry; i++ {
assert.True(t, engine.IsHealthy())
engine.incFailureCount()
engine.incFailureCount(false)
}
assert.False(t, engine.IsHealthy())
assert.True(t, engine.failureCount == engine.opts.FailureRetry)
engine.resetFailureCount()
engine.resetFailureCount(false)
assert.True(t, engine.failureCount == 0)
}

func TestHealthIndicator(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
assert.True(t, engine.state == statePending)
assert.True(t, engine.HealthIndicator() == 0)
engine.setState(stateUnhealthy)
engine.setState(stateUnhealthy, false)
assert.True(t, engine.HealthIndicator() == 0)
engine.setState(stateHealthy)
engine.setState(stateHealthy, false)
assert.True(t, engine.HealthIndicator() == 100)
engine.incFailureCount()
engine.incFailureCount(false)
assert.True(t, engine.HealthIndicator() == (int64)(100-100/engine.opts.FailureRetry))
}

Expand Down Expand Up @@ -155,7 +155,7 @@ func TestOutdatedEngine(t *testing.T) {

func TestEngineCpusMemory(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
engine.setState(stateUnhealthy)
engine.setState(stateUnhealthy, false)
assert.False(t, engine.isConnected())

apiClient := engineapimock.NewMockClient()
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestEngineCpusMemory(t *testing.T) {

func TestEngineSpecs(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
engine.setState(stateUnhealthy)
engine.setState(stateUnhealthy, false)
assert.False(t, engine.isConnected())

apiClient := engineapimock.NewMockClient()
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestEngineSpecs(t *testing.T) {

func TestEngineState(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
engine.setState(stateUnhealthy)
engine.setState(stateUnhealthy, false)
assert.False(t, engine.isConnected())

apiClient := engineapimock.NewMockClient()
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestCreateContainer(t *testing.T) {
readCloser = nopCloser{bytes.NewBufferString("")}
)

engine.setState(stateUnhealthy)
engine.setState(stateUnhealthy, false)
apiClient.On("Info", mock.Anything).Return(mockInfo, nil)
apiClient.On("ServerVersion", mock.Anything).Return(mockVersion, nil)
apiClient.On("NetworkList", mock.Anything,
Expand Down Expand Up @@ -526,7 +526,7 @@ func TestCreateContainer(t *testing.T) {

func TestImages(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
engine.setState(stateHealthy)
engine.setState(stateHealthy, false)
engine.images = []*Image{
{types.ImageSummary{ID: "a"}, engine},
{types.ImageSummary{ID: "b"}, engine},
Expand Down Expand Up @@ -564,7 +564,7 @@ func TestUsedCpus(t *testing.T) {
)

engine := NewEngine("test", 0, engOpts)
engine.setState(stateHealthy)
engine.setState(stateHealthy, false)
apiClient := engineapimock.NewMockClient()

for _, hn := range hostNcpu {
Expand Down Expand Up @@ -651,7 +651,7 @@ func TestContainerRemovedDuringRefresh(t *testing.T) {
)

engine := NewEngine("test", 0, engOpts)
engine.setState(stateUnhealthy)
engine.setState(stateUnhealthy, false)
assert.False(t, engine.isConnected())

// A container is removed before it can be inspected.
Expand Down

0 comments on commit d971ed4

Please sign in to comment.