Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

c2c: add new streaming client and value generator for span config replication #108356

Merged
merged 3 commits into from
Aug 18, 2023

Conversation

msbutler
Copy link
Collaborator

@msbutler msbutler commented Aug 8, 2023

This patch adds a new implementation of the streamclient.Client interface which
replicates span config updates for the replicating app tenant. With this
change, a call to spanConfigClient.SetupSpanConfigsStream() will return a new
spanConfigSubscription which the client can use to listen for span config
updates.

This approach adds a new client implementation, as opposed to adding to the
partitionedStreamClient implementation for a few reasons. First, a span config
client doesn't manage multiple partitioned subscriptions, rather it opens a
single subcription over the span configuration table. Second, a client cannot
call Create, Plan, Hearbeat, and Complete on a span config stream, which the
seperate interface implementation makes quite clear. Third, the seperate
interface aligns with the design goal of seperating the logic around streaming
app tenant data and app tenant metadata.

Internally, this subscription spins up the new spanConfigEventStream value
generator which listens to updates on the system span_configuration table using
a rangefeedcache. Unlike the original eventStream which manages a rangefeed,
the spanConfigEventStream uses a rangefeedcache wrapper as it provides nicer
ordering guarantees on when it emits updates. Specifically, the rangefeed cache
buffers updates emitted from the underlying rangefeed and only flushes updates
less than t1 once the whole span has been checkpointed to t1. In other words,
the rangefeed cache gaurantees that once it flushes updates less than t1, it
will not flush any more new updates less than t1. The rangefeed cache also
flushes updates in sorted timestamp order.

These properties will simplify the ingestion of these span config updates
signficantly. To understand why, consider that span config updates must be
ingested in the same order that they were applied. Since the rangefeed cache
will emit all new updates in order, the ingestion side doesn't need to buffer
updates or keep track of checkpoints.

The rangefeed cache may still emit duplicates of events at any time, thus
extra logic was added to the streamEventBatcher to elide these duplicates.

This PR left a smattering of TODOs related to API cleanup and perf improvements
which will get resolved once the whole span config replication stream is hooked
up.

Informs #106823

Release note: None

@msbutler msbutler self-assigned this Aug 8, 2023
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@msbutler msbutler changed the title Butler c2c spcfg decode c2c: add new streaming client and value generator for span config replication Aug 8, 2023
@msbutler msbutler marked this pull request as ready for review August 8, 2023 15:41
@msbutler msbutler requested review from a team as code owners August 8, 2023 15:41
@msbutler msbutler requested review from rhu713, yuzefovich and stevendanna and removed request for a team, rhu713 and yuzefovich August 8, 2023 15:41
}, nil
}

func (m *spanConfigStreamClient) Create(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevendanna If you like this approach of creating a new client implementation, I'll add some boilerplate testing here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a few words about what you see as the advantage of this? As opposed to allowing the user to call SetupSpanConfigsStream on the partition_stream_client?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just added a paragraph to the commit message

Copy link
Collaborator Author

@msbutler msbutler Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely open to discussing this though! After thinking about this a bit more, I'm on the fence of creating a new client implementation. I'm really creating a new kind of subscription, that could still be apart of the partition_stream_client. It just feels a bit odd that one subscription requires the Create,Plan,Subscribe,Complete flow, and the other does not.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for writing that. I think that it maybe points to an alternative. Which is rather than making this an implementation of the Client interface. We also make a new interface to describe that it is a completely different kind of client.

I won't block this PR on that, but I do think we should work on the shape of this a bit more as we go.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave a TODO in the code here on that.

Copy link
Collaborator

@stevendanna stevendanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran out of time reviewing this at the moment, but I gave it an initial look and left a couple of higher level questions.

pkg/ccl/streamingccl/event.go Outdated Show resolved Hide resolved

batch.SpanConfigs = append(batch.SpanConfigs, streamedSpanCfgEntry)
}
if pacer.shouldCheckpoint(update.Timestamp, true) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mildly wonder if we need this pacing. Given that the rangefeedcache is already only pushing us updates when the frontier advances.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the pacer could also help the span config stream stay ahead of the regular stream too. I don't really see any risk in removing the pacer--do you? I'll add some logic to avoid flushing when the batch is empty.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main risk is that in general we want to read from our rangefeed quickly so if we are sending items 1 by 1, we increase our chances of getting blocked. One thing we could do to mitigate this is if you take a look at the PR I have open for async sending, we could pull some of that code into here. I'm OK with changes to this being a follow-up.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a todo for follow up.

}, nil
}

func (m *spanConfigStreamClient) Create(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a few words about what you see as the advantage of this? As opposed to allowing the user to call SetupSpanConfigsStream on the partition_stream_client?

Comment on lines +42 to +47
rfc *rangefeedcache.Watcher
streamGroup ctxgroup.Group // Context group controlling stream execution.
doneChan chan struct{} // Channel signaled to close the stream loop.
updateCh chan rangefeedcache.Update
errCh chan error // Signaled when error occurs in rangefeed.
streamCh chan tree.Datums // Channel signaled to forward datums to consumer.
sp *tracing.Span // Span representing the lifetime of the eventStream.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not going to ask you to do it here necessarily, but now that you've done the "new value generator" implementation" it could be worth spending a few minutes to see if there is a cheap way to unify them.

Namely, looking at the new implementation, I think you could probably have some generic implementation that you had so pass something that implemented Run(ctx, doneChan, streamCh). Anyway, not essential, but just a thought.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd like to refactor this once I have everything hooked up, including the ingestion side.

@msbutler msbutler force-pushed the butler-c2c-spcfg-decode branch 2 times, most recently from 8257386 to 9dd9337 Compare August 8, 2023 20:49
updates: []spanconfig.Record{makeRecord(t2Span, 2), makeRecord(t3Span, 5)},
},
{
// Merge these Records
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arulajmani in this test, I apply a few span config updates for dummy tables via the kvAccessor on a dummy span config table and check that they get picked up by the c2c machinery (which you don't need to worry about).

In this test case, I merge 3 span config records for t1, t2, and t3. One surprising thing I noticed is that the a delete of the spanconfig record for t1 no-ops in the kvaccessor, and consequently never replicates. I think this occurs because the update on the t123 span updates that record instead. Is this behavior surprising?

I have a similar question for the Split test case below: when I split the record up, a delete on the merged record noops as well. Is this surprising?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One surprising thing I noticed is that the a delete of the spanconfig record for t1 no-ops in the kvaccessor, and consequently never replicates.

I'm probably misunderstanding what you're saying, but where is the delete for t1? Isn't there just a delete for t2 and t3?

deletes: []spanconfig.Target{spanconfig.MakeTargetFromSpan(t2Span), spanconfig.MakeTargetFromSpan(t3Span)},

Are you saying the same happens for something like as well?:

deletes: []spanconfig.Target{spanconfig.MakeTargetFromSpan(t1Span), spanconfig.MakeTargetFromSpan(t2Span), spanconfig.MakeTargetFromSpan(t3Span)},

If so, I think you're right that it's because t123 span updates that record. Note that the primary key of system.span_configurations is just the start key -- so I think what you're seeing is a deletion followed by a write. Given this is all happening in a single transaction, maybe the rangefeed considers all this to be an update for t1Span.StartKey?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, sorry i didn't explain that clearly. youre guess is correct. These two inputs produce the same output, according to the rangefeed:

// w/o t1
deletes: []spanconfig.Target{spanconfig.MakeTargetFromSpan(t2Span), spanconfig.MakeTargetFromSpan(t3Span)},

// w t1
deletes: []spanconfig.Target{spanconfig.MakeTargetFromSpan(t1Span), spanconfig.MakeTargetFromSpan(t2Span), spanconfig.MakeTargetFromSpan(t3Span)},

So I think what you're seeing is a deletion followed by a write. Given this is all happening in a single transaction, maybe the rangefeed considers all this to be an update for t1Span.StartKey?
Interesting...I really hope the extra delete at t1 does not get persisted to the raft log or pebble. If it did, the delete would have the same mvcc timestamp as the t123span update, which would be very sad for readers.

I imagine there's some logic above raft which, for a given transaction, no-ops all the intermediate operations for a given key. I.e. if within a txn:

1: PUT 2 @ key=foo
2: PUT 3 @ key==foo

We get rid of that first operation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine there's some logic above raft which, for a given transaction, no-ops all the intermediate operations for a given key. I.e. if within a txn:

Intermediate operations are only visible to the transaction itself. They're never visible to any other concurrent actor in the system. Intermediate values are stored inside of the intent, whereas the most recent value is stored in the MVCC keyspace. So when the intent is resolved, any information about intermediate operations leaves with it. I think that's what you're seeing here.

Copy link
Collaborator

@arulajmani arulajmani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reviewable? 😢

updates: []spanconfig.Record{makeRecord(t2Span, 2), makeRecord(t3Span, 5)},
},
{
// Merge these Records
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One surprising thing I noticed is that the a delete of the spanconfig record for t1 no-ops in the kvaccessor, and consequently never replicates.

I'm probably misunderstanding what you're saying, but where is the delete for t1? Isn't there just a delete for t2 and t3?

deletes: []spanconfig.Target{spanconfig.MakeTargetFromSpan(t2Span), spanconfig.MakeTargetFromSpan(t3Span)},

Are you saying the same happens for something like as well?:

deletes: []spanconfig.Target{spanconfig.MakeTargetFromSpan(t1Span), spanconfig.MakeTargetFromSpan(t2Span), spanconfig.MakeTargetFromSpan(t3Span)},

If so, I think you're right that it's because t123 span updates that record. Note that the primary key of system.span_configurations is just the start key -- so I think what you're seeing is a deletion followed by a write. Given this is all happening in a single transaction, maybe the rangefeed considers all this to be an update for t1Span.StartKey?

Copy link
Collaborator

@stevendanna stevendanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, for some reason I had a pending review here that hadn't been submitted.

}, nil
}

func (m *spanConfigStreamClient) Create(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for writing that. I think that it maybe points to an alternative. Which is rather than making this an implementation of the Client interface. We also make a new interface to describe that it is a completely different kind of client.

I won't block this PR on that, but I do think we should work on the shape of this a bit more as we go.


batch.SpanConfigs = append(batch.SpanConfigs, streamedSpanCfgEntry)
}
if pacer.shouldCheckpoint(update.Timestamp, true) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main risk is that in general we want to read from our rangefeed quickly so if we are sending items 1 by 1, we increase our chances of getting blocked. One thing we could do to mitigate this is if you take a look at the PR I have open for async sending, we could pull some of that code into here. I'm OK with changes to this being a follow-up.

record roachpb.SpanConfigEntry, ts int64,
) bool {
stringedUpdate := record.String() + fmt.Sprintf(" ts:%d", ts)
// TODO (msbutler): given that a batch can have repeat records, understand how to properly write those on the ingestion side.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right that for the first cut our thought was that we can apply items in the batch one at a time and in that sense they should be fine provided we do it in order.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a given batch, we have to apply all items with the same timestamp at the same time. For example, on ingestion side, all 3 operations here need to be committed at the same time

deletes: []spanconfig.Target{spanconfig.MakeTargetFromSpan(t2Span), spanconfig.MakeTargetFromSpan(t3Span)},
updates: []spanconfig.Record{makeRecord(t123Span, 10)},

Here's why duplicates pose a (solvable) problem: If the rangefeed cache surfaces two identical updates for a given key-timestamp, we have to remove the duplicates before ingesting them, because the kvAccessor.Update() will fail on ingestion side.

To check the kvAccessor behavior, I added the following test case to this unit test:

{
	// Do duplicate update noops the kvAccessor?
	updates: []spanconfig.Record{makeRecord(t1Span, 4), makeRecord(t1Span, 4)},
},

which causes the test to fail because:

	Error:      	Received unexpected error:
        	            	overlapping spans /Tenant/10/Table/100{1-2} and /Tenant/10/Table/100{1-2} in same list
        	            	(1) assertion failure
        	            	Wraps: (2) attached stack trace
        	            	  -- stack trace:
        	            	  | github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor.validateUpdateArgs
        	            	  | 	github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go:587
        	            	  | github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor.(*KVAccessor).updateSpanConfigRecordsWithTxn
        	            	  | 	github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go:302
        	            	  | github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor.(*KVAccessor).UpdateSpanConfigRecords.func1
        	            	  | 	github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go:178

To address, I've added some additional logic via the streamEventBatcher to elide all duplicate updates. Lemme know what you think!

@msbutler msbutler requested a review from a team as a code owner August 16, 2023 20:57
@msbutler msbutler requested review from msirek and removed request for a team August 16, 2023 20:57
Copy link
Collaborator Author

@msbutler msbutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arulajmani no reviewable in DR land 😈

}, nil
}

func (m *spanConfigStreamClient) Create(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave a TODO in the code here on that.

updates: []spanconfig.Record{makeRecord(t2Span, 2), makeRecord(t3Span, 5)},
},
{
// Merge these Records
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, sorry i didn't explain that clearly. youre guess is correct. These two inputs produce the same output, according to the rangefeed:

// w/o t1
deletes: []spanconfig.Target{spanconfig.MakeTargetFromSpan(t2Span), spanconfig.MakeTargetFromSpan(t3Span)},

// w t1
deletes: []spanconfig.Target{spanconfig.MakeTargetFromSpan(t1Span), spanconfig.MakeTargetFromSpan(t2Span), spanconfig.MakeTargetFromSpan(t3Span)},

So I think what you're seeing is a deletion followed by a write. Given this is all happening in a single transaction, maybe the rangefeed considers all this to be an update for t1Span.StartKey?
Interesting...I really hope the extra delete at t1 does not get persisted to the raft log or pebble. If it did, the delete would have the same mvcc timestamp as the t123span update, which would be very sad for readers.

I imagine there's some logic above raft which, for a given transaction, no-ops all the intermediate operations for a given key. I.e. if within a txn:

1: PUT 2 @ key=foo
2: PUT 3 @ key==foo

We get rid of that first operation.

record roachpb.SpanConfigEntry, ts int64,
) bool {
stringedUpdate := record.String() + fmt.Sprintf(" ts:%d", ts)
// TODO (msbutler): given that a batch can have repeat records, understand how to properly write those on the ingestion side.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a given batch, we have to apply all items with the same timestamp at the same time. For example, on ingestion side, all 3 operations here need to be committed at the same time

deletes: []spanconfig.Target{spanconfig.MakeTargetFromSpan(t2Span), spanconfig.MakeTargetFromSpan(t3Span)},
updates: []spanconfig.Record{makeRecord(t123Span, 10)},

Here's why duplicates pose a (solvable) problem: If the rangefeed cache surfaces two identical updates for a given key-timestamp, we have to remove the duplicates before ingesting them, because the kvAccessor.Update() will fail on ingestion side.

To check the kvAccessor behavior, I added the following test case to this unit test:

{
	// Do duplicate update noops the kvAccessor?
	updates: []spanconfig.Record{makeRecord(t1Span, 4), makeRecord(t1Span, 4)},
},

which causes the test to fail because:

	Error:      	Received unexpected error:
        	            	overlapping spans /Tenant/10/Table/100{1-2} and /Tenant/10/Table/100{1-2} in same list
        	            	(1) assertion failure
        	            	Wraps: (2) attached stack trace
        	            	  -- stack trace:
        	            	  | github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor.validateUpdateArgs
        	            	  | 	github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go:587
        	            	  | github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor.(*KVAccessor).updateSpanConfigRecordsWithTxn
        	            	  | 	github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go:302
        	            	  | github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor.(*KVAccessor).UpdateSpanConfigRecords.func1
        	            	  | 	github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go:178

To address, I've added some additional logic via the streamEventBatcher to elide all duplicate updates. Lemme know what you think!


batch.SpanConfigs = append(batch.SpanConfigs, streamedSpanCfgEntry)
}
if pacer.shouldCheckpoint(update.Timestamp, true) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a todo for follow up.

@msbutler
Copy link
Collaborator Author

just pacifying the linters :)

Copy link
Collaborator

@stevendanna stevendanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for moving this forward.

I like the tests you added. In terms of integration tests it would be nice to eventually have a couple of tests that run SQL commands that we know will generate span configuration and which ensures we then see those span configurations over the stream.

I think we can further clean up many of the interfaces as we discussed in the comments, but let's do that as we iterate on getting this wired up end-to-end.

@@ -67,6 +68,8 @@ message StreamPartitionSpec {

// Controls batch size in bytes.
int64 batch_byte_size = 3;

roachpb.TenantID span_config_for_tenant = 4 [(gogoproto.nullable) = false];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[followup-consideration] It could make sense to make a new protobuf message for use in the span stream.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a todo

}
currentTimestamp = event.Timestamp
}
stringedEvent := event.String()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[potential-followup] The processing of these events seems expensive. We iterate through all the updates to create StreamedSpanConfigEntries and then iterate through the StreamedSpanConfigEntries in this loop and marshal the protobuf into text. I kinda wonder if we could do some of this as we go in the original loop and whether there is a smaller unique key we can use.

But, I'm OK with it for now as we try to get things wired up.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a todo.

Publicizing the SpanConfigDecoder and the BufferEvent will make it easy for the
producer side of C2C to decode span config updates read from its rangefeed.

Informs cockroachdb#106823

Release note: None

publicize buffer
Copy link
Collaborator Author

@msbutler msbutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @stevendanna I'll go ahead and bors this once CI is green. Opening a new issue to track all the code cleanups.

}
currentTimestamp = event.Timestamp
}
stringedEvent := event.String()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a todo.

@@ -67,6 +68,8 @@ message StreamPartitionSpec {

// Controls batch size in bytes.
int64 batch_byte_size = 3;

roachpb.TenantID span_config_for_tenant = 4 [(gogoproto.nullable) = false];
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a todo

@msbutler msbutler removed the request for review from msirek August 18, 2023 15:12
@msbutler msbutler force-pushed the butler-c2c-spcfg-decode branch 3 times, most recently from 4a319fb to 3bfcffd Compare August 18, 2023 19:37
…lication

This patch adds a new implementation of the streamclient.Client interface which
replicates span config updates for the replicating app tenant. With this
change, a call to spanConfigClient.SetupSpanConfigsStream() will return a new
spanConfigSubscription which the client can use to listen for span config
updates.

This approach adds a new client implementation, as opposed to adding to the
partitionedStreamClient implementation for a few reasons. First, a span config
client doesn't manage multiple partitioned subscriptions, rather it opens a
single subcription over the span configuration table. Second, a client cannot
call Create, Plan, Hearbeat, and Complete on a span config stream, which the
seperate interface implementation makes quite clear. Third, the seperate
interface aligns with the design goal of seperating the logic around streaming
app tenant data and app tenant _metadata_.

Internally, this subscription spins up the new spanConfigEventStream value
generator which listens to updates on the system span_configuration table using
a rangefeedcache. Unlike the original eventStream which manages a rangefeed,
the spanConfigEventStream uses a rangefeedcache wrapper as it provides nicer
ordering guarantees on when it emits updates. Specifically, the rangefeed cache
buffers updates emitted from the underlying rangefeed and only flushes updates
less than t1 once the frontier advances to t1 (i.e. when whole span has been
checkpointed to t1). In other words, the rangefeed cache gaurantees that once
it flushes updates less than t1, it will not flush any more _new_ updates less
than t1. The rangefeed cache also flushes updates in sorted timestamp order.

These properties will simplify the ingestion of these span config updates
signficantly. To understand why, consider that span config updates _must_ be
ingested in the same order that they were applied. Since the rangefeed cache
will emit all new updates in order, the ingestion side doesn't need to buffer
updates or keep track of checkpoints.

The rangefeed cache may still emit _duplicates_ of events at any time, thus
extra logic was added to the streamEventBatcher to elide these duplicates.

This PR left a smattering of TODOs related to API cleanup and perf improvements
which will get resolved once the whole span config replication stream is hooked
up.

Informs cockroachdb#106823

Release note: None
@msbutler
Copy link
Collaborator Author

TFTR!

bors r=stevendanna

@craig
Copy link
Contributor

craig bot commented Aug 18, 2023

Build succeeded:

@craig craig bot merged commit a190dc4 into cockroachdb:master Aug 18, 2023
7 checks passed
@msbutler msbutler deleted the butler-c2c-spcfg-decode branch August 21, 2023 13:50
msbutler added a commit to msbutler/cockroach that referenced this pull request Aug 25, 2023
This patch spawns a new goroutine on the stream ingestion job coordinator which
in turn creates a special span config client and subscription for replicating
span config updates. As updates come in, the coordinator will buffer updates
and deletes with the same source side commit timestamp and write them in a
transaction to the destination side system span configuration table when it
observes a new update with a newer timestamp.

The ingestion side assumes each replicated update is unique and in timestamp
order, which is enforced by the producer side logic built in cockroachdb#108356. This
assumption simplifies the destination side ingestion logic which must write
updates with the same source side transaction commit timestamp at the
same new timestamp on the destination side. This invariant ensures a span
configuration's target (i.e. the span that a configuration applies to) never
overlaps with any other span configuration target. Else, c2c would break the
span config reconciliation system.

Note that cutover will not revert any ingested span config updates, as we
wouldn't not want to issue revert range requests on an online system table in
the system tenant. That being said, a future PR will need to teach the
destination side application tenant to conduct a full span reconcilation job
immediately after cutover, which will safely revert any span config updates
that committed after the cutover timestamp.

Informs cockroachdb#106823

Release note: None
msbutler added a commit to msbutler/cockroach that referenced this pull request Aug 28, 2023
This patch spawns a new goroutine on the stream ingestion job coordinator which
in turn creates a special span config client and subscription for replicating
span config updates. As updates come in, the coordinator will buffer updates
and deletes with the same source side commit timestamp and write them in a
transaction to the destination side system span configuration table when it
observes a new update with a newer timestamp.

The ingestion side assumes each replicated update is unique and in timestamp
order, which is enforced by the producer side logic built in cockroachdb#108356. This
assumption simplifies the destination side ingestion logic which must write
updates with the same source side transaction commit timestamp at the
same new timestamp on the destination side. This invariant ensures a span
configuration's target (i.e. the span that a configuration applies to) never
overlaps with any other span configuration target. Else, c2c would break the
span config reconciliation system.

Note that cutover will not revert any ingested span config updates, as we
wouldn't not want to issue revert range requests on an online system table in
the system tenant. That being said, a future PR will need to teach the
destination side application tenant to conduct a full span reconcilation job
immediately after cutover, which will safely revert any span config updates
that committed after the cutover timestamp.

Informs cockroachdb#106823

Release note: None
msbutler added a commit to msbutler/cockroach that referenced this pull request Aug 30, 2023
This patch spawns a new goroutine on the stream ingestion job coordinator which
in turn creates a special span config client and subscription for replicating
span config updates. As updates come in, the coordinator will buffer updates
and deletes with the same source side commit timestamp and write them in a
transaction to the destination side system span configuration table when it
observes a new update with a newer timestamp.

The ingestion side assumes each replicated update is unique and in timestamp
order, which is enforced by the producer side logic built in cockroachdb#108356. This
assumption simplifies the destination side ingestion logic which must write
updates with the same source side transaction commit timestamp at the
same new timestamp on the destination side. This invariant ensures a span
configuration's target (i.e. the span that a configuration applies to) never
overlaps with any other span configuration target. Else, c2c would break the
span config reconciliation system.

Note that cutover will not revert any ingested span config updates, as we
wouldn't not want to issue revert range requests on an online system table in
the system tenant. That being said, a future PR will need to teach the
destination side application tenant to conduct a full span reconcilation job
immediately after cutover, which will safely revert any span config updates
that committed after the cutover timestamp.

A future PR will also improve the ingestion logic during an initial scan on
Resume. Because the job does not maintain a protected timestamp on the span
config table, the job may miss updates if it resumes after a long pause period.
So, the ingestor will need to buffer all initial scan updates and flush them in
one transaction along with a tenant span wide delete, in order to maintain the
span config table invariants.

Informs cockroachdb#106823

Release note: None
craig bot pushed a commit that referenced this pull request Aug 30, 2023
107557: streamingest: replicate span configs relevant to replicating tenant r=stevendanna a=msbutler

This patch spawns a new goroutine on the stream ingestion job coordinator which
in turn creates a special span config client and subscription for replicating
span config updates. As updates come in, the coordinator will buffer updates
and deletes with the same source side commit timestamp and write them in a
transaction to the destination side system span configuration table when it
observes a new update with a newer timestamp.

The ingestion side assumes each replicated update is unique and in timestamp
order, which is enforced by the producer side logic built in #108356. This
assumption simplifies the destination side ingestion logic which must write
updates with the same source side transaction commit timestamp at the
same new timestamp on the destination side. This invariant ensures a span
configuration's target (i.e. the span that a configuration applies to) never
overlaps with any other span configuration target. Else, c2c would break the
span config reconciliation system.

Note that cutover will not revert any ingested span config updates, as we
wouldn't not want to issue revert range requests on an online system table in
the system tenant. That being said, a future PR will need to teach the
destination side application tenant to conduct a full span reconcilation job
immediately after cutover, which will safely revert any span config updates
that committed after the cutover timestamp.

A future PR will also improve the ingestion logic during an initial scan on
Resume. Because the job does not maintain a protected timestamp on the span
config table, the job may miss updates if it resumes after a long pause period.
So, the ingestor will need to buffer all initial scan updates and flush them in
one transaction along with a tenant span wide delete, in order to maintain the
span config table invariants.

Informs #106823

Release note: None

Co-authored-by: Michael Butler <butler@cockroachlabs.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants