Skip to content

Commit

Permalink
Exporter: Add retries for Search/ReadContext and Import operations wh…
Browse files Browse the repository at this point in the history
…en importing the resource

Under a very high load, Databricks backend may not answer on time, or return specific
errors, so it makes sense to retry operation few times.

This PR uses "naive" implementation, I need to play a bit more with `retries` package
before adopting it
  • Loading branch information
alexott committed Feb 1, 2024
1 parent d3acc7b commit b68f0d5
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 9 deletions.
44 changes: 44 additions & 0 deletions exporter/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,50 @@ func TestEmitNoSearchNoId(t *testing.T) {
close(ch)
}

func TestEmitNoSearchNoIdWithRetry(t *testing.T) {
ch := make(resourceChannel, 10)
state := newStateApproximation([]string{"a"})
i := 0
ic := &importContext{
importing: map[string]bool{},
Resources: map[string]*schema.Resource{
"a": {},
},
Importables: map[string]importable{
"a": {
Service: "e",
Search: func(ic *importContext, r *resource) error {
if i > 0 {
return nil
}
i = i + 1
return fmt.Errorf("context deadline exceeded (Client.Timeout exceeded while awaiting headers)")
},
},
},
waitGroup: &sync.WaitGroup{},
channels: map[string]resourceChannel{
"a": ch,
},
ignoredResources: map[string]struct{}{},
State: state,
}
ic.enableServices("e")
go func() {
for r := range ch {
r.ImportResource(ic)
}
}()
ic.Emit(&resource{
Resource: "a",
Attribute: "b",
Value: "d",
Name: "c",
})
ic.waitGroup.Wait()
close(ch)
}

func TestEmitNoSearchSucceedsImportFails(t *testing.T) {
ch := make(resourceChannel, 10)
state := newStateApproximation([]string{"a"})
Expand Down
53 changes: 44 additions & 9 deletions exporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/databricks/terraform-provider-databricks/common"

Expand Down Expand Up @@ -243,6 +244,15 @@ func (r *resource) ImportCommand(ic *importContext) string {
return fmt.Sprintf(`terraform import %s%s.%s "%s"`, m, r.Resource, r.Name, r.ID)
}

var (
maxRetries = 5
retryDelay = 2
)

func isRetryableError(err string, i int) bool {
return (strings.Contains(err, "context deadline exceeded") || strings.Contains(err, "Error handling request")) && i < (maxRetries-1)
}

func (r *resource) ImportResource(ic *importContext) {
defer ic.waitGroup.Done()
pr, ok := ic.Resources[r.Resource]
Expand All @@ -265,9 +275,17 @@ func (r *resource) ImportResource(ic *importContext) {
log.Printf("[ERROR] Searching %s is not available", r)
return
}
if err := ir.Search(ic, r); err != nil {
log.Printf("[ERROR] Cannot search for a resource %s: %v", err, r)
return
for i := 0; i < maxRetries; i++ {
err := ir.Search(ic, r)
if err == nil {
break
}
if !isRetryableError(err.Error(), i) {
log.Printf("[ERROR] Cannot search for a resource of %s: %s", r, err)
return
}
log.Printf("[INFO] next retry (%d) for searching of %v", (i + 1), r)
time.Sleep(time.Duration(retryDelay) * time.Second)
}
if r.ID == "" {
log.Printf("[WARN] Cannot find %s", r)
Expand All @@ -288,19 +306,36 @@ func (r *resource) ImportResource(ic *importContext) {
if apiVersion != "" {
ctx = context.WithValue(ctx, common.Api, apiVersion)
}
if dia := pr.ReadContext(ctx, r.Data, ic.Client); dia != nil {
log.Printf("[ERROR] Error reading %s#%s: %v", r.Resource, r.ID, dia)
return
// TODO: rewrite to retries package...
for i := 0; i < maxRetries; i++ {
dia := pr.ReadContext(ctx, r.Data, ic.Client)
if dia == nil {
break
}
if !isRetryableError(fmt.Sprintf("%v", dia), i) {
log.Printf("[ERROR] Error reading %s#%s after %d retries: %v", r.Resource, r.ID, i, dia)
return
}
log.Printf("[INFO] next retry (%d) for reading of %s#%s", (i + 1), r.Resource, r.ID)
time.Sleep(time.Duration(retryDelay) * time.Second)
}
if r.Data.Id() == "" {
r.Data.SetId(r.ID)
}
}
r.Name = ic.ResourceName(r)
if ir.Import != nil {
if err := ir.Import(ic, r); err != nil {
log.Printf("[ERROR] Failed custom import of %s: %s", r, err)
return
for i := 0; i < maxRetries; i++ {
err := ir.Import(ic, r)
if err == nil {
break
}
if !isRetryableError(err.Error(), i) {
log.Printf("[ERROR] Failed custom import of %s: %s", r, err)
return
}
log.Printf("[INFO] next retry (%d) for importing of %s#%s", (i + 1), r.Resource, r.ID)
time.Sleep(time.Duration(retryDelay) * time.Second)
}
}
ic.Add(r)
Expand Down

0 comments on commit b68f0d5

Please sign in to comment.