Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingest: replicate span configs relevant to replicating tenant #107557

Merged
merged 2 commits into from
Aug 30, 2023

Conversation

msbutler
Copy link
Collaborator

@msbutler msbutler commented Jul 25, 2023

This patch spawns a new goroutine on the stream ingestion job coordinator which
in turn creates a special span config client and subscription for replicating
span config updates. As updates come in, the coordinator will buffer updates
and deletes with the same source side commit timestamp and write them in a
transaction to the destination side system span configuration table when it
observes a new update with a newer timestamp.

The ingestion side assumes each replicated update is unique and in timestamp
order, which is enforced by the producer side logic built in #108356. This
assumption simplifies the destination side ingestion logic which must write
updates with the same source side transaction commit timestamp at the
same new timestamp on the destination side. This invariant ensures a span
configuration's target (i.e. the span that a configuration applies to) never
overlaps with any other span configuration target. Else, c2c would break the
span config reconciliation system.

Note that cutover will not revert any ingested span config updates, as we
wouldn't not want to issue revert range requests on an online system table in
the system tenant. That being said, a future PR will need to teach the
destination side application tenant to conduct a full span reconcilation job
immediately after cutover, which will safely revert any span config updates
that committed after the cutover timestamp.

A future PR will also improve the ingestion logic during an initial scan on
Resume. Because the job does not maintain a protected timestamp on the span
config table, the job may miss updates if it resumes after a long pause period.
So, the ingestor will need to buffer all initial scan updates and flush them in
one transaction along with a tenant span wide delete, in order to maintain the
span config table invariants.

Informs #106823

Release note: None

@msbutler msbutler self-assigned this Jul 25, 2023
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@msbutler msbutler force-pushed the butler-c2c-dest-zone branch 28 times, most recently from f4bc91e to f4a5919 Compare July 25, 2023 21:32
@msbutler msbutler requested review from stevendanna and removed request for srosenberg, rhu713 and yuzefovich August 25, 2023 13:06
@msbutler
Copy link
Collaborator Author

@stevendanna this is ready for a first pass! You've already reviewed the first commit.

Copy link
Collaborator

@stevendanna stevendanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Reviewed just the second commit.

pkg/ccl/streamingccl/streamclient/client.go Show resolved Hide resolved

g := ctxgroup.WithContext(ctx)
g.GoCtx(sub.Subscribe)
g.GoCtx(sc.checkForCutover)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this checkForCutover? Could we instead have the goroutine running the distsql flow shut us down when it exits, in the same way it shuts down the replanner?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Refactored to remove this.

Comment on lines 203 to 183
if sc.lastBufferedSourceTimestamp.Less(update.Timestamp) && len(sc.bufferedUpdates) != 0 {
// If this event was originally written at a later timestamp than what's in the buffer, flush the buffer.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we only check the length of bufferedUpdates here and not also the length of bufferedDeletes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it's a bug :D. Fixed. I plan to add more unit tests around this logic.

}

func (sc *spanConfigIngestor) flushEvents(ctx context.Context) error {
log.VEventf(ctx, 3, "flushing span config %d updates", len(sc.bufferedUpdates))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving this to 1 or 2. I think vlog > 2 is usually for the noisiest stuff.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment on lines 217 to 244
func (p *replicationFlowPlanner) getSrcTenantID() (roachpb.TenantID, error) {
if p.srcTenantID.InternalValue == 0 {
return p.srcTenantID, errors.New("make plan must be called before replication flow planner contains srcTenantID")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this an AssertionFailedf.

I could see changing the API of this type a little bit so that the usage was more clear. Namely, rather than having the caller create a &replicationFlowPlanner{} and then calling makePlan on it. You could have a constructor that returns a flow planner with the srcTenantID and initial topology set. Then you could have a method on the struct that was like generateCandidatePlan() that would assume it was constructed using the constructor. This function could also assume it was constructed using the constructor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

14400


# FIGURE OUT HOW TO COMBINE THE SPANCONFIGS, CAUSING A DELETE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:D Let's make this a TODO.

Can we cause a delete by dropping the table gc.ttl on the table and then dropping the table?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a Todo. Chatting with Arul about this. It's harder than I initially thought.

ALTER TENANT ALL SET CLUSTER SETTING spanconfig.reconciliation_job.checkpoint_interval = '10ms'
----

# TODO(msbutler): trigger a full reconciliation job on the new app tenant after cutover, so that we
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little surprised a full reconciliation doesn't happen on tenant startup.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plan to address in a follow up PR

"github.com/cockroachdb/errors"
)

type spanConfigIngestor struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a documentation comment to this struct describing what the spanConfigIngestor does and why.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

g.GoCtx(sub.Subscribe)
g.GoCtx(sc.checkForCutover)

return sc.consumeSpanConfigs(ctx, sub)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing something, but I would probably put this in the context group and then call g.Wait() here. Otherwise, who bubbles up an error from sub.Subscribe?

If you go that route, you may need to arrange some way to shut down the cutover worker (unless we remove it). in the case of an error.

How we do this in other places is to create a new context, grab a CancelFunc and then call the cancel func in close.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Refactored this a bit to make it work.

Copy link
Collaborator Author

@msbutler msbutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the quick review! I think I've addressed all your comments. I'm going to add one more unit test for the ingest_span_configs.go function tomorrow morning.

pkg/ccl/streamingccl/streamclient/client.go Show resolved Hide resolved
pkg/ccl/streamingccl/streamclient/client.go Show resolved Hide resolved
"github.com/cockroachdb/errors"
)

type spanConfigIngestor struct {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 203 to 183
if sc.lastBufferedSourceTimestamp.Less(update.Timestamp) && len(sc.bufferedUpdates) != 0 {
// If this event was originally written at a later timestamp than what's in the buffer, flush the buffer.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it's a bug :D. Fixed. I plan to add more unit tests around this logic.

}

func (sc *spanConfigIngestor) flushEvents(ctx context.Context) error {
log.VEventf(ctx, 3, "flushing span config %d updates", len(sc.bufferedUpdates))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


g := ctxgroup.WithContext(ctx)
g.GoCtx(sub.Subscribe)
g.GoCtx(sc.checkForCutover)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Refactored to remove this.

g.GoCtx(sub.Subscribe)
g.GoCtx(sc.checkForCutover)

return sc.consumeSpanConfigs(ctx, sub)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Refactored this a bit to make it work.

@msbutler msbutler force-pushed the butler-c2c-dest-zone branch 9 times, most recently from 36c14ad to b620c64 Compare August 29, 2023 19:00
@msbutler msbutler added the do-not-merge bors won't merge a PR with this label. label Aug 29, 2023
Copy link
Collaborator

@stevendanna stevendanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my feedback.

Overall, it looks like the next major TODO to handle is probably correctly handling full updates on resumption?

Let me know when this one is ready for final review.

pkg/ccl/streamingccl/streamingest/ingest_span_configs.go Outdated Show resolved Hide resolved
if errors.Is(err, sql.ErrPlanChanged) {
execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1)
}
return err
}

// makeReplicationFlowPlanner creates a replicationFlowPlanner and the initial physical plan.
func makeReplicationFlowPlanner(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for exploring my feedback here. This ended up with a different shape than I had in mind and I'm not sure we've made it better, sorry about that. But let's not bikeshed on it in this PR.

Said another way: If you like what you had previously better, feel free to put it back. Also feel free to leave it if you don't feel like changing it more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, I actually like the refactor, so I added it in a second commit. let me know if pulling it into a second commit changes your opinion of it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realized I forgot to respond to this: I don't dislike it particularly more than what we had before, it just wasn't clear to me it was strictly better. The main thing that I didn't love when I looked at it was storing state on the struct just to read it in a single location.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i agree that this approach is inherently flawed. At some point we ought refactor this to make it easier to grab the sourceTenantID and streamAddresses.

This patch spawns a new goroutine on the stream ingestion job coordinator which
in turn creates a special span config client and subscription for replicating
span config updates. As updates come in, the coordinator will buffer updates
and deletes with the same source side commit timestamp and write them in a
transaction to the destination side system span configuration table when it
observes a new update with a newer timestamp.

The ingestion side assumes each replicated update is unique and in timestamp
order, which is enforced by the producer side logic built in cockroachdb#108356. This
assumption simplifies the destination side ingestion logic which must write
updates with the same source side transaction commit timestamp at the
same new timestamp on the destination side. This invariant ensures a span
configuration's target (i.e. the span that a configuration applies to) never
overlaps with any other span configuration target. Else, c2c would break the
span config reconciliation system.

Note that cutover will not revert any ingested span config updates, as we
wouldn't not want to issue revert range requests on an online system table in
the system tenant. That being said, a future PR will need to teach the
destination side application tenant to conduct a full span reconcilation job
immediately after cutover, which will safely revert any span config updates
that committed after the cutover timestamp.

A future PR will also improve the ingestion logic during an initial scan on
Resume. Because the job does not maintain a protected timestamp on the span
config table, the job may miss updates if it resumes after a long pause period.
So, the ingestor will need to buffer all initial scan updates and flush them in
one transaction along with a tenant span wide delete, in order to maintain the
span config table invariants.

Informs cockroachdb#106823

Release note: None
This patch pushes the generation of the initial plan into a
`replicationFlowPlanner` new constructor. This refactor makes replanning logic
slightly easier to reason about and also ensures that the
`replicationFlowPlanner` gathers all necessary state (e.g. the srcTenantID)
before callers interact with.

Release Note: None
@msbutler msbutler removed the do-not-merge bors won't merge a PR with this label. label Aug 30, 2023
@msbutler
Copy link
Collaborator Author

msbutler commented Aug 30, 2023

@stevendanna RFAL!

Overall, it looks like the next major TODO to handle is probably correctly handling full updates on resumption?

Correct

@msbutler
Copy link
Collaborator Author

TFTR!

bors r=stevendanna

@craig
Copy link
Contributor

craig bot commented Aug 30, 2023

Build succeeded:

@craig craig bot merged commit f464c06 into cockroachdb:master Aug 30, 2023
7 of 8 checks passed
@msbutler msbutler deleted the butler-c2c-dest-zone branch September 6, 2023 16:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants