Skip to content

Commit

Permalink
Merge pull request #5948 from hashicorp/b-stats-recover-plugin-shutdown
Browse files Browse the repository at this point in the history
Collect driver stats when driver plugins are restarted
  • Loading branch information
Mahmood Ali committed Jul 17, 2019
2 parents 15caf5c + 66bef39 commit b352af9
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 44 deletions.
101 changes: 58 additions & 43 deletions client/allocrunner/taskrunner/stats_hook.go
Expand Up @@ -88,56 +88,18 @@ func (h *statsHook) Exited(context.Context, *interfaces.TaskExitedRequest, *inte
// Collection ends when the passed channel is closed
func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interfaces.DriverStats) {

ch, err := handle.Stats(ctx, h.interval)
MAIN:
ch, err := h.callStatsWithRetry(ctx, handle)
if err != nil {
// Check if the driver doesn't implement stats
if err.Error() == cstructs.DriverStatsNotImplemented.Error() {
h.logger.Debug("driver does not support stats")
return
}
h.logger.Error("failed to start stats collection for task", "error", err)
return
}

var backoff time.Duration
var retry int
limit := time.Second * 5
for {
time.Sleep(backoff)
select {
case ru, ok := <-ch:
// Channel is closed
// if channel closes, re-establish a new one
if !ok {
var re *structs.RecoverableError
ch, err = handle.Stats(ctx, h.interval)
if err == nil {
goto RETRY
}

// We do not log when the plugin is shutdown since this is
// likely because the driver plugin has unexpectedly exited,
// in which case sleeping and trying again or returning based
// on the stop channel is the correct behavior
if err != bstructs.ErrPluginShutdown {
h.logger.Debug("error fetching stats of task", "error", err)
goto RETRY
}
// check if the error is terminal otherwise it's likely a
// transport error and we should retry
re, ok = err.(*structs.RecoverableError)
if ok && re.IsUnrecoverable() {
return
}
h.logger.Warn("stats collection for task failed", "error", err)
RETRY:
// Calculate the new backoff
backoff = (1 << (2 * uint64(retry))) * time.Second
if backoff > limit {
backoff = limit
}
// Increment retry counter
retry++

continue
goto MAIN
}

// Update stats on TaskRunner and emit them
Expand All @@ -149,6 +111,59 @@ func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interf
}
}

// callStatsWithRetry invokes handle driver Stats() functions and retries until channel is established
// successfully. Returns an error if it encounters a permanent error.
//
// It logs the errors with appropriate log levels; don't log returned error
func (h *statsHook) callStatsWithRetry(ctx context.Context, handle interfaces.DriverStats) (<-chan *cstructs.TaskResourceUsage, error) {
var retry int

MAIN:
if ctx.Err() != nil {
return nil, ctx.Err()
}

ch, err := handle.Stats(ctx, h.interval)
if err == nil {
return ch, nil
}

// Check if the driver doesn't implement stats
if err.Error() == cstructs.DriverStatsNotImplemented.Error() {
h.logger.Debug("driver does not support stats")
return nil, err
}

// check if the error is terminal otherwise it's likely a
// transport error and we should retry
if re, ok := err.(*structs.RecoverableError); ok && re.IsUnrecoverable() {
h.logger.Error("failed to start stats collection for task with unrecoverable error", "error", err)
return nil, err
}

// We do not warn when the plugin is shutdown since this is
// likely because the driver plugin has unexpectedly exited,
// in which case sleeping and trying again or returning based
// on the stop channel is the correct behavior
if err == bstructs.ErrPluginShutdown {
h.logger.Debug("failed to fetching stats of task", "error", err)
} else {
h.logger.Error("failed to start stats collection for task", "error", err)
}

limit := time.Second * 5
backoff := 1 << (2 * uint64(retry)) * time.Second
if backoff > limit || retry > 5 {
backoff = limit
}

// Increment retry counter
retry++

time.Sleep(backoff)
goto MAIN
}

func (h *statsHook) Shutdown() {
h.mu.Lock()
defer h.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion plugins/drivers/client.go
Expand Up @@ -269,7 +269,7 @@ func (d *driverPluginClient) TaskStats(ctx context.Context, taskID string, inter
return nil, structs.NewRecoverableError(err, rec.Recoverable)
}
}
return nil, err
return nil, grpcutils.HandleGrpcErr(err, d.doneCtx)
}

ch := make(chan *cstructs.TaskResourceUsage, 1)
Expand Down

0 comments on commit b352af9

Please sign in to comment.