Skip to content

SO 5.8 ByExample Redirect And Transform

Yauheni Akhotnikau edited this page Jul 7, 2023 · 1 revision

Note. This example is not simple one. It shows more complex usage of SObjectizer's features. Because of that it is recommended to read SO-5.8 Basics, SO-5.8 InDepth Dispatchers, and SO-5.8 InDepth Message limits before.

Introduction

This example shows a solution for Producer-Consumer problem. The mechanism of message limits is used here.

A collector-perfomer idiom is a good thing for developing application-specific overload control for a particular task. But it requires some thinking and programming. Sometimes it is not needed. The problem could be very simple (an agent can simply ignore extra messages which it can't handle). Or there is no time for serious development (a quick-and-dirty proof-of-concept solution must be created as soon as possible). Message limits can be used in such cases. They allow to defend an agent without writing a lot of domain-specific code.

This sample show some features of SObjectizer's message limits mechanism: redirection of extra messages to different agent, transformation of extra messages to new messages, limitation of message count with application abortion on overload.

Those features could be useful in real-world applications. Lets imagine an application which uses special hardware devices like Hardware Security Modules (HSM). There could be several HSMs attached to the computer. Each HSM will be managed by a separate agent. One HSM can be expensive and very fast. It will be used as main HSM and it will serve almost all requests. Others could be rather old and slow. They will serve as reserve engines and will be used only at peaks of requests.

Message limits allow to specify a limit of request for each HSM-agent. When the limit will be reached an extra message will be redirected to the next HSM-agent in chain. When the limit is reached on the last HSM-agent in the chain then request could be transformed for negative reply immediately.

A working scheme like that is shown in this sample.

What Example Does

There is one cooperation with several agents.

There are two producer agents. They work on turn-by-turn basics. On every turn a producer generates some requests and send them to one performer mbox. Between turns a producer receives and handles replies to previously sent requests.

Producers work on dedicated thread_pool dispatcher with two working threads (it means that producers can work in parallel).

There are three performer agents. They are bound into a chain. The next performer in the chain handles requests slowly than the previous performer. Performers work on dedicated thread_pool dispatcher with three working threads (it means that performers can work in parallel).

There is a logger agent. This agent receives log messages and show them on the standard output. The logger works on dedicated one_thread dispatcher.

All agents have message limits.

A producer agent defines two limits:

  • one for msg_next_turn signal. There is no sense to have more than one such signal in queue and because of that all extra signals are ignored;
  • one for reply message. A producer can't handle more than 10 replies. All extra messages transformed to log_messages and routed to logger agent.

A performer agent defines one limit:

  • for request message. A performer can hold only 3 requests in the queue. All extra requests are redirected to the next performer in the chain. Or, if the performer is the last in the chain, extra requests are transformed to negative replies and sent back to producers.

A logger agent defines one limit:

  • for log_message message. There could be only 100 pending messages in the queue. If queue of that size is not enough then something is very wrong and application will be aborted.

The example works for 5 seconds and then finishes.

The example output could look like:

[+0ms] -- g1: sending request(1)
[+0ms] -- g1: sending request(2)
[+0ms] -- g1: sending request(3)
[+0ms] -- g1: sending request(4)
[+0ms] -- p1: processing request(1) for 87ms
[+0ms] -- g1: sending request(5)
[+0ms] -- g1: sending request(6)
[+0ms] -- g2: sending request(1000001)
[+0ms] -- g1: sending request(7)
[+0ms] -- g2: sending request(1000002)
[+0ms] -- g2: sending request(1000003)
[+0ms] -- g2: sending request(1000004)
[+0ms] -- g2: sending request(1000005)
[+0ms] -- g2: sending request(1000006)
[+0ms] -- g2: sending request(1000007)
[+0ms] -- p2: processing request(5) for 88ms
[+0ms] -- g2: reply received(1000004), processed:0
[+0ms] -- g2: reply received(1000005), processed:0
[+0ms] -- p3: processing request(1000001) for 121ms
[+0ms] -- g2: reply received(1000006), processed:0
[+0.088ms] -- p1: processing request(2) for 64ms
[+0.088ms] -- g1: reply received(1), processed:1
[+0.089ms] -- p2: processing request(6) for 117ms
[+0.089ms] -- g1: reply received(5), processed:1
[+0.122ms] -- p3: processing request(1000002) for 89ms
[+0.122ms] -- g2: reply received(1000001), processed:1
[+0.152ms] -- p1: processing request(3) for 70ms
[+0.152ms] -- g1: reply received(2), processed:1
[+0.206ms] -- p2: processing request(7) for 69ms
[+0.206ms] -- g1: reply received(6), processed:1
[+0.211ms] -- p3: processing request(1000003) for 98ms
[+0.211ms] -- g2: reply received(1000002), processed:1
[+0.222ms] -- p1: processing request(4) for 89ms
[+0.222ms] -- g1: reply received(3), processed:1
[+0.251ms] -- g1: sending request(8)
[+0.251ms] -- g2: sending request(1000008)
[+0.251ms] -- g1: sending request(9)
[+0.251ms] -- g2: sending request(1000009)
[+0.251ms] -- g1: sending request(10)
[+0.251ms] -- g2: sending request(1000010)
[+0.251ms] -- g1: sending request(11)
[+0.251ms] -- g2: sending request(1000011)
[+0.251ms] -- g1: sending request(12)
[+0.251ms] -- g2: sending request(1000012)
[+0.251ms] -- g1: sending request(13)
[+0.251ms] -- g2: sending request(1000013)
[+0.251ms] -- g1: reply received(12), processed:0
[+0.251ms] -- g2: reply received(1000012), processed:0
[+0.251ms] -- g1: reply received(13), processed:0
[+0.251ms] -- g2: reply received(1000013), processed:0
[+0.276ms] -- p2: processing request(1000007) for 69ms

The lines:

[+0ms] -- g2: reply received(1000004), processed:0
[+0ms] -- g2: reply received(1000005), processed:0
...
[+0ms] -- g2: reply received(1000006), processed:0
...
[+0.251ms] -- g1: reply received(12), processed:0
[+0.251ms] -- g2: reply received(1000012), processed:0
[+0.251ms] -- g1: reply received(13), processed:0
[+0.251ms] -- g2: reply received(1000013), processed:0

show us the moments when new requests were rejected because the queues for the producers were full.

Sample Code

// A request to be processed.
struct request : public so_5::message_t
{
#include <chrono>
#include <cstdlib>
#include <ctime>
#include <deque>
#include <iostream>
#include <sstream>
#include <string>
#include <random>

#include <so_5/all.hpp>

// A request to be processed.
struct request
{
	// Return address.
	so_5::mbox_t m_reply_to;
	// Request ID.
	int m_id;
	// Some payload.
	int m_payload;
};

// A reply to processed request.
struct reply
{
	// Request ID.
	int m_id;
	// Was request processed successfully?
	bool m_processed;
};

// Message for logger.
struct log_message
{
	// Text to be logged.
	std::string m_what;
};

// Logger agent.
class a_logger_t final : public so_5::agent_t
{
public :
	a_logger_t( context_t ctx )
		:	so_5::agent_t( ctx
				// Limit the count of messages.
				// Because we can't lost log messages the overlimit
				// must lead to application crash.
				+ limit_then_abort< log_message >( 100 ) )
		,	m_started_at( std::chrono::steady_clock::now() )
	{}

	void so_define_agent() override
	{
		so_default_state().event(
			[this]( const log_message & evt ) {
				std::cout << "[+" << time_delta()
						<< "] -- " << evt.m_what << std::endl;
			} );
	}

private :
	const std::chrono::steady_clock::time_point m_started_at;

	std::string time_delta() const
	{
		auto now = std::chrono::steady_clock::now();

		std::ostringstream ss;
		ss << double(
				std::chrono::duration_cast< std::chrono::milliseconds >(
						now - m_started_at ).count()
				) / 1000.0 << "ms";

		return ss.str();
	}
};

// Load generation agent.
class a_generator_t final : public so_5::agent_t
{
public :
	a_generator_t(
		// Environment to work in.
		context_t ctx,
		// Name of generator.
		std::string name,
		// Starting value for request ID generation.
		int id_starting_point,
		// Address of message processor.
		so_5::mbox_t performer,
		// Address of logger.
		so_5::mbox_t logger )
		:	so_5::agent_t( ctx
				// Expect no more than just one next_turn signal.
				+ limit_then_drop< msg_next_turn >( 1 )
				// Limit the quantity of non-processed replies in the queue.
				+ limit_then_transform( 10,
					// All replies which do not fit to message queue
					// will be transformed to log_messages and sent
					// to logger.
					[this]( const reply & msg ) {
						return make_transformed< log_message >( m_logger,
								m_name + ": unable to process reply(" +
								std::to_string( msg.m_id ) + ")" );
					} ) )
		,	m_name( std::move( name ) )
		,	m_performer( std::move( performer ) )
		,	m_logger( std::move( logger ) )
		,	m_turn_pause( 250 )
		,	m_last_id( id_starting_point )
	{}

	void so_define_agent() override
	{
		so_default_state()
			.event( &a_generator_t::evt_next_turn )
			.event( &a_generator_t::evt_reply );
	}

	void so_evt_start() override
	{
		// Start work cycle.
		so_5::send< msg_next_turn >( *this );
	}

private :
	// Signal about start of the next turn.
	struct msg_next_turn : public so_5::signal_t {};

	// Generator name.
	const std::string m_name;
	// Performer for the requests processing.
	const so_5::mbox_t m_performer;
	// Logger.
	const so_5::mbox_t m_logger;

	// Pause between working turns.
	const std::chrono::milliseconds m_turn_pause;

	// Last generated ID for request.
	int m_last_id;

	void evt_next_turn(mhood_t< msg_next_turn >)
	{
		// Create and send new requests.
		generate_new_requests( random( 5, 8 ) );

		// Wait for next turn and process replies.
		so_5::send_delayed< msg_next_turn >( *this, m_turn_pause );
	}

	void evt_reply( const reply & evt )
	{
		so_5::send< log_message >( m_logger,
				m_name + ": reply received(" + std::to_string( evt.m_id ) +
				"), processed:" + std::to_string( evt.m_processed ) );
	}

	void generate_new_requests( int requests )
	{
		for(; requests > 0; --requests )
		{
			auto id = ++m_last_id;

			so_5::send< log_message >( m_logger,
					m_name + ": sending request(" + std::to_string( id ) + ")" );

			so_5::send< request >( m_performer,
					so_direct_mbox(), id, random( 30, 100 ) );
		}
	}

	static int random( int l, int h )
	{
		std::random_device rd;
		std::mt19937 gen{ rd() };
		return std::uniform_int_distribution< int >{l, h}(gen);
	}
};

// Performer agent.
class a_performer_t final : public so_5::agent_t
{
public :
	// A special indicator that agent must work with anotner performer.
	struct next_performer { so_5::mbox_t m_target; };

	// A special indicator that agent is last in the chain.
	struct last_performer {};

	// Constructor for the case when worker is last in the chain.
	a_performer_t(
		context_t ctx,
		std::string name,
		float slowdown,
		last_performer,
		so_5::mbox_t logger )
		:	so_5::agent_t( ctx
				// Limit count of requests in the queue.
				// If queue is full then request must be transformed
				// to negative reply.
				+ limit_then_transform( 3,
					[]( const request & evt ) {
						return make_transformed< reply >( evt.m_reply_to,
								evt.m_id, false );
					} ) )
		,	m_name( std::move( name ) )
		,	m_slowdown( slowdown )
		,	m_logger( std::move( logger ) )
	{}

	// Constructor for the case when worker has the next performer in chain.
	a_performer_t(
		context_t ctx,
		std::string name,
		float slowdown,
		next_performer next,
		so_5::mbox_t logger )
		:	so_5::agent_t( ctx
				// Limit count of requests in the queue.
				// If queue is full then request must be redirected to the
				// next performer in the chain.
				+ limit_then_redirect< request >( 3,
					[next] { return next.m_target; } ) )
		,	m_name( std::move( name ) )
		,	m_slowdown( slowdown )
		,	m_logger( std::move( logger ) )
	{}

	void so_define_agent() override
	{
		so_default_state().event( &a_performer_t::evt_request );
	}

private :
	const std::string m_name;
	const float m_slowdown;
	const so_5::mbox_t m_logger;

	void evt_request( const request & evt )
	{
		// Processing time is depend on speed of the performer.
		auto processing_time = static_cast< int >(
				m_slowdown * float(evt.m_payload) );

		so_5::send< log_message >( m_logger,
				m_name + ": processing request(" +
				std::to_string( evt.m_id ) + ") for " +
				std::to_string( processing_time ) + "ms" );

		// Imitation of some intensive processing.
		std::this_thread::sleep_for(
				std::chrono::milliseconds( processing_time ) );

		// Generator must receive a reply for the request.
		so_5::send< reply >( evt.m_reply_to, evt.m_id, true );
	}
};

void init( so_5::environment_t & env )
{
	env.introduce_coop( [&env]( so_5::coop_t & coop ) {
		// Logger will work on the default dispatcher.
		auto logger = coop.make_agent< a_logger_t >();

		// Chain of performers.
		// Must work on dedicated thread_pool dispatcher.
		auto performer_disp = so_5::disp::thread_pool::make_dispatcher( env, 3 );
		auto performer_binding_params = so_5::disp::thread_pool::bind_params_t{}
				.fifo( so_5::disp::thread_pool::fifo_t::individual );

		// Start chain from the last agent.
		auto p3 = coop.make_agent_with_binder< a_performer_t >(
				performer_disp.binder( performer_binding_params ),
				"p3",
				1.4f, // Each performer in chain is slower then previous.
				a_performer_t::last_performer{},
				logger->so_direct_mbox() );
		auto p2 = coop.make_agent_with_binder< a_performer_t >(
				performer_disp.binder( performer_binding_params ),
				"p2",
				1.2f, // Each performer in chain is slower then previous.
				a_performer_t::next_performer{ p3->so_direct_mbox() },
				logger->so_direct_mbox() );
		auto p1 = coop.make_agent_with_binder< a_performer_t >(
				performer_disp.binder( performer_binding_params ),
				"p1",
				1.0f, // The first performer is the fastest one.
				a_performer_t::next_performer{ p2->so_direct_mbox() },
				logger->so_direct_mbox() );

		// Generators will work on dedicated thread_pool dispatcher.
		auto generator_disp = so_5::disp::thread_pool::make_dispatcher( env, 2 );
		auto generator_binding_params = so_5::disp::thread_pool::bind_params_t{}
				.fifo( so_5::disp::thread_pool::fifo_t::individual );

		coop.make_agent_with_binder< a_generator_t >(
				generator_disp.binder( generator_binding_params ),
				"g1",
				0,
				p1->so_direct_mbox(),
				logger->so_direct_mbox() );
		coop.make_agent_with_binder< a_generator_t >(
				generator_disp.binder( generator_binding_params ),
				"g2",
				1000000,
				p1->so_direct_mbox(),
				logger->so_direct_mbox() );
	});

	// Take some time to work.
	std::this_thread::sleep_for( std::chrono::seconds(5) );

	env.stop();
}

int main()
{
	try
	{
		so_5::launch( &init );
	}
	catch( const std::exception & ex )
	{
		std::cerr << "Error: " << ex.what() << std::endl;
		return 1;
	}

	return 0;
}
Clone this wiki locally