Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Refactor mem3_sync events to dedicated module
COUCHDB-2984
  • Loading branch information
b20n committed Apr 10, 2016
1 parent 699308f commit d3ce2273c0c1eba5b4107e7bb0a83aaa1736cc6a
Showing 3 changed files with 95 additions and 54 deletions.
@@ -23,7 +23,8 @@ init(_Args) ->
child(mem3_nodes),
child(mem3_sync_nodes), % Order important?
child(mem3_sync),
child(mem3_shards)
child(mem3_shards),
child(mem3_sync_event_listener)
],
{ok, {{one_for_one,10,1}, couch_epi:register_service(mem3_epi, Children)}}.

@@ -17,28 +17,20 @@
code_change/3]).

-export([start_link/0, get_active/0, get_queue/0, push/1, push/2,
remove_node/1, initial_sync/1, get_backlog/0]).

-export([handle_db_event/3]).
remove_node/1, remove_shard/1, initial_sync/1, get_backlog/0, nodes_db/0,
shards_db/0, users_db/0, find_next_node/0]).

-import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]).

-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").

-record(event_listener, {
nodes,
shards,
users
}).

-record(state, {
active = [],
count = 0,
limit,
dict = dict:new(),
waiting = queue:new(),
event_listener
waiting = queue:new()
}).

-record(job, {name, node, count=nil, pid=nil}).
@@ -70,13 +62,15 @@ push(_) ->
remove_node(Node) ->
gen_server:cast(?MODULE, {remove_node, Node}).

remove_shard(Shard) ->
gen_server:cast(?MODULE, {remove_shard, Shard}).

init([]) ->
process_flag(trap_exit, true),
Concurrency = config:get("mem3", "sync_concurrency", "10"),
gen_event:add_handler(mem3_events, mem3_sync_event, []),
{ok, Pid} = start_event_listener(),
initial_sync(),
{ok, #state{limit = list_to_integer(Concurrency), event_listener=Pid}}.
{ok, #state{limit = list_to_integer(Concurrency)}}.

handle_call({push, Job}, From, State) ->
handle_cast({push, Job#job{pid = From}}, State);
@@ -125,10 +119,6 @@ handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) ->
S =:= Shard],
{noreply, State#state{dict = Dict, waiting = from_list(Alive)}}.

handle_info({'EXIT', Pid, _}, #state{event_listener=Pid} = State) ->
{ok, NewPid} = start_event_listener(),
{noreply, State#state{event_listener=NewPid}};

handle_info({'EXIT', Active, normal}, State) ->
handle_replication_exit(State, Active);

@@ -271,42 +261,7 @@ submit_replication_tasks(LocalNode, Live, Shards) ->
sync_push(ShardName, N) ->
gen_server:call(mem3_sync, {push, #job{name=ShardName, node=N}}, infinity).

start_event_listener() ->
State = #event_listener{
nodes = nodes_db(),
shards = shards_db(),
users = users_db()
},
couch_event:link_listener(?MODULE, handle_db_event, State, [all_dbs]).

handle_db_event(NodesDb, updated, #event_listener{nodes = NodesDb} = St) ->
Nodes = mem3:nodes(),
Live = nodes(),
[?MODULE:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)],
{ok, St};
handle_db_event(ShardsDb, updated, #event_listener{shards = ShardsDb} = St) ->
?MODULE:push(ShardsDb, find_next_node()),
{ok, St};
handle_db_event(UsersDb, updated, #event_listener{users = UsersDb} = St) ->
?MODULE:push(UsersDb, find_next_node()),
{ok, St};
handle_db_event(<<"shards/", _/binary>> = ShardName, updated, St) ->
try mem3:shards(mem3:dbname(ShardName)) of
Shards ->
Targets = [S || #shard{node=N, name=Name} = S <- Shards,
N =/= node(), Name =:= ShardName],
Live = nodes(),
[?MODULE:push(ShardName,N) || #shard{node=N} <- Targets,
lists:member(N, Live)]
catch error:database_does_not_exist ->
ok
end,
{ok, St};
handle_db_event(<<"shards/", _:18/binary, _/binary>> =ShardName, deleted, St) ->
gen_server:cast(?MODULE, {remove_shard, ShardName}),
{ok, St};
handle_db_event(_DbName, _Event, St) ->
{ok, St}.


find_next_node() ->
LiveNodes = [node()|nodes()],
@@ -0,0 +1,85 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(mem3_sync_event_listener).
-behavior(couch_event_listener).

-export([
start_link/0
]).

-export([
init/1,
terminate/2,
handle_event/3,
handle_cast/2,
handle_info/2
]).

-include_lib("mem3/include/mem3.hrl").

-record(state, {
nodes,
shards,
users
}).

start_link() ->
couch_event_listener:start_link(?MODULE, [], [all_dbs]).

init(_) ->
State = #state{
nodes = mem3_sync:nodes_db(),
shards = mem3_sync:shards_db(),
users = mem3_sync:users_db()
},
{ok, State}.

terminate(_Reason, _State) ->
ok.

handle_event(NodesDb, updated, #state{nodes = NodesDb} = St) ->
Nodes = mem3:nodes(),
Live = nodes(),
[mem3_sync:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)],
{ok, St};
handle_event(ShardsDb, updated, #state{shards = ShardsDb} = St) ->
mem3_sync:push(ShardsDb, mem3_sync:find_next_node()),
{ok, St};
handle_event(UsersDb, updated, #state{users = UsersDb} = St) ->
mem3_sync:push(UsersDb, mem3_sync:find_next_node()),
{ok, St};
handle_event(<<"shards/", _/binary>> = ShardName, updated, St) ->
try mem3:shards(mem3:dbname(ShardName)) of
Shards ->
Targets = [S || #shard{node=N, name=Name} = S <- Shards,
N =/= node(), Name =:= ShardName],
Live = nodes(),
[mem3_sync:push(ShardName,N) || #shard{node=N} <- Targets,
lists:member(N, Live)]
catch error:database_does_not_exist ->
ok
end,
{ok, St};
handle_event(<<"shards/", _:18/binary, _/binary>> = ShardName, deleted, St) ->
mem3_sync:remove_shard(ShardName),
{ok, St};
handle_event(_DbName, _Event, St) ->
{ok, St}.

handle_cast(Msg, St) ->
couch_log:notice("unexpected cast to mem3_sync_event_listener: ~p", [Msg]),
{ok, St}.

handle_info(Msg, St) ->
couch_log:notice("unexpected info to mem3_sync_event_listener: ~p", [Msg]),
{ok, St}.

0 comments on commit d3ce227

Please sign in to comment.