Skip to content

Commit

Permalink
refactor(with_client): change with_client/2,3 to pick_and_do/3 (#24)
Browse files Browse the repository at this point in the history
* refactor(with_client): change with_client/2,3 to pick_and_do/3

Change the actions and callbacks to type {M, F, A} to avoid badfun
when hot upgrading modules.
  • Loading branch information
terry-xiaoyu committed Nov 24, 2020
1 parent aa5b97e commit 47a223f
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 106 deletions.
19 changes: 16 additions & 3 deletions include/ecpool.hrl
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
-type callback() :: fun((any()) -> any()).
-type action() :: fun((pid()) -> any()).
-type exec_mode() :: relay | relay_async | {relay_async, callback()} | direct.
-type callback() :: mfa() | fun((any()) -> any()).
-type action() :: mfa() | fun((pid()) -> any()).
-type apply_mode() :: handover
| handover_async
| {handover, timeout()}
| {handover_async, callback()}
| no_handover.
-type pool_type() :: random | hash | direct | round_robin.
-type pool_name() :: term().
-type conn_callback() :: mfa().
-type option() :: {pool_size, pos_integer()}
| {pool_type, pool_type()}
| {auto_reconnect, false | pos_integer()}
| {on_reconnect, conn_callback()}
| {on_disconnect, conn_callback()}
| tuple().
75 changes: 36 additions & 39 deletions src/ecpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,18 @@
, stop_sup_pool/1
, get_client/1
, get_client/2
, with_client/2
, with_client/3
, with_client/4
, pick_and_do/3
, name/1
, workers/1
]).

-export([set_reconnect_callback/2, add_reconnect_callback/2]).

-export_type([ pool_name/0
, pool_type/0
, option/0
]).

-type(pool_name() :: term()).

-type(pool_type() :: random | hash | round_robin).

-type(reconn_callback() :: {fun((pid()) -> term())}).

-type(option() :: {pool_size, pos_integer()}
| {pool_type, pool_type()}
| {auto_reconnect, false | pos_integer()}
| {on_reconnect, reconn_callback()}
| tuple()).
%% NOTE: Obsolete APIs.
%% Use pick_and_do/3 APIs instead
-export([ with_client/2
, with_client/3
]).

pool_spec(ChildId, Pool, Mod, Opts) ->
#{id => ChildId,
Expand Down Expand Up @@ -81,45 +68,51 @@ get_client(Pool) ->
get_client(Pool, Key) ->
gproc_pool:pick_worker(name(Pool), Key).

-spec(set_reconnect_callback(atom(), reconn_callback()) -> ok).
-spec(set_reconnect_callback(atom(), conn_callback()) -> ok).
set_reconnect_callback(Pool, Callback) ->
[ecpool_worker:set_reconnect_callback(Worker, Callback)
|| {_WorkerName, Worker} <- ecpool:workers(Pool)],
ok.

-spec(add_reconnect_callback(atom(), reconn_callback()) -> ok).
-spec(add_reconnect_callback(atom(), conn_callback()) -> ok).
add_reconnect_callback(Pool, Callback) ->
[ecpool_worker:add_reconnect_callback(Worker, Callback)
|| {_WorkerName, Worker} <- ecpool:workers(Pool)],
ok.

%% NOTE: Use pick_and_do/3 instead of with_client/2,3
%% to avoid applying action failure with 'badfun'.
%%
%% @doc Call the fun with client/connection
-spec(with_client(atom(), action()) -> any()).
with_client(Pool, Action) when is_atom(Pool) ->
with_client(Pool, Action, direct).
-spec(with_client(atom(), fun((Client :: pid()) -> any())) -> no_return()).
with_client(Pool, Fun) when is_atom(Pool) ->
with_worker(gproc_pool:pick_worker(name(Pool)), Fun, no_handover).

-spec(with_client(atom(), action() | term(), action() | exec_mode()) -> any()).
with_client(Pool, Key, Action) when is_atom(Pool), is_function(Action) ->
with_client(Pool, Key, Action, direct);
%% @doc Call the fun with client/connection
-spec(with_client(atom(), any(), fun((Client :: pid()) -> term())) -> no_return()).
with_client(Pool, Key, Fun) when is_atom(Pool) ->
with_worker(gproc_pool:pick_worker(name(Pool), Key), Fun, no_handover).

with_client(Pool, Action, Mode) when is_atom(Pool), is_function(Action) ->
with_worker(gproc_pool:pick_worker(name(Pool)), Action, Mode).
-spec pick_and_do({atom(), term()}, mfa(), apply_mode()) -> any().
pick_and_do({Pool, KeyOrNum}, Action = {_,_,_}, ApplyMode) ->
with_worker(gproc_pool:pick_worker(name(Pool), KeyOrNum), Action, ApplyMode);

-spec(with_client(atom(), any(), fun((Client :: pid()) -> term()), exec_mode()) -> any()).
with_client(Pool, Key, Action, Mode) when is_atom(Pool) ->
with_worker(gproc_pool:pick_worker(name(Pool), Key), Action, Mode).
pick_and_do(Pool, Action = {_,_,_}, ApplyMode) ->
with_worker(gproc_pool:pick_worker(name(Pool)), Action, ApplyMode).

-spec with_worker(pid(), action(), exec_mode()) -> any().
with_worker(Worker, Action, direct) ->
-spec with_worker(pid(), action(), apply_mode()) -> any().
with_worker(Worker, Action, no_handover) ->
case ecpool_worker:client(Worker) of
{ok, Client} -> Action(Client);
{ok, Client} -> exec(Action, Client);
{error, Reason} -> {error, Reason}
end;
with_worker(Worker, Action, relay) ->
ecpool_worker:exec(Worker, Action);
with_worker(Worker, Action, relay_async) ->
with_worker(Worker, Action, handover) ->
ecpool_worker:exec(Worker, Action, infinity);
with_worker(Worker, Action, {handover, Timeout}) when is_integer(Timeout) ->
ecpool_worker:exec(Worker, Action, Timeout);
with_worker(Worker, Action, handover_async) ->
ecpool_worker:exec_async(Worker, Action);
with_worker(Worker, Action, {relay_async, CallbackFun}) ->
with_worker(Worker, Action, {handover_async, CallbackFun = {_,_,_}}) ->
ecpool_worker:exec_async(Worker, Action, CallbackFun).

%% @doc Pool workers
Expand All @@ -129,3 +122,7 @@ workers(Pool) ->
%% @doc ecpool name
name(Pool) -> {?MODULE, Pool}.

exec({M, F, A}, Client) ->
erlang:apply(M, F, [Client]++A);
exec(Action, Client) when is_function(Action) ->
Action(Client).
101 changes: 61 additions & 40 deletions src/ecpool_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

%% API Function Exports
-export([ client/1
, exec/2
, exec/3
, exec_async/2
, exec_async/3
, is_connected/1
, set_reconnect_callback/2
, set_disconnect_callback/2
, add_reconnect_callback/2
, add_disconnect_callback/2
]).

%% gen_server Function Exports
Expand All @@ -46,8 +48,8 @@
id :: pos_integer(),
client :: pid() | undefined,
mod :: module(),
on_reconnect :: ecpool:reconn_callback(),
on_disconnect :: ecpool:reconn_callback(),
on_reconnect :: ecpool:conn_callback(),
on_disconnect :: ecpool:conn_callback(),
supervisees = [],
opts :: proplists:proplist()
}).
Expand All @@ -74,31 +76,39 @@ start_link(Pool, Id, Mod, Opts) ->
client(Pid) ->
gen_server:call(Pid, client, infinity).

-spec(exec(pid(), action()) -> Result :: any() | {error, Reason :: term()}).
exec(Pid, Action) ->
gen_server:call(Pid, {exec, Action}, infinity).
-spec(exec(pid(), action(), timeout()) -> Result :: any() | {error, Reason :: term()}).
exec(Pid, Action, Timeout) ->
gen_server:call(Pid, {exec, Action}, Timeout).

-spec(exec_async(pid(), action()) -> Result :: any() | {error, Reason :: term()}).
-spec exec_async(pid(), action()) -> ok.
exec_async(Pid, Action) ->
gen_server:call(Pid, {exec_async, Action}).
gen_server:cast(Pid, {exec_async, Action}).

-spec(exec_async(pid(), action(), callback()) -> Result :: any() | {error, Reason :: term()}).
-spec exec_async(pid(), action(), callback()) -> ok.
exec_async(Pid, Action, Callback) ->
gen_server:call(Pid, {exec_async, Action, Callback}).
gen_server:cast(Pid, {exec_async, Action, Callback}).

%% @doc Is client connected?
-spec(is_connected(pid()) -> boolean()).
is_connected(Pid) ->
gen_server:call(Pid, is_connected, infinity).

-spec(set_reconnect_callback(pid(), ecpool:reconn_callback()) -> ok).
set_reconnect_callback(Pid, OnReconnect) ->
-spec(set_reconnect_callback(pid(), ecpool:conn_callback()) -> ok).
set_reconnect_callback(Pid, OnReconnect = {_, _, _}) ->
gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}).

-spec(add_reconnect_callback(pid(), ecpool:reconn_callback()) -> ok).
add_reconnect_callback(Pid, OnReconnect) ->
-spec(set_disconnect_callback(pid(), ecpool:conn_callback()) -> ok).
set_disconnect_callback(Pid, OnDisconnect = {_, _, _}) ->
gen_server:cast(Pid, {set_disconn_callbk, OnDisconnect}).

-spec(add_reconnect_callback(pid(), ecpool:conn_callback()) -> ok).
add_reconnect_callback(Pid, OnReconnect = {_, _, _}) ->
gen_server:cast(Pid, {add_reconn_callbk, OnReconnect}).

-spec(add_disconnect_callback(pid(), ecpool:conn_callback()) -> ok).
add_disconnect_callback(Pid, OnDisconnect = {_, _, _}) ->
gen_server:cast(Pid, {add_disconn_callbk, OnDisconnect}).

%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
Expand All @@ -109,8 +119,8 @@ init([Pool, Id, Mod, Opts]) ->
id = Id,
mod = Mod,
opts = Opts,
on_reconnect = proplists:get_value(on_reconnect, Opts),
on_disconnect = proplists:get_value(on_disconnect, Opts)
on_reconnect = ensure_callback(proplists:get_value(on_reconnect, Opts)),
on_disconnect = ensure_callback(proplists:get_value(on_disconnect, Opts))
},
case connect_internal(State) of
{ok, NewState} ->
Expand All @@ -135,31 +145,29 @@ handle_call(client, _From, State = #state{client = Client}) ->
handle_call({exec, Action}, _From, State = #state{client = Client}) ->
{reply, safe_exec(Action, Client), State};

handle_call({exec_async, Action}, From, State = #state{client = Client}) ->
gen_server:reply(From, ok),
handle_call(Req, _From, State) ->
logger:error("[PoolWorker] unexpected call: ~p", [Req]),
{reply, ignored, State}.

handle_cast({exec_async, Action}, State = #state{client = Client}) ->
_ = safe_exec(Action, Client),
{noreply, State};

handle_call({exec_async, Action, Callback}, From, State = #state{client = Client}) ->
gen_server:reply(From, ok),
_ = Callback(safe_exec(Action, Client)),
handle_cast({exec_async, Action, Callback}, State = #state{client = Client}) ->
_ = safe_exec(Callback, safe_exec(Action, Client)),
{noreply, State};

handle_call(Req, _From, State) ->
logger:error("[PoolWorker] unexpected call: ~p", [Req]),
{reply, ignored, State}.

handle_cast({set_reconn_callbk, OnReconnect}, State) ->
{noreply, State#state{on_reconnect = OnReconnect}};

handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OnReconnectList}) when is_list(OnReconnectList) ->
{noreply, State#state{on_reconnect = [OnReconnect | OnReconnectList]}};
handle_cast({set_disconn_callbk, OnDisconnect}, State) ->
{noreply, State#state{on_disconnect = OnDisconnect}};

handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = undefined}) ->
{noreply, State#state{on_reconnect = [OnReconnect]}};
handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OldOnReconnect}) ->
{noreply, State#state{on_reconnect = add_conn_callback(OnReconnect, OldOnReconnect)}};

handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OnReconnect0}) ->
{noreply, State#state{on_reconnect = [OnReconnect, OnReconnect0]}};
handle_cast({add_disconn_callbk, OnDisconnect}, State = #state{on_disconnect = OldOnDisconnect}) ->
{noreply, State#state{on_disconnect = add_conn_callback(OnDisconnect, OldOnDisconnect)}};

handle_cast(_Msg, State) ->
{noreply, State}.
Expand Down Expand Up @@ -229,16 +237,16 @@ handle_reconnect(_, undefined) ->
handle_reconnect(undefined, _) ->
ok;
handle_reconnect(Client, OnReconnectList) when is_list(OnReconnectList) ->
[OnReconnect(Client) || OnReconnect <- OnReconnectList];
[safe_exec(OnReconnect, Client) || OnReconnect <- OnReconnectList];
handle_reconnect(Client, OnReconnect) ->
OnReconnect(Client).
safe_exec(OnReconnect, Client).

handle_disconnect(undefined, _) ->
ok;
handle_disconnect(_, undefined) ->
ok;
handle_disconnect(Client, Disconnect) ->
Disconnect(Client).
safe_exec(Disconnect, Client).

connect_internal(State) ->
try connect(State) of
Expand All @@ -254,11 +262,24 @@ connect_internal(State) ->
_C:Reason:ST -> {error, {Reason, ST}}
end.

safe_exec(Action, Client) when is_pid(Client) ->
try Action(Client)
safe_exec(Action, MainArg) ->
try exec(Action, MainArg)
catch E:R:ST ->
logger:error("[PoolWorker] safe_exec failed: ~p", [{E,R,ST}]),
logger:error("[PoolWorker] safe_exec ~p, failed: ~0p", [Action, {E,R,ST}]),
{error, {exec_failed, E, R}}
end;
safe_exec(_Action, undefined) ->
{error, worker_disconnected}.
end.

exec({M, F, A}, MainArg) ->
erlang:apply(M, F, [MainArg]++A);
exec(Action, MainArg) when is_function(Action) ->
Action(MainArg).

ensure_callback(undefined) -> undefined;
ensure_callback({_,_,_} = Callback) -> Callback.

add_conn_callback(OnReconnect, OldOnReconnects) when is_list(OldOnReconnects) ->
[OnReconnect | OldOnReconnects];
add_conn_callback(OnReconnect, undefined) ->
[OnReconnect];
add_conn_callback(OnReconnect, OldOnReconnect) ->
[OnReconnect, OldOnReconnect].
Loading

0 comments on commit 47a223f

Please sign in to comment.