Skip to content

Commit

Permalink
Merge pull request #11 from cabol/cabol.version_0.1
Browse files Browse the repository at this point in the history
Code Dialyzed. Added pool_hanler to pub_sub example.
  • Loading branch information
cabol committed Jul 17, 2015
2 parents 7a79bd1 + 9fe4c62 commit 8fb6b0d
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 11 deletions.
14 changes: 14 additions & 0 deletions examples/pub_sub/src/pool_handler.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
%% Feel free to use, reuse and abuse the code in this file.

%% @doc handler.
-module(pool_handler).

-behaviour(ebus_handler).

%% API
-export([handle_msg/2]).

handle_msg({Channel, Msg}, Context) ->
io:format("[Pid: ~p][Channel: ~p][Msg: ~p][Ctx: ~p]~n",
[self(), Channel, Msg, Context]),
timer:sleep(1000).
6 changes: 6 additions & 0 deletions examples/pub_sub/src/pub_sub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
-export([stop/0, stop/1]).

-define(HANDLER, pub_sub_handler).
-define(POOL_HANDLER, pool_handler).
-define(CHANNEL, pub_sub_channel).

%% API.
Expand All @@ -19,6 +20,7 @@ start() ->
start(_Type, _Args) ->
P = spawn_link(fun() -> publisher(?CHANNEL) end),
lists:foreach(fun(N) -> subscriber(?CHANNEL, N) end, lists:seq(1, 3)),
pool(?CHANNEL),
timer:sleep(1 * 60 * 1000),
exit(P, kill),
teardown_ebus().
Expand All @@ -42,5 +44,9 @@ subscriber(Channel, N) ->
Handler = ebus_handler:new(?HANDLER, N),
ebus:sub(Channel, Handler).

pool(Channel) ->
Pool = ebus_handler:new_pool(my_pool, 3, ?POOL_HANDLER, my_pool),
ebus:sub(Channel, Pool).

teardown_ebus() ->
application:stop(ebus).
4 changes: 2 additions & 2 deletions src/ebus.erl
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ code_change(_OldVsn, State, _Extra) ->
%% @private
parse_options([], State) ->
State;
parse_options([{call_timeout, Ch} | Opts], State) when is_integer(Ch) ->
parse_options(Opts, State#state{call_timeout = Ch});
parse_options([{call_timeout, T} | Opts], State) when is_integer(T) ->
parse_options(Opts, State#state{call_timeout = T});
parse_options([{n, N} | Opts], State) when is_integer(N) ->
parse_options(Opts, State#state{n = N});
parse_options([{sub, Q} | Opts], State) when is_integer(Q) ->
Expand Down
6 changes: 3 additions & 3 deletions src/ebus_dist.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ sub(Channel, Handler, Opts) ->
Callback = {ebus_gproc, sub, [Channel, Handler]},
do_write(Channel, Channel, sub, Callback, Opts).

-spec unsub(ebus:channel(), ebus:handler()) -> any().
-spec unsub(ebus:channel(), ebus:handler()) -> ebus:ebus_ret().
unsub(Channel, Handler) ->
unsub(Channel, Handler, []).

Expand All @@ -97,11 +97,11 @@ pub(Channel, Message, Opts) ->
Callback = {ebus_gproc, pub, [Channel, Message]},
do_write(Channel, Channel, pub, Callback, Opts).

-spec get_subscribers(ebus:channel()) -> ebus:ebus_ret().
-spec get_subscribers(ebus:channel()) -> [ebus:handler()].
get_subscribers(Channel) ->
get_subscribers(Channel, []).

-spec get_subscribers(ebus:channel(), options()) -> ebus:ebus_ret().
-spec get_subscribers(ebus:channel(), options()) -> [ebus:handler()].
get_subscribers(Channel, Opts) ->
Callback = {ebus_gproc, get_subscribers, [Channel]},
do_write(Channel, Channel, get_subscribers, Callback, Opts).
Expand Down
2 changes: 1 addition & 1 deletion src/ebus_dist_cmd_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
-record(state, {req_id :: pos_integer(),
coordinator :: node(),
from :: pid(),
bkey :: binary(),
bkey :: {binary(), binary()},
op :: atom(),
val = undefined :: term() | undefined,
preflist :: riak_core_apl:preflist2(),
Expand Down
7 changes: 4 additions & 3 deletions src/ebus_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@
%%%===================================================================

%% State
-record(state, {module :: atom(), context :: any(), pool :: atom()}).
-record(state, {module :: atom(),
context :: any(),
pool :: pid()}).

%% Types
-type context() :: any().
-type pool_opt() :: {name, atom()} | {size, integer()}.
-type option() :: {monitors, [pid()]} | {pool, [pool_opt()]}.
-type option() :: {monitors, [pid()]} | {pool, pid()}.
-type options() :: [option()].
-type handle_fun() :: fun((ebus:channel(), ebus:payload()) -> ok).
-type status() :: exiting | garbage_collecting | waiting | running |
Expand Down
19 changes: 17 additions & 2 deletions src/ebus_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

%% API
-export([keyfind/2, keyfind/3, rem_dups_from_list/1, count_val_in_list/2,
to_bin/1, to_atom/1, to_integer/1, to_float/1,
to_bin/1, to_atom/1, to_integer/1, to_float/1, to_list/1,
build_name/1, get_best_pid/1]).

%%%===================================================================
Expand Down Expand Up @@ -72,7 +72,7 @@ count_val_in_list(Val, L) ->
lists:foldl(F, 0, L).

%% @doc Converts any type to binary.
-spec to_bin(any()) -> atom().
-spec to_bin(any()) -> binary().
to_bin(Data) when is_integer(Data) ->
integer_to_binary(Data);
to_bin(Data) when is_float(Data) ->
Expand Down Expand Up @@ -119,6 +119,21 @@ to_float(Data) when is_pid(Data); is_reference(Data); is_tuple(Data) ->
to_float(Data) ->
Data.

%% @doc Converts any type to list.
-spec to_list(any()) -> list().
to_list(Data) when is_binary(Data) ->
binary_to_list(Data);
to_list(Data) when is_integer(Data) ->
integer_to_list(Data);
to_list(Data) when is_float(Data) ->
float_to_list(Data);
to_list(Data) when is_atom(Data) ->
atom_to_list(Data);
to_list(Data) when is_pid(Data); is_reference(Data); is_tuple(Data) ->
integer_to_list(erlang:phash2(Data));
to_list(Data) ->
Data.

%% @doc Build a name given the list of terms, then they are transformed
%% to binary and concatenated by '_'.
-spec build_name([any()]) -> atom().
Expand Down

0 comments on commit 8fb6b0d

Please sign in to comment.