Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Reimplement mem3:ushards to honor all 5 properties
  • Loading branch information
Robert Newson committed Apr 19, 2012
1 parent 7286ecd commit babbfcc468cb914fab2a0ebea3ce8c3da375b4ab
Showing 1 changed file with 33 additions and 37 deletions.
@@ -18,7 +18,7 @@
choose_shards/2, n/1, dbname/1, ushards/1]).
-export([sync_security/0, sync_security/1]).
-export([compare_nodelists/0, compare_shards/1]).
-export([quorum/1]).
-export([quorum/1, group_by_proximity/1]).

-include("mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
@@ -122,50 +122,21 @@ shards(DbName, DocId) ->
mem3_util:load_shards_from_disk(DbName, DocId)
end.

-spec ushards(DbName::iodata()) -> [#shard{}].
ushards(DbName) ->
Shards = mem3:shards(DbName),
Nodes = rotate_nodes(DbName, live_nodes()),
Buckets = bucket_by_range(Shards),
choose_ushards(Buckets, Nodes).
{L,S,D} = group_by_proximity(live_shards(DbName)),
% Prefer shards in the local zone over shards in a different zone,
% but sort each zone separately to ensure a consistent choice between
% nodes in the same zone.
Shards = choose_ushards(L ++ S) ++ choose_ushards(D),
lists:ukeysort(#shard.range, Shards).

sync_security() ->
mem3_sync_security:go().

sync_security(Db) ->
mem3_sync_security:go(dbname(Db)).

rotate_nodes(DbName, Nodes) ->
{H, T} = lists:split(erlang:crc32(DbName) rem length(Nodes), Nodes),
T ++ H.

live_nodes() ->
lists:sort([node()|erlang:nodes()]).

bucket_by_range(Shards) ->
Buckets0 = lists:foldl(fun(#shard{range=Range}=Shard, Dict) ->
orddict:append(Range, Shard, Dict) end, orddict:new(), Shards),
{_, Buckets} = lists:unzip(Buckets0),
Buckets.

choose_ushards(Buckets, Nodes) ->
choose_ushards(Buckets, Nodes, []).

choose_ushards([], _, Acc) ->
lists:reverse(Acc);
choose_ushards([Bucket|RestBuckets], Nodes, Acc) ->
#shard{node=Node} = Shard = first_match(Bucket, Bucket, Nodes),
choose_ushards(RestBuckets, lists:delete(Node, Nodes) ++ [Node],
[Shard | Acc]).

first_match([], [#shard{range=Range}|_], []) ->
throw({range_not_available, Range});
first_match([#shard{node=Node}=Shard|_], _, [Node|_]) ->
Shard;
first_match([], Shards, [_|RestNodes]) ->
first_match(Shards, Shards, RestNodes);
first_match([_|RestShards], Shards, Nodes) ->
first_match(RestShards, Shards, Nodes).

-spec choose_shards(DbName::iodata(), Options::list()) -> [#shard{}].
choose_shards(DbName, Options) when is_list(DbName) ->
choose_shards(list_to_binary(DbName), Options);
@@ -242,6 +213,31 @@ apportion(Shares, Acc, Remaining) ->
[H|T] = lists:nthtail(N, Acc),
apportion(Shares, lists:sublist(Acc, N) ++ [H+1|T], Remaining - 1).

live_shards(DbName) ->
Nodes = [node()|erlang:nodes()],
[S || #shard{node=Node} = S <- shards(DbName), lists:member(Node, Nodes)].

group_by_proximity(Shards) ->
{Local, Remote} = lists:partition(fun(S) -> S#shard.node =:= node() end,
Shards),
LocalZone = mem3:node_info(node(), <<"zone">>),
Fun = fun(S) -> mem3:node_info(S#shard.node, <<"zone">>) =:= LocalZone end,
{SameZone, DifferentZone} = lists:partition(Fun, Remote),
{Local, SameZone, DifferentZone}.

choose_ushards(Shards) ->
Groups = group_by_range(lists:sort(Shards)),
Fun = fun(Group, {N, Acc}) ->
{N+1, [lists:nth(1 + N rem length(Group), Group) | Acc]} end,
{_, Result} = lists:foldl(Fun, {0, []}, Groups),
Result.

group_by_range(Shards) ->
Groups0 = lists:foldl(fun(#shard{range=Range}=Shard, Dict) ->
orddict:append(Range, Shard, Dict) end, orddict:new(), Shards),
{_, Groups} = lists:unzip(Groups0),
Groups.

% quorum functions

quorum(#db{name=DbName}) ->

0 comments on commit babbfcc

Please sign in to comment.