Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
backport zone allocation feature to dbcore stable.
  • Loading branch information
Robert Newson committed Jul 28, 2011
1 parent 7a2b66f commit e45b8381549a9cf22dd602398e22cb832d472f75
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 25 deletions.
@@ -14,7 +14,7 @@

-module(mem3).

-export([start/0, stop/0, restart/0, nodes/0, shards/1, shards/2,
-export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2,
choose_shards/2, n/1, dbname/1, ushards/1]).
-export([compare_nodelists/0, compare_shards/1]).

@@ -66,6 +66,9 @@ n(DbName) ->
nodes() ->
mem3_nodes:get_nodelist().

node_info(Node, Key) ->
mem3_nodes:get_node_info(Node, Key).

-spec shards(DbName::iodata()) -> [#shard{}].
shards(DbName) when is_list(DbName) ->
shards(list_to_binary(DbName));
@@ -133,14 +136,23 @@ choose_shards(DbName, Options) ->
catch error:E when E==database_does_not_exist; E==badarg ->
Nodes = mem3:nodes(),
NodeCount = length(Nodes),
Suffix = couch_util:get_value(shard_suffix, Options, ""),
Zones = zones(Nodes),
ZoneCount = length(Zones),
N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount),
Q = mem3_util:to_integer(couch_util:get_value(q, Options,
couch_config:get("cluster", "q", "8"))),
% rotate to a random entry in the nodelist for even distribution
{A, B} = lists:split(crypto:rand_uniform(1,length(Nodes)+1), Nodes),
RotatedNodes = B ++ A,
mem3_util:create_partition_map(DbName, N, Q, RotatedNodes, Suffix)
Z = mem3_util:z_val(couch_util:get_value(z, Options), NodeCount, ZoneCount),
Suffix = couch_util:get_value(shard_suffix, Options, ""),
ChosenZones = lists:sublist(shuffle(Zones), Z),
lists:flatmap(
fun({Zone, N1}) ->
Nodes1 = nodes_in_zone(Nodes, Zone),
{A, B} = lists:split(crypto:rand_uniform(1,length(Nodes1)+1), Nodes1),
RotatedNodes = B ++ A,
mem3_util:create_partition_map(DbName, erlang:min(N1,length(Nodes1)),
Q, RotatedNodes, Suffix)
end,
lists:zip(ChosenZones, apportion(N, Z)))
end.

-spec dbname(#shard{} | iodata()) -> binary().
@@ -154,3 +166,35 @@ dbname(DbName) when is_binary(DbName) ->
DbName;
dbname(_) ->
erlang:error(badarg).


zones(Nodes) ->
lists:usort([mem3:node_info(Node, <<"zone">>) || Node <- Nodes]).

nodes_in_zone(Nodes, Zone) ->
[Node || Node <- Nodes, Zone == mem3:node_info(Node, <<"zone">>)].

shuffle(List) ->
%% Determine the log n portion then randomize the list.
randomize(round(math:log(length(List)) + 0.5), List).

randomize(1, List) ->
randomize(List);
randomize(T, List) ->
lists:foldl(fun(_E, Acc) -> randomize(Acc) end,
randomize(List), lists:seq(1, (T - 1))).

randomize(List) ->
D = lists:map(fun(A) -> {random:uniform(), A} end, List),
{_, D1} = lists:unzip(lists:keysort(1, D)),
D1.

apportion(Shares, Ways) ->
apportion(Shares, lists:duplicate(Ways, 0), Shares).

apportion(_Shares, Acc, 0) ->
Acc;
apportion(Shares, Acc, Remaining) ->
N = Remaining rem length(Acc),
[H|T] = lists:nthtail(N, Acc),
apportion(Shares, lists:sublist(Acc, N) ++ [H+1|T], Remaining - 1).
@@ -17,7 +17,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).

-export([start_link/0, get_nodelist/0]).
-export([start_link/0, get_nodelist/0, get_node_info/2]).

-include("mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
@@ -30,19 +30,29 @@ start_link() ->
get_nodelist() ->
gen_server:call(?MODULE, get_nodelist).

get_node_info(Node, Key) ->
gen_server:call(?MODULE, {get_node_info, Node, Key}).

init([]) ->
{Nodes, UpdateSeq} = initialize_nodelist(),
{Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end),
{ok, #state{changes_pid = Pid, update_seq = UpdateSeq, nodes = Nodes}}.

handle_call(get_nodelist, _From, State) ->
{reply, State#state.nodes, State};
handle_call({add_node, Node}, _From, #state{nodes=Nodes} = State) ->
{reply, lists:sort(dict:fetch_keys(State#state.nodes)), State};
handle_call({get_node_info, Node, Key}, _From, State) ->
case dict:find(Node, State#state.nodes) of
{ok, NodeInfo} ->
{reply, couch_util:get_value(Key, NodeInfo), State};
error ->
{reply, error, State}
end;
handle_call({add_node, Node, NodeInfo}, _From, #state{nodes=Nodes} = State) ->
gen_event:notify(mem3_events, {add_node, Node}),
{reply, ok, State#state{nodes = lists:umerge([Node], Nodes)}};
{reply, ok, State#state{nodes = dict:store(Node, NodeInfo, Nodes)}};
handle_call({remove_node, Node}, _From, #state{nodes=Nodes} = State) ->
gen_event:notify(mem3_events, {remove_node, Node}),
{reply, ok, State#state{nodes = lists:delete(Node, Nodes)}};
{reply, ok, State#state{nodes = dict:erase(Node, Nodes)}};
handle_call(_Call, _From, State) ->
{noreply, State}.

@@ -72,25 +82,27 @@ code_change(_OldVsn, State, _Extra) ->
initialize_nodelist() ->
DbName = couch_config:get("mem3", "node_db", "nodes"),
{ok, Db} = mem3_util:ensure_exists(DbName),
{ok, _, Nodes0} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, [], []),
{ok, _, {_, Nodes0}} = couch_btree:fold(Db#db.id_tree, fun first_fold/3,
{Db, dict:new()}, []),
% add self if not already present
case lists:member(node(), Nodes0) of
true ->
case dict:find(node(), Nodes0) of
{ok, _} ->
Nodes = Nodes0;
false ->
error ->
Doc = #doc{id = couch_util:to_binary(node())},
{ok, _} = couch_db:update_doc(Db, Doc, []),
Nodes = [node() | Nodes0]
Nodes = dict:store(node(), [], Nodes0)
end,
couch_db:close(Db),
{lists:sort(Nodes), Db#db.update_seq}.
{Nodes, Db#db.update_seq}.

first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) ->
{ok, Acc};
first_fold(#full_doc_info{deleted=true}, _, Acc) ->
{ok, Acc};
first_fold(#full_doc_info{id=Id}, _, Acc) ->
{ok, [mem3_util:to_atom(Id) | Acc]}.
first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, {_Db, Dict}) ->
{ok, Dict};
first_fold(#full_doc_info{deleted=true}, _, {_Db, Dict}) ->
{ok, Dict};
first_fold(#full_doc_info{id=Id}=DocInfo, _, {Db, Dict}) ->
{ok, #doc{body={Props}}} = couch_db:open_doc(Db, DocInfo),
{ok, {Db, dict:store(mem3_util:to_atom(Id), Props, Dict)}}.

listen_for_changes(Since) ->
DbName = couch_config:get("mem3", "node_db", "nodes"),
@@ -113,7 +125,8 @@ changes_callback({change, {Change}, _}, _) ->
case Node of <<"_design/", _/binary>> -> ok; _ ->
case couch_util:get_value(deleted, Change, false) of
false ->
gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node)});
{Props} = couch_util:get_value(doc, Change),
gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node), Props});
true ->
gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)})
end
@@ -15,7 +15,7 @@
-module(mem3_util).

-export([hash/1, name_shard/2, create_partition_map/5, build_shards/2,
n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
n_val/2, z_val/3, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
load_shards_from_disk/1, load_shards_from_disk/2, shard_info/1,
ensure_exists/1, open_db_doc/1]).

@@ -153,6 +153,19 @@ n_val(N, _) when N < 1 ->
n_val(N, _) ->
N.

z_val(undefined, NodeCount, ZoneCount) ->
z_val(couch_config:get("cluster", "z", "3"), NodeCount, ZoneCount);
z_val(N, NodeCount, ZoneCount) when is_list(N) ->
z_val(list_to_integer(N), NodeCount, ZoneCount);
z_val(N, NodeCount, ZoneCount) when N > NodeCount orelse N > ZoneCount ->
twig:log(error, "Request to create Z=~p DB but only ~p nodes(s) and ~p zone(s)",
[N, NodeCount, ZoneCount]),
erlang:min(NodeCount, ZoneCount);
z_val(N, _, _) when N < 1 ->
1;
z_val(N, _, _) ->
N.

load_shards_from_disk(DbName) when is_binary(DbName) ->
X = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
{ok, Db} = couch_db:open(X, []),

0 comments on commit e45b838

Please sign in to comment.