Skip to content

Commit

Permalink
Support all-vbucket dump tap streams.
Browse files Browse the repository at this point in the history
closes #11

Change-Id: Ifcbfdc224cade010c4f0064fa05cc541e1574cfa
  • Loading branch information
dustin committed Jun 14, 2011
1 parent 8920140 commit 086584e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/mc_daemon.erl
Expand Up @@ -144,7 +144,7 @@ processing({?STAT, _Extra, <<"vbucket">>, _Body, _CAS, Socket, Opaque}, State) -
mc_couch_vbucket:handle_stats(Socket, Opaque, State),
{next_state, processing, State};
processing({?TAP_CONNECT, Extra, _Key, Body, _CAS, Socket, Opaque}, State) ->
mc_tap:run(State#state.db, Opaque, Socket, Extra, Body),
mc_tap:run(State, Opaque, Socket, Extra, Body),
{next_state, processing, State};
processing({?STAT, _Extra, _Key, _Body, _CAS, Socket, Opaque}, State) ->
mc_couch_stats:stats(Socket, Opaque),
Expand Down
33 changes: 23 additions & 10 deletions src/mc_tap.erl
Expand Up @@ -3,18 +3,30 @@
-include("couch_db.hrl").
-include("mc_constants.hrl").

-define(DUMP_ONLY, 2).
-define(DUMP_AND_LIST_VBUCKETS, 6).

-export([run/5]).

%% We only do exactly one type of tap request.
run(DbName, Opaque, Socket,
<<?DUMP_AND_LIST_VBUCKETS:32>>,
<<1:16, VBucketId:16>>) ->
spawn_link(fun() -> process_tap_stream(DbName, Opaque, VBucketId, Socket) end);
run(DbName, Opaque, Socket, <<Flags:32>>, Extra) ->
%% We're pretty specific about the type of tap connections we can handle.
run(State, Opaque, Socket, <<?DUMP_ONLY:32>>, <<>>) ->
spawn_link(fun() ->
DbName = mc_daemon:db_prefix(State),
lists:foreach(fun({VB,_VBState}) ->
process_tap_stream(DbName, Opaque, VB, Socket)
end,
mc_couch_vbucket:list_vbuckets(State)),
terminate_tap_stream(Socket, Opaque, 0)
end);
run(State, Opaque, Socket,
<<?DUMP_AND_LIST_VBUCKETS:32>>, <<1:16, VBucketId:16>>) ->
spawn_link(fun() -> process_tap_stream(mc_daemon:db_prefix(State), Opaque,
VBucketId, Socket),
terminate_tap_stream(Socket, Opaque, VBucketId)
end);
run(State, Opaque, Socket, <<Flags:32>>, Extra) ->
?LOG_INFO("MC tap: invalid request: ~p/~p/~p/~p/~p",
[DbName, Opaque, Socket, Flags, Extra]),
[State, Opaque, Socket, Flags, Extra]),
mc_connection:respond(Socket, ?TAP_CONNECT, Opaque,
#mc_response{status=?EINVAL,
body="Only dump+1 vbucket is allowed"}).
Expand Down Expand Up @@ -50,10 +62,11 @@ process_tap_stream(BaseDbName, Opaque, VBucketId, Socket) ->
AdapterFun, fold_thing,
[]),

couch_db:close(Db).

terminate_tap_stream(Socket, Opaque, Status) ->
TerminalExtra = <<8:16, 0:16, %% length, flags
0:8, %% TTL
0:8, 0:8, 0:8>>, %% reserved
mc_connection:respond(?REQ_MAGIC, Socket, ?TAP_OPAQUE, Opaque,
#mc_response{extra=TerminalExtra, status=VBucketId}),

couch_db:close(Db).
#mc_response{extra=TerminalExtra, status=Status}).

0 comments on commit 086584e

Please sign in to comment.