Skip to content

Commit

Permalink
feat(client): drop retry/backoff functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmalkmus committed Apr 15, 2024
1 parent ea1622d commit 10c49d6
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 222 deletions.
61 changes: 6 additions & 55 deletions axiom/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"strings"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/klauspost/compress/gzhttp"
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
Expand Down Expand Up @@ -86,7 +85,6 @@ type Client struct {
httpClient *http.Client
userAgent string
noEnv bool
noRetry bool

strictDecoding bool

Expand Down Expand Up @@ -250,63 +248,16 @@ func (c *Client) NewRequest(ctx context.Context, method, path string, body any)
// JSON decoded or directly written to v, depending on v being an [io.Writer] or
// not.
func (c *Client) Do(req *http.Request, v any) (*Response, error) {
var (
resp *Response
err error
)
if req.GetBody != nil && !c.noRetry {
bck := backoff.NewExponentialBackOff()
bck.InitialInterval = time.Millisecond * 200
bck.MaxElapsedTime = time.Second * 10
bck.Multiplier = 2.0

err = backoff.Retry(func() error {
var httpResp *http.Response
//nolint:bodyclose // The response body is closed later down below.
httpResp, err = c.httpClient.Do(req)
switch {
case errors.Is(err, context.Canceled):
return backoff.Permanent(err)
case err != nil:
return err
}
resp = newResponse(httpResp)

// We should only retry in the case the status code is >= 500,
// anything below isn't worth retrying.
if code := resp.StatusCode; code >= 500 {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()

// Reset the requests body, so it can be re-read.
if req.Body, err = req.GetBody(); err != nil {
return backoff.Permanent(err)
}

return fmt.Errorf("got status code %d", code)
}

return nil
}, bck)
} else {
var httpResp *http.Response
//nolint:bodyclose // The response body is closed later down below.
if httpResp, err = c.httpClient.Do(req); err != nil {
return nil, err
}
resp = newResponse(httpResp)
httpResp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}

defer func() {
if resp != nil {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}
_, _ = io.Copy(io.Discard, httpResp.Body)
_ = httpResp.Body.Close()
}()

if err != nil {
return resp, err
}
resp := newResponse(httpResp)

span := trace.SpanFromContext(req.Context())
if span.IsRecording() {
Expand Down
9 changes: 0 additions & 9 deletions axiom/client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,6 @@ func SetNoEnv() Option {
}
}

// SetNoRetry prevents the [Client] from auto-retrying failed HTTP requests
// under certain circumstances.
func SetNoRetry() Option {
return func(c *Client) error {
c.noRetry = true
return nil
}
}

// SetNoTracing prevents the [Client] from acquiring a tracer from the global
// tracer provider, even if one is configured.
func SetNoTracing() Option {
Expand Down
78 changes: 0 additions & 78 deletions axiom/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"regexp"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -233,7 +231,6 @@ func TestNewClient_Valid(t *testing.T) {
assert.NotEmpty(t, client.userAgent)
assert.False(t, client.strictDecoding)
assert.True(t, client.noEnv) // Disabled for testing.
assert.False(t, client.noRetry)
}

func TestClient_Options_SetToken(t *testing.T) {
Expand Down Expand Up @@ -573,81 +570,6 @@ func TestClient_do_ValidOnlyAPITokenPaths(t *testing.T) {
}
}

func TestClient_do_Backoff(t *testing.T) {
payload := `{"foo":"bar"}`

var (
internalServerErrorCalled bool
badGatewayCalled bool
gatewayTimeoutCalled bool
)
hf := func(w http.ResponseWriter, r *http.Request) {
header := http.StatusOK
switch {
case !internalServerErrorCalled:
internalServerErrorCalled = true
header = http.StatusInternalServerError
case !badGatewayCalled:
badGatewayCalled = true
header = http.StatusBadGateway
case !gatewayTimeoutCalled:
gatewayTimeoutCalled = true
header = http.StatusGatewayTimeout
}

b, err := io.ReadAll(r.Body)
require.NoError(t, err)

assert.Equal(t, payload, string(b))

w.WriteHeader(header)
}

client := setup(t, "/", hf)

// Wrap with an io.TeeReader as http.NewRequest checks for some special
// readers it can read in full to optimize the request.
var r io.Reader = strings.NewReader(payload)
r = io.TeeReader(r, io.Discard)
req, err := client.NewRequest(context.Background(), http.MethodPost, "/", r)
require.NoError(t, err)

// Make sure the request body can be re-read.
getBodyCounter := 0
req.GetBody = func() (io.ReadCloser, error) {
getBodyCounter++
return io.NopCloser(strings.NewReader(payload)), nil
}

resp, err := client.Do(req, nil)
require.NoError(t, err)

assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.True(t, internalServerErrorCalled)
assert.True(t, badGatewayCalled)
assert.True(t, gatewayTimeoutCalled)
assert.Equal(t, 3, getBodyCounter)
}

func TestClient_do_Backoff_NoRetryOn400(t *testing.T) {
var currentCalls int
hf := func(w http.ResponseWriter, _ *http.Request) {
currentCalls++
w.WriteHeader(http.StatusBadRequest)
}

client := setup(t, "/", hf)

req, err := client.NewRequest(context.Background(), http.MethodGet, "/", nil)
require.NoError(t, err)

resp, err := client.Do(req, nil)
require.Error(t, err, "got status code 400")

assert.Equal(t, 1, currentCalls)
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
}

func TestAPITokenPathRegex(t *testing.T) {
tests := []struct {
input string
Expand Down
79 changes: 0 additions & 79 deletions axiom/datasets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,85 +564,6 @@ func TestDatasetsService_IngestEvents(t *testing.T) {
assert.Equal(t, exp, res)
}

// TestDatasetsService_IngestEvents_Retry tests the retry ingest functionality
// of the client. It also tests the event labels functionality by setting no
// labels.
func TestDatasetsService_IngestEvents_Retry(t *testing.T) {
exp := &ingest.Status{
Ingested: 2,
Failed: 0,
Failures: []*ingest.Failure{},
ProcessedBytes: 630,
BlocksCreated: 0,
WALLength: 2,
TraceID: "abc",
}

hasErrored := false
hf := func(w http.ResponseWriter, r *http.Request) {
if !hasErrored {
hasErrored = true
w.WriteHeader(http.StatusInternalServerError)
return
}

assert.Equal(t, http.MethodPost, r.Method)
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type"))
assert.Equal(t, "zstd", r.Header.Get("Content-Encoding"))
assert.Empty(t, r.Header.Get("X-Axiom-Event-Labels"))

zsr, err := zstd.NewReader(r.Body)
require.NoError(t, err)

events := assertValidJSON(t, zsr)
assert.Len(t, events, 2)
zsr.Close()

w.Header().Set("Content-Type", mediaTypeJSON)
w.Header().Set("X-Axiom-Trace-Id", "abc")
_, err = fmt.Fprint(w, `{
"ingested": 2,
"failed": 0,
"failures": [],
"processedBytes": 630,
"blocksCreated": 0,
"walLength": 2
}`)
assert.NoError(t, err)
}

client := setup(t, "/v1/datasets/test/ingest", hf)

events := []Event{
{
"time": "17/May/2015:08:05:32 +0000",
"remote_ip": "93.180.71.3",
"remote_user": "-",
"request": "GET /downloads/product_1 HTTP/1.1",
"response": 304,
"bytes": 0,
"referrer": "-",
"agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)",
},
{
"time": "17/May/2015:08:05:32 +0000",
"remote_ip": "93.180.71.3",
"remote_user": "-",
"request": "GET /downloads/product_1 HTTP/1.1",
"response": 304,
"bytes": 0,
"referrer": "-",
"agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)",
},
}

res, err := client.Datasets.IngestEvents(context.Background(), "test", events)
require.NoError(t, err)

assert.Equal(t, exp, res)
assert.True(t, hasErrored)
}

func TestDatasetsService_IngestChannel_Unbuffered(t *testing.T) {
exp := &ingest.Status{
Ingested: 2,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.21

require (
github.com/apex/log v1.9.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/golangci/golangci-lint v1.56.2
github.com/google/go-querystring v1.1.0
github.com/klauspost/compress v1.17.8
Expand Down Expand Up @@ -56,6 +55,7 @@ require (
github.com/butuzov/mirror v1.1.0 // indirect
github.com/catenacyber/perfsprint v0.6.0 // indirect
github.com/ccojocar/zxcvbn-go v1.0.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/charithe/durationcheck v0.0.10 // indirect
github.com/chavacava/garif v0.1.0 // indirect
Expand Down

0 comments on commit 10c49d6

Please sign in to comment.