Skip to content

SO 5.7 InDepth Custom Worker Threads

Yauheni Akhotnikau edited this page Dec 28, 2021 · 18 revisions

Table of Contents

Created by gh-md-toc

What Is It?

Until v.5.7.3 there wasn't a possibility to specify a custom thread to be used by SObjectizer dispatchers. But sometimes such a feature could be useful. For example, a specific thread stack size has to be set by pthread_attr_setstacksize in POSIX Thread, or some signal handlers have to be set for a new worker thread on Unix. But SObjectizer created worked thread by itself by using std::thread and didn't provide a way to tune new worker threads.

Since v.5.7.3 it is possible to instruct SObjectizer to use custom worker threads. This feature is based on two interfaces added in v.5.7.3: so_5::disp::abstract_work_thread_t and so_5::disp::abstract_work_thread_factory_t.

Since v.5.7.3 a user can define his/her own worker thread type by implementing so_5::disp::abstract_work_thread_t interface. The user also has to define own thread factory by implementing so_5::disp::abstract_work_thread_factory_t interface. An instance of such a factory has to be created and specified in the params to a SObjectizer dispatcher or to the whole SObjectizer Environment.

abstract_work_thread_t Interface

A user has to define his/her own class that inherits so_5::disp::abstract_work_thread_t type and implements its pure virtual methods.

At the moment so_5::disp::abstract_work_thread_t defines just two virtual methods that have to be implemented in a derived class:

virtual void start( body_func_t thread_body ) = 0;
virtual void join() = 0;

where body_func_t is defined that way:

using body_func_t = std::function< void() >;

Method start has to start the execution of thread_body in the context of a separate thread. There are no limitations: it can be a newly created thread or preallocated one. There is no need to guarantee that thread_body is already started its execution before the return from start, but it is required to guarantee that all required resources are allocated and thread_body has been passed to a separate execution context.

It should also be guaranteed that join can safely be called just after the return from start.

The join method is an analog of std::thread::join: once called it should return only when the separate execution context finished the execution of thread_body passed to the previous call to start.

SObjectizer guarantees that join is called for a thread only once after the successful call to start. If start throws then join isn't called.

An Implementation Of abstract_work_thread_t Has To Be Thread Safe

Methods of abstract_work_thread_t can be called from different threads. For example, start method can be called from one thread, but release will be called from another thread.

SObjectizer guarantees that there won't be parallel calls to start and/or release from different threads. There will be a single call to start from some thread and, if that call success, there will be just a single call to release method from some (probably another) thread.

Implementations Of abstract_work_thread_t And Exceptions

At the moment SObjectizer can't guarantee that thread_func passed to start won't throw.

There is no requirement that a custom thread should intercept such an exception. If an exception from thread_body is intercepted then the call to join should be completed successfully.

Method start can throw if it can't start execution of thread_body in a separate worker context. That exception has to be inherited from std::exception. That exception will be handled by SObjectizer. The join method won't be called if start throws.

Please note that join isn't marked as noexcept. It's because std::thread::join is also not noexcept. Thus, join can throw. But the current version of SObjectizer can't cope with such an exception in most cases. It means that if join throws then the application will probably be terminated because of an exception from a noexcept method (like destructors of SObjectizer's dispatchers and so on).

A Very Simple Example Of Custom Thread

A very simple implementation of a custom thread can look like this:

class simple_custom_thread final : public so_5::disp::abstract_work_thread_t
{
  // Actual thread.
  std::thread thread_;
public:
  simple_custom_thread() = default;

  void start(body_func_t thread_body) override
  {
    // Start a new thread and run thread_body on it.
    thread_ = std::thread{ [tb = std::move(thread_body)] () {
        ... // Do some tuning of the new thread.
        // Run thread_body without intercepting exceptions.
        // It mean a crash of the whole application if thread_body throws,
        // but we don't care in such a simple implementation.
        tb();
      }
    };
  }

  void join() override
  {
    thread_.join();
  }
};

abstract_work_thread_factory_t Interface

A user has to define his/her own class that inherits so_5::disp::abstract_work_thread_factory_t type and implements its pure virtual methods.

At the moment so_5::disp::abstract_work_thread_factory_t defines just two virtual methods that have to be implemented in a derived class:

virtual so_5::disp::abstract_work_thread_t &
acquire( so_5::environment_t & env ) = 0;

virtual void release( so_5::disp::abstract_work_thread_t & thread ) noexcept = 0;

The acquire method should return a valid reference (not a pointer!) to an instance that implements the abstract_work_thread_t interface. It can be a reference to a dynamically allocated object, or to member of some aggregate, or something else. SObjectizer doesn't care about the nature of that instance.

SObjectizer will call start method for a returned thread instance.

The acquire method has to throw an exception derived from std::exception if a new thread can't be acquired.

The release method receives a reference acquired by a previous call to acquire method. The release method has to take all necessary actions for utilizing the returned thread object. For example, if acquire allocates a new thread object on every call then release has to deallocate the object passed to release.

The release method must not throw because SObjectizer has no way to restore if release throws. Because of that release is marked as noexcept.

If acquire successes then SObjectizer guarantees that obtained thread object will be returned to the factory by calling release at the moment when SObjectizer no more needs that thread. The return occurs even if the thread object throws in its start method.

An Implementation Of abstract_work_thread_factory_t Has To Be Thread Safe

Methods of abstract_work_thread_factory_t can and in most cases will be called from different threads. For example, acquire method can be called from one thread, but release will be called from another thread.

Several calls to acquire and release methods can be performed at the same time from different threads.

A Very Simple Implementation Of Custom Thread Factory

A very simple example of a custom thread factory can look like this:

class simple_custom_factory final : public so_5::disp::abstract_work_thread_factory_t
{
public:
  simple_custom_factory() = default;

  so_5::disp::abstract_work_thread_t & acquire(so_5::environment_t & /*env*/) override
  {
    // Just allocate a new object on every call.
    return *(new simple_custom_thread{});
  }

  void release(so_5::disp::abstract_work_thread_t & thread) noexcept override
  {
    // It's a reference to an object created in a previous call to acquire.
    // So just deallocate it.
    delete &thread;
  }
};

How To Specify Own Thread Factory To SObjectizer

To specify an own thread factory to SObjectizer it's required to create an instance of that thread factory and then pass this instance to:

  • the params of a SObjectizer's dispatcher;
  • the params of the whole SObjectizer Environment.

For example:

class my_thread final : public so_5::disp::abstract_work_thread_t {...};
class my_thread_factory final : public so_5::disp::abstract_work_thread_factory_t {...};
...
int main()
{
  // Create an instance of the custom factory.
  auto factory = std::make_shared<my_thread_factory>(...);
  // Launch SObjectizer.
  so_5::launch([factory](so_5::environment_t & env) {
      // Create some agents as one coop.
      env.introduce_coop([factory](so_5::coop_t & coop) {
          // This agent will work on the default dispatcher.
          // The default dispatcher will use a standard SObjectizer's thread.
          coop.make_agent<some_agent>(...);

          // Create an active_obj dispatcher that will use the custom thread factory.
          auto ao_disp = so_5::disp::active_obj::make_dispatcher(
              coop.environment(),
              "disp_with_my_threads",
              so_5::disp::active_obj::disp_params_t{}.work_thread_factory(factory));
          // The following agents with be bound to a dispatcher with custom threads.
          coop.make_agent_with_binder<another_agent>(ao_disp.binder(), ...);
          coop.make_agent_with_binder<yet_another_agent>(ao_disp.binder(), ...);
          ...
        });
    });
}

In the example above only one dispatcher will use a custom thread factory because that factory is specified in the parameters for that dispatcher. All other dispatchers (including the default one) will use the standard factory provided by SObjectizer.

It is possible to set a custom factory for the whole SObjectizer Environment -- via the environment's parameters:

int main()
{
  // Create an instance of the custom factory.
  auto factory = std::make_shared<my_thread_factory>(...);
  // Launch SObjectizer.
  so_5::launch([](so_5::environment_t & env) {
      // Create some agents as one coop.
      env.introduce_coop([](so_5::coop_t & coop) {
          // This agent will work on the default dispatcher.
          // The default dispatcher will use a custom thread.
          coop.make_agent<some_agent>(...);

          // Create an active_obj dispatcher that will use the factory from SObjectizer.
          auto ao_disp = so_5::disp::active_obj::make_dispatcher(
              coop.environment());
          // The following agents with be bound to a dispatcher with custom threads.
          coop.make_agent_with_binder<another_agent>(ao_disp.binder(), ...);
          coop.make_agent_with_binder<yet_another_agent>(ao_disp.binder(), ...);
          ...
        });
    },
    [factory](so_5::environment_params_t & params) {
      // Set a custom thread factory to the whole environment.
      params.work_thread_factory(factory);
    });
}

All standard SObjectizer's dispatchers have their own disp_params_t types and since v.5.7.3 there are work_thread_factory methods in all those types. The work_thread_factory method allows specifying a thread factory to be used by an instance of the dispatcher. If that method isn't called by a user then the dispatcher gets thread factory from the SObjectizer Environment. It means that the factory specified directly to a dispatcher has precedence on the default factory from the environment.

class my_thread final : public so_5::disp::abstract_work_thread_t {...};
class my_thread_factory final : public so_5::disp::abstract_work_thread_factory_t {...};
class another_factory final : public so_5::disp::abstract_work_thread_factory_t {...};
...
int main()
{
  // Create an instance of the custom factory.
  auto factory = std::make_shared<my_thread_factory>(...);
  // Launch SObjectizer.
  so_5::launch([](so_5::environment_t & env) {
      // Create some agents as one coop.
      env.introduce_coop([](so_5::coop_t & coop) {
          // This agent will work on the default dispatcher.
          // The default dispatcher will use the default factory from Environment.
          coop.make_agent<some_agent>(...);

          // Create an active_obj dispatcher that will use the custom thread factory.
          auto ao_disp = so_5::disp::active_obj::make_dispatcher(
              coop.environment(),
              "disp_with_my_threads",
              so_5::disp::active_obj::disp_params_t{}
                .work_thread_factory(std::make_shared<another_factory>(..)));
          // The following agents with be bound to a dispatcher with custom threads.
          coop.make_agent_with_binder<another_agent>(ao_disp.binder(), ...);
          coop.make_agent_with_binder<yet_another_agent>(ao_disp.binder(), ...);
          ...
        });
    },
    [factory](so_5::environment_params_t & params) {
      // Set a custom thread factory to the whole environment.
      params.work_thread_factory(factory);
    });
}
Clone this wiki locally