Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using asynchronous gRPC server and client on same context #81

Closed
otherjason opened this issue Aug 13, 2023 · 9 comments
Closed

Using asynchronous gRPC server and client on same context #81

otherjason opened this issue Aug 13, 2023 · 9 comments

Comments

@otherjason
Copy link

I am developing a gRPC service interface for the first time. I am reasonably familiar with asio, but definitely not expert. Getting a basic server framework going from the examples is straightforward. What I'm trying to do now is put together a simple testing framework that will allow me to write unit tests for my service. It would integrate most easily into my existing tests if I could make it single-process instead of requiring a server and client to be launched separately, so I was trying to write a simple test wrapper that would instantiate the server, then perform asynchronous client requests to test the server's behavior.

With that said, I've edited one of the example programs to try to demonstrate what I want to do:

#include "example/v1/example.grpc.pb.h"
#include "client_rpc.hpp"
#include "helper.hpp"
#include "server_shutdown_asio.hpp"

#include <agrpc/asio_grpc.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <grpcpp/create_channel.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>

#include <iostream>
#include <thread>

namespace asio = boost::asio;

int main(int argc, const char** argv)
{
    const auto port = argc >= 2 ? argv[1] : "50051";
    const auto host = std::string("0.0.0.0:") + port;

    // start up Example server
    std::unique_ptr<grpc::Server> server;

    grpc::ServerBuilder builder;
    agrpc::GrpcContext grpc_context{builder.AddCompletionQueue()};
    builder.AddListeningPort(host, grpc::InsecureServerCredentials());
    example::v1::Example::AsyncService service;
    builder.RegisterService(&service);
    server = builder.BuildAndStart();
    abort_if_not(bool{server});

    // install a request handler for the bidirectional streaming endpoint
    agrpc::repeatedly_request(&example::v1::Example::AsyncService::RequestBidirectionalStreaming, service,
        asio::bind_executor(grpc_context, 
        [&](::grpc::ServerContext &server, ::grpc::ServerAsyncReaderWriter<example::v1::Response, example::v1::Request> &reader_writer) -> asio::awaitable<void>
        {
            std::cout << "server: started request\n";

            // set up an alarm that is used to pace our responses to the client
            agrpc::Alarm alarm(grpc_context);
            int64_t duration_msec = 1000;

            while (true)
            {
                using namespace asio::experimental::awaitable_operators;

                // wait for the first of two events to happen:
                // 
                // 1. we receive a new streaming message from the client
                // 2. the timer expires
                //
                // when the timer expires, we send a message to the client.
                std::cout << "server: reading/waiting\n";
                example::v1::Request cmd;
                auto result = co_await(agrpc::read(reader_writer, cmd) ||
                    alarm.wait(std::chrono::system_clock::now() + std::chrono::milliseconds(duration_msec)));
                if (result.index() == 0)
                {
                    if (std::get<0>(result))
                    {
                        std::cout << "server: got streaming message\n";
                    }
                    else
                    {
                        std::cout << "server: read failed\n";
                        break;
                    }
                }
                else
                {
                    std::cout << "server: alarm expired\n";
                    example::v1::Response resp;
                    co_await agrpc::write(reader_writer, resp);
                }
            }

            co_await agrpc::finish(reader_writer, ::grpc::Status::OK);
        }
    ));
    
    // bring in the server shutdown functionality
    example::ServerShutdown server_shutdown{*server, grpc_context};

    // set up a client stub for this service
    example::v1::Example::Stub stub(::grpc::CreateChannel(host, ::grpc::InsecureChannelCredentials()));
    // spawn a coroutine that will talk to the bidirectional streaming endpoint
    asio::co_spawn(grpc_context,
        [&]() -> asio::awaitable<void>
        {
            // create an RPC to the BidirectionalStreaming interface
            using RPC = example::AwaitableClientRPC<&example::v1::Example::Stub::PrepareAsyncBidirectionalStreaming>;
            RPC rpc(grpc_context);

            // start the RPC
            std::cout << "client: starting RPC\n";
            if (!co_await rpc.start(stub)) co_return;

            // send the first streaming message to the server
            std::cout << "client: writing streaming message\n";
            example::v1::Request cmd;
            auto write_ok = co_await rpc.write(cmd);

            // now, just wait forever for streaming responses back from the server until a read fails
            bool read_ok = true;
            while (write_ok && read_ok)
            {
                std::cout << "client: waiting for streaming message\n";
                example::v1::Response resp;
                read_ok = co_await rpc.read(resp);
            }

            std::cout << "client: done\n";
        },
        asio::detached);

    // run the gRPC context thread
    grpc_context.run();

    std::cout << "Shutdown completed\n";
}

My intended behavior when running the program would be:

  • The server starts.
  • The client coroutine initiates a BidirectionalStreaming RPC.
  • The client coroutine sends a Request streaming message to the server.
  • The server receives the Request.
  • The server sends Response streaming messages periodically to the client (indefinitely in this simple example)
  • At some point later, I can send SIGINT to the process and the ServerShutdown will shut down the server, the next read/write on the RPC fail for the server/client, and the coroutines exit cleanly.

I'm not seeing what I expected, likely because I'm missing something fundamental about how this should work. When I run the above example program, I get:

[user@host:~/git/asio-grpc/build]$ example/asio-grpc-example-streaming-server
client: starting RPC
client: writing streaming message
client: waiting for streaming message
server: started request
server: reading/waiting
server: got streaming message
server: reading/waiting

And nothing else. Specifically, the coroutine handling the BidirectionalStreaming RPC on the server side seems to get stuck in the co_await that should complete when either a new message is received or the alarm expires. The alarm expiration doesn't ever seem to fire.

Furthermore, if I try to send SIGINT or SIGTERM to the process, it does not respond. If I take the co_spawn for the client coroutine out, then I can at least use the signals to shut down the server cleanly.

Should this type of strategy with with asio-grpc?

@Tradias
Copy link
Owner

Tradias commented Aug 14, 2023

Hi, thanks for the detailed issue description.

You can run client and server on the same GrpcContext like you are doing. In fact, most asio-grpc tests do the same.

Note that grpc::Server::Shutdown() is graceful, it won't cancel active requests. You will have to check for it yourself.

But the actual problem lies in this line:

                auto result = co_await(agrpc::read(reader_writer, cmd) ||
                    alarm.wait(std::chrono::system_clock::now() + std::chrono::milliseconds(duration_msec)));

GRPC does not support cancellation of individual reads/writes and therefore neither does asio-grpc. I am currently working on a new API for clients and server (agrpc::ClientRPC and agrpc::ServerRPC) that could support the kind of "cancellation-safety" that you need for that line to work. I can imagine that each call to agrpc::read should either initiate a new read or wait for the previous one to complete. There are some difficulties here though: What if the request parameter refers to a different object between those calls and should this allow multiple outstanding waits? Additionally, such a functionality is unrelated to gRPC and I would wish to point users of asio-grpc at an asio-based library that provides such functionality instead of having to maintain it within asio-grpc.
With all that being said though, I think your issue is the third one that stumbled across this problem of cancellation and it would seem that it is best to add this functionality to ClientRPC and ServerRPC, probably configurable through a traits type.

In the meantime you can use this experimental and not-quite-bug-free alternative based on agrpc::CancelSafe/agrpc::BasicStream:

    // bring in the server shutdown functionality
    example::ServerShutdown server_shutdown{*server, grpc_context};

    // install a request handler for the bidirectional streaming endpoint
    agrpc::repeatedly_request(&example::v1::Example::AsyncService::RequestBidirectionalStreaming, service,
        asio::bind_executor(grpc_context, 
        [&](::grpc::ServerContext &server, ::grpc::ServerAsyncReaderWriter<example::v1::Response, example::v1::Request> &reader_writer) -> asio::awaitable<void>
        {
            std::cout << "server: started request\n";

            // set up an alarm that is used to pace our responses to the client
            agrpc::Alarm alarm(grpc_context);
            int64_t duration_msec = 1000;

            agrpc::GrpcStream stream{grpc_context};
            example::v1::Request cmd;
            stream.initiate(agrpc::read, reader_writer, cmd);
                
            while (true)
            {
                using namespace asio::experimental::awaitable_operators;

                // wait for the first of two events to happen:
                // 
                // 1. we receive a new streaming message from the client
                // 2. the timer expires
                //
                // when the timer expires, we send a message to the client.
                std::cout << "server: reading/waiting\n";
                auto result = co_await(stream.next() ||
                    alarm.wait(std::chrono::system_clock::now() + std::chrono::milliseconds(duration_msec)));
                if (result.index() == 0)
                {
                    if (std::get<0>(result))
                    {
                        std::cout << "server: got streaming message\n";
                        stream.initiate(agrpc::read, reader_writer, cmd);
                    }
                    else
                    {
                        std::cout << "server: read failed\n";
                        break;
                    }
                }
                else if (server_shutdown.is_shutdown)
                {
                    break;
                }
                else
                {
                    std::cout << "server: alarm expired\n";
                    example::v1::Response resp;
                    if (!co_await agrpc::write(reader_writer, resp))
                    {
                        // client disconnected
                        co_return;
                    }
                }
            }

            co_await agrpc::finish(reader_writer, ::grpc::Status::OK);
        }
    ));

@otherjason
Copy link
Author

Thanks for the detailed answer! I will give that a try. A followup question: can you elaborate on how the approach is "not-quite-bug-free"? Just trying to determine whether there are any showstopping reasons why I wouldn't want to do this.

@Tradias
Copy link
Owner

Tradias commented Aug 14, 2023

I believe the main issue is that the completion of a read is not sticky within GrpcStream. If cancellation and successful completion of the read occur at the same time than cancellation takes precedence. The next call to stream.next() will then never complete because no read has been initiated (code transitioned into the alarm expiration branch and did not call stream.initiate()). But I think GrpcStream can be modified without breaking existing user and I will consider a hotfix release (2.6.1) if that helps your cause.

@otherjason
Copy link
Author

That makes sense. If you know a reasonable fix for this, then I would welcome it; I think the GrpcStream-based model as presented accomplishes what I am looking for.

@Tradias
Copy link
Owner

Tradias commented Aug 21, 2023

I re-added a previously flaky example as a unit test but I couldn't find any issues with it. Maybe it is just something specifically wrong with that example and not with GrpcStream/CancelSafe.

One thing that is missing from my code snippet above is the call to cleanup:

co_await stream.cleanup();
co_await agrpc::finish(reader_writer, ::grpc::Status::OK);

which either waits for a previously initiated read to complete and completes immediately if none is pending. This is helpful because you must await all initiated reads before GrpcStream is being destructed.

@otherjason
Copy link
Author

otherjason commented Aug 22, 2023

Thanks for the info. I have been working on what the process looks like for shutting down the server when a bidirectional streaming connection is open. I extended the example above to implement the shutdown logic using a saf::shared_future. So my server loop now awaits three things: a read on the GrpcStream, the Alarm to indicate that it should send an update to the client, and the shutdown future becoming ready. This works nicely with ASIO's || operator, and my code can act accordingly to whichever of the three triggered the coroutine to resume.

I tried adding the call to stream.cleanup() to my example and now my test hangs on the co_await stream.cleanup(); After thinking about it, this makes sense. The sequence of events is:

  • Client initiates the RPC
  • Server begins the wait loop above, periodically sending streaming messages. The key, though, is that there is always a GrpcStream read in flight on the server side.
  • At some point, I tell the server to shut down. This fires the shutdown shared_future, causing the server to break out of its loop.
  • It then calls stream.cleanup(). But, if I'm understanding your description above properly, this will cause the server to wait indefinitely for a new message from the client, thus thwarting my attempt to shut the server down.

I think this is what you were getting at when you said before that the read cannot be cancelled? Is there a clean way around this, so that the server can shut down without potentially being held hostage by clients that aren't aware of the shutdown? I would be fine with the client seeing an error on the RPC in this case, but I'm not sure what the best practice is for this sort of thing.

Alternatively, is it permissible to move the stream.cleanup() after the agrpc::finish()? I suppose that I could finish the RPC on the server side so the client will know that there are no further streaming messages coming (basically the server-side analog to writes_done). I assume that would cause the read on the client side to complete with an error, which would allow the client to close its end of the connection, which would then cause the stream.cleanup() on the server side to complete and thus finish the RPC. This all seems doable, as long as it would be OK to finish the RPC from the server while there is still a pending read on its side of the stream.

@Tradias
Copy link
Owner

Tradias commented Aug 23, 2023

I have tested it and on Windows it seems to be fine to have an uncompleted read while calling finish. Even though the official documentations might indicate otherwise:

Finish:

Request notification for when the server has sent the appropriate signals to the client to end the call. Should not be used concurrently with other operations.

https://grpc.github.io/grpc/cpp/classgrpc_1_1_server_async_reader_writer_interface.html#ae63d347d413c401b1e976f165efde50b

Read:

It should not be called concurrently with other streaming APIs on the same stream.

https://grpc.github.io/grpc/cpp/classgrpc_1_1internal_1_1_async_reader_interface.html#adf87c602036d69158950b7299d0aae70

It is true that the client-side read will complete with false when the server has called finished, like you said, the equivalent of writes_done.

Alternatively you can also call ServerContext::TryCancel before cleanup, this will cause read to complete immediately with false. The client will see StatusCode::CANCELLED even if you call finish (although this technically depends on what happens first).

I have created a complete example of the code here: https://github.com/Tradias/example-vcpkg-grpc/tree/asio-grpc-81. Might make it easier to refer to.

Did you know that asio also has a promise implementation https://www.boost.org/doc/libs/1_83_0/doc/html/boost_asio/reference/experimental__promise.html ? I don't think it can be shared however.

@otherjason
Copy link
Author

Using ServerContext::TryCancel() seems to be the way to go for me. In the event that the server shuts down, having the client see StatusCode::CANCELLED makes semantic sense to me. I tested that change to my application and everything behaves as I would expect. I think this is a good solution to what I needed!

@Tradias
Copy link
Owner

Tradias commented Sep 3, 2023

Great, don't hesitate to open another issue if you have more questions!

@Tradias Tradias closed this as completed Sep 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants