Skip to content

bajankristof/kyu

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

36 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

kyu - Simplified Erlang AMQP client

kyu is an AMQP client application that provides a simplified abstraction layer above the official amqp_client. This of course also means that the module isn't suited to handle every possible use case (check out the features section below).

kyu is heavily inspired by the great turtle application. In some areas it provides more functionality over turtle, in others it lacks. The one feature that it definitely lacks is built in rpc support. If you don't want to implement that yourself I suggest taking a look at turtle.

Features

kyu...

  • Can setup multiple AMQP connections and also maintains them (if a connection is lost it will try to reconnect based on the connection's configuration). If the connection is down you will receive errors when making AMQP dependent calls.
  • Uses names for connections, publishers, and consumers, this way you can access them from anywhere in your application. The name can be any Erlang term (except pid()s), but has to be unique per handler type (a connection can have the same name as a publisher but two connections with the same name will conflict).
  • Provides an easy way to setup publishers or publisher pools that can be used to send messages from anywhere in your application.
  • Supports a special publish execution mode that handles returned messages from the AMQP channel and replies with an error to the caller (can be used to make sure the published message was successfully routed to at least one queue).
  • Provides an easy way to setup consumers. Messages are handled by a stateful callback module defined in the consumer's configuration.
  • Provides an interface to communicate with the RabbitMQ management HTTP API.
  • Adds special declaration commands using the RabbitMQ management HTTP API, that can be used to setup publishers and consumers.

Build

kyu can be built using rebar3:

rebar3 compile

Installation

Simply add kyu to your rebar3 dependecies and to the applications list (for example in <yourapp>.app.src).

%% rebar.config
{deps, [
    kyu,
    %% or
    {kyu, "~> 2.0"},
    %% or
    {kyu, {git, "git://github.com/bajankristof/kyu.git"}}
]}
%% <yourapp>.app.src
{application, yourapp, [
    {mod, {yourapp_app, []}},
    {applications, [
        kernel,
        stdlib,
        kyu
    ]}
]}.

Usage

One way to use the application is by providing connection configurations in environment variables (for example in your sys.config or sys.config.src file). This will use kyu's supervisor to start the connections.

Another way is to start your application and then attach connections to your supervision tree using kyu_connection:child_spec/1 (check out the docs).

%% example sys.config
[
    {kyu, [
        {connections, [#{
            name => <<"rabbitmq_cluster">>, %% required
            url => "amqp://user:password@rabbitmq.cluster", %% optional
            host => "rabbitmq.cluster", %% optional - default: "localhost"
            port => 5672, %% optional - default: 5672
            username => <<"user">>, %% optional - default: <<"guest">>
            password => <<"password">>, %% optional - default: <<"guest">>
            retry_delay => 5000, %% (ms) optional - default: 1000
            retry_attempts => 99, %% optional - default: infinity
            management_host => "rabbitmq.management", %% optional - default: connection host
            management_port => 443 %% optional - default: 15672
        }]}
    ]}
].

If the retry_attempts option is 0 or below the server will try to connect infinitely.

For the full set of configuration options check out the docs.

Publishers

-include_lib("kyu/include/amqp.hrl"). %% amqp commands

kyu_publisher:child_spec(#{
    connection => <<"rabbitmq_cluster">>, %% required
    name => <<"my_publisher">>, %% required
    confirms => true, %% optional - default: true
    commands => [ %% optional
        #'exchange.declare'{
            exchange = <<"my_exchange">>,
            type = <<"topic">>,
            durable = false
        }
    ]
}).

After introducing the returned child spec to your supervision tree, you can start publishing messages.

Publisher = <<"my_publisher">>,
Message = #{
    routing_key => <<"my.routing.key">>,
    exchange => <<"my_exchange">>,
    payload => <<"hello world">>,
    execution => async %% check out the explanation below
},
kyu:publish(Publisher, Message).

This will try to publish the provided message.

For a full set of possible message properties, check out the docs.

Execution

This setting will tell the publisher how to act on messages:

  • sync (default): If the publisher is in confirm mode, it will make the caller wait for the AMQP server to confirm the publication. This is only synchronous from the perspective of the caller! Other messages can still be published at the same time.
  • async: The publisher will ignore confirmation or returned messages.
  • supervised: The publisher will make the caller wait for either the AMQP server to confirm or return the message and reply with a value based on what was sent back by the server (for example {error, <<"NO_ROUTE">>}. For this to work the publisher must be in confirm mode and the mandatory flag has to be set to true on the message. To be able to identify a returned message, the publisher overrides the message_id property with a custom value! This is only synchronous from the perspective of the caller! Other messages can still be published at the same time.

Consumers

-include_lib("amqp.hrl").
-include_lib("kyu.hrl").

kyu_consumer:child_spec(#{
    connection => <<"rabbitmq_cluster">>, %% required
    name => <<"my_consumer">>, %% required
    queue => <<"queue_to_consume">>, %% required
    module => my_worker, %% required
    args => #{}, %% optional - default: undefined
    prefetch_count => 2, %% required - will start a channel with global prefetch count of 2, and 2 workers with local prefetch count of 1
    commands => [ %% optional
        #'queue.declare'{queue = <<"queue_to_consume">>},
        #'kyu.queue.bind'{ %% this is one of the special commands introduced by kyu
            routing_key = <<"my.routing.key">>,
            exchange = <<"my_exchange">>,
            queue = <<"queue_to_consume">>,
            exclusive = true
        }
    ]
}).

After introducing the consumer to your supervision tree, it will start to consume messages from the provided queue and make calls to the specified worker module.

-module(my_worker).

-behaviour(kyu_worker).

-exports([
    init/1,
    handle_message/2,
    handle_call/3,
    handle_cast/2,
    handle_info/2,
    terminate/2
]).

%% this callback is optional and works much like gen_server:init/1
-spec init(Args :: term()) -> {ok, State :: term()} | {stop, Reason :: term()}.
init(Args) ->
    %% initialization
    {ok, Args}.

-spec handle_message(Message :: kyu:message(), State :: term()) ->
    {ack, NewState :: term()}
    | {reject, NewState :: term()}
    | {remove, NewState :: term()}
    | {stop, Reason :: term(), NewState :: term()}.
handle_message(Message, State) ->
    %% consumption
    {ack, State}.

%% this callback is optional and works like gen_server:handle_call/3
-spec handle_call(Request :: term(), From :: gen_server:from(), State :: term()) ->
    {reply, Reply :: term(), NewState :: term()}
    | {noreply, NewState :: term()}
    | {stop, Reason :: term(), NewState :: term()}.
handle_call(_Request, _From, State) ->
    {noreply, State}.

%% this callback is optional and works like gen_server:handle_cast/2
-spec handle_cast(Request :: term(), State :: term()) ->
    {noreply, NewState :: term()}
    | {stop, Reason :: term(), NewState :: term()}.
handle_cast(_Request, State) ->
    {noreply, State}.

%% this callback is optional and works like gen_server:handle_info/2
-spec handle_info(Info :: term(), State :: term()) ->
    {noreply, NewState :: term()}
    | {stop, Reason :: term(), NewState :: term()}.
handle_info(_Info, State) ->
    {noreply, State}.

%% this callback is optional and works like gen_server:terminate/2
-spec terminate(Reason :: term(), State :: term()) -> term().
terminate(_Reason, _State) -> ok.
  • ack - (as you would expect) acks the message
  • reject - rejects and requeues the message
  • remove - rejects and removes the message from the queue
  • stop - crashes the worker process

Commands

kyu supports commands in publisher and consumer configurations. This allows you to declare exchanges, queues and bindings before publishing or consuming.

AMQP commands

The out of the box supported AMQP commands are:

  • exchange.declare
  • exchange.delete
  • exchange.bind
  • exchange.unbind
  • queue.declare
  • queue.bind
  • queue.purge
  • queue.delete
  • queue.unbind

Use these in a module:

-include_lib("kyu/include/amqp.hrl").

Kyu commands

There are two custom commands at the moment:

%% extends the standard 'queue.bind' command
#'kyu.queue.bind'{
    routing_key :: binary(),
    exchange :: binary(),
    queue :: binary(),
    arguments :: list(),

    exclusive :: boolean()
    %% when set to false the command is equivalent to 
    %% a standard 'queue.bind' command

    %% when set to true it will unbind any other routing key
    %% with the same arguments (above)
}
%% provides an alternative to the standard 'queue.unbind' command
#'kyu.queue.unbind'{
    except = <<>> :: binary(), %% an exception routing key
    pattern = <<>> :: binary(), %% a regex pattern
    %% if a routing key bound to the queue
    %% matches the pattern and the arguments (below)
    %% it will be unbound
    %% (except and pattern may not be used together)

    exchange :: binary(),
    queue :: binary(),
    arguments :: list()
}

Use these in a module:

-include_lib("kyu/include/kyu.hrl").

Documentation

Read the full documentation here.