0
-define(VIRTUALNODES, 100).
0
--export([start_link/1, join_node/2, nodes_for_
key/1, partitions/0, nodes/0, state/0, state/1, old_partitions/0, partitions_for_node/2, fire_gossip/1, partition_for_key/1, stop/0, range/1]).
0
+-export([start_link/1, join_node/2, nodes_for_
partition/1, nodes_for_key/1, partitions/0, nodes/0, state/0, state/1, old_partitions/0, partitions_for_node/2, fire_gossip/1, partition_for_key/1, stop/0, range/1]).
0
%% gen_server callbacks
0
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
0
@@ -45,6 +45,9 @@ start_link(Config) ->
0
join_node(JoinTo, Me) ->
0
gen_server:call({membership, JoinTo}, {join_node, Me}).
0
+nodes_for_partition(Partition) ->
0
+ gen_server:call(membership, {nodes_for_partition, Partition}).
0
gen_server:call(membership, {nodes_for_key, Key}).
0
@@ -90,6 +93,7 @@ fire_gossip({A1, A2, A3}) ->
0
membership:state(ModState#membership{config=Config}),
0
reload_storage_servers(State, ModState),
0
+ reload_sync_servers(State, ModState),
0
timer:apply_after(random:uniform(5000) + 5000, membership, fire_gossip, [random:seed()]).
0
@@ -122,6 +126,7 @@ init(ConfigIn) ->
0
reload_storage_servers(empty, State),
0
+ reload_sync_servers(empty, State),
0
timer:apply_after(random:uniform(1000) + 1000, membership, fire_gossip, [random:seed()]),
0
{ok, State#membership{config=configuration:get_config()}}.
0
@@ -141,6 +146,7 @@ handle_call({join_node, Node}, {_, _From}, State = #membership{config=Config}) -
0
error_logger:info_msg("~p is joining the cluster.~n", [node(_From)]),
0
NewState = int_join_node(Node, State),
0
reload_storage_servers(State, NewState),
0
+ reload_sync_servers(State, NewState),
0
{reply, {ok, NewState}, NewState#membership{config=Config}};
0
@@ -149,6 +155,7 @@ handle_call({share, NewState}, _From, State = #membership{config=Config}) ->
0
case vector_clock:compare(State#membership.version, NewState#membership.version) of
0
reload_storage_servers(State, NewState),
0
+ reload_sync_servers(State, NewState),
0
{reply, NewState, NewState};
0
greater -> {reply, State, State};
0
@@ -156,6 +163,7 @@ handle_call({share, NewState}, _From, State = #membership{config=Config}) ->
0
Merged = merge_states(NewState, State),
0
reload_storage_servers(State, Merged),
0
+ reload_sync_servers(State, Merged),
0
{reply, Merged, Merged#membership{config=State#membership.config}}
0
@@ -179,6 +187,9 @@ handle_call(partitions, _From, State) -> {reply, State#membership.partitions, St
0
handle_call({range, Partition}, _From, State) ->
0
{reply, int_range(Partition, State#membership.config), State};
0
+handle_call({nodes_for_partition, Partition}, _From, State) ->
0
+ {reply, int_nodes_for_partition(Partition, State), State};
0
handle_call({nodes_for_key, Key}, _From, State) ->
0
{reply, int_nodes_for_key(Key, State), State};
0
@@ -365,7 +376,11 @@ reload_sync_servers(OldParts, NewParts, Config) ->
0
lists:foreach(fun(Part) ->
0
Name = list_to_atom(lists:concat([sync_, Part])),
0
+ Spec = {Name, {sync_server, start_link, [Name, Part]}, permanent, 1000, worker, [sync_server]},
0
+ case supervisor:start_child(sync_server_sup, Spec) of
0
+ already_present -> supervisor:restart_child(sync_server_sup, Name);
0
reload_storage_servers(empty, NewState) ->
Comments
No one has commented yet.