Skip to content

Commit

Permalink
Merge pull request #1269 from hashicorp/issue-1224-fix-vault-retry
Browse files Browse the repository at this point in the history
fix vault retry logic on failed calls
  • Loading branch information
eikenb committed Aug 30, 2019
2 parents 7e760ef + 14f8395 commit a96de95
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 144 deletions.
42 changes: 40 additions & 2 deletions dependency/vault_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,47 @@ type SecretWrapInfo struct {
WrappedAccessor string
}

// vaultRenewDuration accepts a secret and returns the recommended amount of
//
type renewer interface {
Dependency
stopChan() chan struct{}
secrets() (*Secret, *api.Secret)
}

func renewSecret(clients *ClientSet, d renewer) error {
log.Printf("[TRACE] %s: starting renewer", d)

secret, vaultSecret := d.secrets()
renewer, err := clients.Vault().NewRenewer(&api.RenewerInput{
Secret: vaultSecret,
})
if err != nil {
return err
}
go renewer.Renew()
defer renewer.Stop()

for {
select {
case err := <-renewer.DoneCh():
if err != nil {
log.Printf("[WARN] %s: failed to renew: %s", d, err)
}
log.Printf("[WARN] %s: renewer done (maybe the lease expired)", d)
return nil
case renewal := <-renewer.RenewCh():
log.Printf("[TRACE] %s: successfully renewed", d)
printVaultWarnings(d, renewal.Secret.Warnings)
updateSecret(secret, renewal.Secret)
case <-d.stopChan():
return ErrStopped
}
}
}

// leaseCheckWait accepts a secret and returns the recommended amount of
// time to sleep.
func vaultRenewDuration(s *Secret) time.Duration {
func leaseCheckWait(s *Secret) time.Duration {
// Handle whether this is an auth or a regular secret.
base := s.LeaseDuration
if s.Auth != nil && s.Auth.LeaseDuration > 0 {
Expand Down
4 changes: 2 additions & 2 deletions dependency/vault_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ func init() {

func TestVaultRenewDuration(t *testing.T) {
renewable := Secret{LeaseDuration: 100, Renewable: true}
renewableDur := vaultRenewDuration(&renewable).Seconds()
renewableDur := leaseCheckWait(&renewable).Seconds()
if renewableDur < 16 || renewableDur >= 34 {
t.Fatalf("renewable duration is not within 1/6 to 1/3 of lease duration: %f", renewableDur)
}

nonRenewable := Secret{LeaseDuration: 100}
nonRenewableDur := vaultRenewDuration(&nonRenewable).Seconds()
nonRenewableDur := leaseCheckWait(&nonRenewable).Seconds()
if nonRenewableDur < 85 || nonRenewableDur > 95 {
t.Fatalf("renewable duration is not within 85%% to 95%% of lease duration: %f", nonRenewableDur)
}
Expand Down
93 changes: 41 additions & 52 deletions dependency/vault_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ var (

// VaultReadQuery is the dependency to Vault for a secret
type VaultReadQuery struct {
stopCh chan struct{}
stopCh chan struct{}
sleepCh <-chan time.Time

rawPath string
queryValues url.Values
Expand All @@ -45,81 +46,69 @@ func NewVaultReadQuery(s string) (*VaultReadQuery, error) {

return &VaultReadQuery{
stopCh: make(chan struct{}, 1),
sleepCh: make(chan time.Time, 1),
rawPath: secretURL.Path,
queryValues: secretURL.Query(),
}, nil
}

// Fetch queries the Vault API
func (d *VaultReadQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
func (d *VaultReadQuery) Fetch(clients *ClientSet, opts *QueryOptions,
) (interface{}, *ResponseMetadata, error) {
select {
case <-d.stopCh:
return nil, nil, ErrStopped
default:
}
select {
case <-d.sleepCh:
default:
}

opts = opts.Merge(&QueryOptions{})
firstRun := d.secret == nil

if d.secret != nil {
if vaultSecretRenewable(d.secret) {
log.Printf("[TRACE] %s: starting renewer", d)

renewer, err := clients.Vault().NewRenewer(&api.RenewerInput{
Grace: opts.VaultGrace,
Secret: d.vaultSecret,
})
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}
go renewer.Renew()
defer renewer.Stop()

RENEW:
for {
select {
case err := <-renewer.DoneCh():
if err != nil {
log.Printf("[WARN] %s: failed to renew: %s", d, err)
}
log.Printf("[WARN] %s: renewer returned (maybe the lease expired)", d)
break RENEW
case renewal := <-renewer.RenewCh():
log.Printf("[TRACE] %s: successfully renewed", d)
printVaultWarnings(d, renewal.Secret.Warnings)
updateSecret(d.secret, renewal.Secret)
case <-d.stopCh:
return nil, nil, ErrStopped
}
}
} else {
// The secret isn't renewable, probably the generic secret backend.
dur := vaultRenewDuration(d.secret)
log.Printf("[TRACE] %s: secret is not renewable, sleeping for %s", d, dur)
select {
case <-time.After(dur):
// The lease is almost expired, it's time to request a new one.
case <-d.stopCh:
return nil, nil, ErrStopped
}
if !firstRun && vaultSecretRenewable(d.secret) {
err := renewSecret(clients, d)
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}
}

// We don't have a secret, or the prior renewal failed
vaultSecret, err := d.readSecret(clients, opts)
err := d.fetchSecret(clients, opts)
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}

// Print any warnings
printVaultWarnings(d, vaultSecret.Warnings)

// Create the cloned secret which will be exposed to the template.
d.vaultSecret = vaultSecret
d.secret = transformSecret(vaultSecret)
if !vaultSecretRenewable(d.secret) {
dur := leaseCheckWait(d.secret)
log.Printf("[TRACE] %s: non-renewable secret, set sleep for %s", d, dur)
d.sleepCh = time.After(dur)
}

return respWithMetadata(d.secret)
}

func (d *VaultReadQuery) fetchSecret(clients *ClientSet, opts *QueryOptions,
) error {
opts = opts.Merge(&QueryOptions{})
vaultSecret, err := d.readSecret(clients, opts)
if err == nil {
printVaultWarnings(d, vaultSecret.Warnings)
d.vaultSecret = vaultSecret
// the cloned secret which will be exposed to the template
d.secret = transformSecret(vaultSecret)
}
return err
}

func (d *VaultReadQuery) stopChan() chan struct{} {
return d.stopCh
}

func (d *VaultReadQuery) secrets() (*Secret, *api.Secret) {
return d.secret, d.vaultSecret
}

// CanShare returns if this dependency is shareable.
func (d *VaultReadQuery) CanShare() bool {
return false
Expand Down
11 changes: 9 additions & 2 deletions dependency/vault_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestNewVaultReadQuery(t *testing.T) {

if act != nil {
act.stopCh = nil
act.sleepCh = nil
}

assert.Equal(t, tc.exp, act)
Expand Down Expand Up @@ -170,7 +171,10 @@ func TestVaultReadQuery_Fetch_KVv1(t *testing.T) {
errCh <- err
return
}
dataCh <- data
select {
case dataCh <- data:
case <-d.stopCh:
}
}
}()

Expand Down Expand Up @@ -372,7 +376,10 @@ func TestVaultReadQuery_Fetch_KVv2(t *testing.T) {
errCh <- err
return
}
dataCh <- data
select {
case dataCh <- data:
case <-d.stopCh:
}
}
}()

Expand Down
45 changes: 11 additions & 34 deletions dependency/vault_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/hashicorp/vault/api"
"github.com/pkg/errors"
)

var (
Expand Down Expand Up @@ -44,59 +43,37 @@ func (d *VaultTokenQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interfa
default:
}

opts = opts.Merge(&QueryOptions{})

if vaultSecretRenewable(d.secret) {
log.Printf("[TRACE] %s: starting renewer", d)

renewer, err := clients.Vault().NewRenewer(&api.RenewerInput{
Grace: opts.VaultGrace,
Secret: d.vaultSecret,
})
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}
go renewer.Renew()
defer renewer.Stop()

RENEW:
for {
select {
case err := <-renewer.DoneCh():
if err != nil {
log.Printf("[WARN] %s: failed to renew: %s", d, err)
}
log.Printf("[WARN] %s: renewer returned (maybe the lease expired)", d)
break RENEW
case renewal := <-renewer.RenewCh():
log.Printf("[TRACE] %s: successfully renewed", d)
printVaultWarnings(d, renewal.Secret.Warnings)
updateSecret(d.secret, renewal.Secret)
case <-d.stopCh:
return nil, nil, ErrStopped
}
}
renewSecret(clients, d)
}

// The secret isn't renewable, probably the generic secret backend.
// TODO This is incorrect when given a non-renewable template. We should
// instead to a lookup self to determine the lease duration.
dur := vaultRenewDuration(d.secret)
opts = opts.Merge(&QueryOptions{})
dur := leaseCheckWait(d.secret)
if dur < opts.VaultGrace {
dur = opts.VaultGrace
}

log.Printf("[TRACE] %s: token is not renewable, sleeping for %s", d, dur)
select {
case <-time.After(dur):
// The lease is almost expired, it's time to request a new one.
case <-d.stopCh:
return nil, nil, ErrStopped
}

return nil, nil, ErrLeaseExpired
}

func (d *VaultTokenQuery) stopChan() chan struct{} {
return d.stopCh
}

func (d *VaultTokenQuery) secrets() (*Secret, *api.Secret) {
return d.secret, d.vaultSecret
}

// CanShare returns if this dependency is shareable.
func (d *VaultTokenQuery) CanShare() bool {
return false
Expand Down

0 comments on commit a96de95

Please sign in to comment.