Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor waitAction #229

Merged
merged 1 commit into from Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 24 additions & 22 deletions driver/controller.go
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/status"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -950,34 +951,35 @@ func (d *Driver) waitAction(ctx context.Context, volumeId string, actionId int)
"action_id": actionId,
})

ctx, cancel := context.WithTimeout(ctx, time.Minute)
// This timeout should not strike given all sidecars use a timeout that is
// lower (which should always be the case). Using this as a fail-safe
// mechanism in case of a bug or misconfiguration.
ctx, cancel := context.WithTimeout(ctx, d.waitActionTimeout)
defer cancel()

// TODO(arslan): use backoff in the future
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: I did not carry this TODO intentionally since backoffs for timeouts are implemented by the sidecars, and other kinds of errors we occasionally run into do not seem to warrant an exponential backoff as of today (e.g., temporary 4xx responses returned from the API that disappear on the next API call).

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
action, _, err := d.storageActions.Get(ctx, volumeId, actionId)
if err != nil {
ll.WithError(err).Info("waiting for volume errored")
continue
err := wait.PollUntil(1*time.Second, wait.ConditionFunc(func() (done bool, err error) {
action, _, err := d.storageActions.Get(ctx, volumeId, actionId)
if err != nil {
ctxCanceled := ctx.Err() != nil
if !ctxCanceled {
ll.WithError(err).Info("getting action for volume")
return false, nil
}
ll.WithField("action_status", action.Status).Info("action received")

if action.Status == godo.ActionCompleted {
ll.Info("action completed")
return nil
}
return false, fmt.Errorf("failed to get action %d for volume %s: %s", actionId, volumeId, err)
}

if action.Status == godo.ActionInProgress {
continue
}
case <-ctx.Done():
return fmt.Errorf("timeout occurred waiting for storage action of volume: %q", volumeId)
ll.WithField("action_status", action.Status).Info("action received")

if action.Status == godo.ActionCompleted {
ll.Info("action completed")
return true, nil
}
}

return false, nil
}), ctx.Done())

return err
}

// checkLimit checks whether the user hit their volume limit to ensure.
Expand Down
75 changes: 75 additions & 0 deletions driver/controller_test.go
Expand Up @@ -5,13 +5,16 @@ import (
"errors"
"net/http"
"testing"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/digitalocean/godo"
"github.com/magiconair/properties/assert"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"k8s.io/apimachinery/pkg/util/wait"
)

func TestTagger(t *testing.T) {
Expand Down Expand Up @@ -251,3 +254,75 @@ func TestControllerExpandVolume(t *testing.T) {
})
}
}

type fakeStorageAction struct {
*fakeStorageActionsDriver
storageGetValsFunc func(invocation int) (*godo.Action, *godo.Response, error)
invocation int
}

func (f *fakeStorageAction) Get(ctx context.Context, volumeID string, actionID int) (*godo.Action, *godo.Response, error) {
defer func() {
f.invocation++
}()
return f.storageGetValsFunc(f.invocation)
}

func TestWaitAction(t *testing.T) {
tests := []struct {
name string
storageGetValsFunc func(invocation int) (*godo.Action, *godo.Response, error)
timeout time.Duration
wantErr error
}{
{
name: "timeout",
storageGetValsFunc: func(int) (*godo.Action, *godo.Response, error) {
return &godo.Action{
Status: godo.ActionInProgress,
}, nil, nil
},
timeout: 2 * time.Second,
wantErr: wait.ErrWaitTimeout,
},
{
name: "progressing to completion",
storageGetValsFunc: func(invocation int) (*godo.Action, *godo.Response, error) {
switch invocation {
case 0:
return nil, nil, errors.New("network disruption")
case 1:
return &godo.Action{
Status: godo.ActionInProgress,
}, &godo.Response{}, nil
default:
return &godo.Action{
Status: godo.ActionCompleted,
}, &godo.Response{}, nil
}
},
timeout: 5 * time.Second, // We need three 1-second ticks for the fake storage action to complete.
wantErr: nil,
},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
d := Driver{
waitActionTimeout: test.timeout,
storageActions: &fakeStorageAction{
fakeStorageActionsDriver: &fakeStorageActionsDriver{},
storageGetValsFunc: test.storageGetValsFunc,
},
log: logrus.New().WithField("test_enabed", true),
}

err := d.waitAction(context.Background(), "volumeID", 42)
if err != test.wantErr {
t.Errorf("got error %q, want %q", err, test.wantErr)
}
})
}
}
20 changes: 12 additions & 8 deletions driver/driver.go
Expand Up @@ -27,6 +27,7 @@ import (
"path/filepath"
"strconv"
"sync"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
metadata "github.com/digitalocean/go-metadata"
Expand All @@ -43,7 +44,8 @@ const (
DefaultDriverName = "dobs.csi.digitalocean.com"
// DefaultAddress is the default address that the csi plugin will serve its
// http handler on.
DefaultAddress = "127.0.0.1:12302"
DefaultAddress = "127.0.0.1:12302"
defaultWaitActionTimeout = 1 * time.Minute
)

var (
Expand All @@ -64,12 +66,13 @@ type Driver struct {
// `ControllerPublishVolume` to `NodeStageVolume or `NodePublishVolume`
publishInfoVolumeName string

endpoint string
address string
nodeId string
region string
doTag string
isController bool
endpoint string
address string
nodeId string
region string
doTag string
isController bool
waitActionTimeout time.Duration

srv *grpc.Server
httpSrv http.Server
Expand Down Expand Up @@ -146,7 +149,8 @@ func NewDriver(ep, token, url, doTag, driverName, address string) (*Driver, erro
log: log,
// for now we're assuming only the controller has a non-empty token. In
// the future we should pass an explicit flag to the driver.
isController: token != "",
isController: token != "",
waitActionTimeout: defaultWaitActionTimeout,

storage: doClient.Storage,
storageActions: doClient.StorageActions,
Expand Down
11 changes: 6 additions & 5 deletions driver/driver_test.go
Expand Up @@ -53,11 +53,12 @@ func TestDriverSuite(t *testing.T) {
}

driver := &Driver{
name: DefaultDriverName,
endpoint: endpoint,
nodeId: strconv.Itoa(nodeID),
doTag: doTag,
region: "nyc3",
name: DefaultDriverName,
endpoint: endpoint,
nodeId: strconv.Itoa(nodeID),
doTag: doTag,
region: "nyc3",
waitActionTimeout: defaultWaitActionTimeout,
mounter: &fakeMounter{
mounted: map[string]string{},
},
Expand Down