Skip to content
This repository has been archived by the owner on Sep 19, 2019. It is now read-only.

Commit

Permalink
Implement new database deletion logic
Browse files Browse the repository at this point in the history
This patch makes the response to a DB deletion request depend only on
the content of the shard_db and not on the presence or absence of files
on disk.  We respond with a 'not_found' if the local node has no entry
for the database in its partition table cache, or if every worker fails
to find any live leaf revision (this should be very rare, since the
workers were generated from an in-memory cache of a live leaf revision),
an 'ok' if all cluster nodes confirm that all leaf revisions of the
document are deleted and at least one node actually performed a delete,
an 'accepted' if a majority confirm the same, and an error tuple
otherwise.

The patch also uses rexi_monitor to receive information about down
nodes.  It ensures that database deletions do not block until the
timeout when a node is down.

Thanks Bob Dionne for various implementations and reviews.
  • Loading branch information
kocolosk committed Sep 22, 2011
1 parent 578edd8 commit cbbc6c5
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 29 deletions.
79 changes: 56 additions & 23 deletions src/fabric_db_delete.erl
Expand Up @@ -24,39 +24,72 @@
%%
go(DbName, _Options) ->
Shards = mem3:shards(DbName),
Workers = fabric_util:submit_jobs(Shards, delete_db, [DbName]),
Acc0 = fabric_dict:init(Workers, nil),
case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
% delete doc from shard_db
try delete_shard_db_doc(DbName) of
{ok, ok} ->
ok;
{ok, accepted} ->
accepted;
{ok, not_found} ->
erlang:error(database_does_not_exist, DbName);
Error ->
Error
after
% delete the shard files
fabric_util:submit_jobs(Shards, delete_db, [])
end.

handle_message({rexi_EXIT, Reason}, _Worker, _Counters) ->
{error, Reason};

handle_message(Msg, Shard, Counters) ->
C1 = fabric_dict:store(Shard, Msg, Counters),
case fabric_dict:any(nil, C1) of
true ->
{ok, C1};
false ->
final_answer(C1)
delete_shard_db_doc(Doc) ->
Shards = [#shard{node=N} || N <- mem3:nodes()],
RexiMon = fabric_util:create_monitors(Shards),
Workers = fabric_util:submit_jobs(Shards, delete_shard_db_doc, [Doc]),
Acc0 = {length(Shards), fabric_dict:init(Workers, nil)},
try fabric_util:recv(Workers, #shard.ref, fun handle_db_update/3, Acc0) of
{timeout, _} ->
{error, timeout};
Else ->
Else
after
rexi_monitor:stop(RexiMon)
end.

final_answer(Counters) ->
Successes = [X || {_, M} = X <- Counters, M == ok orelse M == not_found],
case fabric_view:is_progress_possible(Successes) of
handle_db_update({rexi_DOWN, _, {_, Node}, _}, _Worker, {W, Counters}) ->
New = fabric_dict:filter(fun(S, _) -> S#shard.node =/= Node end, Counters),
maybe_stop(W, New);

handle_db_update({rexi_EXIT, _Reason}, Worker, {W, Counters}) ->
maybe_stop(W, fabric_dict:erase(Worker, Counters));

handle_db_update(conflict, _, _) ->
% just fail when we get any conflicts
{error, conflict};

handle_db_update(Msg, Worker, {W, Counters}) ->
maybe_stop(W, fabric_dict:store(Worker, Msg, Counters)).

maybe_stop(W, Counters) ->
case fabric_dict:any(nil, Counters) of
true ->
case lists:keymember(ok, 2, Successes) of
true ->
{stop, ok};
false ->
{stop, not_found}
end;
{ok, {W, Counters}};
false ->
{error, internal_server_error}
{Ok,NotFound} = fabric_dict:fold(fun count_replies/3, {0,0}, Counters),
case {Ok + NotFound, Ok, NotFound} of
{W, 0, W} ->
{#shard{dbname=Name}, _} = hd(Counters),
twig:log(warn, "~p not_found ~s", [?MODULE, Name]),
{stop, not_found};
{W, _, _} ->
{stop, ok};
{N, M, _} when N >= (W div 2 + 1), M > 0 ->
{stop, accepted};
_ ->
{error, internal_server_error}
end
end.

count_replies(_, ok, {Ok, NotFound}) ->
{Ok+1, NotFound};
count_replies(_, not_found, {Ok, NotFound}) ->
{Ok, NotFound+1};
count_replies(_, _, Acc) ->
Acc.
12 changes: 7 additions & 5 deletions src/fabric_rpc.erl
Expand Up @@ -18,8 +18,8 @@
-export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3,
update_docs/3]).
-export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]).
-export([create_db/1, delete_db/2, reset_validation_funs/1, set_security/3,
set_revs_limit/3, create_shard_db_doc/2]).
-export([create_db/1, delete_db/1, reset_validation_funs/1, set_security/3,
set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]).

-include("fabric.hrl").
-include_lib("couch/include/couch_db.hrl").
Expand Down Expand Up @@ -176,9 +176,11 @@ create_db(DbName) ->
create_shard_db_doc(_, Doc) ->
rexi:reply(mem3_util:write_db_doc(Doc)).

delete_db(DbName, DocId) ->
mem3_util:delete_db_doc(DocId),
rexi:reply(couch_server:delete(DbName, [])).
delete_db(DbName) ->
couch_server:delete(DbName, []).

delete_shard_db_doc(_, DocId) ->
rexi:reply(mem3_util:delete_db_doc(DocId)).

get_db_info(DbName) ->
with_db(DbName, [], {couch_db, get_db_info, []}).
Expand Down
6 changes: 5 additions & 1 deletion src/fabric_util.erl
Expand Up @@ -15,7 +15,7 @@
-module(fabric_util).

-export([submit_jobs/3, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1,
update_counter/3, remove_ancestors/2, kv/2]).
update_counter/3, remove_ancestors/2, kv/2, create_monitors/1]).

-include("fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
Expand Down Expand Up @@ -121,6 +121,10 @@ remove_ancestors([{_,{{ok, #doc{revs = {Pos, Revs}}}, Count}} = Head | Tail], Ac
remove_ancestors(update_counter(Descendant, Count, Tail), Acc)
end.

create_monitors(Shards) ->
MonRefs = lists:usort([{rexi_server, N} || #shard{node=N} <- Shards]),
rexi_monitor:start(MonRefs).

%% verify only id and rev are used in key.
update_counter_test() ->
Reply = {ok, #doc{id = <<"id">>, revs = <<"rev">>,
Expand Down

0 comments on commit cbbc6c5

Please sign in to comment.