Skip to content

Commit

Permalink
Merge pull request #119496 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.2-119326

release-23.2: changefeedccl: repopulate request body for webhook sink retries
  • Loading branch information
rharding6373 committed Feb 23, 2024
2 parents f72831b + f2a1a72 commit c816684
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
46 changes: 46 additions & 0 deletions pkg/ccl/changefeedccl/sink_webhook_test.go
Expand Up @@ -719,3 +719,49 @@ func TestWebhookSinkRetryDuration(t *testing.T) {
require.NoError(t, err)
require.Equal(t, retryCfg.MaxBackoff, 30*time.Second)
}

// Regression test for #118485.
func TestWebhookSinkRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.UnderShort(t)

ctx := context.Background()
opts := getGenericWebhookSinkOptions(struct {
key string
value string
}{
key: changefeedbase.OptWebhookSinkConfig,
// Note: The original issue repros most successfully with a 60s backoff,
// which is higher than we support and is higher than desirable for testing.
value: `{"Retry":{"Backoff": "500ms", "Max": "2"}}`})
cert, certEncoded, err := cdctest.NewCACertBase64Encoded()
require.NoError(t, err)

sinkDest, err := cdctest.StartMockWebhookSink(cert)
require.NoError(t, err)
// Return an error that retries in the sink client, then ok.
sinkDest.SetStatusCodes([]int{http.StatusTooManyRequests, http.StatusOK})

sinkDestHost, err := url.Parse(sinkDest.URL())
require.NoError(t, err)

params := sinkDestHost.Query()
params.Set(changefeedbase.SinkParamCACert, certEncoded)
sinkDestHost.RawQuery = params.Encode()

details := jobspb.ChangefeedDetails{
SinkURI: fmt.Sprintf("webhook-%s", sinkDestHost.String()),
Opts: opts.AsMap(),
}

sinkSrc, err := setupWebhookSinkWithDetails(ctx, details, 1 /* parallelism */, timeutil.DefaultTimeSource{})
require.NoError(t, err)

require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc))
require.NoError(t, sinkSrc.Flush(ctx))

require.Equal(t, "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}],\"length\":1}", sinkDest.Pop())

sinkDest.Close()
require.NoError(t, sinkSrc.Close())
}
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/sink_webhook_v2.go
Expand Up @@ -212,6 +212,11 @@ func (sc *webhookSinkClient) FlushResolvedPayload(
// Flush implements the SinkClient interface
func (sc *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error {
req := batch.(*http.Request)
b, err := req.GetBody()
if err != nil {
return err
}
req.Body = b
res, err := sc.client.Do(req)
if err != nil {
return err
Expand Down

0 comments on commit c816684

Please sign in to comment.