Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Eas fix wm 2i timeout #644

Merged
merged 2 commits into from

3 participants

@engelsanchez
Collaborator

When the webmachine process handling the 2i request times out, it was
returning an error message, but messages from the index FSM could still
arrive at this process later in a different context (in mochiweb)
causing it to send error messages to the client, which at that point
could be sending another request down the same connection. That
unrelated request was failing with a 400 "unexpected data" message.
This fix uses a big hammer: on a timeout in the wm process, it will try
to kill the index FSM and consume any messages sent from it before
returning a timeout error message to the client.

@engelsanchez engelsanchez Fix 2i timeout messing with next request
When the webmachine process handling the 2i request times out, it was
returning an error message, but messages from the index FSM could still
arrive at this process later in a different context (in mochiweb)
causing it to send error messages to the client, which at that point
could be sending another request down the same connection. That
unrelated request was failing with a 400 "unexpected data" message.
This fix uses a big hammer: on a timeout in the wm process, it will try
to kill the index FSM and consume any messages sent from it before
returning a timeout error message to the client.
84a6b38
@engelsanchez engelsanchez referenced this pull request in basho/riak_test
Merged

Eas 1.4.2 2i timeout fix #369

@engelsanchez
Collaborator

Testing notes: it would be good to make sure all 2i tests run without problem (verify_2i_timeout, secondary_index_tests, verify_2i_returnterms, verify_2i_limit... I think that's it). The problem occurred only once every few/several tries, be warned.

Possible things to watch out for:

  • killing the FSM should be OK if all processes talking to it or depending on it to exit are monitoring it, which we should be as far as I know.
  • killing a process will probably generate crash messages in the log. At this point I think that's an acceptable level of noise.
  • I am not sure that killing a process, checking if it's alive and then consuming all of its messages is race condition prone: could we still get a message in the queue after the point where erlang:is_process_alive returns false? I'm not 100% sure, but this should be better than no protection at all
@engelsanchez
Collaborator

Also, you'll need riak_test#369 to test. Without that PR, you may see different error messages because the request sometimes returns a 500, sometimes a 503 depending on the type of timeout and also the 2i returnterms could fail generating an index request URL.

@seancribbs

The timeout-related 400 can also be reproduced reliably on this basho/riak-python-client#272. To run just that test, use python -m unittest riak.tests.test_all.RiakHttpTransportTestCase.test_index_timeout.

@engelsanchez
Collaborator

Thanks @seancribbs, that should be useful. I'll try it.

@seancribbs

@engelsanchez Let me know if you need help setting up the tests.

@engelsanchez
Collaborator

While running the test_index_timeout python test in the sdc-2i-timeout branch, I'm getting something unexpected. Is this what you were seeing? The client is not raising an exception with timeout=1.

FAIL: test_index_timeout (riak.tests.test_all.RiakHttpTransportTestCase)

Traceback (most recent call last):
File "riak/tests/test_2i.py", line 422, in test_index_timeout
bucket.get_index('field1_bin', 'val1', timeout=1)
AssertionError: RiakError not raised

@seancribbs

@engelsanchez Yes, and when running wireshark, you can see the 400 go onto the wire between the original response and the client's next request.

@engelsanchez
Collaborator

After debugging the test with my branch, my conclusion is that the test is not working: with a 1ms timeout, results can be returned quite easily. And for the record, the Python test is issuing a non-streaming request, which is a different code path than that touched in this PR.

@beerriot

The method here looks sound to me. I spent a while verifying that it's a good idea by creating a minimal PULSE test:

https://gist.github.com/beerriot/02d6ddf725a9cc7db720

One thing I wanted to check specifically was whether the wait for is_process_alive/1 to return false was necessary. I believe the test shows that it is. As written in the gist, the test without the wait fails, but the test with the wait passes. Uncommenting the extra documented receive line causes the non-waiting test to pass. I'm not sure why that's the case, but it seems safest to leave the wait in for this "hammer".

The safe alternative to this busy-wait loop would be to attach a monitor to FSMPid and wait for the 'DOWN' message.

@engelsanchez
Collaborator

Oh FFS! Why didn't I think of that! Of course that's better :(. Shame...

@engelsanchez
Collaborator

I feel tempted to retry with the monitor (duh!), but given that it's late in the day wanted to check with @jaredmorrow and @jonmeredith to see if anybody is around to take it as is and do another run. If not, I'd like to try to fix the elegant way, but not if it holds back the release.

@engelsanchez engelsanchez Replace busy wait with monitor check
My dumbass coded the kill + busy wait loop for no good reason. Setting
up a monitor, killing the process and waiting for the 'DOWN' message is
the obvious elegant Erlangy way to kill and wait for a process to die.
6e8db5e
@engelsanchez
Collaborator

Well, I've replaced my busy wait with the proper monitor + DOWN message detection technique as Bryan pointed out. Maybe tomorrow @beerriot you or can run a proper PULSE test on it. I've ran the riak test 10 times to verify. I need a bit more time to set up PULSE again.

@beerriot

I've updated my pulse gist with a version that uses monitor like your latest commit. It passes perfectly.

So, +1 to this PR, then.

@engelsanchez engelsanchez merged commit 61ac9d8 into 1.4
@engelsanchez engelsanchez deleted the eas-fix-wm-2i-timeout branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Aug 28, 2013
  1. @engelsanchez

    Fix 2i timeout messing with next request

    engelsanchez authored
    When the webmachine process handling the 2i request times out, it was
    returning an error message, but messages from the index FSM could still
    arrive at this process later in a different context (in mochiweb)
    causing it to send error messages to the client, which at that point
    could be sending another request down the same connection. That
    unrelated request was failing with a 400 "unexpected data" message.
    This fix uses a big hammer: on a timeout in the wm process, it will try
    to kill the index FSM and consume any messages sent from it before
    returning a timeout error message to the client.
Commits on Aug 29, 2013
  1. @engelsanchez

    Replace busy wait with monitor check

    engelsanchez authored
    My dumbass coded the kill + busy wait loop for no good reason. Setting
    up a monitor, killing the process and waiting for the 'DOWN' message is
    the obvious elegant Erlangy way to kill and wait for a process to die.
This page is out of date. Refresh to see the latest.
View
30 src/riak_client.erl
@@ -527,31 +527,31 @@ get_index(Bucket, Query, Opts) ->
riak_kv_index_fsm_sup:start_index_fsm(Node, [{raw, ReqId, Me}, [Bucket, none, Query, Timeout, MaxResults]]),
wait_for_query_results(ReqId, Timeout).
-%% @spec stream_get_index(Bucket :: binary(),
-%% Query :: riak_index:query_def()) ->
-%% {ok, pid()} |
-%% {error, timeout} |
-%% {error, Err :: term()}.
-%%
%% @doc Run the provided index query, return a stream handle.
+-spec stream_get_index(Bucket :: binary(), Query :: riak_index:query_def()) ->
+ {ok, ReqId :: term(), FSMPid :: pid()} | {error, Reason :: term()}.
stream_get_index(Bucket, Query) ->
stream_get_index(Bucket, Query, [{timeout, ?DEFAULT_TIMEOUT}]).
-%% @spec stream_get_index(Bucket :: binary(),
-%% Query :: riak_index:query_def(),
-%% TimeoutMillisecs :: integer()) ->
-%% {ok, pid()} |
-%% {error, timeout} |
-%% {error, Err :: term()}.
-%%
%% @doc Run the provided index query, return a stream handle.
+-spec stream_get_index(Bucket :: binary(), Query :: riak_index:query_def(),
+ Opts :: proplists:proplist()) ->
+ {ok, ReqId :: term(), FSMPid :: pid()} | {error, Reason :: term()}.
stream_get_index(Bucket, Query, Opts) ->
Timeout = proplists:get_value(timeout, Opts, ?DEFAULT_TIMEOUT),
MaxResults = proplists:get_value(max_results, Opts, all),
Me = self(),
ReqId = mk_reqid(),
- riak_kv_index_fsm_sup:start_index_fsm(Node, [{raw, ReqId, Me}, [Bucket, none, Query, Timeout, MaxResults]]),
- {ok, ReqId}.
+ case riak_kv_index_fsm_sup:start_index_fsm(Node,
+ [{raw, ReqId, Me},
+ [Bucket, none,
+ Query, Timeout,
+ MaxResults]]) of
+ {ok, Pid} ->
+ {ok, ReqId, Pid};
+ {error, Reason} ->
+ {error, Reason}
+ end.
%% @spec set_bucket(riak_object:bucket(), [BucketProp :: {atom(),term()}]) -> ok
%% @doc Set the given properties for Bucket.
View
2  src/riak_index.erl
@@ -77,7 +77,7 @@ mapred_index(Dest, Args) ->
mapred_index(Dest, Args, ?TIMEOUT).
mapred_index(_Pipe, [Bucket, Query], Timeout) ->
{ok, C} = riak:local_client(),
- {ok, ReqId} = C:stream_get_index(Bucket, Query, [{timeout, Timeout}]),
+ {ok, ReqId, _} = C:stream_get_index(Bucket, Query, [{timeout, Timeout}]),
{ok, Bucket, ReqId}.
%% @spec parse_object_hook(riak_object:riak_object()) ->
View
2  src/riak_kv_pb_csbucket.erl
@@ -76,7 +76,7 @@ maybe_perform_query({ok, Query}, Req, State) ->
#rpbcsbucketreq{bucket=Bucket, max_results=MaxResults, timeout=Timeout} = Req,
#state{client=Client} = State,
Opts = riak_index:add_timeout_opt(Timeout, [{max_results, MaxResults}]),
- {ok, ReqId} = Client:stream_get_index(Bucket, Query, Opts),
+ {ok, ReqId, _FSMPid} = Client:stream_get_index(Bucket, Query, Opts),
{reply, {stream, ReqId}, State#state{req_id=ReqId, req=Req}}.
%% @doc process_stream/3 callback. Handle streamed responses
View
2  src/riak_kv_pb_index.erl
@@ -81,7 +81,7 @@ maybe_perform_query({ok, Query}, Req=#rpbindexreq{stream=true}, State) ->
#rpbindexreq{bucket=Bucket, max_results=MaxResults, timeout=Timeout} = Req,
#state{client=Client} = State,
Opts = riak_index:add_timeout_opt(Timeout, [{max_results, MaxResults}]),
- {ok, ReqId} = Client:stream_get_index(Bucket, Query, Opts),
+ {ok, ReqId, _FSMPid} = Client:stream_get_index(Bucket, Query, Opts),
ReturnTerms = riak_index:return_terms(Req#rpbindexreq.return_terms, Query),
{reply, {stream, ReqId}, State#state{req_id=ReqId, req=Req#rpbindexreq{return_terms=ReturnTerms}}};
maybe_perform_query({ok, Query}, Req, State) ->
View
32 src/riak_kv_wm_index.erl
@@ -215,11 +215,11 @@ handle_streaming_index_query(RD, Ctx) ->
Opts = riak_index:add_timeout_opt(Timeout, [{max_results, MaxResults}]),
- {ok, ReqID} = Client:stream_get_index(Bucket, Query, Opts),
- StreamFun = index_stream_helper(ReqID, Boundary, ReturnTerms, MaxResults, proplists:get_value(timeout, Opts), undefined, 0),
+ {ok, ReqID, FSMPid} = Client:stream_get_index(Bucket, Query, Opts),
+ StreamFun = index_stream_helper(ReqID, FSMPid, Boundary, ReturnTerms, MaxResults, proplists:get_value(timeout, Opts), undefined, 0),
{{stream, {<<>>, StreamFun}}, CTypeRD, Ctx}.
-index_stream_helper(ReqID, Boundary, ReturnTerms, MaxResults, Timeout, LastResult, Count) ->
+index_stream_helper(ReqID, FSMPid, Boundary, ReturnTerms, MaxResults, Timeout, LastResult, Count) ->
fun() ->
receive
{ReqID, done} ->
@@ -234,7 +234,7 @@ index_stream_helper(ReqID, Boundary, ReturnTerms, MaxResults, Timeout, LastResul
end,
{iolist_to_binary(Final), done};
{ReqID, {results, []}} ->
- {<<>>, index_stream_helper(ReqID, Boundary, ReturnTerms, MaxResults, Timeout, LastResult, Count)};
+ {<<>>, index_stream_helper(ReqID, FSMPid, Boundary, ReturnTerms, MaxResults, Timeout, LastResult, Count)};
{ReqID, {results, Results}} ->
%% JSONify the results
JsonResults = encode_results(ReturnTerms, Results),
@@ -244,14 +244,36 @@ index_stream_helper(ReqID, Boundary, ReturnTerms, MaxResults, Timeout, LastResul
LastResult1 = last_result(Results),
Count1 = Count + length(Results),
{iolist_to_binary(Body),
- index_stream_helper(ReqID, Boundary, ReturnTerms, MaxResults, Timeout, LastResult1, Count1)};
+ index_stream_helper(ReqID, FSMPid, Boundary, ReturnTerms, MaxResults, Timeout, LastResult1, Count1)};
{ReqID, Error} ->
stream_error(Error, Boundary)
after Timeout ->
+ whack_index_fsm(ReqID, FSMPid),
stream_error({error, timeout}, Boundary)
end
end.
+whack_index_fsm(ReqID, Pid) ->
+ wait_for_death(Pid),
+ clear_index_fsm_msgs(ReqID).
+
+wait_for_death(Pid) ->
+ Ref = erlang:monitor(process, Pid),
+ exit(Pid, kill),
+ receive
+ {'DOWN', Ref, process, Pid, _Info} ->
+ ok
+ end.
+
+clear_index_fsm_msgs(ReqID) ->
+ receive
+ {ReqID, _} ->
+ clear_index_fsm_msgs(ReqID)
+ after
+ 0 ->
+ ok
+ end.
+
stream_error(Error, Boundary) ->
lager:error("Error in index wm: ~p", [Error]),
ErrorJson = encode_error(Error),
Something went wrong with that request. Please try again.