Skip to content

Commit

Permalink
Merge #110085
Browse files Browse the repository at this point in the history
110085: streamingccl: correctly resume stream from partial initial scan r=msbutler a=stevendanna

When starting a subscription, we pass an initial timestamp and a previous replicated timestamp. If the previous replicated timestamp is zero, the produce assumes we've completed the initial scan and streams only incremental changes from the given replicated timestamp.

When resuming a restarted job, we use a persisted frontier checkpoint to find the minimum timestamp replicated time for a set of spans and use that rather than the overall replicated time to attempt to reduce the amount of duplicated work.

Since data is split across many ranges, some portions of our keyspace may complete their initial scan before others. As a result, the persisted frontier checkpoint during the initial scan may have some spans at zero-valued timestamps and others at non-zero timestamps.

Unfortunately, the code to find the minimum timestamp incorrectly handled portions of the frontier that had zero-valued timestamps, returning the lowest non-zero value rather than zero.

As a result, if we resumed during an initial scan but after some portions of the key space have recorded progress, we would use a non-zero previous replicated timestamp to resume a stream even if some of the spans still required their initial scans to complete.

He we fix the bug so that the minimum timestamp for a set of spans is zero if any of the given spans is currently at zero.

Fixes #109957

Release note: None

Epic: none

Co-authored-by: Steven Danna <danna@cockroachlabs.com>
  • Loading branch information
craig[bot] and stevendanna committed Sep 6, 2023
2 parents ad4e53f + 0983c34 commit ec68c25
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/span",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
Expand Down
24 changes: 17 additions & 7 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,16 +422,16 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
sip.streamPartitionClients = append(sip.streamPartitionClients, streamClient)
}

previousReplicatedTimetamp := frontierForSpans(sip.frontier, partitionSpec.Spans...)
previousReplicatedTimestamp := frontierForSpans(sip.frontier, partitionSpec.Spans...)

if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil && streamingKnobs.BeforeClientSubscribe != nil {
streamingKnobs.BeforeClientSubscribe(addr, string(token), previousReplicatedTimetamp)
streamingKnobs.BeforeClientSubscribe(addr, string(token), previousReplicatedTimestamp)
}
}

sub, err := streamClient.Subscribe(ctx, streampb.StreamID(sip.spec.StreamID), token,
sip.spec.InitialScanTimestamp, previousReplicatedTimetamp)
sip.spec.InitialScanTimestamp, previousReplicatedTimestamp)

if err != nil {
sip.MoveToDraining(errors.Wrapf(err, "consuming partition %v", addr))
Expand Down Expand Up @@ -1229,19 +1229,29 @@ func (c *cutoverFromJobProgress) cutoverReached(ctx context.Context) (bool, erro
return false, nil
}

// frontierForSpan returns the lowest timestamp in the frontier within the given
// subspans. If the subspans are entirely outside the Frontier's tracked span
// an empty timestamp is returned.
// frontierForSpan returns the lowest timestamp in the frontier within
// the given subspans. If the subspans are entirely outside the
// Frontier's tracked span an empty timestamp is returned.
func frontierForSpans(f *span.Frontier, spans ...roachpb.Span) hlc.Timestamp {
minTimestamp := hlc.Timestamp{}
var (
minTimestamp hlc.Timestamp
sawEmptyTS bool
)

for _, spanToCheck := range spans {
f.SpanEntries(spanToCheck, func(frontierSpan roachpb.Span, ts hlc.Timestamp) span.OpResult {
if ts.IsEmpty() {
sawEmptyTS = true
}
if minTimestamp.IsEmpty() || ts.Less(minTimestamp) {
minTimestamp = ts
}
return span.ContinueMatch
})
}
if sawEmptyTS {
return hlc.Timestamp{}
}
return minTimestamp
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -502,6 +503,47 @@ func TestStreamIngestionProcessor(t *testing.T) {
})
}

func TestFrontierForSpans(t *testing.T) {
defer leaktest.AfterTest(t)()

var (
spanAB = roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}
spanCD = roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}
spanEF = roachpb.Span{Key: roachpb.Key("e"), EndKey: roachpb.Key("f")}
spanXZ = roachpb.Span{Key: roachpb.Key("x"), EndKey: roachpb.Key("z")}
)

t.Run("returns the lowest timestamp for the matched spans", func(t *testing.T) {
f, err := span.MakeFrontier(spanAB, spanCD, spanEF)
require.NoError(t, err)
_, err = f.Forward(spanAB, hlc.Timestamp{WallTime: 1})
require.NoError(t, err)
_, err = f.Forward(spanCD, hlc.Timestamp{WallTime: 2})
require.NoError(t, err)
_, err = f.Forward(spanEF, hlc.Timestamp{WallTime: 3})
require.NoError(t, err)
require.Equal(t, hlc.Timestamp{WallTime: 1}, frontierForSpans(f, spanAB, spanCD, spanEF))
require.Equal(t, hlc.Timestamp{WallTime: 2}, frontierForSpans(f, spanCD, spanEF))
require.Equal(t, hlc.Timestamp{WallTime: 3}, frontierForSpans(f, spanEF))
require.Equal(t, hlc.Timestamp{WallTime: 1}, frontierForSpans(f, spanAB, spanEF))
})
t.Run("returns zero if none of the spans overlap", func(t *testing.T) {
f, err := span.MakeFrontierAt(hlc.Timestamp{WallTime: 1}, spanAB, spanCD, spanEF)
require.NoError(t, err)
require.Equal(t, hlc.Timestamp{}, frontierForSpans(f, spanXZ))
})
t.Run("returns zero if one of the spans is zero", func(t *testing.T) {
f, err := span.MakeFrontier(spanAB, spanCD, spanEF)
require.NoError(t, err)
_, err = f.Forward(spanAB, hlc.Timestamp{WallTime: 1})
require.NoError(t, err)
// spanCD should still be at zero
_, err = f.Forward(spanEF, hlc.Timestamp{WallTime: 3})
require.NoError(t, err)
require.Equal(t, hlc.Timestamp{}, frontierForSpans(f, spanAB, spanCD, spanEF))
})
}

// getPartitionSpanToTableID maps a partiton's span to the tableID it covers in
// the source keyspace. It assumes the source used a random_stream_client, which generates keys for
// a single table span per partition.
Expand Down

0 comments on commit ec68c25

Please sign in to comment.