Skip to content

Commit

Permalink
Merge branch 'bwf-error-msgs'
Browse files Browse the repository at this point in the history
  • Loading branch information
beerriot committed Dec 12, 2011
2 parents 99f2783 + 4a5ffba commit 01f2098
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 16 deletions.
7 changes: 7 additions & 0 deletions README.org
Expand Up @@ -720,3 +720,10 @@ the same input as the =riak_client:mapred/2= function. Support is
currently provided for =map= and =reduce= phases implemented in
Erlang, specified using the ={qfun, function()}= or ={modfun, Module,
Function}= syntax.

* Additional Documentation

A diagram recording the supervisor/link/monitor structure of the
Erlang processes involved in Riak Pipe is included in the file
=riak_pipe_monitors.dot=. The comments at the top of that file
describe how to render it to an image using Graphviz.
98 changes: 98 additions & 0 deletions riak_pipe_monitors.dot
@@ -0,0 +1,98 @@
// A map of the links and monitors among Riak Pipe processes

// This map shows the basic web of monitors and links present in the
// system when:
// - a pipeline "A" is setup with two fittings, "1" and "2"
// - a vnode is handling inputs for both of those fittings
// (only one vnode is shown, for simplicity in the diagram)

// Edges are colored according to the type of link/monitor (or the
// special-case link of supervision):
// supervises = [color="#009900"];
// links = [color="#990000"];
// monitors = [color="#000099"];

// Render using Graphviz:
// dot -Tpng -oriak_pipe_monitors.png riak_pipe_monitors.dot

// Notes:
// - the links from riak_pipe_vnode to riak_pipe_vnode_worker_A* were
// put in place before riak_core_vnode supported handle_info; they
// should be changed to monitors now
digraph {
// application-immortal processes
subgraph apps {
rank=min;
riak_pipe_sup
riak_core_sup
}

subgraph immortal {
{rank=same;
riak_pipe_vnode_master
riak_pipe_builder_sup
riak_pipe_fitting_sup
riak_core_vnode_sup}

// started at application start time
riak_pipe_sup -> riak_pipe_vnode_master [color="#009900"];
riak_pipe_sup -> riak_pipe_builder_sup [color="#009900"];
riak_pipe_sup -> riak_pipe_fitting_sup [color="#009900"];

// external to pipe
riak_core_sup -> riak_core_vnode_sup [color="#009900"];
}

// application-mortal processes
// started as vnode requests arrive
subgraph mortal {
riak_pipe_vnode
riak_pipe_vnode_worker_sup

riak_core_vnode_sup -> riak_pipe_vnode [color="#009900"];
riak_pipe_vnode_master -> riak_pipe_vnode [color="#000099"];

riak_pipe_vnode -> riak_pipe_vnode_worker_sup [color="#990000"];
}

// the builder/fitting "meta" processes
subgraph pipeline_meta_A {
riak_pipe_builder_A

riak_pipe_fitting_A1
riak_pipe_fitting_A2

// started as pipelines are set up
riak_pipe_builder_sup -> riak_pipe_builder_A [color="#009900"];

riak_pipe_fitting_sup -> riak_pipe_fitting_A1 [color="#009900"];
riak_pipe_fitting_sup -> riak_pipe_fitting_A2 [color="#009900"];

riak_pipe_builder_A -> riak_pipe_fitting_A1 [color="#000099"];
riak_pipe_builder_A -> riak_pipe_fitting_A2 [color="#000099"];

riak_pipe_fitting_A1 -> riak_pipe_builder_A [color="#000099"];
riak_pipe_fitting_A2 -> riak_pipe_builder_A [color="#000099"];

// builder also monitors the process receiving outputs
riak_pipe_builder_A -> sink_process_A [color="#000099"];
}

// the processes actually processing inputs
subgraph pipeline_A {
riak_pipe_vnode_worker_A1
riak_pipe_vnode_worker_A2

riak_pipe_vnode_worker_sup -> riak_pipe_vnode_worker_A1 [color="#009900"];
riak_pipe_vnode_worker_sup -> riak_pipe_vnode_worker_A2 [color="#009900"];

riak_pipe_vnode -> riak_pipe_vnode_worker_A1 [color="#990000"];
riak_pipe_vnode -> riak_pipe_vnode_worker_A2 [color="#990000"];

riak_pipe_vnode -> riak_pipe_fitting_A1 [color="#000099"];
riak_pipe_vnode -> riak_pipe_fitting_A2 [color="#000099"];

riak_pipe_fitting_A1 -> riak_pipe_vnode [color="#000099"];
riak_pipe_fitting_A2 -> riak_pipe_vnode [color="#000099"];
}
}
3 changes: 1 addition & 2 deletions src/riak_pipe.erl
Expand Up @@ -369,8 +369,7 @@ collect_results(Pipe, ResultAcc, LogAcc, Timeout) ->
%% of waiting for an `eoi' to propagate through.
-spec destroy(pipe()) -> ok.
destroy(#pipe{builder=Builder}) ->
erlang:exit(Builder, kill),
ok.
riak_pipe_builder:destroy(Builder).

%% @doc Get all active pipelines hosted on `Node'. Pass the atom
%% `global' instead of a node name to get all pipelines hosted on
Expand Down
29 changes: 24 additions & 5 deletions src/riak_pipe_builder.erl
Expand Up @@ -30,7 +30,8 @@
%% API
-export([start_link/2]).
-export([fitting_pids/1,
pipeline/1]).
pipeline/1,
destroy/1]).

%% gen_fsm callbacks
-export([init/1,
Expand Down Expand Up @@ -86,6 +87,17 @@ fitting_pids(Builder) ->
pipeline(BuilderPid) ->
gen_fsm:sync_send_event(BuilderPid, pipeline).

%% @doc Shutdown the pipeline built by this builder.
-spec destroy(pid()) -> ok.
destroy(BuilderPid) ->
try
gen_fsm:sync_send_event(BuilderPid, destroy, infinity)
catch exit:_Reason ->
%% the builder exited before the call completed,
%% since we were shutting it down anyway, this is ok
ok
end.

%%%===================================================================
%%% gen_fsm callbacks
%%%===================================================================
Expand Down Expand Up @@ -123,14 +135,18 @@ wait_pipeline_shutdown(_Event, State) ->
{next_state, wait_pipeline_shutdown, State}.

%% @doc A client is asking for the fittings. Respond.
-spec wait_pipeline_shutdown(pipeline, term(), state()) ->
-spec wait_pipeline_shutdown(pipeline | destroy, term(), state()) ->
{reply,
{ok, #pipe{}},
wait_pipeline_shutdown,
state()}.
state()}
|{stop, normal, ok, state()}.
wait_pipeline_shutdown(pipeline, _From, #state{pipe=Pipe}=State) ->
%% everything is started - reply now
{reply, {ok, Pipe}, wait_pipeline_shutdown, State};
wait_pipeline_shutdown(destroy, _From, State) ->
%% client asked to shutdown this pipe immediately
{stop, normal, ok, State};
wait_pipeline_shutdown(_, _, State) ->
%% unknown message - reply {error, unknown} to get rid of it
{reply, {error, unknown}, wait_pipeline_shutdown, State}.
Expand Down Expand Up @@ -217,9 +233,12 @@ maybe_shutdown(Reason, _StateName, State) ->
%% explode!
{stop, {fitting_exited_abnormally, Reason}, State}.

%% @doc Unused.
%% @doc Terminate any fittings that are still alive.
-spec terminate(term(), atom(), state()) -> ok.
terminate(_Reason, _StateName, _State) ->
terminate(_Reason, _StateName, #state{alive=Alive}) ->
%% this is a brutal kill of each fitting, just in case that fitting
%% is otherwise swamped with stop/restart messages from its workers
[ riak_pipe_fitting_sup:terminate_fitting(F) || {F,_R} <- Alive ],
ok.

%% @doc Unused.
Expand Down
13 changes: 8 additions & 5 deletions src/riak_pipe_fitting.erl
Expand Up @@ -137,10 +137,8 @@ crash(Pid, Fun) ->
%%% gen_fsm callbacks
%%%===================================================================

%% @doc Initialize the fitting process. This function links to the
%% builder process, so it will tear down if the builder exits
%% abnormally (which happens if another fitting exist
%% abnormally).
%% @doc Initialize the fitting process. This function monitors the
%% builder process, so it will tear down if the builder exits.
-spec init([pid() | riak_pipe:fitting_spec() | riak_pipe:fitting()
| riak_pipe:exec_opts()]) ->
{ok, wait_upstream_eoi, state()}.
Expand All @@ -159,7 +157,7 @@ init([Builder,

?T(Details, [], {fitting, init_started}),

erlang:link(Builder),
erlang:monitor(process, Builder),

?T(Details, [], {fitting, init_finished}),

Expand Down Expand Up @@ -333,6 +331,11 @@ handle_sync_event(_Event, _From, StateName, State) ->
atom(), state()) ->
{next_state, atom(), state()}
|{stop, normal, state()}.
handle_info({'DOWN', _Ref, process, Builder, _Reason},
_StateName,
#state{builder=Builder}=State) ->
%% if the builder exits, stop immediately
{stop, normal, State};
handle_info({'DOWN', Ref, _, _, _}, StateName, State) ->
case lists:keytake(Ref, #worker.monitor, State#state.workers) of
{value, Worker, Rest} ->
Expand Down
10 changes: 9 additions & 1 deletion src/riak_pipe_fitting_sup.erl
Expand Up @@ -25,7 +25,8 @@

%% API
-export([start_link/0]).
-export([add_fitting/4]).
-export([add_fitting/4,
terminate_fitting/1]).

%% Supervisor callbacks
-export([init/1]).
Expand Down Expand Up @@ -55,6 +56,13 @@ add_fitting(Builder, Spec, Output, Options) ->
?DPF("Adding fitting for ~p", [Spec]),
supervisor:start_child(?SERVER, [Builder, Spec, Output, Options]).

%% @doc Terminate a fitting immediately. Useful for tearing down
%% pipelines that may be otherwise swamped with messages from
%% restarting workers.
-spec terminate_fitting(riak_pipe:fitting()) -> ok | {error, term()}.
terminate_fitting(#fitting{pid=Pid}) ->
supervisor:terminate_child(?SERVER, Pid).

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
Expand Down
6 changes: 4 additions & 2 deletions src/riak_pipe_vnode.erl
Expand Up @@ -622,14 +622,16 @@ handle_exit(Pid, Reason, #state{partition=Partition}=State) ->
%% @doc Handle a 'DOWN' message from a fitting process. Kill the
%% worker associated with that fitting and dispose of its queue.
-spec handle_info(term(), state()) -> {ok, state()}.
handle_info({'DOWN',_,process,Pid,_}, #state{partition=Partition}=State) ->
handle_info({'DOWN',_,process,Pid,_},
#state{partition=Partition, worker_sup=WorkerSup}=State) ->
NewState = case worker_by_fitting_pid(Pid, State) of
{ok, Worker} ->
?T(Worker#worker.details, [error],
{vnode, {fitting_died, Partition}}),
%% if the fitting died, tear down its worker
erlang:unlink(Worker#worker.pid),
erlang:exit(Worker#worker.pid, fitting_died),
riak_pipe_vnode_worker_sup:terminate_worker(
WorkerSup, Worker#worker.pid),
remove_worker(Worker, State);
none ->
%% TODO: log this somewhere?
Expand Down
8 changes: 7 additions & 1 deletion src/riak_pipe_vnode_worker_sup.erl
Expand Up @@ -26,7 +26,8 @@

%% API
-export([start_link/2]).
-export([start_worker/2]).
-export([start_worker/2,
terminate_worker/2]).

%% Supervisor callbacks
-export([init/1]).
Expand All @@ -48,6 +49,11 @@ start_link(Partition, VnodePid) ->
start_worker(Supervisor, Details) ->
supervisor:start_child(Supervisor, [Details]).

%% @doc Stop a worker immediately
-spec terminate_worker(pid(), pid()) -> ok | {error, term()}.
terminate_worker(Supervisor, WorkerPid) ->
supervisor:terminate_child(Supervisor, WorkerPid).

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
Expand Down

0 comments on commit 01f2098

Please sign in to comment.