Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rangefeed: add scheduler based rangefeed processor #107553

Merged
merged 3 commits into from Sep 8, 2023

Conversation

aliher1911
Copy link
Contributor

@aliher1911 aliher1911 commented Jul 25, 2023

Previously each rangefeed processor run a dedicated goroutine to process range events. With high replica density that produced 10K's of goroutines waking up periodically and overloading go scheduler.
This commit adds a version of processor that uses a scheduler with a fixed pool of workers to process all ranges on the node.

This commit adds a version of processor that uses a scheduler with a fixed pool of workers to process all ranges on the node. Scheduler based processor is off by default and can be enabled by using kv.rangefeed.use_scheduler_processor system cluster setting.

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-26372

Release note: None

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@aliher1911 aliher1911 force-pushed the better_processor_scheduler_20 branch 4 times, most recently from 23b778e to 9898075 Compare July 27, 2023 12:05
@aliher1911 aliher1911 changed the title rangefeed: add scheduler based rangefeed rangefeed: add scheduler based rangefeed processor Jul 27, 2023
@aliher1911 aliher1911 force-pushed the better_processor_scheduler_20 branch 6 times, most recently from f208c21 to a1778f1 Compare August 2, 2023 09:43
@aliher1911 aliher1911 self-assigned this Aug 2, 2023
@aliher1911 aliher1911 force-pushed the better_processor_scheduler_20 branch 6 times, most recently from 3829e8e to 25b8c33 Compare August 4, 2023 14:37
@aliher1911 aliher1911 marked this pull request as ready for review August 4, 2023 14:38
@aliher1911 aliher1911 requested review from a team as code owners August 4, 2023 14:38
@aliher1911 aliher1911 requested review from sumeerbhola and removed request for a team August 4, 2023 14:38
@aliher1911 aliher1911 force-pushed the better_processor_scheduler_20 branch 2 times, most recently from c119606 to ef9a442 Compare August 4, 2023 20:41
@erikgrinaker erikgrinaker self-requested a review August 8, 2023 13:34
Copy link
Contributor

@erikgrinaker erikgrinaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing some preliminary responses and comments. Giving the processor a closer look.

Reviewed 25 of 25 files at r11, 7 of 17 files at r15, 15 of 15 files at r18, 11 of 13 files at r19.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @aliher1911, @miretskiy, and @sumeerbhola)


-- commits line 30 at r19:
s/preformance/performance/

"This processor is disabled by default, but can be enabled via the kv.rangefeed.scheduler.enabled cluster setting."


pkg/kv/kvclient/rangefeed/config.go line 26 at r4 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

I don't think we need this. We can set config.useMuxRangefeed = useMuxRangefeedDefault in initConfig() and then have WithMuxRangefeed(bool) override it if the caller specifies it -- otherwise, it uses the default.

This can just be a bool, i.e. WithMuxRangefeed(enabled bool).


pkg/kv/kvserver/replica_rangefeed.go line 342 at r3 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

Let's not remove this, people get confused by this all the time.

Still removed.


pkg/kv/kvserver/replica_rangefeed.go line 80 at r4 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

Consider dropping _processor.

The description here can also be improved -- for example:

"if true, rangefeeds are processed by a fixed number of workers instead of one worker per range, which can reduce Go runtime overhead"

I still think the description here needs work, it's way too verbose. Try to cut the length by at least half.


pkg/kv/kvserver/replica_rangefeed.go line 369 at r4 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

This will only restart the processor when registering a new rangefeed. I think we should consider some other options:

  1. Ideally, use the new settings for any new rangefeeds, and allow old rangefeeds to keep running on the old processor until it's empty. This would be best for operators, since it allows them to do rolling restarts of changefeeds at their own leisure. However, it's a bit tricky to implement -- it could maybe be done by adding another fanout processor implementation that emits events to multiple processors. I'd be inclined to do something simpler if possible though.

  2. Restart all rangefeeds when the setting changes, triggered by SetOnChange(). This is simple, but very disruptive, since it forces a full restart of all changefeeds across all nodes at the same time.

  3. Do a staggered restart, e.g. by spawning a goroutine that restarts rangefeeds one at a time. We'd ideally want to wait until the client has completed a catchup scan on the range again, but it will be hard to coordinate this. And if we just do a naïve sleep(10*time.Second) it will take a week to restart 50k replicas.

I see we ended up only using the scheduler when starting a new rangefeed without an existing legacy processor. It's certainly very simple, which is great, but the UX can be a bit iffy. It would require shutting down and then restarting all rangefeeds/changefeeds. Maybe that's fine, but it would be nice if there was some way to trigger a staggered rolling refresh of all rangefeeds. I think we can address this later, but we should give it some thought.

@miretskiy Do you have any thoughts on how we can do a rolling restart of all rangefeeds on each range without too much disruption from the changefeed side?


pkg/kv/kvserver/replica_rangefeed.go line 389 at r19 (raw file):

				"requested scheduler based processor, but currently running processor is of wrong type")
		}
		useScheduledProcessor = usingScheduler

This logic should be moved further down, because the existing processor can get torn down by the code below. Something like:

var sched *rangefeed.Scheduler
if RangeFeedUseScheduler.Get(&r.ClusterSettings().SV) {
	// Only use scheduler if we're already using it or we're setting up a new processor.
	if _, ok := p.(*rangefeed.ScheduledProcessor); ok || p == nil {
		sched = r.store.getRangefeedScheduler()
	}
}

pkg/kv/kvserver/store.go line 1965 at r4 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

We'll want an environment variable for the worker count. We may also want to scale this with number of CPUs and number of stores too, but we can revisit that later (let's add a TODO or issue).

A max of 16 seems low, especially considering we're scaling it with 8*CPUs and we generally never run nodes with less than 2 CPUs. I would think a max of 64 or 128 would be more appropriate, but we should run some benchmarks to tune this as well. However, we need to distribute this across stores -- consider customers running with 12 stores where this would otherwise spawn a ton of goroutines. See what we do for the Raft scheduler here:

if sc.RaftSchedulerConcurrency == 0 {
sc.RaftSchedulerConcurrency = defaultRaftSchedulerConcurrency
// If we have more than one store, evenly divide the default workers across
// stores, since the default value is a function of CPU count and should not
// scale with the number of stores.
if numStores > 1 && sc.RaftSchedulerConcurrency > 1 {
sc.RaftSchedulerConcurrency = (sc.RaftSchedulerConcurrency-1)/numStores + 1 // ceil division
}
}


pkg/kv/kvserver/store.go line 3722 at r4 (raw file):

shutdown is implemented as event

I suspect this is where a lot of this complexity is coming from. If we did the shutdown synchronously then we wouldn't need to rely on the scheduler after the processor was unlinked from the replica, and could simply use the range ID as identifier. I have to read up on some of this stuff, will revisit.


pkg/kv/kvserver/store.go line 2411 at r5 (raw file):

Previously, aliher1911 (Oleg) wrote…

That's a good idea, we can just ask to schedule something to all. The only downside that I see is that we need to scan map within scheduler under lock which would be blocking writes. I did naive benchmark of enqueue of 10k ranges and it gives me 0.12ms delay on random hardware. If we provide external list, we only have 4mks interruptions if notifications are chunked.

Ok, we have to maintain the external map for the old processor anyway, so this is fine for now.


pkg/kv/kvserver/rangefeed/processor.go line 205 at r19 (raw file):

func NewProcessor(cfg Config) Processor {
	cfg.SetDefaults()
	cfg.AmbientContext.AddLogTag("rangefeed", nil)

Should we set the range ID here or something, or is that already included via the "r" tag passed in from the caller?


pkg/kv/kvserver/rangefeed/processor.go line 731 at r19 (raw file):

// syncSendAndWait allows sync event to be sent and waited on its channel.
// Exposed to allow special test syncEvents that contain span to be sent.
func (p *LegacyProcessor) syncSendAndWait(se *syncEvent) {

nit: syncEventCWith(*syncEvent) or something, to clarify that it's basically syncEventC().


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 62 at r4 (raw file):
Ok, I guess that's fine. A closure has a much larger complexity footprint than simple message passing, but I guess it's ok as long as it's easy enough to reason about concurrency concerns and such. This places much more responsibility on the caller to make sure it isn't vulnerable to race conditions or problematic interleavings, since the state we close over when the request is submitted is different from when the request is evaluated.

To that end, I'd appreciate some attention towards this comment (also wrt. the registration that's closed over in Register()):

Also, this should be more explicit about its inputs -- we currently always close over the inputs, and it isn't clear to me which synchronization guarantees we have. At the very least, it should pass the processor as an input, since I believe that's what most callers operate on, and guarantee that the request has exclusive access to it (which I'm assuming is currently true).


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 205 at r19 (raw file):

		// processor should stop.
		p.stopping = true
		p.scheduler.Stop()

This method should be called StopProcessor(), which is what ClientProcessor calls on Scheduler. This reads like we're stopping the entire scheduler.


pkg/kv/kvserver/rangefeed/scheduler.go line 56 at r19 (raw file):

	// QueueData is scheduled when event is put into rangefeed queue for
	// processing.
	QueueData processorEventType = 1 << 2

nit: processorEventType = 1 << 2 is unnecessary here, the iota will do it automatically. Same goes for ReqEvent below.


pkg/kv/kvserver/rangefeed/bench_test.go line 52 at r19 (raw file):

		for _, opType := range []opType{writeOpType, commitOpType, closedTSOpType} {
			for _, numRegistrations := range []int{1, 10, 100} {
				name := fmt.Sprintf("opType=%s/numRegs=%d/procType=%s", opType, numRegistrations, procType)

nit: procType should be first, the order mirrors the nesting order.


pkg/kv/kvserver/rangefeed/processor_test.go line 257 at r19 (raw file):

// tHelper is a wrapper interface to have a common processor constructor for
// tests and benchmarks.
type tHelper interface {

nit: testing.TB already does this.

Copy link
Contributor

@erikgrinaker erikgrinaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly nits, logic seems fine. I want to do a final pass over the shutdown logic, and see if we can simplify it further.

Reviewed 5 of 15 files at r10, 2 of 11 files at r17, 2 of 13 files at r19, 9 of 9 files at r20, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @aliher1911, @miretskiy, and @sumeerbhola)


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 1 at r19 (raw file):

// Copyright 2018 The Cockroach Authors.

nit: 2023


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 30 at r19 (raw file):

// request is any action on processor which is not a data path. e.g. request
// active filter, length, add registration etc.
type request func(context.Context)

nit: clarify the description here, e.g. "Request is a function that can be scheduled to run for a processor on its scheduler worker, serialized with other scheduler events, having exclusive access to the processor.".

As discussed elsewhere, it should also be explicit about it's inputs (the processor) and output, rather than closing over them. And given that runRequest() is generic over the return type, shouldn't this also be generic? Something like:

type request[T any] func(context.Context, p *ScheduledProcessor) T

In fact, there's actually two types here: there's the type callers primarily interact with (the above), and then there's the type which is propagated via the channel, which doesn't have a return type and instead closes over a synchronization channel. We should probably make both of these explicit here, and explain their relationship.


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 113 at r19 (raw file):

// process is a scheduler callback that is processing scheduled events and
// requests.
func (p *ScheduledProcessor) process(e processorEventType) processorEventType {

nit: consider splitting out separate process*() methods for each type of event, which process() dispatches to.


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 119 at r19 (raw file):

			for {
				select {
				case e := <-p.requestQueue:

Do we need to check for stopping in this loop, and/or bound the amount of work we pull off the channel?


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 151 at r19 (raw file):

	}
	if e&Stopped != 0 {
		p.cleanup()

cleanup() will panic if called twice (e.g. due to the double-closed channel). Are we guaranteed to only see Stopped once, and never if we failed to start?


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 247 at r19 (raw file):

	)

	filter := runRequest(p, func(ctx context.Context) *Filter {

It seems a bit iffy that we're both syncing the event channel above and blocking until this request is scheduled and processed, because the caller is holding raftMu. The old processor does this too though, so I guess this is no worse in principle.

Does this have any additional hazards over the legacy processor in doing so? I can't immediately think of any. Since we never block on consumers, I don't think there should be any deadlock scenarios where clients are blocked waiting for a registration preventing them from consuming events, or anything like that.


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 523 at r19 (raw file):

	await := future.MakeAwaitableFuture(future.Make[T]())
	p.enqueueRequest(func(ctx context.Context) {
		await.Set(f(ctx))

nit: why not just use a simple 1-sized channel here? I don't think we gain anything at all from the future?


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 721 at r19 (raw file):

func (p *ScheduledProcessor) scheduleEvent(e processorEventType) {
	p.scheduler.Schedule(e)

nit: ClientScheduler.Schedule seems like a misnomer, Enqueue would be better -- it calls through to Scheduler.Enqueue(), and we don't actually schedule the event here (that happens when we pick it off the queue).

Also, I have a preference for removing the scheduleEvent() method, and just call p.scheduler.Enqueue() directly -- the terminoloy here is pretty confusing in that we have both scheduler events and processor events, so being explicit about enqueueing in the scheduler might be clearer.


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 180 at r20 (raw file):

				// TODO(oleg): we need to cap number of tasks that we can fire up across
				// all feeds as they could potentially generate O(n) tasks for push.
				err := p.stopper.RunAsyncTask(ctx, "rangefeed: pushing old txns", pushTxns.Run)

This task will be shut down when the stopper is quiescing, but I don't think we'll shut it down when the processor stops without tripping the stopper?

We should probably have a unified way to shut down both this and the rts scan when the processor stops.


pkg/kv/kvserver/rangefeed/scheduler.go line 56 at r19 (raw file):

	// QueueData is scheduled when event is put into rangefeed queue for
	// processing.
	QueueData processorEventType = 1 << 2

nit: consider clearer naming here, e.g. EventQueued and RequestQueued.


pkg/kv/kvserver/rangefeed/scheduler.go line 57 at r19 (raw file):

	// processing.
	QueueData processorEventType = 1 << 2
	// ReqEvent is scheduled when request function id put into rangefeed request

nit: the closure itself is put into the queue.

Copy link
Contributor Author

@aliher1911 aliher1911 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, and @sumeerbhola)


-- commits line 47 at r4:

Previously, erikgrinaker (Erik Grinaker) wrote…

This deserves a release note.

This particular commit is a change in testutil only. I don't think it is worth mentioning. Release note is part of the next commit where actual scheduler processor is added.


pkg/kv/kvserver/replica_rangefeed.go line 342 at r3 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

Still removed.

Added in few other places but missed this of all the things. This function is only called internally in a single place and not even in tests. Much more useful on the request itself.


pkg/kv/kvserver/replica_rangefeed.go line 389 at r19 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

This logic should be moved further down, because the existing processor can get torn down by the code below. Something like:

var sched *rangefeed.Scheduler
if RangeFeedUseScheduler.Get(&r.ClusterSettings().SV) {
	// Only use scheduler if we're already using it or we're setting up a new processor.
	if _, ok := p.(*rangefeed.ScheduledProcessor); ok || p == nil {
		sched = r.store.getRangefeedScheduler()
	}
}

That check was only meaningful with eager processor restart. But since it could be very disruptive we need to check if prev processor is nil or defunct.


pkg/kv/kvserver/rangefeed/processor.go line 205 at r19 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

Should we set the range ID here or something, or is that already included via the "r" tag passed in from the caller?

That should come from the parent request I believe with node and store tags.


pkg/kv/kvserver/rangefeed/processor.go line 731 at r19 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

nit: syncEventCWith(*syncEvent) or something, to clarify that it's basically syncEventC().

It only exists because tests need to post special events. Hard to give consistent name especially that scheduler doesn't support this.


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 30 at r19 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

nit: clarify the description here, e.g. "Request is a function that can be scheduled to run for a processor on its scheduler worker, serialized with other scheduler events, having exclusive access to the processor.".

As discussed elsewhere, it should also be explicit about it's inputs (the processor) and output, rather than closing over them. And given that runRequest() is generic over the return type, shouldn't this also be generic? Something like:

type request[T any] func(context.Context, p *ScheduledProcessor) T

In fact, there's actually two types here: there's the type callers primarily interact with (the above), and then there's the type which is propagated via the channel, which doesn't have a return type and instead closes over a synchronization channel. We should probably make both of these explicit here, and explain their relationship.

If request itself is generic then request queue should be generic too, but it is not possible I think. There would also be a problem with process callback because it can't be generic itself to be able to handle different types of requests and return results.
That's why it is implemented as closure for each individual request type by the caller and handled by generic sendRequest method.


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 119 at r19 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

Do we need to check for stopping in this loop, and/or bound the amount of work we pull off the channel?

We only have requests for filter/len/reg/unreg here so we would only have maybe 10 requests over the whole lifecycle of processor. I don't think it makes much sense to check that. If we get caught in shutdown here, caller will either see it or doesn't care so we should be ok.


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 151 at r19 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

cleanup() will panic if called twice (e.g. due to the double-closed channel). Are we guaranteed to only see Stopped once, and never if we failed to start?

Scheduler guarantees that. It will only send stopped once and will never reuse the ID. Calling it if init failed after we've registered with scheduler is a bug. I changed that to stop via scheduler instead. That would guarantee that we call in once.


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 247 at r19 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

It seems a bit iffy that we're both syncing the event channel above and blocking until this request is scheduled and processed, because the caller is holding raftMu. The old processor does this too though, so I guess this is no worse in principle.

Does this have any additional hazards over the legacy processor in doing so? I can't immediately think of any. Since we never block on consumers, I don't think there should be any deadlock scenarios where clients are blocked waiting for a registration preventing them from consuming events, or anything like that.

I think original processor is subtly broken and synchronizes more than it needs and also takes iterators at wrong points. We shouldn't have deadlocks though as register is serving external request and processor is not expected to block on anything so we must succeed. The only failure is request being dropped, but runRequest takes care of that.


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 180 at r20 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

This task will be shut down when the stopper is quiescing, but I don't think we'll shut it down when the processor stops without tripping the stopper?

We should probably have a unified way to shut down both this and the rts scan when the processor stops.

Looks like a bug inherited from old processor. It can post events after termination even if we cancel it. We can add cancellation to the context to reduce the chance of it. It won't break anything, but can trip budget use after close in test builds.

Copy link
Contributor Author

@aliher1911 aliher1911 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, and @sumeerbhola)


pkg/kv/kvserver/store.go line 1965 at r4 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

A max of 16 seems low, especially considering we're scaling it with 8*CPUs and we generally never run nodes with less than 2 CPUs. I would think a max of 64 or 128 would be more appropriate, but we should run some benchmarks to tune this as well. However, we need to distribute this across stores -- consider customers running with 12 stores where this would otherwise spawn a ton of goroutines. See what we do for the Raft scheduler here:

if sc.RaftSchedulerConcurrency == 0 {
sc.RaftSchedulerConcurrency = defaultRaftSchedulerConcurrency
// If we have more than one store, evenly divide the default workers across
// stores, since the default value is a function of CPU count and should not
// scale with the number of stores.
if numStores > 1 && sc.RaftSchedulerConcurrency > 1 {
sc.RaftSchedulerConcurrency = (sc.RaftSchedulerConcurrency-1)/numStores + 1 // ceil division
}
}

Bumped that to 64 for now and added store count handling. We should check if increasing this number could cause goroutines per cpu jump if set too high.

@aliher1911 aliher1911 force-pushed the better_processor_scheduler_20 branch 2 times, most recently from 84de721 to 481a1d1 Compare September 7, 2023 11:20
Copy link
Contributor Author

@aliher1911 aliher1911 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, and @sumeerbhola)


pkg/kv/kvserver/replica_rangefeed.go line 369 at r4 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

I see we ended up only using the scheduler when starting a new rangefeed without an existing legacy processor. It's certainly very simple, which is great, but the UX can be a bit iffy. It would require shutting down and then restarting all rangefeeds/changefeeds. Maybe that's fine, but it would be nice if there was some way to trigger a staggered rolling refresh of all rangefeeds. I think we can address this later, but we should give it some thought.

@miretskiy Do you have any thoughts on how we can do a rolling restart of all rangefeeds on each range without too much disruption from the changefeed side?

Wouldn't we throttle that on "initial scan"? So that if we only trigger one at a time reducing blast radius? I think we should test that to see the effects. Adding this logic shoudn't be hard as it was there in initial PR but was later removed following initial feedback.


pkg/kv/kvserver/store.go line 3722 at r4 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

shutdown is implemented as event

I suspect this is where a lot of this complexity is coming from. If we did the shutdown synchronously then we wouldn't need to rely on the scheduler after the processor was unlinked from the replica, and could simply use the range ID as identifier. I have to read up on some of this stuff, will revisit.

We can fire up async task to do that, but be throttle # of concurrent workers. Doing is synchronously with callers is bad because it can be called under raft lock and would make range operations slower.

Copy link
Contributor

@erikgrinaker erikgrinaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 4 of 13 files at r21, 9 of 9 files at r25, 9 of 9 files at r26, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @aliher1911, @miretskiy, and @sumeerbhola)


-- commits line 47 at r4:

Previously, aliher1911 (Oleg) wrote…

This particular commit is a change in testutil only. I don't think it is worth mentioning. Release note is part of the next commit where actual scheduler processor is added.

I think Reviewable messed up the positioning here. Anyway, release note LGTM, thanks!


pkg/kv/kvserver/replica_rangefeed.go line 369 at r4 (raw file):

Previously, aliher1911 (Oleg) wrote…

Wouldn't we throttle that on "initial scan"? So that if we only trigger one at a time reducing blast radius? I think we should test that to see the effects. Adding this logic shoudn't be hard as it was there in initial PR but was later removed following initial feedback.

I mean, if we could keep the changefeeds running but only restart individual ranges a few at a time (which the DistSender and changefeeds can recover from just fine), then changefeeds would mostly keep running during the rolling restart instead of having to do a full stop and restart of the changefeed which can take hours and temporarily stop emitting events to applications.


pkg/kv/kvserver/store.go line 1965 at r4 (raw file):

Previously, aliher1911 (Oleg) wrote…

Bumped that to 64 for now and added store count handling. We should check if increasing this number could cause goroutines per cpu jump if set too high.

Should we keep the CPU scaling?


pkg/kv/kvserver/store.go line 3722 at r4 (raw file):

Previously, aliher1911 (Oleg) wrote…

We can fire up async task to do that, but be throttle # of concurrent workers. Doing is synchronously with callers is bad because it can be called under raft lock and would make range operations slower.

We can keep the shutdown logic here as-is for now -- it isn't terribly involved, and gets the job done.


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 30 at r19 (raw file):

Previously, aliher1911 (Oleg) wrote…

If request itself is generic then request queue should be generic too, but it is not possible I think. There would also be a problem with process callback because it can't be generic itself to be able to handle different types of requests and return results.
That's why it is implemented as closure for each individual request type by the caller and handled by generic sendRequest method.

Yes, that's what I meant by there being two request types -- the one in the channel that closes over the return channel (which doesn't need to be generic because it can't return anything), and the one callers interact with. Anyway, doesn't particularly matter.


pkg/kv/kvserver/rangefeed/scheduled_processor.go line 194 at r20 (raw file):

	// permissible when shutting down because we don't wait for registrations to
	// free up all the resources.
	pErr := kvpb.NewError(&kvpb.NodeUnavailableError{})

Can you clarify this logic a bit? In the legacy processor, this is only called in the case where the stopper quiesces (in which case we know the node is shutting down). Here, it isn't clear that we're only called in the context of node shutdown -- this is called whenever we stop the processor, so why should we use NodeUnavailableError for other cases?

Copy link
Contributor Author

@aliher1911 aliher1911 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, and @sumeerbhola)


pkg/kv/kvserver/replica_rangefeed.go line 369 at r4 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

I mean, if we could keep the changefeeds running but only restart individual ranges a few at a time (which the DistSender and changefeeds can recover from just fine), then changefeeds would mostly keep running during the rolling restart instead of having to do a full stop and restart of the changefeed which can take hours and temporarily stop emitting events to applications.

Something like that #110243 ? It looks pretty isolated from the rest of it so we can land that separately. We can technically expand that for any options on processors beside its type.


pkg/kv/kvserver/store.go line 1965 at r4 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

Should we keep the CPU scaling?

I'm questioning why do we need more workers than CPU's here. I think we are mostly CPU bound in those workers. There's some contention with downstream registrations and with pulling work of the busy queue, but we shouldn't spend 50% on those waits. Having 2xCPU would seem sufficient here. We can have lower bound so that we don't starve ourselves if node is small, but I don't think this is as heavy as raft overall.
I'll run it through pref test to see if setting it high will increase goroutines per cpu dangerously or not.

Copy link
Contributor

@erikgrinaker erikgrinaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Giving this pre-emptive approval, merge when you're comfortable with it.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @aliher1911, @miretskiy, and @sumeerbhola)


pkg/kv/kvserver/replica_rangefeed.go line 369 at r4 (raw file):

Previously, aliher1911 (Oleg) wrote…

Something like that #110243 ? It looks pretty isolated from the rest of it so we can land that separately. We can technically expand that for any options on processors beside its type.

Yeah, something along those lines. @miretskiy may also have some thoughts here. Let's discuss briefly on Monday.


pkg/kv/kvserver/store.go line 1965 at r4 (raw file):

Previously, aliher1911 (Oleg) wrote…

I'm questioning why do we need more workers than CPU's here. I think we are mostly CPU bound in those workers. There's some contention with downstream registrations and with pulling work of the busy queue, but we shouldn't spend 50% on those waits. Having 2xCPU would seem sufficient here. We can have lower bound so that we don't starve ourselves if node is small, but I don't think this is as heavy as raft overall.
I'll run it through pref test to see if setting it high will increase goroutines per cpu dangerously or not.

That's probably true, I don't think we'll block on IO here. 2x seems like a good starting point (possibly 4x to be safe), we can benchmark and tweak this later.

Previously each rangefeed processor run a dedicated goroutine to
process range events. With high replica density that produced
10K's of goroutines waking up periodically and overloading go
scheduler.
This commit adds a version of processor that uses a scheduler
with a fixed pool of workers to process all ranges on the node.
Scheduler based processor is off by default and can be enabled
by using `kv.rangefeed.scheduler.enabled` cluster setting.
Number of workers in scheduler pool is determined by environment
variable `COCKROACH_RANGEFEED_SCHEDULER_WORKERS`.

Epic: none

Release note (performance improvement): Added scheduler based
rangefeed processor which improves rangefeed and changefeed
performance for very large tables. New processor is disabled by
default, but can be enabled via `kv.rangefeed.scheduler.enabled`
cluster setting.
This commit adds slow transaction pusher to scheduler based rangefeed.
Store will trigger bulk enqueueing of relevant push events to
eligible processors.

Epic: none

Release note: None
@aliher1911 aliher1911 requested a review from a team as a code owner September 8, 2023 16:19
@aliher1911 aliher1911 requested review from rachitgsrivastava and DarrylWong and removed request for a team September 8, 2023 16:19
@aliher1911
Copy link
Contributor Author

bors r=erikgrinaker

@craig
Copy link
Contributor

craig bot commented Sep 8, 2023

Build succeeded:

@craig craig bot merged commit 397dc0b into cockroachdb:master Sep 8, 2023
6 of 8 checks passed
@aliher1911 aliher1911 added the O-premortem Issues identified during premortem exercise. label Sep 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
O-premortem Issues identified during premortem exercise.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants