Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/httpreq/httpreq.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const (
// maxRetries is the maximum number of retry attempts
maxRetries = 5

// baseDelay is the initial delay used for retry backoff
baseDelay = 500 * time.Millisecond
// backOff is the initial delay used for retry backoff
backOff = 500 * time.Millisecond

// maxRetryAfter is the maximum delay allowed when honoring a Retry-After header
maxRetryAfter = 5 * time.Minute
Expand Down Expand Up @@ -128,7 +128,7 @@ func DoRequestWithRetries(f RequestFunc, insecureSkipVerify bool) ([]byte, *http
if err == nil || attempt == maxRetries || !ShouldRetry(err.Code()) {
return body, err
}
wait := GetNextBackoff(resp, baseDelay, attempt-1)
wait := GetNextBackoff(resp, backOff, attempt-1)
klog.Infof("Attempt %d failed with error: %v. Retrying in %s", attempt, err, wait.String())
time.Sleep(wait)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/httpreq/httpreq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func TestGetNextBackoff(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
wait := GetNextBackoff(tc.resp, baseDelay, tc.iter)
wait := GetNextBackoff(tc.resp, backOff, tc.iter)
correct := tc.check(wait)
require.True(t, correct)
})
Expand Down
16 changes: 9 additions & 7 deletions pkg/server/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@ import (
)

const (
maxRetries = 5
baseDelay = 2 * time.Second
maxRetries = 5
defaultBackOff = 2 * time.Second
)

type asyncController struct {
queue *TrailingDelayQueue
var backOff time.Duration

func init() {
backOff = defaultBackOff
}

func processRequest(item any) (any, *httperr.Error) {
return processRequestWithRetries(baseDelay, item.(*topology.Request), processTopologyRequest)
return processRequestWithRetries(item.(*topology.Request), processTopologyRequest)
}

func processRequestWithRetries(delay time.Duration, tr *topology.Request, f func(*topology.Request) ([]byte, *httperr.Error)) ([]byte, *httperr.Error) {
func processRequestWithRetries(tr *topology.Request, f func(*topology.Request) ([]byte, *httperr.Error)) ([]byte, *httperr.Error) {
attempt := 0
for {
var code int
Expand All @@ -63,7 +65,7 @@ func processRequestWithRetries(delay time.Duration, tr *topology.Request, f func
return ret, err
}

wait := httpreq.GetNextBackoff(nil, delay, attempt-1)
wait := httpreq.GetNextBackoff(nil, backOff, attempt-1)
klog.Infof("Attempt %d failed with error: %v. Retrying in %s", attempt, err, wait.String())
time.Sleep(wait)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func (r *retrier) callback(_ *topology.Request) ([]byte, *httperr.Error) {
}

func TestProcessRequestWithRetries(t *testing.T) {
backOff = time.Millisecond
defer func() { backOff = defaultBackOff }()

tr := &topology.Request{
Provider: topology.Provider{
Name: "test",
Expand Down Expand Up @@ -120,7 +123,7 @@ func TestProcessRequestWithRetries(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ret, err := processRequestWithRetries(time.Millisecond, tr, tc.retrier.callback)
ret, err := processRequestWithRetries(tr, tc.retrier.callback)
if len(tc.err) != 0 {
require.EqualError(t, err, tc.err)
require.Equal(t, tc.code, err.Code())
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type HttpServer struct {
async *asyncController
}

type asyncController struct {
queue *TrailingDelayQueue
}

var srv *HttpServer

func InitHttpServer(ctx context.Context, cfg *config.Config) {
Expand Down
22 changes: 12 additions & 10 deletions pkg/server/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ SwitchName=S3 Nodes=I[34-36]
)

func TestServerIntegration(t *testing.T) {
backOff = 100 * time.Millisecond
defer func() { backOff = defaultBackOff }()

port, err := test.GetAvailablePort()
require.NoError(t, err)
Expand Down Expand Up @@ -58,7 +60,7 @@ func TestServerIntegration(t *testing.T) {
}{
{
filename: "../../tests/integration/payload-error-500-after-retries.json",
timeout: 1 * time.Minute,
timeout: time.Minute,
},
{
filename: "../../tests/integration/payload-invalid-http-method.json",
Expand Down Expand Up @@ -88,7 +90,7 @@ func TestServerIntegration(t *testing.T) {
payload, err := io.ReadAll(payloadFile)
require.NoError(t, err)
if tc.timeout <= 0 {
tc.timeout = 10 * time.Second
tc.timeout = 5 * time.Second
}

testIntegration(t, baseURL, string(payload), tc.expected, tc.generateMethod, tc.timeout)
Expand Down Expand Up @@ -144,7 +146,7 @@ func testIntegration(t *testing.T, baseURL, payload, expected, generateMethod st

func topologyRequestWithRetries(url string, timeout time.Duration) (int, []byte, error) {

start, delay := time.Now(), 2*time.Second
start, delay := time.Now(), time.Second

var resp *http.Response
var code int
Expand All @@ -159,16 +161,16 @@ func topologyRequestWithRetries(url string, timeout time.Duration) (int, []byte,
}

code = resp.StatusCode
if resp.StatusCode == http.StatusAccepted || resp.StatusCode == http.StatusNotFound {
if code == http.StatusAccepted {
resp.Body.Close()
} else if resp.StatusCode == http.StatusOK {
continue
}

if code == http.StatusOK {
body, err = io.ReadAll(resp.Body)
resp.Body.Close()
break
} else {
resp.Body.Close()
break
}
resp.Body.Close()
break
}

return code, body, err
Expand Down
17 changes: 8 additions & 9 deletions pkg/server/trailing_delay_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (q *TrailingDelayQueue) Submit(item any) string {
q.lastTime = time.Now()
if len(q.uid) == 0 {
q.uid = uuid.New().String()
res := &Completion{
Status: http.StatusAccepted,
Message: fmt.Sprintf("request ID %s has not completed yet", q.uid),
}
q.store.Add(q.uid, res)
}

return q.uid
Expand All @@ -126,16 +131,10 @@ func (q *TrailingDelayQueue) Get(uid string) *Completion {
return res.(*Completion)
}

completion := &Completion{}
if uid == q.uid {
completion.Message = fmt.Sprintf("request ID %s has not completed yet", uid)
completion.Status = http.StatusAccepted
} else {
completion.Message = fmt.Sprintf("request ID %s not found", uid)
completion.Status = http.StatusNotFound
return &Completion{
Message: fmt.Sprintf("request ID %s not found", uid),
Status: http.StatusNotFound,
}

return completion
}

func (q *TrailingDelayQueue) Shutdown() {
Expand Down