Permalink
Browse files

Implement new database creation logic

The new logic decouples the creation of the shard files on disk from
the creation of the mapping document in the shard database.  Most
errors encountered during the creation of files on disk are ignored,
though we do bubble up 'file_exists' errors to the client.  Conflicts
encountered in the shard mapping database result in an immediate
failure.  A matching mem3 topic branch reports those conflicts when the
body of the revision on disk is different than the one we're trying to
save.  We report an 'ok' when every node reports that the desired
document body is saved to disk, an 'accepted' when a majority do so, and
an {'error', 'internal_server_error'} otherwise.  If any file already
exists on disk we'll send {'error', 'file_exists'} instead of 'ok' or
'accepted'.

This patch also incorporates the use of rexi_monitor to prevent database
creations from hanging when a node is down.

Thanks Bob Dionne for help with the implementation, testing, and review.
  • Loading branch information...
kocolosk committed Sep 22, 2011
1 parent cbbc6c5 commit 4ae36d2b6d5925a62765ed8e07b7c488d93984af
Showing with 87 additions and 82 deletions.
  1. +87 −82 src/fabric_db_create.erl
View
@@ -26,24 +26,14 @@
go(DbName, Options) ->
case validate_dbname(DbName, Options) of
ok ->
{MegaSecs, Secs, _} = now(),
Suffix = "." ++ integer_to_list(MegaSecs*1000000 + Secs),
Shards = mem3:choose_shards(DbName, [{shard_suffix,Suffix} | Options]),
case mem3_util:open_db_doc(DbName) of
{ok, Doc} ->
% the DB already exists, and may have a different Suffix
ok;
{not_found, _} ->
Doc = make_document(Shards, Suffix)
end,
Workers = fabric_util:submit_jobs(Shards, create_db, []),
Acc0 = fabric_dict:init(Workers, nil),
X = fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0),
case update_shard_db(Doc) of
{ok, true} ->
case X of {ok, _} -> ok; Else -> Else end;
{ok, false} ->
{error, internal_server_error}
{Shards, Doc} = generate_shard_map(DbName, Options),
case {create_shard_files(Shards), create_shard_db_doc(Doc)} of
{ok, {ok, Status}} ->
Status;
{file_exists, {ok, _}} ->
{error, file_exists};
{_, Error} ->
Error
end;
Error ->
Error
@@ -66,30 +56,91 @@ validate_dbname(DbName, Options) ->
end
end.
handle_message(Msg, Shard, Counters) ->
C1 = fabric_dict:store(Shard, Msg, Counters),
case fabric_dict:any(nil, C1) of
true ->
{ok, C1};
false ->
final_answer(C1)
generate_shard_map(DbName, Options) ->
{MegaSecs, Secs, _} = now(),
Suffix = "." ++ integer_to_list(MegaSecs*1000000 + Secs),
Shards = mem3:choose_shards(DbName, [{shard_suffix,Suffix} | Options]),
case mem3_util:open_db_doc(DbName) of
{ok, Doc} ->
% the DB already exists, and may have a different Suffix
ok;
{not_found, _} ->
Doc = make_document(Shards, Suffix)
end,
{Shards, Doc}.
create_shard_files(Shards) ->
Workers = fabric_util:submit_jobs(Shards, create_db, []),
RexiMon = fabric_util:create_monitors(Shards),
try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Workers) of
{error, file_exists} ->
file_exists;
_ ->
ok
after
rexi_monitor:stop(RexiMon)
end.
update_shard_db(Doc) ->
handle_message(file_exists, _, _) ->
{error, file_exists};
handle_message({rexi_DOWN, _, {_, Node}, _}, _, Workers) ->
case lists:filter(fun(S, _) -> S#shard.node =/= Node end, Workers) of
[] ->
{stop, ok};
RemainingWorkers ->
{ok, RemainingWorkers}
end;
handle_message(_, Worker, Workers) ->
case lists:delete(Worker, Workers) of
[] ->
{stop, ok};
RemainingWorkers ->
{ok, RemainingWorkers}
end.
create_shard_db_doc(Doc) ->
Shards = [#shard{node=N} || N <- mem3:nodes()],
RexiMon = fabric_util:create_monitors(Shards),
Workers = fabric_util:submit_jobs(Shards, create_shard_db_doc, [Doc]),
Acc0 = fabric_dict:init(Workers, nil),
fabric_util:recv(Workers, #shard.ref, fun handle_db_update/3, Acc0).
handle_db_update({ok, _}, Worker, Counters) ->
handle_db_update(ok, Worker, Counters);
handle_db_update(Msg, Worker, Counters) ->
C1 = fabric_dict:store(Worker, Msg, Counters),
case fabric_dict:any(nil, C1) of
Acc0 = {length(Shards), fabric_dict:init(Workers, nil)},
try fabric_util:recv(Workers, #shard.ref, fun handle_db_update/3, Acc0) of
{timeout, _} ->
{error, timeout};
Else ->
Else
after
rexi_monitor:stop(RexiMon)
end.
handle_db_update({rexi_DOWN, _, {_, Node}, _}, _Worker, {W, Counters}) ->
New = fabric_dict:filter(fun(S, _) -> S#shard.node =/= Node end, Counters),
maybe_stop(W, New);
handle_db_update({rexi_EXIT, _Reason}, Worker, {W, Counters}) ->
maybe_stop(W, fabric_dict:erase(Worker, Counters));
handle_db_update(conflict, _, _) ->
% just fail when we get any conflicts
{error, conflict};
handle_db_update(Msg, Worker, {W, Counters}) ->
maybe_stop(W, fabric_dict:store(Worker, Msg, Counters)).
maybe_stop(W, Counters) ->
case fabric_dict:any(nil, Counters) of
true ->
{ok, C1};
{ok, {W, Counters}};
false ->
{stop, fabric_dict:any(ok, C1)}
case lists:sum([1 || {_, ok} <- Counters]) of
W ->
{stop, ok};
NumOk when NumOk >= (W div 2 + 1) ->
{stop, accepted};
_ ->
{error, internal_server_error}
end
end.
make_document([#shard{dbname=DbName}|_] = Shards, Suffix) ->
@@ -108,49 +159,3 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix) ->
{<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
]}}.
final_answer(Counters) ->
Successes = [X || {_, M} = X <- Counters, M == ok orelse M == file_exists],
case fabric_view:is_progress_possible(Successes) of
true ->
case lists:keymember(file_exists, 2, Successes) of
true ->
{error, file_exists};
false ->
{stop, ok}
end;
false ->
{error, internal_server_error}
end.
db_create_ok_test() ->
Shards = mem3_util:create_partition_map("foo",3,12,["node1","node2","node3"]),
Acc0 = fabric_dict:init(Shards, nil),
Result = lists:foldl(fun(Shard,{Acc,_}) ->
case handle_message(ok,Shard,Acc) of
{ok, NewAcc} ->
{NewAcc,true};
{stop, ok} -> {Acc,true};
{error, _} -> {Acc, false}
end end, {Acc0, true}, Shards),
?assertEqual(element(2,Result), true).
db_create_file_exists_test() ->
Shards = mem3_util:create_partition_map("foo",3,12,["node1","node2","node3","node4","node5"]),
BadNo = random:uniform(length(Shards)),
Acc0 = fabric_dict:init(Shards, nil),
Result = lists:foldl(
fun(Shard,{Acc,Iter,Bool}) ->
MessResult = case Iter of
BadNo ->
handle_message(file_exists,Shard,Acc);
_ ->
handle_message(ok,Shard,Acc)
end,
case MessResult of
{ok, NewAcc} ->
{NewAcc, Iter+1, Bool};
{stop, ok} -> {Acc, Iter+1, Bool};
{error, _} -> {Acc, Iter+1, false}
end
end,{Acc0, 1, true}, Shards),
?assertEqual(element(3,Result),false).

0 comments on commit 4ae36d2

Please sign in to comment.