Skip to content

Commit

Permalink
Merge pull request #23 from basho/sdc-multiple-listeners
Browse files Browse the repository at this point in the history
Enable multiple PB listeners.
  • Loading branch information
seancribbs committed Mar 16, 2013
2 parents ec2d353 + 9ba8a1a commit f506a12
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 46 deletions.
100 changes: 71 additions & 29 deletions src/riak_api_pb_listener.erl
Expand Up @@ -28,7 +28,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([sock_opts/0, new_connection/2]).
-export([get_port/0, get_ip/0]).
-export([get_listeners/0]).
-record(state, {portnum}).

%% @doc Starts the PB listener
Expand Down Expand Up @@ -88,38 +88,80 @@ new_connection(Socket, State) ->
ok = riak_api_pb_server:set_socket(Pid, Socket),
{ok, State}.

get_listeners() ->
DefaultListener = case {get_ip(), get_port()} of
{undefined, _} -> [];
{_, undefined} -> [];
{IP, Port} -> [{IP, Port}]
end,
Listeners = app_helper:get_env(riak_api, pb, []) ++ DefaultListener,
[ {I, P} || {I, P} <- Listeners ].

%% @private
get_port() ->
Envs = [{riak_api, pb_port},
{riak_kv, pb_port}],
case app_helper:try_envs(Envs) of
{riak_api, pb_port, Port} ->
Port;
{riak_kv, pb_port, Port} ->
lager:warning("The config riak_kv/pb_port has been"
" deprecated and will be removed. Use"
" riak_api/pb_port in the future."),
Port;
_ ->
lager:warning("The config riak_api/pb_port is missing,"
" PB connections will be disabled."),
undefined
case app_helper:get_env(riak_api, pb_port) of
undefined ->
undefined;
Port ->
lager:warning("The config riak_api/pb_port has been"
" deprecated and will be removed. Use"
" riak_api/pb (IP/Port pairs) in the future."),
Port
end.

%% @private
get_ip() ->
Envs = [{riak_api, pb_ip},
{riak_kv, pb_ip}],
case app_helper:try_envs(Envs) of
{riak_api, pb_ip, IP} ->
IP;
{riak_kv, pb_ip, IP} ->
lager:warning("The config riak_kv/pb_ip has been"
" deprecated and will be removed. Use"
" riak_api/pb_ip in the future."),
IP;
_ ->
lager:warning("The config riak_api/pb_ip is missing,"
" PB connections will be disabled."),
undefined
case app_helper:get_env(riak_api, pb_ip) of
undefined ->
undefined;
IP ->
lager:warning("The config riak_api/pb_ip has been"
" deprecated and will be removed. Use"
" riak_api/pb (IP/Port pairs) in the future."),
IP
end.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).

listeners_test_() ->
{foreach,
fun() ->
application:load(riak_api),
app_helper:get_env(riak_api, pb, [{"127.0.0.1", 8087}])
end,
fun(OldListeners) ->
application:set_env(riak_api, pb, OldListeners),
application:unset_env(riak_api, pb_ip),
application:unset_env(riak_api, pb_port)
end,
[
{"old config keys get upgraded",
fun() ->
application:unset_env(riak_api, pb),
application:set_env(riak_api, pb_ip, "127.0.0.1"),
application:set_env(riak_api, pb_port, 10887),
?assertEqual([{"127.0.0.1", 10887}], get_listeners())
end},
{"missing old IP config key disables listener",
fun() ->
application:unset_env(riak_api, pb),
%% application:set_env(riak_api, pb_ip, "127.0.0.1"),
application:set_env(riak_api, pb_port, 10887),
?assertEqual([], get_listeners())
end},
{"missing old Port config key disables listener",
fun() ->
application:unset_env(riak_api, pb),
application:set_env(riak_api, pb_ip, "127.0.0.1"),
%% application:set_env(riak_api, pb_port, 10887),
?assertEqual([], get_listeners())
end},
{"bad configs are ignored",
fun() ->
application:set_env(riak_api, pb, [{"0.0.0.0", 8087}, badjuju]),
?assertEqual([{"0.0.0.0", 8087}], get_listeners())
end}]}.

-endif.
20 changes: 13 additions & 7 deletions src/riak_api_sup.erl
Expand Up @@ -31,7 +31,10 @@

-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
-define(CHILD(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}).

-define(LNAME(IP, Port), lists:flatten(io_lib:format("~p:~p", [IP, Port]))).
-define(LISTENER(IP, Port), {?LNAME(IP, Port),
{riak_api_pb_listener, start_link, [IP, Port]},
permanent, 5000, worker, [riak_api_pb_listener]}).
%% @doc Starts the supervisor.
-spec start_link() -> {ok, pid()} | {error, term()}.
start_link() ->
Expand All @@ -44,15 +47,18 @@ start_link() ->
MaxT :: pos_integer(),
ChildSpec :: supervisor:child_spec().
init([]) ->
Port = riak_api_pb_listener:get_port(),
IP = riak_api_pb_listener:get_ip(),
IsPbConfigured = (Port /= undefined) andalso (IP /= undefined),
Listeners = riak_api_pb_listener:get_listeners(),
Helper = ?CHILD(riak_api_pb_registration_helper, worker),
Registrar = ?CHILD(riak_api_pb_registrar, worker),
NetworkProcesses = if IsPbConfigured ->
[?CHILD(riak_api_pb_sup, supervisor),
?CHILD(riak_api_pb_listener, worker, [IP, Port])];
NetworkProcesses = if Listeners /= [] ->
[?CHILD(riak_api_pb_sup, supervisor)] ++
listener_specs(Listeners);
true ->
lager:info("No PB listeners were configured,"
" PB connections will be disabled."),
[]
end,
{ok, {{one_for_one, 10, 10}, [Helper, Registrar|NetworkProcesses]}}.

listener_specs(Pairs) ->
[ ?LISTENER(IP, Port) || {IP, Port} <- Pairs ].
16 changes: 6 additions & 10 deletions test/pb_service_test.erl
Expand Up @@ -97,23 +97,20 @@ setup() ->

application:set_env(riak_core, handoff_port, 0),

OldHost = app_helper:get_env(riak_api, pb_ip, "127.0.0.1"),
OldPort = app_helper:get_env(riak_api, pb_port, 8087),
application:set_env(riak_api, pb_ip, "127.0.0.1"),
application:set_env(riak_api, pb_port, 32767),
OldListeners = app_helper:get_env(riak_api, pb, [{"127.0.0.1", 8087}]),
application:set_env(riak_api, pb, [{"127.0.0.1", 32767}]),

[ application:start(A) || A <- Deps ],
riak_core:wait_for_application(riak_api),
wait_for_port(),
riak_api_pb_service:register(?MODULE, ?MSGMIN, ?MSGMAX),
riak_api_pb_service:register(?MODULE, 111),
{OldHost, OldPort, Deps}.
{OldListeners, Deps}.

cleanup({H, P, Deps}) ->
cleanup({L, Deps}) ->
[ application:stop(A) || A <- lists:reverse(Deps), not is_otp_base_app(A) ],
wait_for_application_shutdown(riak_api),
application:set_env(riak_api, pb_ip, H),
application:set_env(riak_api, pb_port, P),
application:set_env(riak_api, pb, L),
ok.

request_multi(Payloads) when is_list(Payloads) ->
Expand Down Expand Up @@ -169,8 +166,7 @@ new_connection() ->
new_connection([{packet,4}, {header, 1}]).

new_connection(Options) ->
Host = app_helper:get_env(riak_api, pb_ip),
Port = app_helper:get_env(riak_api, pb_port),
{Host, Port} = hd(app_helper:get_env(riak_api, pb)),
gen_tcp:connect(Host, Port, [binary, {active, false},{nodelay, true}|Options]).

simple_test_() ->
Expand Down

0 comments on commit f506a12

Please sign in to comment.