From f2a1a72c21954bc51e93af9e8fc71ae66ea7e580 Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Fri, 16 Feb 2024 23:49:40 +0000 Subject: [PATCH] changefeedccl: repopulate request body for webhook sink retries The webhook sink reuses an http request for retries. However, the http library consumes the request body, so retries may result in an error due to a non-zero content length but zero length body. This change re-initializes the http request body on every request so that it will have the right contents on every retry. Epic: none Fixes: #118485 Release note (bug fix): Fixes a bug in the webhook sink where the http request body may not be initialized on retries, resulting in the error "http: ContentLength=... with Body length 0". --- pkg/ccl/changefeedccl/sink_webhook_test.go | 46 ++++++++++++++++++++++ pkg/ccl/changefeedccl/sink_webhook_v2.go | 5 +++ 2 files changed, 51 insertions(+) diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index f0f20ba3a37e..5e8829d58f48 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -719,3 +719,49 @@ func TestWebhookSinkRetryDuration(t *testing.T) { require.NoError(t, err) require.Equal(t, retryCfg.MaxBackoff, 30*time.Second) } + +// Regression test for #118485. +func TestWebhookSinkRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + skip.UnderShort(t) + + ctx := context.Background() + opts := getGenericWebhookSinkOptions(struct { + key string + value string + }{ + key: changefeedbase.OptWebhookSinkConfig, + // Note: The original issue repros most successfully with a 60s backoff, + // which is higher than we support and is higher than desirable for testing. + value: `{"Retry":{"Backoff": "500ms", "Max": "2"}}`}) + cert, certEncoded, err := cdctest.NewCACertBase64Encoded() + require.NoError(t, err) + + sinkDest, err := cdctest.StartMockWebhookSink(cert) + require.NoError(t, err) + // Return an error that retries in the sink client, then ok. + sinkDest.SetStatusCodes([]int{http.StatusTooManyRequests, http.StatusOK}) + + sinkDestHost, err := url.Parse(sinkDest.URL()) + require.NoError(t, err) + + params := sinkDestHost.Query() + params.Set(changefeedbase.SinkParamCACert, certEncoded) + sinkDestHost.RawQuery = params.Encode() + + details := jobspb.ChangefeedDetails{ + SinkURI: fmt.Sprintf("webhook-%s", sinkDestHost.String()), + Opts: opts.AsMap(), + } + + sinkSrc, err := setupWebhookSinkWithDetails(ctx, details, 1 /* parallelism */, timeutil.DefaultTimeSource{}) + require.NoError(t, err) + + require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrc.Flush(ctx)) + + require.Equal(t, "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}],\"length\":1}", sinkDest.Pop()) + + sinkDest.Close() + require.NoError(t, sinkSrc.Close()) +} diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go index b7fff6ddeced..b577ea21a2e9 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_v2.go +++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go @@ -212,6 +212,11 @@ func (sc *webhookSinkClient) FlushResolvedPayload( // Flush implements the SinkClient interface func (sc *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error { req := batch.(*http.Request) + b, err := req.GetBody() + if err != nil { + return err + } + req.Body = b res, err := sc.client.Do(req) if err != nil { return err