Skip to content

so5extra 1.6 Synchronous Interaction

Yauheni Akhotnikau edited this page Apr 21, 2023 · 1 revision

The Problem

Versions 5.3-5.5 of SObjectizer supported synchronous interaction between agents. This stuff has been removed from SObjectizer-5.6. Now SObjectizer Core doesn't support synchronous interaction at all.

The Solution

The so_5::extra::sync submodule provides support for synchronous interaction between agents or worker threads (if only mchains are used) in the form of request_reply_t message type and request_reply, request_opt_reply free functions.

Header File

All stuff related to synchronous interaction are defined in so_5_extra/sync/pub.hpp header file. So to use this functionality it is necessary to include that file and so_5/all.hpp file:

#include <so_5_extra/sync/pub.hpp>

#include <so_5/all.hpp>

How To Issue And Handle A Synchronous Request?

The recommended way of issuing a synchronous request is the following:

// Define a short name for request-reply pair.
using my_request_reply = so_5::extra::sync::request_reply_t<my_request, my_reply>;
...
// Define a subscription for synchronous request in some agent.
class request_handler final : public so_5::agent_t {
   ...
   void on_request(typename my_request_reply::request_mhood_t cmd) {
      ... // Handling of the request.
          // The content of the request is available via cmd->request() method.
      // Sending the reply for the request.
      cmd->make_reply(...); // All arguments are going to my_reply's constructor.
   }
   ...
   void so_define_agent() override {
      so_subscribe_self().event(&request_handler::on_request);
   }
};
...
// Issuing a request to the direct mbox of some request_handling.
so_5::mbox_t handler_mbox = ...;
// Sending a request and waiting the reply for no more than 15s.
// An exception will be sent if the reply is not received.
my_reply reply = my_request_reply::ask_value(handler_mbox, 15s,
   ...); // All other arguments are goint to my_request's constructor.

There is also ask_opt_value method that doesn't throw but returns an std::optional<Reply> instance:

// Sending a request and waiting the reply for no more than 15s.
// An empty optional will be returned if the reply is not received.
std::optional<my_reply> reply = my_request_reply::ask_opt_value(handler_mbox, 15s,
   ...); // All other arguments are goint to my_request's constructor.
if(reply) {
   ... // Usage of *reply value.
}

Note. Method make_reply can be called only once. An attempt to call make_reply several times will lead to an exception.

There is no need to use an alias like my_requet_reply as shown above. A request can be issued by request_reply() or request_opt_reply() free functions:

// Sending a request and waiting the reply for no more than 15s.
// An exception will be sent if the reply is not received.
my_reply reply = so_5::extra::sync::request_value<my_request, my_reply>(handler_mbox, 15s,
   ...); // All other arguments are goint to my_request's constructor.

// Sending a request and waiting the reply for no more than 15s.
// An empty optional will be returned if the reply is not received.
std::optional<my_reply> opt_reply =
   so_5::extra::sync::request_opt_value<my_request, my_reply>(handler_mbox, 15s,
      ...); // All other arguments are goint to my_request's constructor.
if(opt_reply) {
   ... // Usage of *opt_reply value.
}

An event handler for a request can be defined that way:

void request_handler::on_request(
   mutable_mhood_t<so_5::extra::sync::request_reply_t<my_request, my_reply>> cmd) {
   ...
}

Or, in more simple form:

void request_handler::on_request(
   typename so_5::extra::sync::request_reply_t<my_request, my_reply>::request_mhood_t cmd) {
   ...
}

Or, in yet more simple form:

void request_handler::on_request(
   so_5::extra::sync::request_mhood_t<my_request, my_reply> cmd) {
   ...
}

Synchronous interaction can also be used with mchains:

// Define a short name for request-reply pair.
using my_request_reply = so_5::extra::sync::request_reply_t<my_request, my_reply>;
...
// A worker thread that will handle incoming requests.
void request_handler(so_5::mchain_t in_chain) {
   so_5::receive(from(in_chain).handle_all(),
      [](typename my_request_reply::request_mhood_t cmd) {
         ... // Handling of the request.
             // cmd->request() returns a reference to my_request object.
         // Make the reply.
         cmd->make_reply(...); // All arguments are going to my_reply's constructor.
      });
}

// A worker thread that will issue requests.
void request_producer(so_5::mchain_t out_chain) {
   // Issue a request and wait no more than 15s for the reply.
   auto reply = my_request_reply::ask_value(out_chain, 15s,
      ...); // All other arguments are going to my_request's constructor.
}

Some Technical Details

When ask_value() or ask_op_value() method of request_reply_t<Q,A> is called then:

  • a new mchain is created. This mchain will be used for sending/receiving the reply;
  • a new instance of request_reply_t<Q,A> is created. That instance holds the reply mchain and an instance of Q type. Note that additional arguments passed to ask_value() or ask_opt_value() are forwared to the constructor or that Q instance;
  • this new message is sent as a mutable message to the specified destination;
  • ask_value()/ask_opt_value() sleeps on the reply mchain waiting for the reply. If reply is received it will be returned. If not then ask_value() throws an exception and ask_opt_value() returns an empty std::optional;
  • when make_reply() is called on request_reply_t<Q,A> object then a new instance of A type is created. Then this instance is sent to the reply mchain.

Note that the destructor of request_reply_t<Q,A> closes the reply mchain. It means that if the request_reply_t message won't be handled the reply mchain will be automatically closed and this will awaken ask_value()/ask_opt_value().

Note that request_reply() and request_opt_value() are just shotcuts for ask_value() and ask_opt_value() methods. Because of that request_reply() and request_opt_value() work as described above.

Advanced Topics

Forwarding Or Storing request_reply_t Message

Because request_reply_t<Q,A> instance are sent as a usual message then it can be easily forwarded to another destination:

class load_balancer final : public so_5::agent_t {
   void on_request(typename some_request::request_mhood_t cmd) {
      const auto new_dest = select_appropriate_destination();
      // Forward the request to a new destination.
      so_5::send(new_dest, std::move(cmd));
   }
   ...
};

It's also possible to store and process request_reply_t<Q, A> instance later:

class bunch_processor final : public so_5::agent_t {
   // Container for holding pending requests.
   std::vector<typename some_request::holder_t> pending_requests_;

   // Reaction to the new incoming request.
   void on_request(typename some_request::request_mhood_t cmd) {
      // Just store request to process it later.
      pending_requests_.push_back(cmd.make_holder());
   }

   // Time to process all pending requests.
   void on_processing_time(mhood_t<some_timer_msg>) {
      for(auto & r : pending_requests_)
         r->make_reply(...);
   }
};

Replying To A Custom Destination

Methods ask_value() and ask_opt_value() (as well as free functions request_reply()/request_opt_value()) create a new mchain and place this mchain into request_reply_t to be used in make_reply() method. Sometimes it could be inappropriate: you may want to use an already existing destination for request's reply.

There are initiate_with_custom_reply_to methods in request_reply_t which allows to issue a request with a specific destination for the reply.

For example, let's see how to issue a request with replying to the direct mbox of agent-issuer:

class request_issuer final : public so_5::agent_t {
   // The mbox of request handler.
   const so_5::mbox_t service_;
   ...
   void on_some_event(mhood_t<some_event> cmd) {
      ...
      // We want to issue a request here.
      // Create a subscription of the reply.
      so_subscribe_self().event(&request_issuer::on_reply);
      // Issuing a new request.
      some_request::initiate_with_custom_reply_to(
         // The destination for the request.
         service_,
         // The destination for the reply.
         *this,
         ...); 
   }

   void on_reply(typename some_request::reply_mhood_t cmd) {
      ... // Handling of the request.
   }
};

There is another example: redirection of replies from two separate requests to one mchain and handling of them in one receive call:

using first_dialog = so_5::extra::sync::request_reply_t<first_request, first_reply>;
using second_dialog = so_5::extra::sync::request_reply_t<second_request, second_reply>;

// The single mchain for both replies.
auto reply_ch = create_mchain(env);

// Issuing requests. Please note the usage of do_not_close_reply_chain.
first_dialog::initiate_with_custom_reply_to(
   service, reply_ch, so_5::extra::sync::do_not_close_reply_chain,
   ...);
second_dialog::initiate_with_custom_reply_to(
   service, reply_ch, so_5::extra::sync::do_not_close_reply_chain,
   ...);

// Waiting and handling of replies.
receive(from(reply_ch).handle_n(2).empty_timeout(15s),
   [](typename first_dialog::reply_mhood_t cmd) {...},
   [](typename second_dialog::reply_mhood_t cmd) {...});

It is important to use do_not_close_reply_chain here becase the reply chain will be closed after processing of the first request.

Clone this wiki locally