Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix vault retry logic on failed calls #1269

Merged
merged 1 commit into from
Aug 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 40 additions & 2 deletions dependency/vault_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,47 @@ type SecretWrapInfo struct {
WrappedAccessor string
}

// vaultRenewDuration accepts a secret and returns the recommended amount of
//
type renewer interface {
Dependency
stopChan() chan struct{}
secrets() (*Secret, *api.Secret)
}

func renewSecret(clients *ClientSet, d renewer) error {
log.Printf("[TRACE] %s: starting renewer", d)

secret, vaultSecret := d.secrets()
renewer, err := clients.Vault().NewRenewer(&api.RenewerInput{
Secret: vaultSecret,
})
if err != nil {
return err
}
go renewer.Renew()
defer renewer.Stop()

for {
select {
case err := <-renewer.DoneCh():
if err != nil {
log.Printf("[WARN] %s: failed to renew: %s", d, err)
}
log.Printf("[WARN] %s: renewer done (maybe the lease expired)", d)
return nil
case renewal := <-renewer.RenewCh():
log.Printf("[TRACE] %s: successfully renewed", d)
printVaultWarnings(d, renewal.Secret.Warnings)
updateSecret(secret, renewal.Secret)
case <-d.stopChan():
return ErrStopped
}
}
}

// leaseCheckWait accepts a secret and returns the recommended amount of
// time to sleep.
func vaultRenewDuration(s *Secret) time.Duration {
func leaseCheckWait(s *Secret) time.Duration {
// Handle whether this is an auth or a regular secret.
base := s.LeaseDuration
if s.Auth != nil && s.Auth.LeaseDuration > 0 {
Expand Down
4 changes: 2 additions & 2 deletions dependency/vault_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ func init() {

func TestVaultRenewDuration(t *testing.T) {
renewable := Secret{LeaseDuration: 100, Renewable: true}
renewableDur := vaultRenewDuration(&renewable).Seconds()
renewableDur := leaseCheckWait(&renewable).Seconds()
if renewableDur < 16 || renewableDur >= 34 {
t.Fatalf("renewable duration is not within 1/6 to 1/3 of lease duration: %f", renewableDur)
}

nonRenewable := Secret{LeaseDuration: 100}
nonRenewableDur := vaultRenewDuration(&nonRenewable).Seconds()
nonRenewableDur := leaseCheckWait(&nonRenewable).Seconds()
if nonRenewableDur < 85 || nonRenewableDur > 95 {
t.Fatalf("renewable duration is not within 85%% to 95%% of lease duration: %f", nonRenewableDur)
}
Expand Down
93 changes: 41 additions & 52 deletions dependency/vault_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ var (

// VaultReadQuery is the dependency to Vault for a secret
type VaultReadQuery struct {
stopCh chan struct{}
stopCh chan struct{}
sleepCh <-chan time.Time

rawPath string
queryValues url.Values
Expand All @@ -45,81 +46,69 @@ func NewVaultReadQuery(s string) (*VaultReadQuery, error) {

return &VaultReadQuery{
stopCh: make(chan struct{}, 1),
sleepCh: make(chan time.Time, 1),
rawPath: secretURL.Path,
queryValues: secretURL.Query(),
}, nil
}

// Fetch queries the Vault API
func (d *VaultReadQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
func (d *VaultReadQuery) Fetch(clients *ClientSet, opts *QueryOptions,
) (interface{}, *ResponseMetadata, error) {
select {
case <-d.stopCh:
return nil, nil, ErrStopped
default:
}
select {
case <-d.sleepCh:
default:
}

opts = opts.Merge(&QueryOptions{})
firstRun := d.secret == nil

if d.secret != nil {
if vaultSecretRenewable(d.secret) {
log.Printf("[TRACE] %s: starting renewer", d)

renewer, err := clients.Vault().NewRenewer(&api.RenewerInput{
Grace: opts.VaultGrace,
Secret: d.vaultSecret,
})
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}
go renewer.Renew()
defer renewer.Stop()

RENEW:
for {
select {
case err := <-renewer.DoneCh():
if err != nil {
log.Printf("[WARN] %s: failed to renew: %s", d, err)
}
log.Printf("[WARN] %s: renewer returned (maybe the lease expired)", d)
break RENEW
case renewal := <-renewer.RenewCh():
log.Printf("[TRACE] %s: successfully renewed", d)
printVaultWarnings(d, renewal.Secret.Warnings)
updateSecret(d.secret, renewal.Secret)
case <-d.stopCh:
return nil, nil, ErrStopped
}
}
} else {
// The secret isn't renewable, probably the generic secret backend.
dur := vaultRenewDuration(d.secret)
log.Printf("[TRACE] %s: secret is not renewable, sleeping for %s", d, dur)
select {
case <-time.After(dur):
// The lease is almost expired, it's time to request a new one.
case <-d.stopCh:
return nil, nil, ErrStopped
}
if !firstRun && vaultSecretRenewable(d.secret) {
err := renewSecret(clients, d)
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}
}

// We don't have a secret, or the prior renewal failed
vaultSecret, err := d.readSecret(clients, opts)
err := d.fetchSecret(clients, opts)
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}

// Print any warnings
printVaultWarnings(d, vaultSecret.Warnings)

// Create the cloned secret which will be exposed to the template.
d.vaultSecret = vaultSecret
d.secret = transformSecret(vaultSecret)
if !vaultSecretRenewable(d.secret) {
eikenb marked this conversation as resolved.
Show resolved Hide resolved
dur := leaseCheckWait(d.secret)
log.Printf("[TRACE] %s: non-renewable secret, set sleep for %s", d, dur)
d.sleepCh = time.After(dur)
}

return respWithMetadata(d.secret)
}

func (d *VaultReadQuery) fetchSecret(clients *ClientSet, opts *QueryOptions,
) error {
opts = opts.Merge(&QueryOptions{})
vaultSecret, err := d.readSecret(clients, opts)
if err == nil {
printVaultWarnings(d, vaultSecret.Warnings)
d.vaultSecret = vaultSecret
// the cloned secret which will be exposed to the template
d.secret = transformSecret(vaultSecret)
}
return err
}

func (d *VaultReadQuery) stopChan() chan struct{} {
return d.stopCh
}

func (d *VaultReadQuery) secrets() (*Secret, *api.Secret) {
return d.secret, d.vaultSecret
}

// CanShare returns if this dependency is shareable.
func (d *VaultReadQuery) CanShare() bool {
return false
Expand Down
11 changes: 9 additions & 2 deletions dependency/vault_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestNewVaultReadQuery(t *testing.T) {

if act != nil {
act.stopCh = nil
act.sleepCh = nil
}

assert.Equal(t, tc.exp, act)
Expand Down Expand Up @@ -170,7 +171,10 @@ func TestVaultReadQuery_Fetch_KVv1(t *testing.T) {
errCh <- err
return
}
dataCh <- data
select {
case dataCh <- data:
case <-d.stopCh:
}
}
}()

Expand Down Expand Up @@ -372,7 +376,10 @@ func TestVaultReadQuery_Fetch_KVv2(t *testing.T) {
errCh <- err
return
}
dataCh <- data
select {
case dataCh <- data:
case <-d.stopCh:
}
}
}()

Expand Down
45 changes: 11 additions & 34 deletions dependency/vault_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/hashicorp/vault/api"
"github.com/pkg/errors"
)

var (
Expand Down Expand Up @@ -44,59 +43,37 @@ func (d *VaultTokenQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interfa
default:
}

opts = opts.Merge(&QueryOptions{})

if vaultSecretRenewable(d.secret) {
log.Printf("[TRACE] %s: starting renewer", d)

renewer, err := clients.Vault().NewRenewer(&api.RenewerInput{
Grace: opts.VaultGrace,
Secret: d.vaultSecret,
})
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}
go renewer.Renew()
defer renewer.Stop()

RENEW:
for {
select {
case err := <-renewer.DoneCh():
if err != nil {
log.Printf("[WARN] %s: failed to renew: %s", d, err)
}
log.Printf("[WARN] %s: renewer returned (maybe the lease expired)", d)
break RENEW
case renewal := <-renewer.RenewCh():
log.Printf("[TRACE] %s: successfully renewed", d)
printVaultWarnings(d, renewal.Secret.Warnings)
updateSecret(d.secret, renewal.Secret)
case <-d.stopCh:
return nil, nil, ErrStopped
}
}
renewSecret(clients, d)
}

// The secret isn't renewable, probably the generic secret backend.
// TODO This is incorrect when given a non-renewable template. We should
// instead to a lookup self to determine the lease duration.
dur := vaultRenewDuration(d.secret)
opts = opts.Merge(&QueryOptions{})
dur := leaseCheckWait(d.secret)
if dur < opts.VaultGrace {
dur = opts.VaultGrace
}

log.Printf("[TRACE] %s: token is not renewable, sleeping for %s", d, dur)
select {
case <-time.After(dur):
// The lease is almost expired, it's time to request a new one.
case <-d.stopCh:
return nil, nil, ErrStopped
}

return nil, nil, ErrLeaseExpired
}

func (d *VaultTokenQuery) stopChan() chan struct{} {
return d.stopCh
}

func (d *VaultTokenQuery) secrets() (*Secret, *api.Secret) {
return d.secret, d.vaultSecret
}

// CanShare returns if this dependency is shareable.
func (d *VaultTokenQuery) CanShare() bool {
return false
Expand Down