Small collection of utilities that hasn’t yet justified their own repositories.
A simple loadbalancer / circuitbreaker
Used between services in a SOA, loadbalances requests over multiple nodes, automatically removes failing nodes and stops cascading failures.
{ok, _} = gen_lb:start_link(service_x,
[ %% nodes in this cluster
{cluster, ["node1","node2","node3"]}
%% module implementing the actual code
%% communicating with cluster
, {cb_mod, testmod}
%% interval upon which failures are
%% calculated (ms)
, {interval, 600000} %10 min
%% max errors during interval that a node can
%% experience before being taken out of rotation
, {max_errors, 2}
%% max crasches during interval that a node can
%% experience before being taken out of rotation
, {max_crasches, 1}
%% Time a blocked node will be kept out of cluster
%% before being automatically inserted again
, {block_time, 300000} %5 min
]),
%% call service_x that can be handled by node[123]. gen_lb keeps track
%% of crasches, failures, load-balancing etc.
case gen_lb:call(service_x, do_something,
[ %% do up to two attempts (ie: if the first fail do another).
%% multiple attempts will always hit atleast two nodes
{attempts, 2}
]) of
{ok, Res} ->
%% everything went good
{ok, Res};
{error, cluster_down} ->
%% all nodes in the cluster are down (previous crasches/errors)
%% nothing we can do except wait until they are automatically reinserted
{error, cluster_down};
{error, Rsn} ->
%% call failed for whatever reason.
%% the failure is stored in gen_lb and might result in node being taken
%% out of rotation
{error, Rsn}
end.
%% manually block a node (infinite time)
gen_lb:block(service_x, "node1").
%% manually unblock a node
gen_lb:unblock(service_x, "node1").
Callback module testmod.erl
-module(testmod).
-behaviour(gen_lb).
-export([exec/2]).
exec(Node, Args) ->
%% do the actual call to the node selected by gen_lb
case do_something(Node, Args) of
{ok, Res} ->
{ok, Res};
{error, Rsn} ->
%% we failed, this will increment number of errors on this node.
%% request will possibly be retried with another node
{error, Rsn}
end.
All requests go through a gen_server, nodes can be kept in an ETS-table in order to avoid this.
Better support for timeouts
More metrics
A persistant priority queue with the ability to set ‘due’ time on enqueued items.
%% start your storage backend (postgres/riak/ETS/etc)
%% for this example we use a gen_server keeping track of a dets table. Source
%% is found in test/
{ok, _} = yamq_dets_store:start_link("foo.dets"),
%% define a function that does something with enqueued items
F = fun(Task) ->
io:format("got a task ~p", [Task]),
do_something(Task)
end,
{ok, _} = yamq:start_link([ {store, yamq_dets_store}
%% function that will be executed for enqueued
%% tasks
, {func, F}
%% number of concurrent workers
, {workers, 4}
]),
%% enqueue a task with priority 1 that will be handled as soon as
%% there is a worker thread ready
ok = yamq:enqueue(foo, [{priority, 1}]),
%% enqueue a task with priority 8 that will be handled in 10minutes or later
ok = yamq:enqueue(bar, [{priority, 8}, {due, 600000}]),
%% enqueue three tasks and make sure they are executed in serial
SerializeID = 123456789,
ok = yamq:enqueue(baz, [{serialize, SerializeID}]),
ok = yamq:enqueue(buz, [{serialize, SerializeID}]),
ok = yamq:enqueue(bar, [{serialize, SerializeID}]),
%% stop queue
ok = yamq:stop(),
Very brief:
- Upon startup store is consulted in order to rebuild index
- New items are written to store before they are executed
- 8 priority levels, one ETS table (ordered_set) / level
Needs to look somewhat like this
-module(yamq_store_impl).
-behaviour(yamq_store).
-export([ get/1
, put/2
, delete/1
, list/0
]).
get(K) -> {ok, V}.
put(K,V) -> ok.
delete(K) -> ok.
list() -> {ok, []}.
Some rough estimates can be found in test/perf1.erl.
My thinkpad x230 handled several million entries with a due time from now to +6 months without any problems which is more than good enough for my usecase.