Permalink
Browse files

Add a 5x exponential backoff on 429s & 5xxs to the webhook Authentica…

…tor/Authorizer.
  • Loading branch information...
cjcullen committed Jun 24, 2016
1 parent e294b23 commit 38a104219922fb5ba6fe447546ac50f641adb8c8
@@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/apis/authentication.k8s.io/v1beta1"
"k8s.io/kubernetes/pkg/auth/authenticator"
"k8s.io/kubernetes/pkg/auth/user"
+ "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/util/cache"
"k8s.io/kubernetes/plugin/pkg/webhook"
@@ -35,6 +36,8 @@ var (
groupVersions = []unversioned.GroupVersion{v1beta1.SchemeGroupVersion}
)
+const retryBackoff = 500 * time.Millisecond
+
// Ensure WebhookTokenAuthenticator implements the authenticator.Token interface.
var _ authenticator.Token = (*WebhookTokenAuthenticator)(nil)
@@ -46,7 +49,12 @@ type WebhookTokenAuthenticator struct {
// New creates a new WebhookTokenAuthenticator from the provided kubeconfig file.
func New(kubeConfigFile string, ttl time.Duration) (*WebhookTokenAuthenticator, error) {
- gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions)
+ return newWithBackoff(kubeConfigFile, ttl, retryBackoff)
+}
+
+// newWithBackoff allows tests to skip the sleep.
+func newWithBackoff(kubeConfigFile string, ttl, initialBackoff time.Duration) (*WebhookTokenAuthenticator, error) {
+ gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions, initialBackoff)
if err != nil {
return nil, err
}
@@ -61,7 +69,9 @@ func (w *WebhookTokenAuthenticator) AuthenticateToken(token string) (user.Info,
if entry, ok := w.responseCache.Get(r.Spec); ok {
r.Status = entry.(v1beta1.TokenReviewStatus)
} else {
- result := w.RestClient.Post().Body(r).Do()
+ result := w.WithExponentialBackoff(func() restclient.Result {
+ return w.RestClient.Post().Body(r).Do()
+ })
if err := result.Error(); err != nil {
return nil, false, err
}
@@ -148,7 +148,7 @@ func newTokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, c
if err := json.NewEncoder(tempfile).Encode(config); err != nil {
return nil, err
}
- return New(p, cacheTime)
+ return newWithBackoff(p, cacheTime, 0)
}
func TestTLSConfig(t *testing.T) {
@@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/authorization/v1beta1"
"k8s.io/kubernetes/pkg/auth/authorizer"
+ "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/util/cache"
"k8s.io/kubernetes/plugin/pkg/webhook"
@@ -36,6 +37,8 @@ var (
groupVersions = []unversioned.GroupVersion{v1beta1.SchemeGroupVersion}
)
+const retryBackoff = 500 * time.Millisecond
+
// Ensure Webhook implements the authorizer.Authorizer interface.
var _ authorizer.Authorizer = (*WebhookAuthorizer)(nil)
@@ -67,7 +70,12 @@ type WebhookAuthorizer struct {
// For additional HTTP configuration, refer to the kubeconfig documentation
// http://kubernetes.io/v1.1/docs/user-guide/kubeconfig-file.html.
func New(kubeConfigFile string, authorizedTTL, unauthorizedTTL time.Duration) (*WebhookAuthorizer, error) {
- gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions)
+ return newWithBackoff(kubeConfigFile, authorizedTTL, unauthorizedTTL, retryBackoff)
+}
+
+// newWithBackoff allows tests to skip the sleep.
+func newWithBackoff(kubeConfigFile string, authorizedTTL, unauthorizedTTL, initialBackoff time.Duration) (*WebhookAuthorizer, error) {
+ gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions, initialBackoff)
if err != nil {
return nil, err
}
@@ -148,7 +156,9 @@ func (w *WebhookAuthorizer) Authorize(attr authorizer.Attributes) (err error) {
if entry, ok := w.responseCache.Get(string(key)); ok {
r.Status = entry.(v1beta1.SubjectAccessReviewStatus)
} else {
- result := w.RestClient.Post().Body(r).Do()
+ result := w.WithExponentialBackoff(func() restclient.Result {
+ return w.RestClient.Post().Body(r).Do()
+ })
if err := result.Error(); err != nil {
return err
}
@@ -183,7 +183,7 @@ current-context: default
return fmt.Errorf("failed to execute test template: %v", err)
}
// Create a new authorizer
- _, err = New(p, 0, 0)
+ _, err = newWithBackoff(p, 0, 0, 0)
return err
}()
if err != nil && !tt.wantErr {
@@ -291,7 +291,7 @@ func newAuthorizer(callbackURL string, clientCert, clientKey, ca []byte, cacheTi
if err := json.NewEncoder(tempfile).Encode(config); err != nil {
return nil, err
}
- return New(p, cacheTime, cacheTime)
+ return newWithBackoff(p, cacheTime, cacheTime, 0)
}
func TestTLSConfig(t *testing.T) {
@@ -19,6 +19,7 @@ package webhook
import (
"fmt"
+ "time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
@@ -27,16 +28,18 @@ import (
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/runtime"
runtimeserializer "k8s.io/kubernetes/pkg/runtime/serializer"
+ "k8s.io/kubernetes/pkg/util/wait"
_ "k8s.io/kubernetes/pkg/apis/authorization/install"
)
type GenericWebhook struct {
- RestClient *restclient.RESTClient
+ RestClient *restclient.RESTClient
+ initialBackoff time.Duration
}
// New creates a new GenericWebhook from the provided kubeconfig file.
-func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupVersion) (*GenericWebhook, error) {
+func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupVersion, initialBackoff time.Duration) (*GenericWebhook, error) {
for _, groupVersion := range groupVersions {
if !registered.IsEnabledVersion(groupVersion) {
return nil, fmt.Errorf("webhook plugin requires enabling extension resource: %s", groupVersion)
@@ -64,5 +67,31 @@ func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupV
// TODO(ericchiang): Can we ensure remote service is reachable?
- return &GenericWebhook{restClient}, nil
+ return &GenericWebhook{restClient, initialBackoff}, nil
+}
+
+// WithExponentialBackoff will retry webhookFn 5 times w/ exponentially
+// increasing backoff when a 429 or a 5xx response code is returned.
+func (g *GenericWebhook) WithExponentialBackoff(webhookFn func() restclient.Result) restclient.Result {
+ backoff := wait.Backoff{
+ Duration: g.initialBackoff,
+ Factor: 1.5,
+ Jitter: 0.2,
+ Steps: 5,
+ }
+ var result restclient.Result
+ wait.ExponentialBackoff(backoff, func() (bool, error) {
+ result = webhookFn()
+ // Return from Request.Do() errors immediately.
+ if err := result.Error(); err != nil {
+ return false, err
+ }
+ // Retry 429s, and 5xxs.
+ var statusCode int
+ if result.StatusCode(&statusCode); statusCode == 429 || statusCode >= 500 {
+ return false, nil
+ }
+ return true, nil
+ })
+ return result
}

0 comments on commit 38a1042

Please sign in to comment.