Skip to content

Commit

Permalink
Merge pull request #45 from emqx/0809-fix-pick-and-do
Browse files Browse the repository at this point in the history
fix: allow Fun action for pick_and_do
  • Loading branch information
savonarola committed Aug 14, 2023
2 parents 1fd66ec + 53050da commit bb56fea
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 14 deletions.
14 changes: 11 additions & 3 deletions src/ecpool.appup.src
@@ -1,8 +1,12 @@
%% -*-: erlang -*-
{"0.5.5",
{"0.5.6",
[
{"0.5.5", [
{load_module, ecpool, brutal_purge, soft_purge, []}
]},
{"0.5.4", [
{load_module, ecpool_worker, brutal_purge, soft_purge, []}
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool, brutal_purge, soft_purge, []}
]},
{"0.5.3", [
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
Expand All @@ -23,8 +27,12 @@
]}
],
[
{"0.5.5", [
{load_module, ecpool, brutal_purge, soft_purge, []}
]},
{"0.5.4", [
{load_module, ecpool_worker, brutal_purge, soft_purge, []}
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool, brutal_purge, soft_purge, []}
]},
{"0.5.3", [
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
Expand Down
22 changes: 12 additions & 10 deletions src/ecpool.erl
Expand Up @@ -65,6 +65,8 @@
| {on_disconnect, conn_callback()}
| tuple().

-define(IS_ACTION(ACTION), ((is_tuple(ACTION) andalso tuple_size(ACTION) == 3) orelse is_function(ACTION, 1))).

pool_spec(ChildId, Pool, Mod, Opts) ->
#{id => ChildId,
start => {?MODULE, start_pool, [Pool, Mod, Opts]},
Expand Down Expand Up @@ -114,38 +116,38 @@ add_reconnect_callback(Pool, Callback) ->
%% @doc Call the fun with client/connection
-spec with_client(pool_name(), action(Result)) ->
Result | {error, disconnected | ecpool_empty}.
with_client(Pool, Fun) ->
with_client(Pool, Fun) when ?IS_ACTION(Fun) ->
with_worker(get_client(Pool), Fun, no_handover).

%% @doc Call the fun with client/connection
-spec with_client(pool_name(), any(), action(Result)) ->
Result | {error, disconnected | ecpool_empty}.
with_client(Pool, Key, Fun) ->
with_client(Pool, Key, Fun) when ?IS_ACTION(Fun) ->
with_worker(get_client(Pool, Key), Fun, no_handover).

-spec pick_and_do({pool_name(), term()} | pool_name(), action(Result), apply_mode()) ->
Result | {error, disconnected | ecpool_empty}.
pick_and_do({Pool, KeyOrNum}, Action = {_,_,_}, ApplyMode) ->
pick_and_do({Pool, KeyOrNum}, Action, ApplyMode) when ?IS_ACTION(Action) ->
with_worker(get_client(Pool, KeyOrNum), Action, ApplyMode);
pick_and_do(Pool, Action = {_,_,_}, ApplyMode) ->
pick_and_do(Pool, Action, ApplyMode) when ?IS_ACTION(Action) ->
with_worker(get_client(Pool), Action, ApplyMode).

-spec with_worker(pid() | false, action(Result), apply_mode()) ->
Result | {error, disconnected | ecpool_empty}.
with_worker(false, _Action, _Mode) ->
with_worker(false, Action, _Mode) when ?IS_ACTION(Action) ->
{error, ecpool_empty};
with_worker(Worker, Action, no_handover) ->
with_worker(Worker, Action, no_handover) when ?IS_ACTION(Action) ->
case ecpool_worker:client(Worker) of
{ok, Client} -> exec(Action, Client);
{error, Reason} -> {error, Reason}
end;
with_worker(Worker, Action, handover) ->
with_worker(Worker, Action, handover) when ?IS_ACTION(Action) ->
ecpool_worker:exec(Worker, Action, infinity);
with_worker(Worker, Action, {handover, Timeout}) when is_integer(Timeout) ->
with_worker(Worker, Action, {handover, Timeout}) when is_integer(Timeout) andalso ?IS_ACTION(Action) ->
ecpool_worker:exec(Worker, Action, Timeout);
with_worker(Worker, Action, handover_async) ->
with_worker(Worker, Action, handover_async) when ?IS_ACTION(Action) ->
ecpool_worker:exec_async(Worker, Action);
with_worker(Worker, Action, {handover_async, CallbackFun = {_,_,_}}) ->
with_worker(Worker, Action, {handover_async, CallbackFun = {_,_,_}}) when ?IS_ACTION(Action) ->
ecpool_worker:exec_async(Worker, Action, CallbackFun).

%% @doc Pool workers
Expand Down
11 changes: 10 additions & 1 deletion test/ecpool_SUITE.erl
Expand Up @@ -56,7 +56,8 @@ groups() ->
t_client_exec_random,
t_client_exec2_random,
t_multiprocess_client,
t_multiprocess_client_not_restart
t_multiprocess_client_not_restart,
t_pick_and_do_fun
]}].

init_per_suite(Config) ->
Expand Down Expand Up @@ -235,3 +236,11 @@ t_client_exec2_random(_Config) ->
{result, 4} -> ok;
R1 -> ct:fail({unexpected_result, R1})
end.

t_pick_and_do_fun(_Config) ->
Pool = ?FUNCTION_NAME,
Action = fun(Client) -> test_client:plus(Client, 1, 3) end,
Opts = [{pool_size, 5}, {pool_type, hash}, {auto_reconnect, false}],
{ok, _} = ecpool:start_pool(Pool, test_client, Opts),
?assertEqual(4, ecpool:pick_and_do({Pool, <<"abc">>}, Action, no_handover)),
ecpool:stop_sup_pool(Pool).

0 comments on commit bb56fea

Please sign in to comment.