Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Merge pull request #28 from cloudant/13529-reconfigure-ring-on-nodeup
Reconfigure ring on nodeup events

BugzID: 13529
  • Loading branch information
kocolosk committed May 3, 2012
2 parents 42f6b11 + cab6cb0 commit f25f60e36a7f4aa93616be27ba9400861df01fd5
Showing 1 changed file with 30 additions and 9 deletions.
@@ -136,7 +136,7 @@ handle_info({'EXIT', Active, Reason}, State) ->
twig:log(warn, "~p ~s -> ~p ~p", [?MODULE, OldDbName, OldNode,
Reason]),
case Reason of {pending_changes, Count} ->
add_to_queue(State, Job#job{pid = nil, count = Count});
maybe_resubmit(State, Job#job{pid = nil, count = Count});
_ ->
try mem3:shards(mem3:dbname(Job#job.name)) of _ ->
timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}])
@@ -163,6 +163,19 @@ code_change(_, #state{waiting = WaitingList} = State, _) when is_list(WaitingLis
code_change(_, State, _) ->
{ok, State}.

maybe_resubmit(State, #job{name=DbName, node=Node} = Job) ->
case lists:member(DbName, local_dbs()) of
true ->
case find_next_node() of
Node ->
add_to_queue(State, Job);
_ ->
State % don't resubmit b/c we have a new replication target
end;
false ->
add_to_queue(State, Job)
end.

handle_replication_exit(State, Pid) ->
#state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State,
Active1 = lists:keydelete(Pid, #job.pid, Active),
@@ -209,11 +222,8 @@ add_to_queue(State, #job{name=DbName, node=Node, pid=From} = Job) ->
end.

sync_nodes_and_dbs() ->
Db1 = couch_config:get("mem3", "node_db", "nodes"),
Db2 = couch_config:get("mem3", "shard_db", "dbs"),
Db3 = couch_config:get("couch_httpd_auth", "authentication_db", "_users"),
Node = find_next_node(),
[push(?l2b(Db), Node) || Db <- [Db1, Db2, Db3]].
[push(Db, Node) || Db <- local_dbs()].

initial_sync() ->
[net_kernel:connect_node(Node) || Node <- mem3:nodes()],
@@ -246,10 +256,9 @@ sync_push(ShardName, N) ->
gen_server:call(mem3_sync, {push, #job{name=ShardName, node=N}}, infinity).

start_update_notifier() ->
Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
Db3 = ?l2b(couch_config:get("couch_httpd_auth", "authentication_db",
"_users")),
Db1 = nodes_db(),
Db2 = shards_db(),
Db3 = users_db(),
couch_db_update_notifier:start_link(fun
({updated, Db}) when Db == Db1 ->
Nodes = mem3:nodes(),
@@ -303,3 +312,15 @@ is_running(DbName, Node, ActiveList) ->

remove_entries(Dict, Entries) ->
lists:foldl(fun(Entry, D) -> dict:erase(Entry, D) end, Dict, Entries).

local_dbs() ->
[nodes_db(), shards_db(), users_db()].

nodes_db() ->
?l2b(couch_config:get("mem3", "node_db", "nodes")).

shards_db() ->
?l2b(couch_config:get("mem3", "shard_db", "dbs")).

users_db() ->
?l2b(couch_config:get("couch_httpd_auth", "authentication_db", "_users")).

0 comments on commit f25f60e

Please sign in to comment.