Skip to content

Commit

Permalink
Merge pull request basho#68 from branch 'bwf-vnode-down'
Browse files Browse the repository at this point in the history
  • Loading branch information
beerriot committed Mar 1, 2013
2 parents 6cf9a07 + a87f40a commit 9752bb1
Showing 1 changed file with 35 additions and 11 deletions.
46 changes: 35 additions & 11 deletions src/riak_pipe_vnode.erl
Expand Up @@ -335,24 +335,48 @@ queue_work_send(#fitting{ref=Ref}=Fitting,
{raw, Ref, self()},
riak_pipe_vnode_master) of
{ok, VnodePid} ->
%% monitor in case the vnode is gone before it
%% responds to this request
MonRef = erlang:monitor(process, VnodePid),
%% block until input confirmed queued, for backpressure
receive
{Ref, Reply} ->
erlang:demonitor(MonRef),
Reply;
{'DOWN',MonRef,process,VnodePid,Reason} ->
{error, {vnode_down, Reason}}
end;
queue_work_wait(Ref, Index, VnodePid);
{error, timeout} ->
{error, {vnode_proxy_timeout, {Index, Node}}}
catch exit:{{nodedown, Node}, _GenServerCall} ->
%% node died between services check and gen_server:call
{error, {nodedown, Node}}
end.

queue_work_wait(Ref, Index, VnodePid) ->
%% monitor in case the vnode is gone before it
%% responds to this request
MonRef = erlang:monitor(process, VnodePid),
%% block until input confirmed queued, for backpressure
receive
{Ref, Reply} ->
erlang:demonitor(MonRef),
Reply;
{'DOWN',MonRef,process,VnodePid,normal} ->
%% the vnode likely just shut down after completing handoff
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
Next = case riak_core_ring:next_owner(Ring, Index) of
{undefined, undefined, undefined} ->
%% ownership finished changing before we asked
%% ... check if Next==Node?
riak_core_ring:index_owner(Ring);
{_From, To, _Status} ->
%% ownership is still changing ... wait for
%% the future owner
To
end,
%% monitor new vnode, since the input will be handled
%% there, instead of at the vnode originally contacted
{ok, NextPid} = rpc:call(Next,
riak_core_vnode_master,
get_vnode_pid,
[Index, riak_pipe_vnode]),
queue_work_wait(Ref, Index, NextPid);
{'DOWN',MonRef,process,VnodePid,Reason} ->
%% the vnode died unexpectedly
{error, {vnode_down, Reason}}
end.

%% @doc Send end-of-inputs for a fitting to a vnode. Note: this
%% should only be called by `riak_pipe_fitting' processes. This
%% will cause the vnode to shutdown the worker, dispose of the
Expand Down

0 comments on commit 9752bb1

Please sign in to comment.