Skip to content

alexzaitzev/Contour

 
 

Repository files navigation

logo Contour

Contour status NuGet Status Nuget

About

Contour is the message bus implementation which provide communication between .NET services via different transport protocols. At this moment it supports only AMQP/RabbitMQ.

Configuring via xml-file

Connection

Configuration can be set in .config file, declaring the 'endpoints' section inside the 'serviceBus' group:

<sectionGroup name="serviceBus">
    <section name="endpoints" type="Contour.Configurator.EndpointsSection, Contour" />
</sectionGroup>

Endpoints declaration

Service bus works with the endpoints at which messages are sent. For example, if application has two endpoints, then for service bus it is interaction between different participants. And vice versa. If two different applications use the same endpoints, then it is one interaction participant for service bus (when default routing is used)

Endpoints configuration is defined in 'serviceBus' section:

<serviceBus>
    <endpoints>
        <endpoint name="point1" connectionString="amqp://localhost:5672/ ">
            <!-- ... -->
        </endpoint>
        <endpoint name="point2" connectionString="amqp://localhost:5672/ ">
            <!-- ... -->
        </endpoint>
    </endpoints>
</serviceBus>

It is possible to define the configuration for multiple endpoints. Each endpoint has mandatory parameters, unique name and connection string to the broker.

Additionally, we can pass component name of IBusLifecycleHandler type in the attribute 'lifecycleHandler', which will be invoked when the state of service bus client is changed. For example, when it starts or stops.

<serviceBus>
    <endpoints>
        <endpoint name="point1" connectionString="amqp://localhost:5672/" lifecycleHandler="SpecialLifecycleHandler">
            <!-- ... -->
        </endpoint>
    </endpoints>
</serviceBus>

It may be helpful, if your component needs to know when service bus was started or stopped.

Every message, that failed to process, falls into fault queue. Fault queue has the same name as the endpoint with addition '.Fault' postfix. To control TTL of fault messages in such queues set 'faultQueueTtl' attribute. To limit maximum number of messages - use 'faultQueueLimit' attribute. If number of fault messages reaches this limit, old messages will be substituted.

<serviceBus>
    <endpoints>
        <endpoint name="point1" connectionString="amqp://localhost:5672/" faultQueueTtl="3.00:00:00" faultQueueLimit="500">
            <!-- ... -->
        </endpoint>
    </endpoints>
</serviceBus>

Default value for TTL - 21 days, for maximum size of the fault queue - no limits.

Declaration of global message validators

Each incoming message passes through global validators, so they can be optionally defined in 'validators' section. Each class of validator must implement IMessageValidator interface (for more convenience you can inherit from class FluentPayloadValidatorOf, which has already implemented this interface).

Also you can set a value of 'group' parameter to 'true'. This would mean that you don’t need to enumerate all validators in component, and just add the name of a validators group, which will include all classes, implementing interface IMessageValidator.

<endpoint name="point1" connectionString="amqp://localhost:5672/ ">
    <validators>
        <add name="NameValidator" group="true"/>
    </validators>
    <!-- ... -->
</endpoint>

Caching

To control the caching of Request/Reply queries the tag 'caching' is used.

You can enable caching by setting attribute to ‘enabled’. At the requested side response is cached by the key, which is generated on the basis of the request body. Caching time is defined by the responder at the 'Reply'.

'Publisher/Subscriber' requests are not cached.

<endpoint name="point1" connectionString="amqp://localhost:5672/">
    <caching enabled="true"/>
    <!-- ... -->
</endpoint>

Configuring ParallelismLevel

Endpoint provides ability to setup the necessary number of incoming messages handles, each will be run in its own thread. Messages will be distributed between them by the Round-Robin principle.

<endpoints>
    <endpoint name="Emitter" connectionString="amqp://localhost:5672/" parallelismLevel="4">
    </endpoint>
</endpoints>

By default, one handler for incoming messages is used.

Configuring QoS

Endpoint also provides ability to setup a certain number of incoming messages by one access to the broker.

<endpoints>
    <endpoint name="Consumer" connectionString="amqp://localhost:5672/">
        <qos prefetchCount="8" />
        <incoming />
    </endpoint>
</endpoints>

By default, uses the value of 50 messages, which will be read by one access.

Sender’s dynamic routing

On the sender’s side you can turn on dynamic routing. It allows you to send messages with label, which wasn’t configured during the endpoint creation.

Dynamic routing can be useful in cases, when you don’t have enough information about all labels at the moment of the endpoint creation.

<endpoints>
    <endpoint name="point1" connectionString="amqp://localhost:5672/">
        <dynamic outgoing="true" />
    </endpoint>
</endpoints>

Declaration of outgoing messages

All outgoing messages declare in the 'outgoing' collection. Each message label declares with an individual tag 'route'. 'Key' and 'label' attributes are mandatory.

Key is a label alias and allows you not to mention concrete label of the message. For referring the label from the application, you must specify the alias adding a colon (:) as a prefix.

Additionally, you can set the parameters (as attributes of the 'route' element):

  • persist – broker saves the message for reliable delivery (false by default);
  • confirm – broker must confirm message receipt for the processing (message will be stored if persist="true", false by default);
  • ttl – message lifetime (unlimited, if the value is not specified);
  • timeout – response time for requests (default value – 30 seconds).

To support requests, you must declare a subscription point for response messages waiting. At the moment you can declare only default subscription point.

<outgoing>
    <route key="message" label="message.label" persist="true" ttl="00:01:00" />
    <route key="request" label="request.label" confirm="true" timeout="00:00:50">
        <callbackEndpoint default="true" />
    </route>
</outgoing>

Declaration of incoming messages

All incoming messages are declared in the 'incoming' collection. Each message label declares with an individual tag 'on'. 'Key' and 'label' attributes are mandatory.

Additionally, you can set the parameters (as attributes of the element 'on'):

  • requiresAccept – event processing must be explicitly confirmed in the handler. Otherwise message will be returned to the queue (false by default);
  • react – message handler name (for example, the name of the handler is registered in the IoC-container);
  • type – CLR-type of the event body. Can be represented by fully qualified name. In this case, default mechanism of type searching is used. Or you can use a short name type. In this case, it will search for all loaded assemblies in the application domain. If type isb’t mentioned will be using ExpandoObject by default. If type is not specified, ExpandoObject will be used by default.
  • validate – message validator name (class that implements IMessageValidator), which must operate only within a declared subscription;
  • lifestyle – allows you to specify a handler lifestyle.

Lifestyle possible values:

  • Normal – handler is requested, when you create a bus client (default value);
  • Lazy – handler is requested once, when you get the first message;
  • Delegated – handler is requested every time you get the message, allowing you to manage handler lifetime more flexible.
<incoming>
    <on key="message" label="message.label" react="MessageHandler" />
    <on key="request" label="request.label" react="RequestHandler" type="RequestPayload" validate="InputValidator" requiresAccept="true" />
    <on key="lazy" label="lazy.label" react="LazyHandler" lifestyle="Lazy" />
</incoming>

Applying configuration

To use configurator, you need to create an instance of the AppConfigConfigurator class.

Class constructor can take an object, implementing IDependencyResolver interface, which is uses for getting bus client dependencies. It can be used for searching message handlers or specifying life cycle handler. If concrete client configuration is not required to specify external dependencies, this parameter can be omitted. For simplifying, instead of implementing a particular class, you can use the delegate of DependencyResolverFunc type.

For example, a typical creation of object AppConfigConfigurator using Ninject for obtaining handlers looks as follows:

var configurator = new AppConfigConfigurator((name, type) => kernel.Get(type, name));

To apply configuration, method Configure is used, which must be invoked during the bus instance creation.

var bus = new BusFactory().Create(cfg =>
{
    configurator.Configure("point1", cfg);
});

The advantage of this method, is that you can combine configuration via code and configuration file.

If endpoints names are unknown (or should not be known) at compile time, they can be accessed through the Endpoints property. Example of creating and configuring all bus clients:

var busFactory = new BusFactory();
var busInstances = configurator
                    .Endpoints
                    .Select(e => busFactory.Create(cfg => configurator.Configure(e, cfg)))
                    .ToList();

Configuring via C# code

Endpoint with all its parameters (except lifestyle) can be configured via code as well.

To use dynamic routing, it is necessary to set message label to the value MessageLabel.Any.

IBus bus1 = new BusFactory().Create(
    cfg =>
        {
            cfg.UseRabbitMq();
            cfg.SetEndpoint("point1");
            cfg.SetConnectionString("amqp://localhost:5672/ ");
            cfg.HandleLifecycleWith(new SpecialLifecycleHandler());
            cfg.RegisterValidator(new NameValidator());
            cfg.EnableCaching();
            cfg.UseParallelismLevel(4);
            cfg.SetDefaultQoS(8);
            cfg.Route("out.message.label")
                .Persistently()
                .WithTtl(new TimeSpan(0, 1, 0));
            cfg.Route("out.request.label")
                .WithRequestTimeout(new TimeSpan(0, 0, 50))
                .WithConfirmation();
            cfg.On("in.message.label")
                .ReactWith(MessageHandler());
            cfg.On<RequestPayload>("in.request.label")
                .ReactWith(RequestHandler())
                .WhenVerifiedBy(new InputValidator())
                .RequiresAccept();
            cfg.On<RequestPayload>("in.lazy.label")
                .ReactWith(LazyHandler());
        });

IBus bus2 = new BusFactory().Create(
    cfg =>
        {
            cfg.UseRabbitMq();
            cfg.SetEndpoint("point2");
            cfg.SetConnectionString("amqp://localhost:5672/");
        });

Contour headers

Often interaction comprises of several components coming one by one in a chain, and a need to track message passing through this chain appears. For this reason, some incoming messages headers are copied in the outgoing messages.

Header field name Description Copying
x-correlation-id The correlation identifier is needed to combine a set of messages in one group. For example, it allows you to match reply message with the request Yes
x-expires Header, which contains the rules of data deterioration. For example: x-expires: at 2016-04-01T22:00:33Z or x-expires: in 100 No
x-message-type Message label with which it was sent. Using this header is not recommended No
x-persist Need this message be persistent (saved on disk) or not No
x-reply-route Reply message address to the request. For example, x-reply-route: direct:///amq.gen-n9DsUj1qm4vgCq0MHHPoBQ No
x-timeout Response timeout to the request No
x-ttl Message TTL No
x-breadcrumbs List of all endpoints, through which message has passed, separated by semicolon (;) Yes (adding new value)
x-original-message-id Message identifier, that started the messages exchange Yes

Channels and pipes in Contour

In Contour you can organize message processing as a set of sequential atomic message transformations based on a Pipes and Filters template.

For example, you need to forward messages, that has field 'Tick' and its value is odd. Other messages should be filtered. It can be done with the following set of filters:

var configurator = new AppConfigConfigurator((name, type) => kernel.Get(type, name));
configuration.On("document.systemtick")
    .ReactWith(new PipelineConsumerOf<ExpandoObject>(
        new IMessageOperator[]
            {
                new JsonPathFilter("Tick"),
                new Filter(request => ((dynamic)request.Payload).Tick % 2 == 1),
                new StaticRouter("document.oddsystemtick")
            }));

Most of the filters (messages transformers) are universal and can be used for processing different types of messages.

Contour filters

Contour comprises the following set of the universal message filters:

  • Acceptor,
  • ContentBasedRouter,
  • Filter,
  • JsonPathFilter,
  • RecipientList,
  • Reply,
  • Splitter,
  • StaticRouter,
  • Translator,
  • TransparentReply,
  • WireTap.

Acceptor

Confirms the message processing and forwards it to the next filter.

ContentBasedRouter

Content Based Router

Routes each message to the correct recipient based on a message content (template in EIP).

Behavior is defined by the signature:

public ContentBasedRouter(Func<IMessage, MessageLabel> routeResolverFunc)

Filter

Filter

Filters messages by the predicate (template in EIP).

Behavior is defined by the signature:

public Filter(Predicate<IMessage> predicateFunc)

JsonPathFilter

Filter checks node availability at the specified JSONPath.

Behavior is defined by the signature:

public JsonPathFilter(string jsonPath)

RecipientList

Recipient List

Inspects an incoming message, determines the list of desired recipients, and forwards the message to all channels associated with the recipients in the list. (template in EIP).

Behavior is defined by the signature:

public RecipientList(Func<IMessage, MessageLabel[]> determineRecipientList)

Reply

Sends an incoming message as a reply and breaks the message flow. It is terminal filter in the chain.

Splitter

Splitter

Break out the composite message into a series of individual messages, each of them contains data related to one item. (template in EIP).

Behavior is defined by the signature:

public Splitter(Func<IMessage, IEnumerable<object>> splitterFunc)

StaticRouter

Forwards the incoming messages to the specified recipient.

Behavior is defined by the signature:

public StaticRouter(string label)

Translator

Translator

Translate one data format into another (template in EIP).

Behavior is defined by the signature:

public Translator(Func<IMessage, object> translationFunc)

TransparentReply

Sends incoming message as a reply and forwards it to the incoming flow.

WireTap

wiretap

A special case of RecipientList, which allows you to listen to the channel (template in EIP).

Behavior is defined by the signature:

public WireTap(MessageLabel messageLabel)

Features

  1. All outgoing messages of the last operation are published in the bus.
  2. Synchronous messages processing.
  3. Broker is not used to transmit messages between operators.
  4. Operations that produce multiple messages per message, publish them in the same outgoing channel.
  5. Operations are executed in the same address space and can affect each other. It is not recommended to use Reflection and mutable objects.

Build the project

  • clone the repository
  • run "build.cmd" to make sure all unit tests are still passing.
  • run "build.cmd RunAllTests" to make sure all integration and unit tests are still passing. In this case you have to configure access to the RabbitMQ broker.

Library license

The library is available under MIT. For more information see the License file in the GitHub repository.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Languages

  • C# 99.6%
  • Other 0.4%