Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: Introduce MuxRangeFeed bi-directional RPC. #75581

Merged
merged 3 commits into from
Aug 19, 2022

Conversation

miretskiy
Copy link
Contributor

@miretskiy miretskiy commented Jan 26, 2022

As demonstrated in #74219, running
rangefeed over large enough table or perhaps not consuming rangefeed events quickly enough,
may cause undesirable memory blow up, including up to OOMing the node. Since rangefeeds t
end to run on (almost) every node, a possibility exists that a entire cluster might
get destabilized.

One of the reasons for this is that the memory used by low(er) level http2
RPC transport is not currently being tracked -- nor do we have a mechanism
to add such tracking. #74456
provided partial mitigation for this issue, where a single RangeFeed stream could
be restricted to use less memory. Nonetheless, the possibility of excessive memory
usage remains since the number of rangefeed streams in the system could be very large.

This PR introduce a new bi-directional streaming RPC called MuxRangeFeed.
In a uni-directional streaming RPC, the client establishes connection to the
KV server and requests to receive events for 1 span. A separate RPC stream
is created for each span.

In contrast, MuxRangeFeed is bi-directional RPC: the client is expected to
connect to the kv server and request as many spans as it wishes to receive
from that server. The server multiplexes all events onto a single stream,
and the client de-multiplexes those events appropriately on the receiving
end.

Note: just like in a uni-directional implementation, each call to
MuxRangeFeed method (i.e. a logical rangefeed established by the client)
is treated independently. That is, even though it is possible to multiplex
all logical rangefeed streams onto a single bi-directional stream to a
single KV node, this optimization is not currently implementation as it
raises questions about scheduling and fairness. If we need to further
reduce the number of bi-directional streams in the future, from
number_logical_feeds*number_of_nodes down to the number_of_nodes, such
optimization can be added at a later time.

Multiplexing all of the spans hosted on a node onto a single bi-directional
stream reduces the number of such stream from the number_of_ranges down to
number_of_nodes (per logical range feed). This, in turn, enables the
client to reserve the worst case amount of memory before starting the
rangefeed. This amount of memory is bound by the number of nodes in the
cluster times per-stream maximum memory -- default is 2MB, but this can be
changed via dedicated rangefeed connection class. The server side of the
equation may also benefit from knowing how many rangefeeds are running right
now. Even though the server is somewhat protected from memory blow up via
http2 pushback, we may want to also manage server side memory better.
Memory accounting for client (and possible server) will be added in the
follow on PRs.

The use of new RPC is contingent on the callers providing WithMuxRangefeed
option when starting the rangefeed. However, an envornmnet variable "kill
switch" COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED exists to disable the use
of mux rangefeed in case of serious issues being discovered.

Release note (enterprise change): Introduce a new Rangefeed RPC called
MuxRangeFeed. Rangefeeds now use a common HTTP/2 stream per client
for all range replicas on a node, instead of one per replica. This
significantly reduces the amount of network buffer memory usage, which could
cause nodes to run out of memory if a client was slow to consume events.
The caller may opt in to use new mechanism by specifying WithMuxRangefeed
option when starting the rangefeed. However, a cluster wide
COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED environment variable may be set to
false to inhibit the use of this new RPC.

Release Justification: Rangefeed scalability and stability improvement. Safe to merge
since the functionality disabled by default.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@miretskiy
Copy link
Contributor Author

Note: Posting this now to get indication from CI on what's broken. Also going to spend some time benchmarking this stuff.
But would love to get some comments on this.

Copy link
Collaborator

@rytaft rytaft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first commit :lgtm: did not look at the second commit

Reviewed 7 of 7 files at r1.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @erikgrinaker, @miretskiy, and @nvanbenschoten)


pkg/sql/stats/stats_cache.go, line 544 at r1 (raw file):

			// even if it wasn't the descriptor that was used to collect the stats.
			// If have types that are not backwards compatible in this way, then we
			// will need to Start writing a timestamp on the stats objects and request

nit: stray capital S


pkg/sql/stats/create_stats_job_test.go, line 181 at r1 (raw file):

	}

	// Attempt to Start an automatic stats run. It should fail.

nit: more stray capitals


pkg/sql/stats/create_stats_job_test.go, line 211 at r1 (raw file):

	autoStatsRunShouldFail()

	// Attempt to Start a regular stats run. It should succeed.

ditto

@miretskiy
Copy link
Contributor Author

miretskiy commented Jan 26, 2022

first commit :lgtm: did not look at the second commit

Reviewed 7 of 7 files at r1.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @erikgrinaker, @miretskiy, and @nvanbenschoten)

pkg/sql/stats/stats_cache.go, line 544 at r1 (raw file):

			// even if it wasn't the descriptor that was used to collect the stats.
			// If have types that are not backwards compatible in this way, then we
			// will need to Start writing a timestamp on the stats objects and request

nit: stray capital S

pkg/sql/stats/create_stats_job_test.go, line 181 at r1 (raw file):

	}

	// Attempt to Start an automatic stats run. It should fail.

nit: more stray capitals

pkg/sql/stats/create_stats_job_test.go, line 211 at r1 (raw file):

	autoStatsRunShouldFail()

	// Attempt to Start a regular stats run. It should succeed.

ditto

All done.

Copy link
Contributor

@erikgrinaker erikgrinaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just flushing some thoughts after a quick partial look, will continue later.

even though it is possible to multiplex all logical rangefeed streams onto a single bi-directional stream to a single KV node, this optimization is not currently implementation as it raises questions about scheduling and fairness.

This seems like a very reasonable compromise. I'm not convinced multiplexing logical rangefeeds will ever be worth the risk of head-of-line blocking for e.g. system rangefeeds.

This amount of memory is bound by the number of nodes in the cluster times per-stream maximum memory -- default is 2MB, but this can be changed via dedicated rangefeed connection class.

What will be the fate of the rangefeed class with these changes? Will it default to on for 22.1, or are we removing it entirely?

Reviewed 5 of 7 files at r1, 25 of 25 files at r3, 11 of 24 files at r4, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @erikgrinaker, @miretskiy, @nvanbenschoten, and @rytaft)


-- commits, line 63 at r4:
I think this contains too many internal implementation details that are not relevant for end-users. Consider rewriting it along the lines of "Rangefeeds (which are used internally e.g. by changefeeds) now use a common HTTP/2 stream per client for all range replicas on a node, instead of one per replica. This significantly reduces the amount of network buffer memory usage, which could cause nodes to run out of memory if a client was slow to consume events.".


-- commits, line 66 at r4:
nit: couple of typos here


pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 225 at r4 (raw file):

}

func (a *activeRangeFeed) setNodeID(nodeID roachpb.NodeID) {

Can we pass this and the range ID in through the constructor? It seems brittle to set these later.


pkg/roachpb/api.proto, line 2813 at r4 (raw file):

  // To be removed at version 22.2
  rpc RangeFeed          (RangeFeedRequest)          returns (stream RangeFeedEvent)          {}
  rpc RangeFeedStream    (stream RangeFeedRequest)   returns (stream RangeFeedEvent)          {}

This feels like a misnomer: RangeFeed is already streaming. Something like MultiplexedRangeFeed or MultiRangeFeed seems more accurate.


pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go, line 38 at r4 (raw file):

)

func TestingDisableStreamingRangeFeed() func() {

nit: why is this function in this file when the only call is in another file?

@miretskiy
Copy link
Contributor Author

This amount of memory is bound by the number of nodes in the cluster times per-stream maximum memory -- default is 2MB, but this can be changed via dedicated rangefeed connection class.

What will be the fate of the rangefeed class with these changes? Will it default to on for 22.1, or are we removing it entirely?

Not sure. Hopefully, we'll be able to conduct experiments and perhaps reduce default memory; or perhaps use dynamic window; and maybe be able to drop rangefeed class altogether.

Copy link
Contributor Author

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erikgrinaker Thanks for the first pass. Comments addressed.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @erikgrinaker, @nvanbenschoten, and @rytaft)


pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 225 at r4 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

Can we pass this and the range ID in through the constructor? It seems brittle to set these later.

Yeah; it's a bit brittle. I did introduce setRangeID methods. Ideally, i would have a constructor set those.
But at the current call sites, I simply don't have a convenient way of getting node/descriptor ids...

// Register partial range feed with registry.
	active := newActiveRangeFeed(rr, span, startFrom)
	defer active.release()

	// Start a retry loop for sending the batch to the range.
	for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); {
            ...
            Descriptor available here

It would be nice not to have to create multipole active structures for each iteration.
Same range id could move to another node; etc... I just don't see a good mechanism
unless, I construct/destroy active rangefeed structure in a loop)...
May that's not such a bad idea. WDYT?


pkg/roachpb/api.proto, line 2813 at r4 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

This feels like a misnomer: RangeFeed is already streaming. Something like MultiplexedRangeFeed or MultiRangeFeed seems more accurate.

I like this. Thanks. I chose slightly shorter name though MuxRangeFeed.

@miretskiy miretskiy changed the title kv: Introduce RangeFeedStream bi-directional RPC. kv: Introduce MuxRangeFeed bi-directional RPC. Jan 28, 2022
@miretskiy miretskiy force-pushed the rfstream branch 2 times, most recently from ba8351d to c435159 Compare January 28, 2022 20:48
@miretskiy miretskiy requested a review from a team as a code owner January 28, 2022 20:48
@ajwerner
Copy link
Contributor

Something feels a little off to me here, namely, the use of RangeID as mux-demux key alone. Right now you could, by the API contract, and code, as I read it, set up multiple RangeFeeds in the stream on the same range. That would have bizarre-o semantics. You could imagine making an invariant that there's at most one such stream for each range, but that's got complexity. I'd worry about bugs races where the server side doesn't unregister its use of the RangeID before the client sends another request (arbitrarily delayed computation and all that). Anyway, how do you feel about a (rangeID, streamID) tuple for the requests and responses and then the client routes with the streamID and increments a counter? what's 2-3 bytes? It'll make me feel better.

@miretskiy
Copy link
Contributor Author

miretskiy commented Jan 28, 2022

Something feels a little off to me here, namely, the use of RangeID as mux-demux key alone. Right now you could, by the API contract, and code, as I read it, set up multiple RangeFeeds in the stream on the same range. That would have bizarre-o semantics. You could imagine making an invariant that there's at most one such stream for each range, but that's got complexity. I'd worry about bugs races where the server side doesn't unregister its use of the RangeID before the client sends another request (arbitrarily delayed computation and all that). Anyway, how do you feel about a (rangeID, streamID) tuple for the requests and responses and then the client routes with the streamID and increments a counter? what's 2-3 bytes? It'll make me feel better.

I agree with you. I was (and still am) a bit uneasy about this. I was considering sending back the actual span that was sent w/ the request; and sending those back just when the error happened. But, this was causing the problem w/ active range structure management -- I didn't want to loose observability into that (and sending span on each event is too much)... Anyhow, appreciate your comments; Let me ponder on this while people review the rest.

@miretskiy
Copy link
Contributor Author

nit: why is this function in this file when the only call is in another file?

@erikgrinaker the "old" mock style test was moved to dist_sender_rangefeed_mock_test.go. This mock test file is in the kvcoord package. The new
dist_sender_rangefeed_test.go uses real test cluster (so, no mocks). Because this test depends on test cluster, and test clusters depend on dist sender to start rangefeeds, I had to move this test to kvcoord_test package. Alas, for this test I want to test disabling mux rangefeed. So, I just added TestingSetEnableStreamingRangefeed in the mock_test.go because it's in the right package. I could move the function directly into dist_sender_mux_rangefeed ; or I can add another file... Or I can keep things where they are. Your preference?

@miretskiy
Copy link
Contributor Author

Anyway, how do you feel about a (rangeID, streamID) tuple for the requests and responses and then the client routes with the streamID and increments a counter? what's 2-3 bytes? It'll make me feel better.

@ajwerner Done.

@miretskiy miretskiy force-pushed the rfstream branch 2 times, most recently from 4906621 to 278b7fe Compare February 1, 2022 19:07
Copy link
Contributor

@erikgrinaker erikgrinaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added TestingSetEnableStreamingRangefeed in the mock_test.go because it's in the right package. I could move the function directly into dist_sender_mux_rangefeed ; or I can add another file... Or I can keep things where they are. Your preference?

No strong preference. Maybe move it into dist_sender_mux_rangefeed_test.go or something.

But regarding enableMuxRangeFeed, isn't it going to be brittle to be changing this global while tests are running, e.g. in the case of parallel tests?

Reviewed 6 of 27 files at r13.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @erikgrinaker, @miretskiy, @nvanbenschoten, and @rytaft)


pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 225 at r4 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Yeah; it's a bit brittle. I did introduce setRangeID methods. Ideally, i would have a constructor set those.
But at the current call sites, I simply don't have a convenient way of getting node/descriptor ids...

// Register partial range feed with registry.
	active := newActiveRangeFeed(rr, span, startFrom)
	defer active.release()

	// Start a retry loop for sending the batch to the range.
	for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); {
            ...
            Descriptor available here

It would be nice not to have to create multipole active structures for each iteration.
Same range id could move to another node; etc... I just don't see a good mechanism
unless, I construct/destroy active rangefeed structure in a loop)...
May that's not such a bad idea. WDYT?

I think we should be careful about uninitialized state (invites race conditions), and differentiate between mutable and immutable state (avoids unnecessary synchronization). How about this: take rangeID and nodeID via the constructor (to avoid uninitialized state), but allow nodeID to be changed later via setNodeID() since it varies over the lifetime of a rangefeed. Then do something like this in that loop:


var active *activeRangeFeed

for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); {
  if !token.Valid() { ... }

  if active == nil {
    active = newActiveRangeFeed(...)
    defer active.release()
  } else {
    active.setNodeID(token.Desc().NodeID)
  }

pkg/roachpb/api.proto, line 2813 at r4 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I like this. Thanks. I chose slightly shorter name though MuxRangeFeed.

Nice, MuxRangeFeed is great. Consider cleaning up everything else to use "mux" rather than "stream" too, for consistency -- there's still lots of stuff referencing "streaming rangefeeds".

@miretskiy
Copy link
Contributor Author

s/rfPipe/requestPipe

done

@miretskiy
Copy link
Contributor Author

And similarly on the client side, where I think return requestPipe, nil is wrong.

Does this make sense?

@andreimatei thanks so much for taking careful look. I think keeping local interceptors for bi-directional
RPC is important -- so, I'm keeping it. As suggested, I've tried to fix mux rangefeed local context implementation.
In addition, I made updates to context_test to make sure that client and server interceptors
execute for mux rangefeeds. Please take another look.

Copy link
Contributor

@andreimatei andreimatei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@miretskiy I think you're responding to Reviewable comments in Github, which makes the original Reviewable thread dangle.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @erikgrinaker, @miretskiy, @nvanbenschoten, @rytaft, and @tbg)


pkg/rpc/context.go line 1030 at r55 (raw file):

			adapter := muxRangeFeedServerAdapter{
				ServerStream: stream,
				requestPipe:  requestPipe,

I don't think this is quite right. I think that, by using requestPipe here, the server will erroneously bypass both the client and the server-side interceptors when Recv()'ing. I think the muxRangeFeedServerAdapter doesn't want need to have requestPipe at all. This adapter needs to end up delegating to this here stream for both sending and receiving.

TestInternalClientAdapterWithServerStreamInterceptors does not verify that the server stream provided by the interceptor is invoked for Recv() - and that's the part that I believe is broken. In fact, the test's internalServer never calls Recv on the server stream.

Similarly, TestInternalClientAdapterWithClientStreamInterceptors currently appears to test a single thing - that the client stream provided by the test is invoked when the client is receiving messages. It does not test that the client stream provided by the test is invoked when sending messages - and I believe that is broken.

I think what is needed is for the serverStreamInterceptors.run() call below to not use eventPipe; eventPipe pairs the send side and receive side of RangeFeedEvent. Instead, I think we need to pass in something that pairs the send side of RangeFeedEvent with the receive side of RangeFeedRequest.

Similarly, the muxRangeFeedClientAdapter should also not have a requestPipe; it should simply delegate to the ClientStream that it has. Which means that the line return eventPipe, nil is not good. You don't want eventPipe there; you want something that pairs the the receive side of RangeFeedEvent with the send side of RangeFeedRequest.

Does this make sense?


pkg/rpc/context.go line 1080 at r55 (raw file):

var _ roachpb.Internal_MuxRangeFeedClient = muxRangeFeedClientAdapter{}

func (a muxRangeFeedClientAdapter) SendMsg(m interface{}) error {

you don't need to override the ClientStream methods if all you do is delegate to the embedded member


pkg/rpc/context.go line 1091 at r55 (raw file):

	// Mark this request as originating locally.
	request.AdmissionHeader.SourceLocation = roachpb.AdmissionHeader_LOCAL
	return a.requestPipe.SendMsg(request)

I don't think this is right. You want to delegate to ClientStream here, not requestPipe.


pkg/rpc/context.go line 1123 at r55 (raw file):

func (a muxRangeFeedServerAdapter) Recv() (*roachpb.RangeFeedRequest, error) {
	m := new(roachpb.RangeFeedRequest)
	if err := a.requestPipe.RecvMsg(m); err != nil {

I don't think this is right. You want to delegate to ServerStream here, not requestPipe.


pkg/rpc/context_test.go line 329 at r55 (raw file):

func (s *internalServer) MuxRangeFeed(stream roachpb.Internal_MuxRangeFeedServer) error {
	s.muxRfServerStream = stream
	for _, ev := range s.rangeFeedEvents {

I think this is too simplistic of an implementation of the server to be useful for the tests. We should be receiving requests from stream.


pkg/server/node.go line 1254 at r28 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

In server code we generally use Stopper.RunAsyncTask(), to hook it up to the stopper, tracing infrastructure, and so on. If we're going to use these other concurrency primitives, I think they should be extended with similar infrastructure.

@andreimatei What's your take?

RPCs already run under the stopper because of this interceptor, so the whole MuxRangeFeed call lives in a task.
I think this code wants to fork new spans for every asyncRangeFeed, but otherwise I think using ctxGroup is fine. Extending ctxgroup with tracing options is possible, but sometimes it's overkill (and it does cost some), so... it hasn't been done yet.


pkg/server/node.go line 1329 at r55 (raw file):

			return errors.CombineErrors(err, rfGrp.Wait())
		}
		rfGrp.GoCtx(n.asyncRangeFeed(*req, muxStream))

let's inline the code for asyncRangeFeed here for readability (i.e. avoid returning a function). Nobody else seems to be calling that function.


pkg/server/testserver.go line 812 at r55 (raw file):

		stopper = stop.NewStopper(stop.WithTracer(tr))
		// The server's stopper stops the tenant, for convenience.
		// Use main server quiesce as a signal to stop tenants stopper. in the perfect world, we

s/in/In


pkg/server/testserver.go line 813 at r55 (raw file):

		// The server's stopper stops the tenant, for convenience.
		// Use main server quiesce as a signal to stop tenants stopper. in the perfect world, we
		// want to have tenant stopped before the main server.  Using ts.Stopper().AddCloser() to

Closer are introduced too abruptly in this paragraph; start by saying something like Note that doing this in a ts.Stopper.AddCloser() doesn't work because it's too late...


pkg/server/testserver.go line 819 at r55 (raw file):

		if err := ts.Stopper().RunAsyncTask(ctx, "propagate-cancellation-to-tenant", func(ctx context.Context) {
			<-ts.Stopper().ShouldQuiesce()
			stopper.Stop(context.Background())

can't use ctx here?

@miretskiy
Copy link
Contributor Author

you don't need to override the ClientStream methods if all you do is delegate to the embedded member

Done

  • Test server comments addressed.

let's inline the code for asyncRangeFeed here for readability (i.e. avoid returning a function). Nobody else seems to be calling that function.

Done.

RPCs already run under the stopper because of this interceptor, so the whole MuxRangeFeed call lives in a task.
I think this code wants to fork new spans for every asyncRangeFeed, but otherwise I think using ctxGroup is fine. Extending ctxgroup with tracing options is possible, but sometimes it's overkill (and it does cost some), so... it hasn't been done yet.

Added forked span.

@miretskiy miretskiy force-pushed the rfstream branch 5 times, most recently from b3664b9 to 0f91949 Compare August 17, 2022 23:52
@miretskiy
Copy link
Contributor Author

@andreimatei please take a look at updated "pipe" implementations in context.go

Table statistics cache starts a rangefeed process very early
during server initialization.  This type of work should be started
when "start" method is called.  This PR does just that.

Release Notes: None
When starting tenant server in tests, the tenant server was stopped
by propagating stopper signal from main server to tenant test server.
Unfortunately, this could result in stuck or slow test shutdown since
closers in stopper run as the very last thing. Use main test server
`WithCancelOnQuiesce` to propagate stop signal to the tenant test server.

Release Notes: None
@miretskiy
Copy link
Contributor Author

Made some minor updates to the context test; CI green; @andreimatei would appreciate final 👁️ on context stuff.

Copy link
Contributor

@andreimatei andreimatei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @erikgrinaker, @miretskiy, @nvanbenschoten, @rytaft, and @tbg)


pkg/rpc/context.go line 1172 at r57 (raw file):

}

// eventPipe is a (uni-directional) pipe of *RangeFeedEvent that implements

comment about RangeFeedEvent is stale?


pkg/rpc/context.go line 1021 at r63 (raw file):

	go func() {
		// Handler adapts the ServerStream to the typed interface expected by the
		// RPC handler (Node.RangeFeed). `stream` might be `eventPipe` which we

s/RangeFeed/MuxRangeFeed
s/eventPipe/serverSide


pkg/rpc/context.go line 1107 at r63 (raw file):

}

// channelStream implements grpc.ClientStream, grpc.ServerStream and grpc.Stream

Does it fully implement these interfaces?
Also, is saying that "all other methods panic" fair?
Perhaps say that this is a helper for implementing whatever interfaces.


pkg/rpc/context.go line 1111 at r63 (raw file):

// This type simply provides access to the context (as required by grpc.Stream) interface
// while all other methods panic.  Type expected to be embedded inside other types.
// Channel stream exchanges messages via channel.

s/channel/respC ?


pkg/rpc/context.go line 1146 at r63 (raw file):

	case s.respC <- m:
		return nil
	case <-s.doneC:

why was doneC necessary, compared to <-s.ctx.Done() as we had before? Given that we return s.ctx.Err(), it looks like this code is assuming that doneC is s.ctx.Done()...


pkg/rpc/context.go line 1152 at r63 (raw file):

// recv is the implementation of RecvMsg.
func (s *channelStream) recv() (interface{}, error) {

FWIW, I think we can make channelStream.recv() generic, using reflection, so that it matches grpc's RecvMsg interface:

	RecvMsg(m interface{}) error

Here's how you can implement *dst = *src in a type-generic way: https://go.dev/play/p/onlCwWu6kiD. I'm not sure about the performance, though, but if it's not much worse than what we have now, I'd probably find it preferable to having to parameterize eventPipe with a callback for the only purpose of assigning a pointer.

But also see below.


pkg/rpc/context.go line 1184 at r63 (raw file):

var _ grpc.ServerStream = &eventPipe{}

// SendMsg is part of the grpc.ServerStream interface. It is also part of the

nit: what's left of the old comment looks bizarre. Say `SendMsg is part of the grpc.{Server,Client}Stream interfaces.


pkg/rpc/context.go line 1190 at r63 (raw file):

}

// RecvMsg is part of the grpc.ClientStream interface. It is also technically

comment is stale


pkg/rpc/context.go line 1198 at r63 (raw file):

}

// newRangeFeedPipe creates a eventPipe for receiving RangeFeedEvent.

s/receiving/sending and receiving


pkg/rpc/context.go line 1207 at r63 (raw file):

		channelStream: cs,
		receiver: func(dst interface{}) error {
			msg, err := cs.recv()

I think the way in which eventPipe interacts with its channelStream is unclear: it embeds it, and whomever constructs an eventPipe also needs to hook up receiver to the same channelStream.

But also, I don't think we have the layer right here. I'm bothered by the eventPipe's relationship with the channelStream; see below.


pkg/rpc/context.go line 1211 at r63 (raw file):

				return err
			}
			*dst.(*roachpb.RangeFeedEvent) = *msg.(*roachpb.RangeFeedEvent)

FWIW, I think you can also make channelStream.recv() generic, using reflection, so that you don't need the assign function at all. Here's how you can implement *dst = *src in a type-generic way: https://go.dev/play/p/onlCwWu6kiD. I'm not sure about the performance, though, but if it's not much worse than what we have now, I'd probably find it preferable to having to parameterize eventPipe with the assignment function.
Is there a benchmark that stresses changefeeds? Cause I'd run it to see if I can get away with reflection.


pkg/rpc/context.go line 1226 at r63 (raw file):

	clientStream := makeChannelStream(ctx)
	serverStream := makeChannelStream(ctx)
	clientPipe = &eventPipe{

I think the use of eventPipe here is very confusing. A "pipe" is supposed to have two ends, and what you put into one end comes out at the other. But that's not what this clientPipe does: what you put into one end does not come out at the other end. The clientPipe embeds a channelStream, which strongly suggests that the channelStream.send() method will be used by sending on the pipe (the comment on channelStream.send() even says that this is the implementation of SendMsg), but that's not the case. Realistically, nobody new is going to understand what's going on here. I think the embedding has to go.

I think we need to figure out a different layering that's less confusing.

Please see the last commit here for my take on it. If you like it, take it and squash it in.

@miretskiy
Copy link
Contributor Author

Comments updated.

why was doneC necessary, compared to <-s.ctx.Done() as we had before? Given that we return s.ctx.Err(), it looks like this code is assuming that doneC is s.ctx.Done()...

Reverted; though often times in tight loop situation it's better to save a copy of doneC. But no need to do this now.

Please see the last commit here for my take on it. If you like it, take it and squash it in.

Stealing your suggested changes.

As demonstrated in cockroachdb#74219, running rangefeed over large enough table or
perhaps not consuming rangefeed events quickly enough, may cause undesirable
memory blow up, including up to OOMing the node.  Since rangefeeds tend to
run on (almost) every node, a possiblity exists that a entire cluster might
get destabilized.

One of the reasons for this is that the memory used by low(er) level http2
RPC transport is not currently being tracked -- nor do we have a mechanism
to add such tracking.  cockroachdb#74456 provided partial mitigation for this issue,
where a single RangeFeed stream could be restricted to use less memory.
Nonetheless, the possibility of excessive memory usage remains since the
number of rangefeed streams in the system could be very large.

This PR introduce a new bi-directional streaming RPC called `MuxRangeFeed`.
In a uni-directional streaming RPC, the client establishes connection to the
KV server and requests to receive events for 1 span.  A separate RPC stream
is created for each span.

In contrast, `MuxRangeFeed` is bi-directional RPC: the client is expected to
connect to the kv server and request as many spans as it wishes to receive
from that server. The server multiplexes all events onto a single stream,
and the client de-multiplexes those events appropriately on the receiving
end.

Note: just like in a uni-directional implementation, each call to
`MuxRangeFeed` method (i.e. a logical rangefeed established by the client)
is treated independently.  That is, even though it is possible to multiplex
all logical rangefeed streams onto a single bi-directional stream to a
single KV node, this optimization is not currently implementation as it
raises questions about scheduling and fairness.  If we need to further
reduce the number of bi-directional streams in the future, from
`number_logical_feeds*number_of_nodes` down to the `number_of_nodes`, such
optimization can be added at a later time.

Multiplexing all of the spans hosted on a node onto a single bi-directional
stream reduces the number of such stream from the `number_of_ranges` down to
`number_of_nodes` (per logical range feed).  This, in turn, enables the
client to reserve the worst case amount of memory before starting the
rangefeed.  This amount of memory is bound by the number of nodes in the
cluster times per-stream maximum memory -- default is `2MB`, but this can be
changed via dedicated rangefeed connection class.  The server side of the
equation may also benefit from knowing how many rangefeeds are running right
now.  Even though the server is somewhat protected from memory blow up via
http2 pushback, we may want to also manage server side memory better.
Memory accounting for client (and possible server) will be added in the
follow on PRs.

The use of new RPC is contingent on the callers providing `WithMuxRangefeed`
option when starting the rangefeed.  However, an envornmnet variable "kill
switch" `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` exists to disable the use
of mux rangefeed in case of serious issues being discovered.

Release note (enterprise change): Introduce a new Rangefeed RPC called
`MuxRangeFeed`.  Rangefeeds now use a common HTTP/2 stream per client
for all range replicas on a node, instead of one per replica. This
significantly reduces the amount of network buffer memory usage, which could
cause nodes to run out of memory if a client was slow to consume events.
The caller may opt in to use new mechanism by specifying `WithMuxRangefeed`
option when starting the rangefeed.  However, a cluster wide
`COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` environment variable may be set to
`false` to inhibit the use of this new RPC.

Release justification: Rangefeed scalability and stability improvement.
Safe to merge since the functionality disabled by default.
Copy link
Contributor

@andreimatei andreimatei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: for the RPC part

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @erikgrinaker, @miretskiy, @nvanbenschoten, @rytaft, and @tbg)

@miretskiy
Copy link
Contributor Author

bors r+

1 similar comment
@miretskiy
Copy link
Contributor Author

bors r+

@craig
Copy link
Contributor

craig bot commented Aug 18, 2022

Already running a review

@craig
Copy link
Contributor

craig bot commented Aug 19, 2022

Build succeeded:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants