From 086584e0bebd92e9755c6d4195808c2b35b8babf Mon Sep 17 00:00:00 2001 From: Dustin Sallings Date: Tue, 14 Jun 2011 15:52:59 -0700 Subject: [PATCH] Support all-vbucket dump tap streams. closes #11 Change-Id: Ifcbfdc224cade010c4f0064fa05cc541e1574cfa --- src/mc_daemon.erl | 2 +- src/mc_tap.erl | 33 +++++++++++++++++++++++---------- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/mc_daemon.erl b/src/mc_daemon.erl index a0d63f0..d012d9f 100644 --- a/src/mc_daemon.erl +++ b/src/mc_daemon.erl @@ -144,7 +144,7 @@ processing({?STAT, _Extra, <<"vbucket">>, _Body, _CAS, Socket, Opaque}, State) - mc_couch_vbucket:handle_stats(Socket, Opaque, State), {next_state, processing, State}; processing({?TAP_CONNECT, Extra, _Key, Body, _CAS, Socket, Opaque}, State) -> - mc_tap:run(State#state.db, Opaque, Socket, Extra, Body), + mc_tap:run(State, Opaque, Socket, Extra, Body), {next_state, processing, State}; processing({?STAT, _Extra, _Key, _Body, _CAS, Socket, Opaque}, State) -> mc_couch_stats:stats(Socket, Opaque), diff --git a/src/mc_tap.erl b/src/mc_tap.erl index d553f43..693442e 100644 --- a/src/mc_tap.erl +++ b/src/mc_tap.erl @@ -3,18 +3,30 @@ -include("couch_db.hrl"). -include("mc_constants.hrl"). +-define(DUMP_ONLY, 2). -define(DUMP_AND_LIST_VBUCKETS, 6). -export([run/5]). -%% We only do exactly one type of tap request. -run(DbName, Opaque, Socket, - <>, - <<1:16, VBucketId:16>>) -> - spawn_link(fun() -> process_tap_stream(DbName, Opaque, VBucketId, Socket) end); -run(DbName, Opaque, Socket, <>, Extra) -> +%% We're pretty specific about the type of tap connections we can handle. +run(State, Opaque, Socket, <>, <<>>) -> + spawn_link(fun() -> + DbName = mc_daemon:db_prefix(State), + lists:foreach(fun({VB,_VBState}) -> + process_tap_stream(DbName, Opaque, VB, Socket) + end, + mc_couch_vbucket:list_vbuckets(State)), + terminate_tap_stream(Socket, Opaque, 0) + end); +run(State, Opaque, Socket, + <>, <<1:16, VBucketId:16>>) -> + spawn_link(fun() -> process_tap_stream(mc_daemon:db_prefix(State), Opaque, + VBucketId, Socket), + terminate_tap_stream(Socket, Opaque, VBucketId) + end); +run(State, Opaque, Socket, <>, Extra) -> ?LOG_INFO("MC tap: invalid request: ~p/~p/~p/~p/~p", - [DbName, Opaque, Socket, Flags, Extra]), + [State, Opaque, Socket, Flags, Extra]), mc_connection:respond(Socket, ?TAP_CONNECT, Opaque, #mc_response{status=?EINVAL, body="Only dump+1 vbucket is allowed"}). @@ -50,10 +62,11 @@ process_tap_stream(BaseDbName, Opaque, VBucketId, Socket) -> AdapterFun, fold_thing, []), + couch_db:close(Db). + +terminate_tap_stream(Socket, Opaque, Status) -> TerminalExtra = <<8:16, 0:16, %% length, flags 0:8, %% TTL 0:8, 0:8, 0:8>>, %% reserved mc_connection:respond(?REQ_MAGIC, Socket, ?TAP_OPAQUE, Opaque, - #mc_response{extra=TerminalExtra, status=VBucketId}), - - couch_db:close(Db). + #mc_response{extra=TerminalExtra, status=Status}).