Skip to content

Commit

Permalink
Fix test failures when running against Riak 1.2.
Browse files Browse the repository at this point in the history
  • Loading branch information
seancribbs committed Jul 16, 2012
1 parent 3021173 commit ac9af27
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 15 deletions.
1 change: 1 addition & 0 deletions rebar.config
@@ -1,4 +1,5 @@
{cover_enabled, true}. {cover_enabled, true}.
{eunit_opts, [verbose]}.
{erl_opts, [warnings_as_errors]}. {erl_opts, [warnings_as_errors]}.
{deps, [ {deps, [
{riak_pb, ".*", {git, "git://github.com/basho/riak_pb", "master"}} {riak_pb, ".*", {git, "git://github.com/basho/riak_pb", "master"}}
Expand Down
87 changes: 72 additions & 15 deletions src/riakc_pb_socket.erl
Expand Up @@ -1220,7 +1220,7 @@ on_error(_Request, ErrMsg, State) ->
%% Format the PB encoded error message %% Format the PB encoded error message
fmt_err_msg(ErrMsg) -> fmt_err_msg(ErrMsg) ->
case ErrMsg#rpberrorresp.errcode of case ErrMsg#rpberrorresp.errcode of
Code when Code =:= 1; Code =:= undefined -> Code when Code =:= 0; Code =:= 1; Code =:= undefined ->
{error, ErrMsg#rpberrorresp.errmsg}; {error, ErrMsg#rpberrorresp.errmsg};
Code -> Code ->
{error, {Code, ErrMsg#rpberrorresp.errmsg}} {error, {Code, ErrMsg#rpberrorresp.errmsg}}
Expand All @@ -1237,14 +1237,14 @@ send_mapred_req(Pid, MapRed, ClientPid, CallTimeout) ->
ReqId = mk_reqid(), ReqId = mk_reqid(),
Timeout = proplists:get_value(timeout, MapRed, default_timeout(mapred_timeout)), Timeout = proplists:get_value(timeout, MapRed, default_timeout(mapred_timeout)),
Timeout1 = if Timeout1 = if
is_integer(Timeout) -> is_integer(Timeout) ->
%% Add an extra 100ms to the mapred timeout and use that %% Add an extra 100ms to the mapred timeout and use that
%% for the socket timeout. This should give the %% for the socket timeout. This should give the
%% map/reduce a chance to fail and let us know. %% map/reduce a chance to fail and let us know.
Timeout + 100; Timeout + 100;
true -> true ->
Timeout Timeout
end, end,
gen_server:call(Pid, {req, ReqMsg, Timeout1, {ReqId, ClientPid}}, CallTimeout). gen_server:call(Pid, {req, ReqMsg, Timeout1, {ReqId, ClientPid}}, CallTimeout).


%% @private %% @private
Expand Down Expand Up @@ -1510,6 +1510,7 @@ decode_mapred_resp(Data, <<"application/x-erlang-binary">>) ->
%% as a dependency. %% as a dependency.
%% %%
-ifdef(TEST). -ifdef(TEST).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").


%% Get the test host - check env RIAK_TEST_PB_HOST then env 'RIAK_TEST_HOST_1' %% Get the test host - check env RIAK_TEST_PB_HOST then env 'RIAK_TEST_HOST_1'
Expand Down Expand Up @@ -1563,15 +1564,51 @@ test_cookie() ->
list_to_atom(CookieStr) list_to_atom(CookieStr)
end. end.


%% Get the riak version from the init boot script, turn it into a list
%% of integers.
riak_version() ->
StrVersion = element(2, rpc:call(test_riak_node(), init, script_id, [])),
[ list_to_integer(V) || V <- string:tokens(StrVersion, ".") ].


%% Resets the riak node
reset_riak() -> reset_riak() ->
%% sleep because otherwise we're going to kill the vnodes too fast %% sleep because otherwise we're going to kill the vnodes too fast
%% for the supervisor's maximum restart frequency, which will bring %% for the supervisor's maximum restart frequency, which will bring
%% down the entire node %% down the entire node
timer:sleep(500),

?assertEqual(ok, maybe_start_network()), ?assertEqual(ok, maybe_start_network()),
case riak_version() of
[1,Two|_] when Two >= 2->
reset_riak_12();
_ ->
reset_riak_legacy()
end.

%% Resets a Riak 1.2+ node, which can run the memory backend in 'test'
%% mode.
reset_riak_12() ->
set_test_backend(),
ok = rpc:call(test_riak_node(), riak_kv_memory_backend, reset, []),
reset_ring().

%% Sets up the memory/test backend, leaving it alone if already set properly.
set_test_backend() ->
Env = rpc:call(test_riak_node(), application, get_all_env, [riak_kv]),
Backend = proplists:get_value(storage_backend, Env),
Test = proplists:get_value(test, Env),
case {Backend, Test} of
{riak_kv_memory_backend, true} ->
ok;
_ ->
ok = rpc:call(test_riak_node(), application, set_env, [riak_kv, storage_backend, riak_kv_memory_backend]),
ok = rpc:call(test_riak_node(), application, set_env, [riak_kv, test, true]),
Vnodes = rpc:call(test_riak_node(), riak_core_vnode_manager, all_vnodes, [riak_kv_vnode]),
[ ok = rpc:call(test_riak_node(), supervisor, terminate_child, [riak_core_vnode_sup, Pid]) ||
{_, _, Pid} <- Vnodes ]
end.


%% Resets a Riak 1.1 and earlier node.
reset_riak_legacy() ->
timer:sleep(500),
%% Until there is a good way to empty the vnodes, require the %% Until there is a good way to empty the vnodes, require the
%% test to run with ETS and kill the vnode master/sup to empty all the ETS tables %% test to run with ETS and kill the vnode master/sup to empty all the ETS tables
%% and the ring manager to remove any bucket properties %% and the ring manager to remove any bucket properties
Expand All @@ -1587,13 +1624,25 @@ reset_riak() ->
ok = rpc:call(test_riak_node(), riak_kv_mapred_cache, clear, []), ok = rpc:call(test_riak_node(), riak_kv_mapred_cache, clear, []),


%% Now reset the ring so bucket properties are default %% Now reset the ring so bucket properties are default
reset_ring().

%% Resets the ring to a fresh one, effectively deleting any bucket properties.
reset_ring() ->
Ring = rpc:call(test_riak_node(), riak_core_ring, fresh, []), Ring = rpc:call(test_riak_node(), riak_core_ring, fresh, []),
ok = rpc:call(test_riak_node(), riak_core_ring_manager, set_my_ring, [Ring]). ok = rpc:call(test_riak_node(), riak_core_ring_manager, set_my_ring, [Ring]).




%% Finds the pid of the PB listener process
riak_pb_listener_pid() -> riak_pb_listener_pid() ->
Children = supervisor:which_children({riak_kv_sup, test_riak_node()}), {Children, Proc} = case riak_version() of
hd([Pid || {Mod,Pid,_,_} <- Children, Mod == riak_kv_pb_listener]). [1,Two|_] when Two == 2->
{supervisor:which_children({riak_api_sup, test_riak_node()}),
riak_api_pb_listener};
_ ->
{supervisor:which_children({riak_kv_sup, test_riak_node()}),
riak_kv_pb_listener}
end,
hd([Pid || {Mod,Pid,_,_} <- Children, Mod == Proc]).


pause_riak_pb_listener() -> pause_riak_pb_listener() ->
Pid = riak_pb_listener_pid(), Pid = riak_pb_listener_pid(),
Expand All @@ -1604,10 +1653,16 @@ resume_riak_pb_listener() ->
rpc:call(test_riak_node(), sys, resume, [Pid]). rpc:call(test_riak_node(), sys, resume, [Pid]).


kill_riak_pb_sockets() -> kill_riak_pb_sockets() ->
case supervisor:which_children({riak_kv_pb_socket_sup, test_riak_node()}) of Children = case riak_version() of
[1,Two|_] when Two >= 2 ->
supervisor:which_children({riak_api_pb_sup, test_riak_node()});
_ ->
supervisor:which_children({riak_kv_pb_socket_sup, test_riak_node()})
end,
case Children of
[] -> [] ->
ok; ok;
Children -> [_|_] ->
Pids = [Pid || {_,Pid,_,_} <- Children], Pids = [Pid || {_,Pid,_,_} <- Children],
[rpc:call(test_riak_node(), erlang, exit, [Pid, kill]) || Pid <- Pids], [rpc:call(test_riak_node(), erlang, exit, [Pid, kill]) || Pid <- Pids],
erlang:yield(), erlang:yield(),
Expand Down Expand Up @@ -1647,6 +1702,8 @@ auto_reconnect_bad_connect_test() ->
stop(Pid). stop(Pid).


server_closes_socket_test() -> server_closes_socket_test() ->
%% Silence SASL junk when socket closes.
error_logger:tty(false),
%% Set up a dummy socket to send requests on %% Set up a dummy socket to send requests on
{ok, Listen} = gen_tcp:listen(0, [binary, {packet, 4}, {active, false}]), {ok, Listen} = gen_tcp:listen(0, [binary, {packet, 4}, {active, false}]),
{ok, Port} = inet:port(Listen), {ok, Port} = inet:port(Listen),
Expand Down

0 comments on commit ac9af27

Please sign in to comment.