Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi ClientAsyncWriter::Write() calls will crash. #7659

Closed
jinq0123 opened this issue Aug 7, 2016 · 17 comments

Comments

Projects
None yet
6 participants
@jinq0123
Copy link

commented Aug 7, 2016

I changed TEST_P(AsyncEnd2endTest, SimpleClientStreaming) in async_end2end_test.cc to test client async writing and found 7 calls of ClientAsyncWriter::Write() will crash.

TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
  ...
  std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
      stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
  ...
  cli_stream->Write(send_request, tag(31));
  cli_stream->Write(send_request, tag(32));
  cli_stream->Write(send_request, tag(33));
  cli_stream->Write(send_request, tag(34));
  cli_stream->Write(send_request, tag(35));
  cli_stream->Write(send_request, tag(36));
  cli_stream->Write(send_request, tag(37));
  ...
}

Call stacks:

    async_end2end_test.exe!memset(unsigned char * dst, unsigned char value, unsigned long count) 行 137    未知
    async_end2end_test.exe!call_start_batch(grpc_exec_ctx * exec_ctx, grpc_call * call, const grpc_op * ops, unsigned int nops, void * notify_tag, int is_notify_tag_closure) 行 1392  C
    async_end2end_test.exe!grpc_call_start_batch(grpc_call * call, const grpc_op * ops, unsigned int nops, void * tag, void * reserved) 行 1715    C
    async_end2end_test.exe!grpc::Channel::PerformOpsOnCall(grpc::CallOpSetInterface * ops, grpc::Call * call) 行 95    C++
    async_end2end_test.exe!grpc::Call::PerformOps(grpc::CallOpSetInterface * ops) 行 662   C++
    async_end2end_test.exe!grpc::ClientAsyncWriter::Write(const grpc::testing::EchoRequest & msg, void * tag) 行 206   C++
>   async_end2end_test.exe!grpc::testing::`anonymous namespace'::AsyncEnd2endTest_SimpleClientStreaming_Test::TestBody() 行 453    C++
    async_end2end_test.exe!testing::internal::HandleSehExceptionsInMethodIfSupported(testing::Test * object, void(testing::Test::*)() method, const char * location) 行 2063   C++
    async_end2end_test.exe!testing::internal::HandleExceptionsInMethodIfSupported(testing::Test * object, void(testing::Test::*)() method, const char * location) 行 2114  C++
    async_end2end_test.exe!testing::Test::Run() 行 2150    C++
    async_end2end_test.exe!testing::TestInfo::Run() 行 2330    C++
    async_end2end_test.exe!testing::TestCase::Run() 行 2445    C++
    async_end2end_test.exe!testing::internal::UnitTestImpl::RunAllTests() 行 4316  C++
    async_end2end_test.exe!testing::internal::HandleSehExceptionsInMethodIfSupported(testing::internal::UnitTestImpl * object, bool(testing::internal::UnitTestImpl::*)() method, const char * location) 行 2063 C++
    async_end2end_test.exe!testing::internal::HandleExceptionsInMethodIfSupported(testing::internal::UnitTestImpl * object, bool(testing::internal::UnitTestImpl::*)() method, const char * location) 行 2114    C++
    async_end2end_test.exe!testing::UnitTest::Run() 行 3926    C++
    async_end2end_test.exe!RUN_ALL_TESTS() 行 2289 C++
    async_end2end_test.exe!main(int argc, char * * argv) 行 1477   C++
@vjpai

This comment has been minimized.

Copy link
Member

commented Aug 7, 2016

This is expected behavior. You can only have 1 outstanding asynchronous
write on the same side of the same stream without waiting for the completion queue
notification to indicate that it is OK to do another write to that side of the stream.
This is a documented part of the async API.

@jinq0123

This comment has been minimized.

Copy link
Author

commented Aug 8, 2016

We need REAL async API.

@vjpai

This comment has been minimized.

Copy link
Member

commented Aug 8, 2016

The justification for allowing only 1 write before a completion notification is straightforward; async operations are intended to be concurrent, but concurrent operations on the same side of a stream are not allowed since each side of the stream has operations that must appear to be in-order. (Note that I've made a corresponding edit to my previous response) This is still concurrent with regard to the receiver side of the stream in the sense that multiple messages can be outstanding before the receiver operates on them, and in the sense that multiple streams (RPCs) can go on the same channel.

That said, we always welcome API feedback. I'd be glad to hear your detailed critique of the C++ async API as well as to understand what you would expect in a "REAL" (sic) API. Note that the C++ async API is already used by several projects, including TensorFlow.

@jinq0123

This comment has been minimized.

Copy link
Author

commented Aug 9, 2016

I will try to change grpc_call::active_batches from an array to a list to allow multi async Write() without blocking waiting for the completion notification. That will be easier to use.

struct grpc_call {
  batch_control active_batches[MAX_CONCURRENT_BATCHES];
  ...
};
@vjpai

This comment has been minimized.

Copy link
Member

commented Aug 9, 2016

For starters, let me request you not to start coding this up as a pull request without a strong justification. I will not accept such a change, nor will any other team member. Changing the grpc_call from an array to a list will add needless allocations (and thus locking) to an extremely critical path, will hinder performance, and will make the whole system prone to memory leaks.

Before you start coding this, please just answer my question: what would you expect to see in a "REAL" API? What do you expect to accomplish by allowing multiple writes on the same side of a stream without a notification in between? How do you intend to redefine the concurrency contract to handle such operations? Let me again emphasize that our reason for this restriction is to make sure that we can provide sane stream semantics in an asynchronous system that is typically used with high concurrency.

We're an open-source project, so you can fork your own version and make whatever changes you want to your version, but I can't envision a situation where such changes would be accepted upstream unless there is a real justification.

@jinq0123

This comment has been minimized.

Copy link
Author

commented Aug 11, 2016

what would you expect to see in a "REAL" API?

No crash() in Write(). No wait before next Write().

What do you expect to accomplish by allowing multiple writes on the same side of a stream without a notification in between?

Why need notification? Users want to write messages without notification.

How do you intend to redefine the concurrency contract to handle such operations?

Keep it the old way but no crash and no wait.

@nicolasnoble

This comment has been minimized.

Copy link
Contributor

commented Aug 11, 2016

When we designed the C and C++ API, we chose to provide something that would allow the highest performance possible. This is that model, with these contracts. With that said, you may have a misunderstanding of the notification mechanism: it isn't to mean the call has been fully processed, received and responded - otherwise, it wouldn't be a true async API. The notification is to mean that the API is now ready to receive another write from your software. There is no "wait" time in the sense that the API will not block between the Write and the notification. This is merely the API ingesting your call and preparing itself to send it. All outstanding calls will still happen asynchronously, and concurrently.

I'll say it again: the API will not block in any way between the Write and the notification. This is merely the core trying to process your call in the fastest possible way, and signaling you immediately after it's done evaluating how to do so that it's ready for another Write.

@jinq0123

This comment has been minimized.

Copy link
Author

commented Aug 12, 2016

Can I have a ClientAsyncWriter::WriteAndAsyncNext() without blocking?

In async_end2end_test.cc, there is a blocking wait for the completion notification:

        for (;;) {
          auto r = cq->AsyncNext(...);
          ...
        }

Is there an async way to do it? How?

@nicolasnoble

This comment has been minimized.

Copy link
Contributor

commented Aug 12, 2016

There are several AsyncNext in that file, some are blocking, some aren't. The only reason some of these calls should be blocking is if there's a non-zero deadline set, and gRPC is waiting for work to process from the network or other threads.

For example, this call that has a deadline set to 0 won't be blocking:

      for (;;) {
        auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
        if (r == CompletionQueue::TIMEOUT) continue;
        if (r == CompletionQueue::GOT_EVENT) break;

In that case, AsyncNext will either have not done anything, because there's no work to do, and return CompletionQueue::TIMEOUT, or it will have processed some work, and return CompletionQueue::GOT_EVENT with the tag and ok result set to the appropriate event that happened.

@jinq0123

This comment has been minimized.

Copy link
Author

commented Aug 12, 2016

AsyncNext(..., gpr_time_0(...)) is non-blocking, but the for loop is busy waiting. It will wait until some other threads set the completion event.

@vjpai

This comment has been minimized.

Copy link
Member

commented Aug 12, 2016

I'm going to back up a level and say that we won't add a ClientAsyncWriter::WriteAndAsyncNext() method. Our approach to API is conservative - we won't add an API that combines two things that can already be easily done separately (separate call to Write and Next or AsyncNext in this case). If someone wants to do that, they can easily write those two lines of code in their own function. We are willing to add to the API for features that can't be provided in the existing API, but not otherwise.

A single WriteAndAsyncNext limits options such as doing Write on other streams before checking any given one. In practice, code responds to Next or AsyncNext by using the tag to reference some structure that stores state associated with that stream, which is then allowed to continue to the next stage of its processing (such as doing another Write).

@nicolasnoble

This comment has been minimized.

Copy link
Contributor

commented Aug 12, 2016

I'll also add that an API that does exactly what you want would actually do the following under the hood:

  • Create a worker thread.
  • Accumulate all the Writes to do in a worker queue (which could lead to out of memory problems, but that's another topic).
  • Process the worker queue in the worker thread.

The bad part about that model is that it's doing extra work that some users may not desire (create a thread-safe list, and a worker thread).

The good part about that model is that it's currently very easy for someone to write on top of current API.

Imagine a networked video game for instance, running on a low-end device. It may be resource-constrained, and cannot afford threads. Its main loop would definitely be a busy loop anyway, and would look like something along these lines:

AbsoluteTime now = 0;
TimeDifference frameDelta = 0;
while (!UserWantsToQuit()) {
  AbsoluteTime currentTime = GetCurrentTime();
  frameDelta = currentTime - now;
  now = currentTime;

  // Prepare our next frame data
  ProcessAI(now, frameDelta);
  ProcessNetwork(now, frameDelta); // calls cq->AsyncNext(... gpr_time_0(GPR_CLOCK_REALTIME));
  ProcessOtherPlayers(now, frameDelta);
  ProcessAnimations(now, frameDelta);
  ProcessUI(now, frameDelta);

  // Draw and play our next frame
  DrawFrame(now, frameDelta);
  OutputSound(now, frameDelta);
}

We preferred to leave the option to the developer about which exact model they want, and that was our design choice.

@vjpai

This comment has been minimized.

Copy link
Member

commented Aug 12, 2016

I propose closing this issue as it seems to have reached a meaningful conclusion.

@vjpai vjpai closed this Aug 12, 2016

@pgrosu

This comment has been minimized.

Copy link

commented Aug 12, 2016

Hi Jin (@jinq0123),

I share your passion for self-optimized all-in-one functions, but one thing to keep in mind is that gRPC can be just the initialization part at times - yet still a critical part, though it can be also be a large part too depending on the implementation requirements - of the transmission of data in the microservices space. Sometimes it just initiates the connections and then passes it to another type of protocol for reading and writing data, such as RDMA over Converged Ethernet to minimize overhead and CPU resources, while maximizing data throughput. Then there are additional distributed data structures and algorithms one can utilize to optimize based on these latency numbers - by Jeff Dean and Peter Norvig - which I'm sure you are already aware of, and still apply today :)

 Latency Comparison Numbers
--------------------------
L1 cache reference                           0.5 ns
Branch mispredict                            5   ns
L2 cache reference                           7   ns                      14x L1 cache
Mutex lock/unlock                           25   ns
Main memory reference                      100   ns                      20x L2 cache, 200x L1 cache
Compress 1K bytes with Zippy             3,000   ns        3 us
Send 1K bytes over 1 Gbps network       10,000   ns       10 us
Read 4K randomly from SSD*             150,000   ns      150 us          ~1GB/sec SSD
Read 1 MB sequentially from memory     250,000   ns      250 us
Round trip within same datacenter      500,000   ns      500 us
Read 1 MB sequentially from SSD*     1,000,000   ns    1,000 us    1 ms  ~1GB/sec SSD, 4X memory
Disk seek                           10,000,000   ns   10,000 us   10 ms  20x datacenter roundtrip
Read 1 MB sequentially from disk    20,000,000   ns   20,000 us   20 ms  80x memory, 20X SSD
Send packet CA->Netherlands->CA    150,000,000   ns  150,000 us  150 ms

So there are many more protocols one can take advantage of, and there is a reason Google can deal with 1.3 Petabits/sec of throughput using their custom-written control stack - and if you're interested here is a link to their nice paper:

Jupiter Rising: A Decade of Clos Topologies and Centralized Control in Google’s Datacenter Network

The Google gRPC Team is doing the community a fantastic service by open-sourcing this, but basically gRPC can be sometimes only the initialization of the data communication in the microservices space, or can be most of it too, and that depends on the implementation requirements.

Hope it helps,
Paul

@ctiller

This comment has been minimized.

Copy link
Member

commented Aug 15, 2016

Necro, but I've been out for a while.

The other big reason for the 'one outstanding' rule is to allow pushback
from the network to the application. Without that, a slow consumer of
writes will cause an unlimited buffer build up somewhere in the gRPC stack,
and we have strong evidence (in the form of scars from production outages)
that this will lead to OOM vectors in real applications.

We chose one outstanding message as any larger finite value suffers from
the same bugs, only more rarely (which means they're less likely to be
caught, and more likely to bite our users in actual production scenarios).

On Mon, Aug 8, 2016 at 10:58 AM Vijay Pai notifications@github.com wrote:

The justification for allowing only 1 write before a completion
notification is straightforward; async operations are intended to be
concurrent, but concurrent operations on the same side of a stream are not
allowed since each side of the stream has operations that must appear to be
in-order. (Note that I've made a corresponding edit to my previous
response) This is still concurrent with regard to the receiver side of the
stream in the sense that multiple messages can be outstanding before the
receiver operates on them, and in the sense that multiple streams (RPCs)
can go on the same channel.

That said, we always welcome API feedback. I'd be glad to hear your
detailed critique of the C++ async API as well as to understand what you
would expect in a "REAL" (sic) API. Note that the C++ async API is already
used by several projects, including TensorFlow.


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
#7659 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AJpudb6or1L-qUxmwkPBtiiIa_RIYz4rks5qd2jXgaJpZM4Jeeu7
.

@jinq0123

This comment has been minimized.

Copy link
Author

commented Aug 16, 2016

Zeromq has a ZMQ_HWM option.

From: http://api.zeromq.org/2-1:zmq-setsockopt

ZMQ_HWM: Set high water mark

The ZMQ_HWM option shall set the high water mark for the specified socket. The high water mark is a hard limit on the maximum number of outstanding messages ØMQ shall queue in memory for any single peer that the specified socket is communicating with.

The default ZMQ_HWM value of zero means "no limit".

@vjpai

This comment has been minimized.

Copy link
Member

commented Aug 16, 2016

That's a completely separate issue. As @nicolasnoble and I both mentioned in our responses, there may very well be more messages queued on a socket to the peer in the gRPC async API. In fact, there actually is no limit from gRPC's side, though the socket parameters of the underlying network would place a limit on it.

jinq0123 added a commit to jinq0123/grpc that referenced this issue Aug 20, 2016

Fix crash in ClientWriter::Write().
See:  Multi ClientAsyncWriter::Write() calls will crash. grpc#7659
grpc#7659
Allow unlimited batch controls.

@lock lock bot locked as resolved and limited conversation to collaborators Oct 3, 2018

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
You can’t perform that action at this time.