Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming API buffers messages leaving potential for data loss #2159

Closed
cstockton opened this issue Jun 19, 2018 · 14 comments
Closed

Streaming API buffers messages leaving potential for data loss #2159

cstockton opened this issue Jun 19, 2018 · 14 comments
Labels
P2 Type: Feature New features or improvements in behavior

Comments

@cstockton
Copy link

Please answer these questions before submitting your issue.

What version of gRPC are you using?

1.12.2

What version of Go are you using (go version)?

go10.0.3

What operating system (Linux, Windows, …) and version?

Linux

What did you do?

Given the following grpc.ClientStream code (with GetStream in place of boiler plate for GobgpApi_InjectMrtClient) and reqs is a slice of 255 valid messages:

ctx, cancel = context.WithCancel(ctx)
defer cancel()

stream, err := client.GetStream(ctx)
if err != nil {
	t.Fatalf("exp nil err; got %v", err)
}
for _, req := range reqs {
	if err = stream.Send(req); err != nil {
		t.Fatalf("exp nil err from Send; got %v", err)
	}
}

// One of the two code paths below are executed here after all Send() calls were executed with no non-nil errs.
...

The code below will lose messages:

// Cancel first results in around 4-32 of 255 messages being sent to remote
// endpoint and the below error returned by CloseAndRecv():
//
//   rpc error: code = Canceled desc = context canceled
//
{
	cancel()
	fmt.Println(stream.CloseAndRecv())
}

While the code below will ensure all 255 messages are delivered.

// Cancel after CloseAndRecv results in all messages being sent and no error.
{
	fmt.Println(stream.CloseAndRecv())
	cancel()
}

The documentation for SendMsg led me to believe that once Send returns my message has been sent. I accept that "sends m" may mean it sends to some kind of internal buffer but I believe this is important and should be documented with emphasis.

// SendMsg blocks until it sends m, the stream is done or the stream
// breaks.

// On error, it aborts the stream and returns an RPC status on client
// side. On server side, it simply returns the error to the caller.
// SendMsg is called by generated code. Also Users can call SendMsg
// directly when it is really needed in their use cases.
// It's safe to have a goroutine calling SendMsg and another goroutine calling
// recvMsg on the same stream at the same time.
// But it is not safe to call SendMsg on the same stream in different goroutines.
SendMsg(m interface{}) error

So I am a bit confused again by the semantics around GRPC streaming API even after clarification of #2071. I checked grpc options and other config knobs and couldn't find anything to provide guarantees for Send().

The obvious fix is don't do the first thing! But unfortunately Send() happens in a worker Goroutine that selects on a job channel which results in a call to Send, or exits when the context is done. Since Send may have buffered messages but gives me no way to flush them (that I can find) without closing the stream I can't guarantee a job.Reply() actually resulted in a Send() since the work context could end at any moment resulting in all pending messages being discarded.

The only option I can think of is splitting the context. Meaning I'll have to give the grpc stream a separate context.Background() wrapped with context.WithCancel and start another goroutine which will wait for the real request context to be done, at which point I call sr.CloseAndRecv and finally the wrapped grpc stream ctx cancel() which ends the 3rd cancellation goroutine that was started upon stream creation by grpc.

I think the most reasonable fix is for Send to block until a message is actually sent or provide a mechanism to flush even if it an optional interface I have to assert on i.e. (sr.(grpc.Flusher).Flush()). Of course if I am missing something I would be open to other options that do not involve more goroutines / moving parts. Thanks for the help.

@MakMukhi
Copy link
Contributor

Hey @cstockton ,
Thanks for opening the issue. I apologize that the documentation is not clear enough to convey that the send doesn't lead to message being written on the wire. We'll update that.
As for blocking until a message is written on wire, that would require augmenting the API surface. We'll have a discussion over that and get back to you.
Note we can't make this our default behavior since it severely hampers performance.

@MakMukhi MakMukhi added P2 Type: Feature New features or improvements in behavior labels Jun 19, 2018
@cstockton
Copy link
Author

@MakMukhi Thanks, though I am concerned that there is invisible buffering on something that has a primitive IO interface, i.e.: zero controls or documentation around how it buffers messages internally. How and when does it create back pressure? Since buffering indefinitely must not happen here as it would be a security issue (immediate potential for a denial of service vector) it has to block at some point. Performance is not being increased via the current API, the cost is being hidden in exchange for safety and reliability by removing. An exchange that I believe goes against a core tenet of high quality library design in Go: block callers by default.

I understand that not everyone may share this opinion- but I believe their is only one path forward: API must be blocking. Maybe an example underneath with a few lines of code to get the identical (user facing) behavior that GRPC has today would be a better alternative:

ch := make(chan Thing, HowBigIsThis_NobodyKnows) 
go func() {
    stream, err := grpc.New...
    for {
         select { case <-ctx.Done(): ...; case thing := <-ch: stream.Send(thing); }
    }
}()

@MakMukhi
Copy link
Contributor

The buffering is not indefinite, every time a message is sent it takes away from a local soft limit of 64KB which is then replenished as bytes are written on the wire. This also provides user back pressure and prevents them from accidentally OOMing their local machine.
Safety on the send side it is about preventing users from accidentally OOMing themselves and not protecting them against a malicious DoS.

All of this is internal implementation and therefore not documented in the API. Also note that send and recv are not primitive calls they do not act on the underlying connection.

Blocking on a write call until it has been been written on the wire has problems like the following:

  1. User initiates a write and blocks.
  2. The underlying connection is lost.
  3. Write takes say 20-30 mins to realize that and blocks until then.
  4. Note that net.Conn Write doesn't take a context that could timeout.
  5. Even if the stream's deadline expired, the write operation will be blocked for that long.

@cstockton
Copy link
Author

The buffering is not indefinite, every time a message is sent it takes away from a local soft limit of 64KB which is then replenished as bytes are written on the wire.

So depending on the size of the message thousands of messages may be lost in a short burst? I changed my request counts to find the maximum message loss for my use case is 2800, so I can infer around 2800 messages fit in this 64k buffer.

  • 2048 delivered 35-43 consistently for a loss of 2000 messages.
  • 4096 delivered 1350-1390 consistently for a loss of 2700 messages.
  • 8192 delivered 5480-5490 consistently for a loss of 2800 messages.

This also provides user back pressure and prevents them from accidentally OOMing their local machine. Safety on the send side it is about preventing users from accidentally OOMing themselves and not protecting them against a malicious DoS.

I am glad that there are defined bounds- but to be clear you are not naming this as a feature of the current design right? Because the same would be true if there was no buffering at all.

All of this is internal implementation and therefore not documented in the API. Also note that send and recv are not primitive calls they do not act on the underlying connection.

Thanks for helping me understand the original rationale for not documenting the fact the library could drop 64kb of user messages. I think at this point we can agree this is not an internal implementation detail- but user facing since it has been noticed by one.

Blocking on a write call until it has been been written on the wire has problems like the following:

Blocking on a write does not mean blocking until success, it means blocking until the most immediate definite outcome. In your example as soon as the connection is lost you have an outcome (broken pipe, io.EOF, whatever)- so return it. Leave the decision to retry 20-30 minutes in a tight loop to the very capable programmer utilizing the library.

Note that net.Conn Write doesn't take a context that could timeout.

It has deadlines, which are set by context.WithTimeout and context.WithDeadline. If the current API faces challenges to crossing context and net boundaries then I would be happy to help solve those. But I think Go developers are very accustomed to setting context deadlines as well as having some separate (often constant) values for maximum time to wait for network deadlines for background work.

I don't feel sufficient technical merit has emerged here to justify the current design, nor will it given how well traveled this area of design is. I believe it should be the number one priority of the rpc to provide strong guarantees around message delivery, but I understand if the grpc authors will not concede here. I'll keep an eye out on this issue in the future, for now I'm switching to the equivalent non-streaming API. Thanks!

@MakMukhi
Copy link
Contributor

Blocking on a write does not mean blocking until success, it means blocking until the most immediate definite outcome. In your example as soon as the connection is lost you have an outcome (broken pipe, io.EOF, whatever)- so return it.

That's the problem; the underlying net.Conn write call won't return for 20-30 mins until it realizes the connection is not reachable any more.

t has deadlines, which are set by context.WithTimeout and context.WithDeadline. If the current API faces challenges to crossing context and net boundaries then I would be happy to help solve those. But I think Go developers are very accustomed to setting context deadlines as well as having some separate (often constant) values for maximum time to wait for network deadlines for background work.

The only way to set deadlines on net.Conn is through an API call which would mean that we set and reset the deadline for every single write call we make.

Also, the whole premise of this issue is about client canceling the context without reading status back from the server. Note this is not an expected usage of gRPC.
So if one were to follow the correct usage pattern, the safety check to prevent OOMing remains opaque to the user.

I agree that the documentation needs to be updated but disagree with updating the design due to incorrect usage resulting from poor documentation.

@cstockton
Copy link
Author

Blocking on a write does not mean blocking until success, it means blocking until the most immediate definite outcome. In your example as soon as the connection is lost you have an outcome (broken pipe, io.EOF, whatever)- so return it.

That's the problem; the underlying net.Conn write call won't return for 20-30 mins until it realizes the connection is not reachable any more.

It will only block for for 20-30 minutes because there is no deadlines, the entire point of my post was to add them.

The only way to set deadlines on net.Conn is through an API call which would mean that we set and reset the deadline for every single write call we make.

The streaming API made the unconventional design decision to bind ctx to the Stream object and not the calls that actually block, like Send. Which means you don't need to set a deadline on every single write call, because the context is only received once. If you multiplex onto a pool of streams you would just have a baseline timeout and retry when deadline was exceeded but ctx was not done.

Also, the whole premise of this issue is about client canceling the context without reading status back from the server. Note this is not an expected usage of gRPC.
So if one were to follow the correct usage pattern, the safety check to prevent OOMing remains opaque to the user.

What you are telling me is that this is unexpected usage:

	http.HandleFunc("/bar", func(w http.ResponseWriter, r *http.Request) {
		stream, err := client.GetStream(r.Context())
		if err != nil {
			http.Error(w, ...)
			return
		}
		defer stream.CloseAndRecv()

		for _, req := range reqs {
			if err = stream.Send(req); err != nil {
				http.Error(w, ...)
				return
			}
		}
	})

I disagree, I believe your API has an unexpected design that is a risk to users data. It introduces a type of subtlety that is extremely difficult to track down. Send doesn't send anything. It queues an object to maybe be sent, maybe not. The SendMsg func obtains some buffer allowance in Write(concerning, please see [1]) before calling controlBuf.put which does little more than wrap a mutex around an unbound enqueue to a linked list and potentially wake a waiting receiver, essentially reproducing channel semantics. I can't think of a reason why you would do this other than specifically wanting an unbound queue.

Essentially it is UNSAFE (if you care about messages getting delivered) to use the streaming API with a context that can be canceled, which.. is most any context in use by any go program I've ever seen. That is why context is used. But if you feel it is more reasonable is for me to give GRPC stream a background Context() and monitor my request context until it's done, at which point I stop sends and call "CloseAndRecv()" than I have no option but to do so. I've never had to solve this unusual problem before, but I guess you expect users to write something like below or do you have another suggestion?

	http.HandleFunc("/bar", func(w http.ResponseWriter, r *http.Request) {
		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()

		stream, err := client.GetStream(ctx)
		if err != nil {
			http.Error(w, ...)
			return
		}
		defer stream.CloseAndRecv()

		for _, req := range reqs {
			if err = r.Context().Err(); err != nil {
				http.Error(w, ...)
				return
			}
			if err = stream.Send(req); err != nil {
				http.Error(w, ...)
				return
			}
		}
	})

Though that does no good for any other type of failure, panics, hardware failure, and so on. The bottom line here is that the API provides no way to ensure a prior Send has completed without closing the stream all together. This contradicts the very reason why I want to stream items, to make incremental progress towards my goal. I don't think it's unreasonable to want to be able to measure or create checkpoints at any interval of my choice as if I was writing to a file. It's like an os.File with Sync removed and now takes a context.Context and starts a goroutine running a f.loopy(ctx) writer! To ensure your data is written you have to call f.Close() or write "enough" data for loopy to decide to Sync for you, for performance reasons. Not unix like, Go like, or anything like.

Anyways, if you are concerned about backwards compatibility I would be happy to brainstorm ways to fix this API while keeping performance gains you claim would be lost. But if you really believe this design is sound, feel free to close the issue I've made as much of a case as I can the rest is up to the GRPC authors.

@MakMukhi
Copy link
Contributor

The streaming API made the unconventional design decision to bind ctx to the Stream object and not the calls that actually block, like Send. Which means you don't need to set a deadline on every single write call, because the context is only received once. If you multiplex onto a pool of streams you would just have a baseline timeout and retry when deadline was exceeded but ctx was not done.

So are you suggesting that the transport keeps a heap of deadlines and every time a new send message call comes in we add it to the heap and update the underlying net.Conn's deadline? I'd really appreciate it if you think over design ideas thoroughly before standing by them vehemently. By the not buffering any thing means a syscall for every frame that we write!

What you are telling me is that this is unexpected usage:

No, I don't see you canceling any contexts on gRPC stream before CloseAndRecv finishes.

I can't think of a reason why you would do this other than specifically wanting an unbound queue.

If you're curious, feel free to look up the PRs that made these changes and the associated benchmark numbers. A channel and serveral other data structures were tried and the current implementation proved to be most optimal.
Now I'm not saying this is the most optimal implementation that there can ever be. This is an active effort, after all.

Essentially it is UNSAFE (if you care about messages getting delivered) to use the streaming API with a context that can be canceled, which.. is most any context in use by any go program I've ever seen.

Wow, that's definitely an exaggeration! Do you expect a "sent" message to be received by the other side instantly? If gRPC doesn't buffer then, the kernel will and so will all the network buffers along the way. Just don't cancel the context until you have got a status from the server!

It's like an os.File with Sync removed and now takes a context.Context and starts a goroutine running a f.loopy(ctx) writer!

Funny you should mention that because the underlying net.Conn is very similar to that minus the context of course. You can read about it more here. The point here is that a network is not just an os.File, there are other considerations and performance is a big one. We can't just make syscalls for every message that goes out.

Among this we haven't even discussed flow control by the way. Each data frame that goes out needs to be checked against stream level and connection level flow control. The latter will require a global lock to be acquired by every stream for every data frame to go out. That's a very high contention lock!

Moreover, if you want to wait for every send to actually go on the wire, you have a bunch of stream goroutines that are waiting for each other to write(connection level flow control requires cooperation). Also, don't forget round trip times for window acks that a send might have to wait for. These user goroutines can be instead doing meaningful work!

I really don't want to justify our design choices further. If you're so inclined, I encourage you to perhaps come up with a prototype and run it against our benchmarks. If it makes things better there's no reason gRPC won't accept it.

@cstockton
Copy link
Author

This was a very difficult to issue to track down because there are so many moving parts when the system is shutting down which is the only time this issue surfaced. I think some of my frustration began to leak into the issue once I saw it being dismissed, I'm sorry for that. It's also disappointing I've somehow failed to show you the difficult ergonomics users face with the current API, despite multiple snippets of code that I felt were clear examples. The latest of which you replied:

No, I don't see you canceling any contexts on gRPC stream before CloseAndRecv finishes.

You have demonstrated the subtlety of the API first hand by giving me a thumbs up on this. Maybe you can come back to it when you have a clear head and look at the below snippet again. Once you notice it I challenge you face the difficult ergonomics of the library first hand by writing a version that ensures all messages are sent before the function exits:

What you are telling me is that this is unexpected usage:

	http.HandleFunc("/bar", func(w http.ResponseWriter, r *http.Request) {
		stream, err := client.GetStream(r.Context())
		if err != nil {
			http.Error(w, ...)
			return
		}
		defer stream.CloseAndRecv()

		for _, req := range reqs {
			if err = stream.Send(req); err != nil {
				http.Error(w, ...)
				return
			}
		}
	})

Finally I forgot to include the footnote for my [1] on Write. I wanted to make sure that there is no way in gRPC to send a data frame with no data or header? It appears that in such a scenario the get() that acts as the write barrier to create back pressure when writes are slow could be skipped, causing unbound writes to the linked list.

@MakMukhi
Copy link
Contributor

I can relate to the frustration resulting from complex, sometimes unfortunate, engineering problems. I've been there several times myself.

I do think that blocking a Send until the message is written on the wire is a reasonable feature to expect. Moreover, this can be accomplished fairly easily in the current implementation. I have strong apprehensions about making it the default behavior however.

Following is how I envision this behavior can be added:
Each dataFrame can be augmented with another closure onWrite, that signals the sending goroutine when the complete message is written on the wire by loopy. In fact, this was done in one of the iterations but decided against because of the performance impact it had for streams just wanting to send a lot of small messages. This is the reason we don't want it to be our default behavior.
To signal the transport we can add a field to Options that tells it if the Write call needs to be blocking.

The API surface can also be augmented to support blocking with something like.

type Blocker interface {
SendMsgBlocking()
}

clientStream struct will implement that method.

The generated code, can then, have a method like:

func (x *routeGuideRecordRouteClient) SendBlocking(m *Point) error {
 x.ClientStream.(Blocker).SendMsgBlocking()
}

Here's a running example of such a pattern.

If you think gRPC should have this API, I encourage you to open another issue specifically for that.

I wanted to make sure that there is no way in gRPC to send a data frame with no data or header?

The only way to send an empty data frame is by closeSend which can happen only once for a stream.

jeanbza added a commit to jeanbza/grpc-go that referenced this issue Jun 22, 2018
jeanbza added a commit that referenced this issue Jun 27, 2018
documentation: clarify SendMsg documentation

Relevant issue: #2159
jeanbza added a commit to jeanbza/grpc-go that referenced this issue Jul 3, 2018
Deprecate Stream, and move the methods and documention to ServerStream
and ClientStream. This is due to the fact that there are different
semantics for SendMsg, and it's quite confusing to document one method
for two things. Furthermore, Stream is not actually used in any way
other than to be inherited by ClientStream and ServerStream.

Relevant issue: grpc#2159
jeanbza added a commit to jeanbza/grpc-go that referenced this issue Jul 3, 2018
Deprecate Stream, and move the methods and documention to ServerStream
and ClientStream. This is due to the fact that there are different
semantics for SendMsg, and it's quite confusing to document one method
for two things. Furthermore, Stream is not actually used in any way
other than to be inherited by ClientStream and ServerStream.

Relevant issue: grpc#2159
jeanbza added a commit that referenced this issue Jul 9, 2018
docs: deprecate stream, move documentation to client|server stream

Deprecate Stream, and move the methods and documention to ServerStream
and ClientStream. This is due to the fact that there are different
semantics for SendMsg, and it's quite confusing to document one method
for two things. Furthermore, Stream is not actually used in any way
other than to be inherited by ClientStream and ServerStream.

Relevant issue: #2159
@brettbuddin
Copy link

@cstockton Have you managed to open another issue for this newer API specifically? We here at @codeship are in a similar predicament. I'd be willing to help draft a proposal and work on an implementation for it.

@cstockton
Copy link
Author

@brettbuddin I came to the conclusion I was failing to convey the difficulties of using this API and took no further action. I would be more than happy to give feedback on any API proposals, but don't have any desire to drive the effort. Thanks for reaching out to me hope you find a work around.

@dfawley
Copy link
Member

dfawley commented Nov 14, 2018

Even if you are notified as a message is written to the wire, as suggested in @MakMukhi's comment above, it is still not an indication that the server will ever receive it. It could be lost if a network error occurs after transmission. It could be lost if the server crashes. Or, it could sit in a buffer on the server, waiting to be Recv()d when stream cancellation happens. Cancellation is ~immediate, meaning it will abort the whole RPC ASAP and result in lost messages from the server's internal buffers.

The only way to ensure the server received your messages is to either build it into your protocol (via streaming ACK responses) or wait for successful RPC completion. This is a fundamental part of the way gRPC works, and all gRPC implementations have this behavior.

If you want your stream to outlive another context, don't derive your stream's context from the other context. It is admittedly a bit unusual in Go for a context to outlive a function call, but cancellation propagation is a core gRPC feature, this was the most natural way to express it, and this mechanism works similarly in the other languages as well.

@cstockton
Copy link
Author

@dfawley Developers are looking for the same reasonable guarantees they find in most any language or library they use, including the Go standard library: A call to Send*|Write* blocks until a corresponding system call has completed. Under these circumstances they may use their understanding of the source and destination systems network configuration details to create delivery assumptions to determine if further protocol support would be needed to meet their SLA.

For GRPC the guarantee is that your message will be placed into a buffer in memory. So to understand delivery guarantees one must dig through thousands of lines of code as I did, only to come to the conclusion under normal usage there are none. Since as you stated above the conditions which delivery can have reasonable guarantees require writing unusual Go code- that is you must be aware that messages may not be delivered to write such code. Though I disagree that you can even write software to work around this, since at some point you need to have a mechanism for cancellation. Even with a separate background context- how do you eventually give up and measure the progress you have made thus far, given you have no way to check how many of the existing buffered messages resulted in a system call?

To summarize I disagree with the counter argument that delivery guarantees at the media or destination host layer is impossible with additional protocol support. While you are not wrong I don't see how it's related to GRPC lacking the same reasonable guarantees (1 send -> 1 host system call) provided by most any other networking library developers have ever used. I also disagree with performance benefits given that I consider performance gains that sacrifice correctness or introduce bullet-item lists of nuance (i.e. grpc stream docs) to have no merit. Though stream send requests could carry synchronization back to the blocked callers once a corresponding system call has been made with little affect on throughput capability of concurrent senders, affecting only the synthetic benchmarks of single-senders.

@dfawley
Copy link
Member

dfawley commented Nov 15, 2018

A call to Send*|Write* blocks until a corresponding system call has completed.

The gRPC-Go streaming API is not intended to be conceptually at the same level as file descriptor operations. Applications that write to FDs need to know when syscalls are completed for various reasons. gRPC streams are a much higher level API. Applications that write to grpc streams have no reason to worry about syscalls; they should only be concerned with message delivery guarantees.

to understand delivery guarantees one must dig through thousands of lines of code

The comments on SendMsg explain this (possibly changed after this issue was filed):

    // SendMsg does not wait until the message is received by the server. An
    // untimely stream closure may result in lost messages. To ensure delivery,
    // users should ensure the RPC completed successfully using RecvMsg.

Even with a separate background context- how do you eventually give up and measure the progress you have made thus far, given you have no way to check how many of the existing buffered messages resulted in a system call?

You would need server-streaming ACKs if you are concerned with partial RPC progress (regardless of our API).

performance gains that sacrifice correctness

My point is that it doesn't make any difference to the application whether the syscall happens before or after SendMsg returns, in terms of knowing whether the server received -- or will ever receive -- the message. Given that, there is no reason to block. It does not impact correctness.

@lock lock bot locked as resolved and limited conversation to collaborators May 14, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
P2 Type: Feature New features or improvements in behavior
Projects
None yet
Development

No branches or pull requests

4 participants