New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: run CHANGEFEEDs via distsql #28555
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewing commits one by one, please bear with the series of updates.
Reviewed 1 of 2 files at r1.
Reviewable status: complete! 0 of 0 LGTMs obtained
pkg/ccl/changefeedccl/span_frontier.go, line 52 at r1 (raw file):
// Entries are sorted based on their timestamp such that the oldest will rise to // the top of the heap. type spanFrontierHeap []*spanFrontierEntry
I'm confused by your heap implementation. Doesn't Push
and Pop
need to do additional work to bubble up the next highest element into the top position?
Also from my reading you need a Fix()
implementation, but you don't have one. If you don't actually need a heap.Interface
, then you could just avoid the interface{}
conversations entirely and specify it to only take spanFrontier
s.
pkg/ccl/changefeedccl/span_frontier.go, line 117 at r1 (raw file):
s.idAlloc++ if err := s.tree.Insert(e, true /* fast */); err != nil { panic(err)
A todo
to refactor this panic away would be nice.
pkg/ccl/changefeedccl/span_frontier_test.go, line 32 at r1 (raw file):
return buf.String() }
You might want to test the heap separately as well. I don't think your heap maintains itself correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed r3. A healthy amount of 🐶 👨🔬 in my reviews, but all the distsql stuff looks correct.
Reviewed 3 of 6 files at r2, 22 of 22 files at r3.
Reviewable status: complete! 0 of 0 LGTMs obtained
pkg/ccl/changefeedccl/changefeed_dist.go, line 107 at r3 (raw file):
gatewayNodeID := execCfg.NodeID.Get() changeAggregatorProcs := []distsqlplan.Processor{{
why is there only one changeAggregatorProc
? I would have thought you'd want one per span (and perhaps split the spans if they cross range boundaries, akin to what createTableReaders
does.
pkg/ccl/changefeedccl/changefeed_dist.go, line 180 at r3 (raw file):
} type changeAggregator struct {
I'd split the processors into their own file, if only because planning code and processor implementations are just so different
pkg/ccl/changefeedccl/changefeed_dist.go, line 206 at r3 (raw file):
resolvedSpanBuf encDatumRowBuffer }
It would be great if you add all the interface assertions that this struct obeys, since it's outside the package structure that makes it clear. i.e.
var _ Processor = &changeAggregator{}
var _ RowSource = &changeAggregator{}
pkg/ccl/changefeedccl/changefeed_dist.go, line 288 at r3 (raw file):
func (ca *changeAggregator) close() { // Wait for the poller to finish shutting down. <-ca.pollerErrCh
check the error channel for nil
to indicate graceful shutdown?
pkg/ccl/changefeedccl/changefeed_dist.go, line 337 at r3 (raw file):
return err } ca.resolvedSpanBuf.Push(sqlbase.EncDatumRow{
I don't know what contract this set of datums is satisfying, so a comment would be very helpful here.
pkg/ccl/changefeedccl/changefeed_dist.go, line 482 at r3 (raw file):
// core changefeed, which returns changed rows directly via pgwire, // a row with a null resolved_span field is a changed row that needs // to be forwared to the gateway.
typo 'forwared'
pkg/sql/distsqlrun/processors.proto, line 952 at r3 (raw file):
// ChangeFrontierSpec is the specification for a processor that receives // span-level
incomplete comment.
I've been thinking since I sent this that things would be cleaner if ChangeAggregator had two outputs, one for rows and one for resolved spans. We could hook the first directly to the gateway and the other to the ChangeFrontier (eliminating the awkward forwarding it has). Thoughts? Anything I should know about the two output stuff? |
Scratch that. I was misunderstanding some distsql things. I'm told that I could do something similar by routing the stream to two different places with a custom hash router, but I think I'll leave that for a followup. |
b55bd69
to
85249c0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review! RFAL
Reviewable status: complete! 0 of 0 LGTMs obtained
pkg/ccl/changefeedccl/changefeed_dist.go, line 107 at r3 (raw file):
Previously, arjunravinarayan (Arjun Narayan) wrote…
why is there only one
changeAggregatorProc
? I would have thought you'd want one per span (and perhaps split the spans if they cross range boundaries, akin to whatcreateTableReaders
does.
Yeah, that's exactly what we'll do for enterprise changefeeds. Leaving it for a followup. This is the logic we'll use for the non-enterprise ones. See the TODO in the godoc for distChangefeedFlow
pkg/ccl/changefeedccl/changefeed_dist.go, line 180 at r3 (raw file):
Previously, arjunravinarayan (Arjun Narayan) wrote…
I'd split the processors into their own file, if only because planning code and processor implementations are just so different
Done.
pkg/ccl/changefeedccl/changefeed_dist.go, line 206 at r3 (raw file):
Previously, arjunravinarayan (Arjun Narayan) wrote…
It would be great if you add all the interface assertions that this struct obeys, since it's outside the package structure that makes it clear. i.e.
var _ Processor = &changeAggregator{}
var _ RowSource = &changeAggregator{}
I did, actually : - ). Moved them up here since that's clearly where you're expecting them
pkg/ccl/changefeedccl/changefeed_dist.go, line 288 at r3 (raw file):
Previously, arjunravinarayan (Arjun Narayan) wrote…
check the error channel for
nil
to indicate graceful shutdown?
It'll be context cancelled. Added a comment
pkg/ccl/changefeedccl/changefeed_dist.go, line 337 at r3 (raw file):
Previously, arjunravinarayan (Arjun Narayan) wrote…
I don't know what contract this set of datums is satisfying, so a comment would be very helpful here.
Done.
pkg/ccl/changefeedccl/changefeed_dist.go, line 482 at r3 (raw file):
Previously, arjunravinarayan (Arjun Narayan) wrote…
typo 'forwared'
Done.
pkg/sql/distsqlrun/processors.proto, line 952 at r3 (raw file):
Previously, arjunravinarayan (Arjun Narayan) wrote…
incomplete comment.
Done.
pkg/ccl/changefeedccl/span_frontier.go, line 52 at r1 (raw file):
Previously, arjunravinarayan (Arjun Narayan) wrote…
I'm confused by your heap implementation. Doesn't
Push
andPop
need to do additional work to bubble up the next highest element into the top position?Also from my reading you need a
Fix()
implementation, but you don't have one. If you don't actually need aheap.Interface
, then you could just avoid theinterface{}
conversations entirely and specify it to only takespanFrontier
s.
Addressed in #28319
pkg/ccl/changefeedccl/span_frontier.go, line 117 at r1 (raw file):
Previously, arjunravinarayan (Arjun Narayan) wrote…
A
todo
to refactor this panic away would be nice.
Addressed in #28319
pkg/ccl/changefeedccl/span_frontier_test.go, line 32 at r1 (raw file):
Previously, arjunravinarayan (Arjun Narayan) wrote…
You might want to test the heap separately as well. I don't think your heap maintains itself correctly.
Addressed in #28319
One or more ChangeAggregator processors watch table data for changes. These transform the changed kvs into changed rows and either emit them to a sink (such as kafka) or, if there is no sink, forward them in columns 1,2,3 (where they will be eventually returned directly via pgwire). In either case, periodically a span will become resolved as of some timestamp, meaning that no new rows will ever be emitted at or below that timestamp. These span-level resolved timestamps are emitted as a marshalled `jobspb.ResolvedSpan` proto in column 0. The flow will always have exactly one ChangeFrontier processor which all the ChangeAggregators feed into. It collects all span-level resolved timestamps and aggregates them into a changefeed-level resolved timestamp, which is the minimum of the span-level resolved timestamps. This changefeed-level resolved timestamp is emitted into the changefeed sink (or returned to the gateway if there is no sink) whenever it advances. ChangeFrontier also updates the progress of the changefeed's corresponding system job. Release note (enterprise change): CHANGEFEEDs are now executed using our distributed SQL framework.
Hey @arjunravinarayan @jordanlewis I hate to ask, but any chance y'all could take another pass at this today? I was hoping to get it merged before the release branch is cut |
👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I took a look, but this is a pretty big change that I have almost no context on. I'd say based on the fact that you seem to have followed the DistSQL contract properly, but honestly I didn't have the time today to really give this a thorough review. Perhaps somebody more familiar with change feeds should review this?
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it's ready for merging! @jordanlewis r1 and r2 are from another PR, it's just r3 here.
Reviewed 17 of 17 files at r4.
Reviewable status: complete! 2 of 0 LGTMs obtained
pkg/ccl/changefeedccl/changefeed_dist.go, line 107 at r3 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
Yeah, that's exactly what we'll do for enterprise changefeeds. Leaving it for a followup. This is the logic we'll use for the non-enterprise ones. See the TODO in the godoc for distChangefeedFlow
👍 Sounds good to me.
pkg/ccl/changefeedccl/changefeed_processors.go, line 1 at r4 (raw file):
// Copyright 2018 The Cockroach Authors.
I didn't take close look at the changes between r3 and r4 since the code movement makes it difficult to look at diffs, but I'm trusting you that there isn't much different between the revisions besides what we explicitly talked about. Otherwise holler with comments and I'll take closer looks there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 2 of 0 LGTMs obtained
pkg/ccl/changefeedccl/changefeed_processors.go, line 1 at r4 (raw file):
Previously, arjunravinarayan (Arjun Narayan) wrote…
I didn't take close look at the changes between r3 and r4 since the code movement makes it difficult to look at diffs, but I'm trusting you that there isn't much different between the revisions besides what we explicitly talked about. Otherwise holler with comments and I'll take closer looks there.
I moved the bufferSink special case to the constructor, which cleaned it up a bit.
I also had to deal with a merge conflict with the monitoring stuff. Since TrailingMetaCallback doesn't seem to always be called, I had to do some gross stuff with a goroutine to guarantee that the processor always deregisters itself from cf.metrics.mu.resolved
. There's almost certainly something better to be done here than what I did, but it's not coming to mind (and this is probably good enough to merge and deal with later)
I'm going to go ahead and merge this so it definitely makes the branch tomorrow. @jordanlewis Unfortunately, I'm probably the only person in the company right now that has enough knowledge of both changefeeds and distsql to fully grok this. Nathan has already looked at the changefeed parts closely, but I'd love to get your feedback on the distsql part before backward compatibility ties our hands even more (so, in the next week or two). Happy to schedule a meeting and give you all the background if that will help (or answer questions over slack or whatever your preference). bors r=arjunravinarayan |
28555: changefeedccl: run CHANGEFEEDs via distsql r=arjunravinarayan a=danhhz One or more ChangeAggregator processors watch table data for changes. These transform the changed kvs into changed rows and either emit them to a sink (such as kafka) or, if there is no sink, forward them in columns 1,2,3 (where they will be eventually returned directly via pgwire). In either case, periodically a span will become resolved as of some timestamp, meaning that no new rows will ever be emitted at or below that timestamp. These span-level resolved timestamps are emitted as a marshalled `jobspb.ResolvedSpan` proto in column 0. The flow will always have exactly one ChangeFrontier processor which all the ChangeAggregators feed into. It collects all span-level resolved timestamps and aggregates them into a changefeed-level resolved timestamp, which is the minimum of the span-level resolved timestamps. This changefeed-level resolved timestamp is emitted into the changefeed sink (or returned to the gateway if there is no sink) whenever it advances. ChangeFrontier also updates the progress of the changefeed's corresponding system job. @arjunravinarayan and/or @jordanlewis I tried to keep all the changes that weren't strictly distsql-related to a minimum in this PR. There is a bit of churn in sink.go that was hard to avoid, but I'll have @nvanbenschoten take a general pass so y'all can stay focused on the distributed flow. The first two commits are #28319 and you can ignore them entirely. Co-authored-by: Daniel Harrison <daniel.harrison@gmail.com>
Build succeeded |
One or more ChangeAggregator processors watch table data for changes. These
transform the changed kvs into changed rows and either emit them to a sink
(such as kafka) or, if there is no sink, forward them in columns 1,2,3 (where
they will be eventually returned directly via pgwire). In either case,
periodically a span will become resolved as of some timestamp, meaning that
no new rows will ever be emitted at or below that timestamp. These span-level
resolved timestamps are emitted as a marshalled
jobspb.ResolvedSpan
protoin column 0.
The flow will always have exactly one ChangeFrontier processor which all the
ChangeAggregators feed into. It collects all span-level resolved timestamps
and aggregates them into a changefeed-level resolved timestamp, which is the
minimum of the span-level resolved timestamps. This changefeed-level resolved
timestamp is emitted into the changefeed sink (or returned to the gateway if
there is no sink) whenever it advances. ChangeFrontier also updates the
progress of the changefeed's corresponding system job.
@arjunravinarayan and/or @jordanlewis I tried to keep all the changes that weren't strictly distsql-related to a minimum in this PR. There is a bit of churn in sink.go that was hard to avoid, but I'll have @nvanbenschoten take a general pass so y'all can stay focused on the distributed flow. The first two commits are #28319 and you can ignore them entirely.