Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Initial commit

  • Loading branch information...
commit ff01dcef7ba84502b9c32087afc8fb9f63dbc6e1 0 parents
Fred Hebert authored
4 .gitignore
... ... @@ -0,0 +1,4 @@
  1 +*.swp
  2 +*.beam
  3 +*.dump
  4 +*.COVER.*
193 README.markdown
Source Rendered
... ... @@ -0,0 +1,193 @@
  1 +# Dispcount #
  2 +
  3 +Dispcount is an attempt at making more efficient resource dispatching than usual Erlang pool approaches based on a single process receiving messages from everyone and possibly getting overloaded when demand is too high, or at least seeing slower and slower response times.
  4 +
  5 +## When should I use dispcount? ##
  6 +
  7 +There have been a few characteristics assumed to be present for the design of dispcount:
  8 +
  9 +- resources are limited, but the demand for them is superior to their availability.
  10 +- requests for resources are *always* incoming
  11 +- because of the previous point, it is possible and prefered to simply not queue requests for busy resources, but instantly return. Newer requests will take their spot
  12 +- low latency to know whether or not a resource is available is more important than being able to get all queries to run.
  13 +
  14 +If you cannot afford to ignore a query and wish to eventually serve every one of them, dispcount might not be for you.
  15 +
  16 +## How to build ##
  17 +
  18 + `$ ./rebar compile`
  19 +
  20 +## Running tests ##
  21 +
  22 +Run the small Common Test suite with:
  23 +
  24 + `$ rebar ct`
  25 +
  26 +## How to use dispcount ##
  27 +
  28 +First start the application:
  29 +
  30 + `application:start(dispcount).`
  31 +
  32 +When resources need to be dispatched, a dispatcher has to be started:
  33 +
  34 + ok = dispcount:start_dispatch(
  35 + ref_dispatcher,
  36 + {ref_dispatch, []},
  37 + [{restart,permanent},{shutdown,4000},
  38 + {maxr,10},{maxt,60},{resources,10}]
  39 + )
  40 +
  41 +The general form is:
  42 +
  43 + ok = dispcount:start_dispatch(
  44 + DispatcherName,
  45 + {CallbackMod, Arg},
  46 + [{restart,Type},{shutdown,Timeout},
  47 + {maxr,X},{maxt,Y},{resources,Num}]
  48 + )
  49 +
  50 +The `restart`, `shutdown`, `maxr`, and `maxt` values allow to configure the supervisor that will take care of that dispatcher. The `resources` value lets you set how many 'things' you want available. If you were handling HTTP sockets, you could use 200 connections by putting `{resources,200}`. Skip to the next section to see how to write your own dispatcher callback module.
  51 +
  52 +The dispatcher is then put under the supervision structure of the dispcount application. To be able to further access resources, you need to fetch information related to the dispatcher:
  53 +
  54 + {ok, Info} = dispcount:dispatcher_info(ref_dispatcher)
  55 +
  56 +This is because we want to reduce the number of calls to configuration spots and centralized points in a node. As such, you should call this function in the supervisor of whatever is going to call the dispatcher, and share the value to all children if possible. That way, a basic blob of content is going to be shared without any cost to all processes.
  57 +
  58 +Using this `Info` value, calls to checkout resources can be made:
  59 +
  60 + case dispcount:checkout(Info) of
  61 + {ok, CheckinReference, Resource} ->
  62 + timer:sleep(10),
  63 + dispcount:checkin(Info, CheckinReference, Resource);
  64 + {error, busy} ->
  65 + give_up
  66 + end
  67 +
  68 +And that's the gist of it.
  69 +
  70 +## Writing a dispatcher callback module ##
  71 +
  72 +Each dispatcher to allow to lend resources is written as a callback for a custom behaviour. Here's an example (tested) callback module that simply returns references:
  73 +
  74 + -module(ref_dispatch).
  75 + -behaviour(dispcount).
  76 + -export([init/1, checkout/2, checkin/2, handle_info/2, dead/1,
  77 + terminate/2, code_change/3]).
  78 +
  79 + init([]) ->
  80 + {ok, make_ref()}.
  81 +
  82 +This one works a bit like a gen\_server. You have arguments passed and return `{ok,State}`. The state will then be carried around for the subsequent calls.
  83 +
  84 +The next function is `checkout`:
  85 +
  86 + checkout(_From, Ref) ->
  87 + {ok, Ref, undefined}.
  88 +
  89 +By default, the behaviour takes care of making sure only valid requests for a checkout (resources aren't busy) are going through. The `_From` variable is the pid of the requester of a resource. This is useful if you need to change things like a socket's controlling process or a port's controller. Then, you only need to return a resource by doing `{ok, Resource, NewState}`, and th ecaller will see `{ok, Reference, Resource}`. The `Reference` is a token added in by dispcount and is needed to chick the resource back in. Other things to return are `{error, Reason, NewState}`, which will return `{error, Reason}` to the caller.
  90 +
  91 +Finally, you can return `{stop, Reason, NewState}` to terminate the resource watcher. Note that this is risky because of how things work (see the relevant section for this later in this README).
  92 +
  93 +To check resources back in, the behaviour needs to implement the following:
  94 +
  95 + checkin(Ref, undefined) ->
  96 + {ok, Ref};
  97 + checkin(SomeRef, Ref) ->
  98 + {ignore, Ref}.
  99 +
  100 +In this case, what happens is that we make sure that the resource that is being sent back to us is the right one. The first function clause makes sure that we only receive a reference after we've distributed one, and we then accept that one. If we receive extraneous references (maybe someone called the `checkin/3` function twice?), we ignore the result.
  101 +
  102 +The second clause here is entirely optional and defensive programming. Note that checking a resource in is a synchronous operation.
  103 +
  104 +The next call is the `dead/1` function:
  105 +
  106 + dead(undefined) ->
  107 + {ok, make_ref()}.
  108 +
  109 +`dead(State)` is called whenever the process that checked out a given resource has died. This is because dispcount automatically monitors them so you don't need to do it yourself. If it sees the resource owner died, it calls thhat function.
  110 +
  111 + This lets you create a new instance of a resource to distribute later on, if required or possible. As an example, if we were to use a permanent connection to a database as a resource, then this is where we'd set a new connection up and then keep going as if nothing went wrong.
  112 +
  113 +You can also receive unexpected messages to your process, if you felt like implementing your own side-protocols or whatever:
  114 +
  115 + handle_info(_Msg, State) ->
  116 + {ok, State}.
  117 +
  118 +And finally, you benefit from a traditional OTP `terminate/2` function, and the related `code_change/3`.
  119 +
  120 + terminate(_Reason, _State) ->
  121 + ok.
  122 +
  123 + code_change(_OldVsn, State, _Extra) ->
  124 + {ok, State}.
  125 +
  126 +Here's a similar callback module to handle HTTP sockets (untested):
  127 +
  128 + -module(http_dispatch).
  129 + -behaviour(dispcount).
  130 + -export([init/1, checkout/2, checkin/2, handle_info/2, dead/1, terminate/2, code_change/3]).
  131 +
  132 + -record(state, {resource, given=false, port}).
  133 +
  134 + init([{port,Num}]) ->
  135 + {ok,Socket} = gen_tcp:connect({127,0,0,1}, Num, [binary]),
  136 + {ok, #state{resource=Socket, port=Num}}.
  137 +
  138 + %% just in case, but that should never happen anyway :V I'm paranoid!
  139 + checkout(_From, State = #state{given=true}) ->
  140 + {error, busy, State};
  141 + checkout(From, State = #state{resource=Socket}) ->
  142 + gen_tcp:controlling_process(Socket, From),
  143 + {ok, Socket, State#state{given=true}}.
  144 +
  145 + checkin(Socket, State = #state{resource=Socket, given=true}) ->
  146 + {ok, State#state{given=false}};
  147 + checkin(_Socket, State) ->
  148 + %% The socket doesn't match the one we had -- an error happened somewhere
  149 + {ignore, State}.
  150 +
  151 + dead(State) ->
  152 + %% aw shoot, someone lost our resource, we gotta create a new one:
  153 + {ok, NewSocket} = gen_tcp:connect({127,0,0,1}, State#state.port, [binary]),
  154 + {ok, State#state{resource=NewSocket,given=false}}.
  155 + %% alternatively:
  156 + %% {stop, Reason, State}
  157 +
  158 + handle_info(_Msg, State) ->
  159 + %% something unexpected with the TCP connection if we set it to active,once???
  160 + {ok, State}.
  161 +
  162 + terminate(_Reason, _State) ->
  163 + %% let the GC clean the socket.
  164 + ok.
  165 +
  166 + code_change(_OldVsn, State, _Extra) ->
  167 + {ok, State}.
  168 +
  169 +## How does it work ##
  170 +
  171 +What killed most of the pool and dispatch systems I used before was the amount of messaging required to make things work. When many thousands of processes require information from a central point at once, performance would quickly degrade as soon as the protocol had some messaging involved at its core.
  172 +
  173 +We'd see mailbox queue build-up, busy schedulers, and booming memory. Dispcount tries to solve the problem by using the ubiquitous Erlang optimization tool: ETS tables.
  174 +
  175 +The core concept of dispcount is based on two ETS tables: a dispatch table (write-only) and a worker matchup table (read-only). Two tables because what costs the most performance with ETS in terms of concurrency is switching between reading and writing.
  176 +
  177 +In each of the table, `N` entries are added: one for each resource available, matching with a process that manages that resource (a *watcher*). Persistent hashing of the resources allows to dispatch queries uniformly to all of these watchers. Once you know which watcher your request is dedicated to, the dispatch table is called into action.
  178 +
  179 +The dispatch table manages to allow both reads and writes while remaining write-only. The trick is to use the `ets:update_counter` functions, which atomically increment a counter and return the value, although the operation is only writing and communicating a minimum of information.
  180 +
  181 +The gist of the idea is that you can only get the permission to message the watcher if you're the first one to increment the counter. Other processes that try to do it just instantly give up. This guarantees that a single caller at a time has the permission to message a given worker, a bit like a mutex, but implemented efficiently (for Erlang, that is).
  182 +
  183 +Then the lookup table comes in action; because we have the permission to message a watcher, we look up its pid, and then send a message.
  184 +
  185 +Whenever we check a resource back in or the process that acquired it dies, the counter is reset to 0 and a new request can come in and take its place.
  186 +
  187 +Generally, this allows us to move the bottleneck of similar applications away from a single process and its mailbox, to an evenly distributed number of workers. Then the next bottleneck will be the ETS tables (both set with read and write concurrency options), which are somewhat less likely to be as much of a hot spot.
  188 +
  189 +## What's left to do? ##
  190 +
  191 +- More complete testing suite.
  192 +- Adding a function call to allow the transfer of ownership from a process to another one to avoid messing with monitoring in the callback module.
  193 +- Testing to make sure the callback modules can be updated with OTP relups and appups. This is so far untested.
5 include/state.hrl
... ... @@ -0,0 +1,5 @@
  1 +-record(config, {dispatch_name :: atom(),
  2 + num_watchers = 25 :: pos_integer(),
  3 + watcher_type = ets :: 'named' | 'ets',
  4 + dispatch_table :: ets:tid() | 'undefined',
  5 + worker_table :: ets:tid() | 'undefined'}).
BIN  rebar
Binary file not shown
31 rebar.config
... ... @@ -0,0 +1,31 @@
  1 +%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
  2 +%% ex: ts=4 sw=4 ft=erlang et
  3 +%% This is a sample rebar.conf file that shows examples of some of rebar's
  4 +%% options.
  5 +
  6 +%% == Core ==
  7 +
  8 +%% Additional library directories to add to the code path
  9 +{lib_dirs, []}.
  10 +
  11 +%% == Erlang Compiler ==
  12 +
  13 +%% Erlang compiler options
  14 +{erl_first_files, ["dispcount"]}.
  15 +{erl_opts, [debug_info, {i, "include"}, {d,'DEBUG'}]}.
  16 +
  17 +%% == Common Test ==
  18 +
  19 +%% Option to pass extra parameters when launching Common Test
  20 +{ct_extra_params, "-boot start_sasl -pa ebin/"}.
  21 +
  22 +%% == Dependencies ==
  23 +
  24 +%% Where to put any downloaded dependencies. Default is "deps"
  25 +{deps_dir, "deps"}.
  26 +
  27 +%% What dependencies we have, dependencies can be of 3 forms, an application
  28 +%% name as an atom, eg. mochiweb, a name and a version (from the .app file), or
  29 +%% an application name, a version and the SCM details on how to fetch it (SCM
  30 +%% type, location and revision). Rebar currently supports git, hg, bzr and svn.
  31 +{deps, []}.
10 src/dispcount.app.src
... ... @@ -0,0 +1,10 @@
  1 +{application, dispcount, [
  2 + {description, "A dispatching library for resources and "
  3 + "task limiting based on shared counters"},
  4 + {vsn, "0.1.0"},
  5 + {applications, [kernel, stdlib]},
  6 + {registered, []},
  7 + {mod, {dispcount,[]}},
  8 + {modules, [dispcount, dispcount_supersup, dispcount_sup, dispcount_util,
  9 + dispcount_watcher, dispcount_serv]}
  10 +]}.
49 src/dispcount.erl
... ... @@ -0,0 +1,49 @@
  1 +-module(dispcount).
  2 +-behaviour(application).
  3 +-export([start/2,stop/1]).
  4 +-export([start_dispatch/3, stop_dispatch/1, dispatcher_info/1, checkout/1, checkin/3]).
  5 +-export([behaviour_info/1]).
  6 +
  7 +%% eventually switch to -callback if it becomes backwards compatible
  8 +behaviour_info(callbacks) ->
  9 + [{init,1},
  10 + {checkout, 2},
  11 + {checkin, 2},
  12 + {handle_info,2},
  13 + {dead,1},
  14 + {terminate,2},
  15 + {code_change,3}];
  16 +behaviour_info(_Other) ->
  17 + undefined.
  18 +
  19 +-spec start(normal, _) -> {ok, pid()}.
  20 +start(normal, _Args) ->
  21 + dispcount_supersup:start_link().
  22 +
  23 +-spec stop(_) -> ok.
  24 +stop(_State) ->
  25 + ok.
  26 +
  27 +-spec stop_dispatch(Name::atom()) -> ok.
  28 +stop_dispatch(Name) ->
  29 + dispcount_supersup:stop_dispatch(Name).
  30 +
  31 +-spec start_dispatch(Name::atom(), {module(), _}, term()) -> ok | already_defined.
  32 +start_dispatch(Name, Mod={M,A}, DispatchOpts) when is_atom(M) ->
  33 + Res = dispcount_supersup:start_dispatch(Name, Mod, DispatchOpts),
  34 + %% wait for all tables to be there. A bit messy, but it can be done:
  35 + dispcount_serv:wait_for_dispatch(Name, infinity),
  36 + Res.
  37 +
  38 +%% Should be called as infrequently as possible
  39 +-spec dispatcher_info(Name::atom()) -> term().
  40 +dispatcher_info(Name) ->
  41 + dispcount_serv:get_info(Name).
  42 +
  43 +-spec checkout(term()) -> {ok, term(), term()} | {error, term()}.
  44 +checkout(Info) ->
  45 + dispcount_watcher:checkout(Info).
  46 +
  47 +-spec checkin(term(), term(), term()) -> ok.
  48 +checkin(Info, CheckinRef, Resource) ->
  49 + dispcount_watcher:checkin(Info, CheckinRef, Resource).
109 src/dispcount_serv.erl
... ... @@ -0,0 +1,109 @@
  1 +%% In charge of relaying info about the supervisor when called.
  2 +-module(dispcount_serv).
  3 +-behaviour(gen_server).
  4 +-include("state.hrl").
  5 +
  6 +-export([start_link/4, wait_for_dispatch/2, get_info/1]).
  7 +-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  8 + terminate/2, code_change/3]).
  9 +
  10 +%%%%%%%%%%%%%%%%%
  11 +%%% INTERFACE %%%
  12 +%%%%%%%%%%%%%%%%%
  13 +-spec start_link(Parent::pid(), Name::atom(), {module(),[term()]}, [term(),...]) -> {ok, pid()}.
  14 +start_link(Parent, Name, {M,A}, Opts) ->
  15 + gen_server:start_link(?MODULE, {Parent, Name, {M,A}, Opts}, []).
  16 +
  17 +-spec wait_for_dispatch(Name::atom(), infinity | pos_integer()) -> ok.
  18 +wait_for_dispatch(Name, Timeout) ->
  19 + gen_server:call(get_name(Name), wait_for_tables, Timeout).
  20 +
  21 +-spec get_info(Name::atom()) -> #config{}.
  22 +get_info(Name) ->
  23 + gen_server:call(get_name(Name), get_info).
  24 +
  25 +%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  26 +%%% GEN_SERVER CALLBACKS %%%
  27 +%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  28 +init({Parent, Name, {M,A}, Opts}) ->
  29 + %% This one needs to go fast because we're gonna mess up the synchronous
  30 + %% starts of servers for the sake of the pool. For this reason, we'll
  31 + %% temporarily use this process to receive all requests and just forward
  32 + %% them when the time has come, maybe.
  33 + ConfTmp = init_tables(Opts),
  34 + Conf = ConfTmp#config{dispatch_name=Name, num_watchers=proplists:get_value(resources,Opts,10)},
  35 + SupSpec =
  36 + {{simple_one_for_one, proplists:get_value(maxr, Opts, 1), proplists:get_value(maxt, Opts, 60)},
  37 + [{watchers,
  38 + {dispcount_watcher, start_link, [Conf,{M,A}]},
  39 + proplists:get_value(restart,Opts,permanent),
  40 + proplists:get_value(shutdown,Opts,5000),
  41 + worker,
  42 + [dispcount_watcher,M]}]}, % <- check to make sure this can survive stuff
  43 + ChildSpec = {watchers_sup, {watchers_sup, start_link, [SupSpec]},
  44 + permanent, infinity, supervisor, [watchers_sup]},
  45 + self() ! continue_init,
  46 + register(get_name(Name), self()),
  47 + {ok, {Parent, ChildSpec, Conf}}.
  48 +
  49 +handle_call(get_info, _From, S = #config{}) ->
  50 + {reply, {ok, S}, S};
  51 +handle_call(wait_for_tables, _From, S = #config{num_watchers=N, dispatch_table=Tid}) ->
  52 + %% there should be N + 1 entries in the dispatch table
  53 + case ets:info(Tid, size) of
  54 + X when X =:= N+1 ->
  55 + {reply, ok, S};
  56 + _ ->
  57 + timer:sleep(1),
  58 + handle_call(wait_for_tables, _From, S)
  59 + end;
  60 +handle_call(_Call, _From, State) ->
  61 + {noreply, State}.
  62 +
  63 +handle_cast(_Cast, State) ->
  64 + {noreply, State}.
  65 +
  66 +handle_info(continue_init, {Parent, ChildSpec, Conf}) ->
  67 + {ok, Sup} = supervisor:start_child(Parent, ChildSpec),
  68 + ok = start_watchers(Sup, Conf),
  69 + {noreply, Conf};
  70 +handle_info(_Info, State) ->
  71 + {noreply, State}.
  72 +
  73 +code_change(_OldVsn, State, _Extra) ->
  74 + {ok, State}.
  75 +
  76 +terminate(_Reason, _State) ->
  77 + ok.
  78 +
  79 +%%%%%%%%%%%%%%%%%%%%%%%%%
  80 +%%% PRIVATE & HELPERS %%%
  81 +%%%%%%%%%%%%%%%%%%%%%%%%%
  82 +init_tables(Opts) ->
  83 + case proplists:get_value(watcher_type, Opts, ets) of
  84 + ets -> %% here
  85 + Dispatch = ets:new(dispatch_table, [set, public, {write_concurrency,true}]),
  86 + Worker = ets:new(worker_table, [set, public, {read_concurrency,true}]),
  87 + true = ets:insert(Dispatch, {ct,0}),
  88 + #config{watcher_type = ets,
  89 + dispatch_table = Dispatch,
  90 + worker_table = Worker};
  91 + named -> %% here
  92 + Dispatch = ets:new(dispatch_table, [set, public, {write_concurrency,true}]),
  93 + true = ets:insert(Dispatch, {ct,0}),
  94 + #config{watcher_type = named,
  95 + dispatch_table = Dispatch,
  96 + worker_table = undefined};
  97 + Other ->
  98 + erlang:error({bad_option,{watcher_type,Other}})
  99 + end.
  100 +
  101 +start_watchers(Sup, #config{num_watchers=Num}) ->
  102 + [start_watcher(Sup, Id) || Id <- lists:seq(1,Num)],
  103 + ok.
  104 +
  105 +start_watcher(Sup, Id) ->
  106 + supervisor:start_child(Sup, [Id]).
  107 +
  108 +get_name(Name) ->
  109 + list_to_atom(atom_to_list(Name) ++ "_serv").
17 src/dispcount_sup.erl
... ... @@ -0,0 +1,17 @@
  1 +-module(dispcount_sup).
  2 +-behaviour(supervisor).
  3 +-export([start_link/3, init/1]).
  4 +
  5 +-spec start_link(Name::atom(), module(), [term()]) -> {ok, pid()}.
  6 +start_link(Name, Mod, InitOpts) ->
  7 + supervisor:start_link({local, Name}, ?MODULE, {Name,Mod,InitOpts}).
  8 +
  9 +init({Name,Mod,InitOpts}) ->
  10 + %% dispcount_sup is started by dispcount_serv
  11 + {ok, {{one_for_all, 1, 60}, % once a minute is pretty generous
  12 + [%{watchers_sup, {dispcount_sup, start_link, [Name, Mod, InitOpts]},
  13 + % permanent, infinity, supervisor, [dispcount_sup]},
  14 + {info_server,
  15 + {dispcount_serv, start_link, [self(), Name, Mod, InitOpts]},
  16 + permanent, infinity, worker, [dispcount_serv]}
  17 + ]}}.
32 src/dispcount_supersup.erl
... ... @@ -0,0 +1,32 @@
  1 +-module(dispcount_supersup).
  2 +-behaviour(supervisor).
  3 +-export([start_dispatch/3, stop_dispatch/1, start_link/0, init/1]).
  4 +
  5 +%%%%%%%%%%%%%%%%%
  6 +%%% INTERFACE %%%
  7 +%%%%%%%%%%%%%%%%%
  8 +-spec start_link() -> {ok, pid()}.
  9 +start_link() ->
  10 + supervisor:start_link({local,?MODULE}, ?MODULE, []).
  11 +
  12 +-spec start_dispatch(Name::atom(), {module(),[term()]}, Opts::[term()]) -> ok.
  13 +start_dispatch(Name, Mod, Opts) ->
  14 + case supervisor:start_child(?MODULE, [Name, Mod, Opts]) of
  15 + {ok, _} -> ok;
  16 + {error,{already_started,_}} -> already_started
  17 + end.
  18 +
  19 +-spec stop_dispatch(Name::atom()) -> ok.
  20 +stop_dispatch(Name) ->
  21 + case whereis(Name) of
  22 + Pid when is_pid(Pid) ->
  23 + supervisor:terminate_child(?MODULE, Pid);
  24 + _ ->
  25 + ok
  26 + end.
  27 +
  28 +init([]) ->
  29 + {ok, {{simple_one_for_one, 1, 60},
  30 + [{dispcount_sup,
  31 + {dispcount_sup, start_link, []},
  32 + permanent, infinity, supervisor, [dispcount_sup]}]}}.
155 src/dispcount_watcher.erl
... ... @@ -0,0 +1,155 @@
  1 +-module(dispcount_watcher).
  2 +-behaviour(gen_server).
  3 +-include("state.hrl").
  4 +
  5 +-record(state, {callback :: module(),
  6 + callback_state :: term(),
  7 + config :: #config{},
  8 + id :: pos_integer(),
  9 + ref :: reference() | undefined}).
  10 +
  11 +-export([start_link/3, checkout/1, checkin/3]).
  12 +-export([init/1, handle_call/3, handle_cast/2,
  13 + handle_info/2, code_change/3, terminate/2]).
  14 +
  15 +%%%%%%%%%%%%%%%%%%%%%%%%
  16 +%%% PUBLIC INTERFACE %%%
  17 +%%%%%%%%%%%%%%%%%%%%%%%%
  18 +-spec start_link(#config{}, {module(), term()}, pos_integer()) -> {ok, pid()} | {error, _} | ignore.
  19 +start_link(Conf, Callback={_,_}, Id) ->
  20 + gen_server:start_link(?MODULE, {Id, Conf, Callback}, []).
  21 +
  22 +-spec checkout(#config{}) -> {ok, Ref::term(), Resource::term()} | {error, Reason::term()}.
  23 +checkout(Conf) ->
  24 + checkout(self(), Conf).
  25 +
  26 +-spec checkout(pid(), #config{}) -> {ok, Ref::term(), Resource::term()} | {error, Reason::term()}.
  27 +checkout(ToPid,#config{num_watchers=Num, watcher_type=Type, dispatch_table=DTid, worker_table=WTid}) ->
  28 + case {Type, is_free(DTid, Id = dispatch_id(Num))} of
  29 + {ets, true} ->
  30 + [{_,Pid}] = ets:lookup(WTid, Id),
  31 + gen_server:call(Pid, {get,ToPid});
  32 + {named, true} ->
  33 + gen_server:call(get_name(Id), {get,ToPid});
  34 + {_, false} ->
  35 + {error, busy}
  36 + end.
  37 +
  38 +-spec checkin(#config{}, Ref::term(), Resource::term()) -> ok.
  39 +checkin(#config{}, {Pid,Ref}, Resource) ->
  40 + %% we cheated, using a Pid for the CheckRef. Dirty optimisation!
  41 + gen_server:cast(Pid, {put, Ref, Resource}).
  42 +
  43 +%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  44 +%%% GEN_SERVER CALLBACKS %%%
  45 +%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  46 +init({Id,C=#config{watcher_type=ets,dispatch_table=DTid,worker_table=WTid},{M,A}}) ->
  47 + ets:insert(WTid, {Id, self()}),
  48 + ets:insert(DTid, {Id, 0}),
  49 + init(Id,C,M,A);
  50 +init({Id,C=#config{watcher_type=named,dispatch_table=Tid},{M,A}}) ->
  51 + register(get_name(Id), self()),
  52 + ets:insert(Tid, {Id, 0}),
  53 + init(Id,C,M,A).
  54 +
  55 +handle_call({get, Pid}, _From, S=#state{callback=M, callback_state=CS, ref=undefined}) ->
  56 + try M:checkout(Pid, CS) of
  57 + {ok, Res, NewCS} ->
  58 + MonRef = erlang:monitor(process, Pid),
  59 + {reply, {ok, {self(),MonRef}, Res}, S#state{callback_state=NewCS, ref=MonRef}};
  60 + {error, Reason, NewCS} ->
  61 + {reply, {error, Reason}, S#state{callback_state=NewCS}};
  62 + {stop, Reason, NewCS} ->
  63 + M:terminate(Reason, NewCS),
  64 + {stop, Reason, S}
  65 + catch
  66 + Type:Reason ->
  67 + {stop, {Type,Reason}, S}
  68 + end;
  69 +handle_call({get, _Pid}, _From, State) -> % busy
  70 + {reply, {error, busy}, State};
  71 +handle_call(_Call, _From, State) ->
  72 + {noreply, State}.
  73 +
  74 +handle_cast({put, Ref, Res},
  75 + S=#state{callback=M, callback_state=CS, config=Conf, id=Id, ref=Ref}) ->
  76 + try M:checkin(Res, CS) of
  77 + {ok, NewCS} ->
  78 + #config{dispatch_table=DTid} = Conf,
  79 + erlang:demonitor(Ref, [flush]),
  80 + set_free(DTid, Id),
  81 + {noreply, S#state{ref=undefined,callback_state=NewCS}};
  82 + {ignore, NewCS} ->
  83 + {noreply, S#state{callback_state=NewCS}};
  84 + {stop, Reason, NewCS} ->
  85 + M:terminate(Reason, NewCS),
  86 + {stop, Reason, S}
  87 + catch
  88 + Type:Reason ->
  89 + {stop, {Type,Reason}, S}
  90 + end;
  91 +handle_cast({put, _Ref, _Res}, State) -> % nomatch on refs
  92 + {noreply, State};
  93 +handle_cast(_Cast, State) ->
  94 + {noreply, State}.
  95 +
  96 +handle_info({'DOWN', Ref, process, _Pid, _Reason},
  97 + S=#state{ref=Ref, callback=M, callback_state=CS, config=Conf, id=Id}) ->
  98 + try M:dead(CS) of
  99 + {ok, NewCS} ->
  100 + #config{dispatch_table=DTid} = Conf,
  101 + set_free(DTid, Id),
  102 + {noreply, S#state{ref=undefined,callback_state=NewCS}};
  103 + {stop, Reason, NewCS} ->
  104 + M:terminate(Reason, NewCS),
  105 + {stop, Reason, S}
  106 + catch
  107 + Type:Reason ->
  108 + {stop, {Type,Reason}, S}
  109 + end;
  110 +handle_info(Msg, S=#state{callback=M, callback_state=CS}) ->
  111 + try M:handle_info(Msg, CS) of
  112 + {ok, NewCS} ->
  113 + {noreply, S#state{callback_state=NewCS}};
  114 + {stop, Reason, NewCS} ->
  115 + M:terminate(Reason, NewCS),
  116 + {stop, Reason, S}
  117 + catch
  118 + Type:Reason ->
  119 + {stop, {Type,Reason}, S}
  120 + end.
  121 +
  122 +%% How do we handle things for the callback module??
  123 +code_change(_OldVsn, State, _Extra) ->
  124 + {ok, State}.
  125 +
  126 +terminate(_Reason, _State) ->
  127 + ok.
  128 +
  129 +%%%%%%%%%%%%%%%%%%%%%%%
  130 +%%% HELPERS/PRIVATE %%%
  131 +%%%%%%%%%%%%%%%%%%%%%%%
  132 +get_name(Id) ->
  133 + list_to_atom("#"++atom_to_list(?MODULE)++integer_to_list(Id)).
  134 +
  135 +init(Id,Conf,M,A) ->
  136 + case M:init(A) of
  137 + {ok, S} ->
  138 + {ok, #state{callback=M,callback_state=S,config=Conf,id=Id}};
  139 + X -> X
  140 + end.
  141 +
  142 +dispatch_id(Num) ->
  143 + erlang:phash2({now(),self()}, Num) + 1.
  144 +
  145 +is_free(Tid, Id) ->
  146 + %% We optionally keep a tiny message queue in there,
  147 + %% which should cause no overhead but be fine to deal
  148 + %% with short spikes.
  149 + case ets:update_counter(Tid, Id, {2,1}) of
  150 + 1 -> true;
  151 + _ -> false
  152 + end.
  153 +
  154 +set_free(Tid, Id) ->
  155 + ets:insert(Tid, {Id,0}).
18 src/notes.txt
... ... @@ -0,0 +1,18 @@
  1 +-- two options available: registered-based or ets-based dispatching
  2 +
  3 +-- get/put and base init are all generic operations to be split into a behaviour
  4 +
  5 +-- the initial supervision needs to start N of them at once and be able to return the config on demand
  6 +
  7 +- the pool isn't an app, but must be possible to fit
  8 + under a supervision tree. Although it should be possible for it to have its own supervision tree. If you depend on it, you use it.
  9 +
  10 +[dispcount_supersup]
  11 + |
  12 +start({Mod,Args,Opts})
  13 + |
  14 + [dispcount_sup]
  15 + | \
  16 + | [watchers_sup]
  17 + | |
  18 + [serv] [watcher :: custom module]
12 src/watchers_sup.erl
... ... @@ -0,0 +1,12 @@
  1 +-module(watchers_sup).
  2 +-behaviour(supervisor).
  3 +-export([start_link/1, init/1]).
  4 +
  5 +-spec start_link({{supervisor:strategy(), pos_integer(), pos_integer()}, [supervisor:child_spec()]}) -> {ok, pid()}.
  6 +start_link(Spec) ->
  7 + supervisor:start_link(?MODULE, Spec).
  8 +
  9 +%% the spec is coming from dispcount_serv, tunneled through
  10 +%% dispcount_sup.
  11 +init(Spec) ->
  12 + {ok, Spec}.
108 test/dispcount_SUITE.erl
... ... @@ -0,0 +1,108 @@
  1 +-module(dispcount_SUITE).
  2 +-include_lib("common_test/include/ct.hrl").
  3 +-export([all/0, init_per_suite/1, end_per_suite/1,
  4 + init_per_testcase/2, end_per_testcase/2]).
  5 +-export([starting/1, stopping/1, overload/1, dead/1]).
  6 +
  7 +all() -> [starting, stopping, overload, dead].
  8 +
  9 +init_per_suite(Config) ->
  10 + application:start(dispcount),
  11 + Config.
  12 +
  13 +end_per_suite(_Config) ->
  14 + ok.
  15 +
  16 +init_per_testcase(overload, Config) ->
  17 + ok = dispcount:start_dispatch(
  18 + ref_overload_dispatcher,
  19 + {ref_dispatch, []},
  20 + [{restart,permanent},{shutdown,4000},
  21 + {maxr,10},{maxt,60},{resources,2}]
  22 + ),
  23 + {ok, Info} = dispcount:dispatcher_info(ref_overload_dispatcher),
  24 + [{info, Info} | Config];
  25 +init_per_testcase(dead, Config) ->
  26 + ok = dispcount:start_dispatch(
  27 + ref_dead_dispatcher,
  28 + {ref_dispatch, []},
  29 + [{restart,permanent},{shutdown,4000},
  30 + {maxr,10},{maxt,60},{resources,1}]
  31 + ),
  32 + {ok, Info} = dispcount:dispatcher_info(ref_dead_dispatcher),
  33 + [{info, Info} | Config];
  34 +init_per_testcase(_, Config) ->
  35 + Config.
  36 +
  37 +end_per_testcase(overload, Config) ->
  38 + dispcount:stop_dispatch(ref_overload_dispatcher);
  39 +end_per_testcase(dead, Config) ->
  40 + dispcount:stop_dispatch(ref_dead_dispatcher);
  41 +end_per_testcase(_, Config) ->
  42 + ok.
  43 +
  44 +starting(_Config) ->
  45 + ok = dispcount:start_dispatch(
  46 + ref_dispatcher,
  47 + {ref_dispatch, []},
  48 + [{restart,permanent},{shutdown,4000},
  49 + {maxr,10},{maxt,60},{resources,10}]
  50 + ),
  51 + {ok, Info} = dispcount:dispatcher_info(ref_dispatcher),
  52 + case dispcount:checkout(Info) of
  53 + {ok, CheckinReference, Resource} ->
  54 + timer:sleep(10),
  55 + dispcount:checkin(Info, CheckinReference, Resource);
  56 + {error, busy} ->
  57 + give_up
  58 + end.
  59 +
  60 +stopping(_Config) ->
  61 + ok = dispcount:start_dispatch(
  62 + stop_dispatch,
  63 + {ref_dispatch, []},
  64 + [{restart,permanent},{shutdown,4000},
  65 + {maxr,10},{maxt,60},{resources,1}]
  66 + ),
  67 + already_started = dispcount:start_dispatch(
  68 + stop_dispatch,
  69 + {ref_dispatch, []},
  70 + [{restart,permanent},{shutdown,4000},
  71 + {maxr,10},{maxt,60},{resources,1}]
  72 + ),
  73 + dispcount:stop_dispatch(stop_dispatch),
  74 + ok = dispcount:start_dispatch(
  75 + stop_dispatch,
  76 + {ref_dispatch, []},
  77 + [{restart,permanent},{shutdown,4000},
  78 + {maxr,10},{maxt,60},{resources,1}]
  79 + ),
  80 + dispcount:stop_dispatch(stop_dispatch).
  81 +
  82 +overload(Config) ->
  83 + %% should be two workers max. Loop until we reach overload,
  84 + %% then a bit more to make sure nothing is available (damn hashing makes
  85 + %% things non-deterministic), then free resources and check that we
  86 + %% can access more.
  87 + Info = ?config(info, Config),
  88 + %% the list comprehension monad, hell yes! Skip all busy calls and see that
  89 + %% only two resources are acquired
  90 + Resources = [{Ref, Res} || _ <- lists:seq(1,20), {ok, Ref, Res} <- [dispcount:checkout(Info)]],
  91 + 2 = length(Resources),
  92 + [] = [{Ref, Res} || _ <- lists:seq(1,100), {ok, Ref, Res} <- [dispcount:checkout(Info)]],
  93 + %% turning ressources in
  94 + [dispcount:checkin(Info, Ref, Res) || {Ref, Res} <- Resources],
  95 + %% then we're able to get more in.
  96 + timer:sleep(100),
  97 + Resources2 = [{Ref, Res} || _ <- lists:seq(1,20), {ok, Ref, Res} <- [dispcount:checkout(Info)]],
  98 + 2 = length(Resources2).
  99 +
  100 +dead(Config) ->
  101 + %% The dispatcher with this test has 1 ressource available.
  102 + Info = ?config(info, Config),
  103 + %% resource owners should be monitored automatically and handled when stuff dies.
  104 + spawn(fun() -> dispcount:checkout(Info), timer:sleep(500) end),
  105 + timer:sleep(100),
  106 + {error, busy} = dispcount:checkout(Info),
  107 + timer:sleep(500),
  108 + {ok, _Ref, _Res} = dispcount:checkout(Info).
39 test/http_dispatch.erl
... ... @@ -0,0 +1,39 @@
  1 +-module(http_dispatch).
  2 +-behaviour(dispcount).
  3 +-export([init/1, checkout/2, checkin/2, handle_info/2, dead/1, terminate/2, code_change/3]).
  4 +
  5 +-record(state, {resource, given=false, port}).
  6 +
  7 +init([{port,Num}]) ->
  8 + {ok,Socket} = gen_tcp:connect({127,0,0,1}, Num, [binary]),
  9 + {ok, #state{resource=Socket, port=Num}}.
  10 +
  11 +checkout(_From, State = #state{given=true}) ->
  12 + {error, busy, State};
  13 +checkout(From, State = #state{resource=Socket}) ->
  14 + gen_tcp:controlling_process(Socket, From),
  15 + {ok, Socket, State#state{given=true}}.
  16 +
  17 +checkin(Socket, State = #state{resource=Socket, given=true}) ->
  18 + {ok, State#state{given=false}};
  19 +checkin(_Socket, State) ->
  20 + %% The socket doesn't match the one we had -- an error happened somewhere
  21 + {ignore, State}.
  22 +
  23 +dead(State) ->
  24 + %% aw shoot, someone lost our resource, we gotta create a new one:
  25 + {ok, NewSocket} = gen_tcp:connect({127,0,0,1}, State#state.port, [binary]),
  26 + {ok, State#state{resource=NewSocket,given=false}}.
  27 + %% alternatively:
  28 + %% {stop, Reason, State}
  29 +
  30 +handle_info(_Msg, State) ->
  31 + %% something unexpected with the TCP connection if we set it to active,once???
  32 + {ok, State}.
  33 +
  34 +terminate(_Reason, _State) ->
  35 + %% let the GC clean the socket.
  36 + ok.
  37 +
  38 +code_change(_OldVsn, State, _Extra) ->
  39 + {ok, State}.
27 test/ref_dispatch.erl
... ... @@ -0,0 +1,27 @@
  1 +-module(ref_dispatch).
  2 +-behaviour(dispcount).
  3 +-export([init/1, checkout/2, checkin/2, handle_info/2, dead/1,
  4 + terminate/2, code_change/3]).
  5 +
  6 +init([]) ->
  7 + {ok, make_ref()}.
  8 +
  9 +checkout(_From, Ref) ->
  10 + {ok, Ref, undefined}.
  11 +
  12 +checkin(Ref, undefined) ->
  13 + {ok, Ref};
  14 +checkin(SomeRef, Ref) ->
  15 + {ignore, Ref}.
  16 +
  17 +dead(undefined) ->
  18 + {ok, make_ref()}.
  19 +
  20 +handle_info(_Msg, State) ->
  21 + {ok, State}.
  22 +
  23 +terminate(_Reason, _State) ->
  24 + ok.
  25 +
  26 +code_change(_OldVsn, State, _Extra) ->
  27 + {ok, State}.

0 comments on commit ff01dce

Please sign in to comment.
Something went wrong with that request. Please try again.