From d3ce2273c0c1eba5b4107e7bb0a83aaa1736cc6a Mon Sep 17 00:00:00 2001 From: Benjamin Anderson Date: Sat, 9 Apr 2016 20:55:58 -0700 Subject: [PATCH 1/4] Refactor mem3_sync events to dedicated module COUCHDB-2984 --- src/mem3_sup.erl | 3 +- src/mem3_sync.erl | 61 +++-------------------- src/mem3_sync_event_listener.erl | 85 ++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 54 deletions(-) create mode 100644 src/mem3_sync_event_listener.erl diff --git a/src/mem3_sup.erl b/src/mem3_sup.erl index 2662a7c..80b8ca3 100644 --- a/src/mem3_sup.erl +++ b/src/mem3_sup.erl @@ -23,7 +23,8 @@ init(_Args) -> child(mem3_nodes), child(mem3_sync_nodes), % Order important? child(mem3_sync), - child(mem3_shards) + child(mem3_shards), + child(mem3_sync_event_listener) ], {ok, {{one_for_one,10,1}, couch_epi:register_service(mem3_epi, Children)}}. diff --git a/src/mem3_sync.erl b/src/mem3_sync.erl index 28a8261..88f4ad4 100644 --- a/src/mem3_sync.erl +++ b/src/mem3_sync.erl @@ -17,28 +17,20 @@ code_change/3]). -export([start_link/0, get_active/0, get_queue/0, push/1, push/2, - remove_node/1, initial_sync/1, get_backlog/0]). - --export([handle_db_event/3]). + remove_node/1, remove_shard/1, initial_sync/1, get_backlog/0, nodes_db/0, + shards_db/0, users_db/0, find_next_node/0]). -import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). --record(event_listener, { - nodes, - shards, - users -}). - -record(state, { active = [], count = 0, limit, dict = dict:new(), - waiting = queue:new(), - event_listener + waiting = queue:new() }). -record(job, {name, node, count=nil, pid=nil}). @@ -70,13 +62,15 @@ push(_) -> remove_node(Node) -> gen_server:cast(?MODULE, {remove_node, Node}). +remove_shard(Shard) -> + gen_server:cast(?MODULE, {remove_shard, Shard}). + init([]) -> process_flag(trap_exit, true), Concurrency = config:get("mem3", "sync_concurrency", "10"), gen_event:add_handler(mem3_events, mem3_sync_event, []), - {ok, Pid} = start_event_listener(), initial_sync(), - {ok, #state{limit = list_to_integer(Concurrency), event_listener=Pid}}. + {ok, #state{limit = list_to_integer(Concurrency)}}. handle_call({push, Job}, From, State) -> handle_cast({push, Job#job{pid = From}}, State); @@ -125,10 +119,6 @@ handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) -> S =:= Shard], {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}. -handle_info({'EXIT', Pid, _}, #state{event_listener=Pid} = State) -> - {ok, NewPid} = start_event_listener(), - {noreply, State#state{event_listener=NewPid}}; - handle_info({'EXIT', Active, normal}, State) -> handle_replication_exit(State, Active); @@ -271,42 +261,7 @@ submit_replication_tasks(LocalNode, Live, Shards) -> sync_push(ShardName, N) -> gen_server:call(mem3_sync, {push, #job{name=ShardName, node=N}}, infinity). -start_event_listener() -> - State = #event_listener{ - nodes = nodes_db(), - shards = shards_db(), - users = users_db() - }, - couch_event:link_listener(?MODULE, handle_db_event, State, [all_dbs]). - -handle_db_event(NodesDb, updated, #event_listener{nodes = NodesDb} = St) -> - Nodes = mem3:nodes(), - Live = nodes(), - [?MODULE:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)], - {ok, St}; -handle_db_event(ShardsDb, updated, #event_listener{shards = ShardsDb} = St) -> - ?MODULE:push(ShardsDb, find_next_node()), - {ok, St}; -handle_db_event(UsersDb, updated, #event_listener{users = UsersDb} = St) -> - ?MODULE:push(UsersDb, find_next_node()), - {ok, St}; -handle_db_event(<<"shards/", _/binary>> = ShardName, updated, St) -> - try mem3:shards(mem3:dbname(ShardName)) of - Shards -> - Targets = [S || #shard{node=N, name=Name} = S <- Shards, - N =/= node(), Name =:= ShardName], - Live = nodes(), - [?MODULE:push(ShardName,N) || #shard{node=N} <- Targets, - lists:member(N, Live)] - catch error:database_does_not_exist -> - ok - end, - {ok, St}; -handle_db_event(<<"shards/", _:18/binary, _/binary>> =ShardName, deleted, St) -> - gen_server:cast(?MODULE, {remove_shard, ShardName}), - {ok, St}; -handle_db_event(_DbName, _Event, St) -> - {ok, St}. + find_next_node() -> LiveNodes = [node()|nodes()], diff --git a/src/mem3_sync_event_listener.erl b/src/mem3_sync_event_listener.erl new file mode 100644 index 0000000..7059347 --- /dev/null +++ b/src/mem3_sync_event_listener.erl @@ -0,0 +1,85 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_sync_event_listener). +-behavior(couch_event_listener). + +-export([ + start_link/0 +]). + +-export([ + init/1, + terminate/2, + handle_event/3, + handle_cast/2, + handle_info/2 +]). + +-include_lib("mem3/include/mem3.hrl"). + +-record(state, { + nodes, + shards, + users +}). + +start_link() -> + couch_event_listener:start_link(?MODULE, [], [all_dbs]). + +init(_) -> + State = #state{ + nodes = mem3_sync:nodes_db(), + shards = mem3_sync:shards_db(), + users = mem3_sync:users_db() + }, + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +handle_event(NodesDb, updated, #state{nodes = NodesDb} = St) -> + Nodes = mem3:nodes(), + Live = nodes(), + [mem3_sync:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)], + {ok, St}; +handle_event(ShardsDb, updated, #state{shards = ShardsDb} = St) -> + mem3_sync:push(ShardsDb, mem3_sync:find_next_node()), + {ok, St}; +handle_event(UsersDb, updated, #state{users = UsersDb} = St) -> + mem3_sync:push(UsersDb, mem3_sync:find_next_node()), + {ok, St}; +handle_event(<<"shards/", _/binary>> = ShardName, updated, St) -> + try mem3:shards(mem3:dbname(ShardName)) of + Shards -> + Targets = [S || #shard{node=N, name=Name} = S <- Shards, + N =/= node(), Name =:= ShardName], + Live = nodes(), + [mem3_sync:push(ShardName,N) || #shard{node=N} <- Targets, + lists:member(N, Live)] + catch error:database_does_not_exist -> + ok + end, + {ok, St}; +handle_event(<<"shards/", _:18/binary, _/binary>> = ShardName, deleted, St) -> + mem3_sync:remove_shard(ShardName), + {ok, St}; +handle_event(_DbName, _Event, St) -> + {ok, St}. + +handle_cast(Msg, St) -> + couch_log:notice("unexpected cast to mem3_sync_event_listener: ~p", [Msg]), + {ok, St}. + +handle_info(Msg, St) -> + couch_log:notice("unexpected info to mem3_sync_event_listener: ~p", [Msg]), + {ok, St}. From d5e0a4a19de99b2c6a91c9de8a1bc120664e36d5 Mon Sep 17 00:00:00 2001 From: Benjamin Anderson Date: Sat, 9 Apr 2016 22:44:58 -0700 Subject: [PATCH 2/4] Reduce frequency of mem3_sync:push/2 calls In high-throughput scenarios on databases with large q values the mem3_sync event listener becomes overloaded with messages due to the poor performance of the shard selection logic. It's not strictly necessary to sync on every update, but we do need to be careful not to lose updates by keeping history too naively. This patch adds a configurable delay and push frequencyto reduce pressure on the mem3_sync event listener. COUCHDB-2984 --- src/mem3_sync_event_listener.erl | 176 +++++++++++++++++++++++++++---- 1 file changed, 155 insertions(+), 21 deletions(-) diff --git a/src/mem3_sync_event_listener.erl b/src/mem3_sync_event_listener.erl index 7059347..d74c21f 100644 --- a/src/mem3_sync_event_listener.erl +++ b/src/mem3_sync_event_listener.erl @@ -12,6 +12,7 @@ -module(mem3_sync_event_listener). -behavior(couch_event_listener). +-behavior(config_listener). -export([ start_link/0 @@ -25,24 +26,52 @@ handle_info/2 ]). +-export([ + handle_config_change/5, + handle_config_terminate/3 +]). + -include_lib("mem3/include/mem3.hrl"). -record(state, { nodes, shards, - users + users, + delay, + frequency, + last_push, + buckets }). +%% Calling mem3_sync:push/2 on every update has a measurable performance cost, +%% so we'd like to coalesce multiple update messages from couch_event in to a +%% single push call. Doing this while ensuring both correctness (i.e., no lost +%% updates) and an even load profile is somewhat subtle. This implementation +%% groups updated shards in a list of "buckets" (see bucket_shard/2) and +%% guarantees that each shard is in no more than one bucket at a time - i.e., +%% any update messages received before the shard's current bucket has been +%% pushed will be ignored - thereby reducing the frequency with which a single +%% shard will be pushed. mem3_sync:push/2 is called on all shards in the +%% *oldest* bucket roughly every mem3.sync_frequency milliseconds (see +%% maybe_push_shards/1) to even out the load on mem3_sync. + start_link() -> couch_event_listener:start_link(?MODULE, [], [all_dbs]). init(_) -> - State = #state{ + config:listen_for_changes(?MODULE, undefined), + Delay = config:get_integer("mem3", "sync_delay", 5000), + Frequency = config:get_integer("mem3", "sync_frequency", 500), + Buckets = lists:duplicate(Delay div Frequency + 1, sets:new()), + St = #state{ nodes = mem3_sync:nodes_db(), shards = mem3_sync:shards_db(), - users = mem3_sync:users_db() + users = mem3_sync:users_db(), + delay = Delay, + frequency = Frequency, + buckets = Buckets }, - {ok, State}. + {ok, St}. terminate(_Reason, _State) -> ok. @@ -51,35 +80,140 @@ handle_event(NodesDb, updated, #state{nodes = NodesDb} = St) -> Nodes = mem3:nodes(), Live = nodes(), [mem3_sync:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)], - {ok, St}; + maybe_push_shards(St); handle_event(ShardsDb, updated, #state{shards = ShardsDb} = St) -> mem3_sync:push(ShardsDb, mem3_sync:find_next_node()), - {ok, St}; + maybe_push_shards(St); handle_event(UsersDb, updated, #state{users = UsersDb} = St) -> mem3_sync:push(UsersDb, mem3_sync:find_next_node()), - {ok, St}; + maybe_push_shards(St); handle_event(<<"shards/", _/binary>> = ShardName, updated, St) -> - try mem3:shards(mem3:dbname(ShardName)) of - Shards -> - Targets = [S || #shard{node=N, name=Name} = S <- Shards, - N =/= node(), Name =:= ShardName], - Live = nodes(), - [mem3_sync:push(ShardName,N) || #shard{node=N} <- Targets, - lists:member(N, Live)] - catch error:database_does_not_exist -> - ok - end, - {ok, St}; + Buckets = bucket_shard(ShardName, St#state.buckets), + maybe_push_shards(St#state{buckets=Buckets}); handle_event(<<"shards/", _:18/binary, _/binary>> = ShardName, deleted, St) -> mem3_sync:remove_shard(ShardName), - {ok, St}; + maybe_push_shards(St); handle_event(_DbName, _Event, St) -> - {ok, St}. + maybe_push_shards(St). +handle_cast({set_frequency, Frequency}, St) -> + #state{delay = Delay, buckets = Buckets0} = St, + Buckets1 = rebucket_shards(Delay, Frequency, Buckets0), + maybe_push_shards(St#state{frequency=Frequency, buckets=Buckets1}); +handle_cast({set_delay, Delay}, St) -> + #state{frequency = Frequency, buckets = Buckets0} = St, + Buckets1 = rebucket_shards(Delay, Frequency, Buckets0), + maybe_push_shards(St#state{delay=Delay, buckets=Buckets1}); handle_cast(Msg, St) -> couch_log:notice("unexpected cast to mem3_sync_event_listener: ~p", [Msg]), - {ok, St}. + maybe_push_shards(St). +handle_info(timeout, St) -> + maybe_push_shards(St); handle_info(Msg, St) -> couch_log:notice("unexpected info to mem3_sync_event_listener: ~p", [Msg]), + maybe_push_shards(St). + +handle_config_change("mem3", "sync_delay", Delay0, _, St) -> + try list_to_integer(Delay0) of + Delay1 -> + couch_event_listener:cast( + ?MODULE, + {set_delay, Delay1} + ) + catch error:badarg -> + couch_log:warning( + "ignoring bad value for mem3.sync_delay: ~p", + [Delay0] + ) + end, + {ok, St}; +handle_config_change("mem3", "sync_frequency", Frequency0, _, St) -> + try list_to_integer(Frequency0) of + Frequency1 -> + couch_event_listener:cast( + ?MODULE, + {set_frequency, Frequency1} + ) + catch error:badarg -> + couch_log:warning( + "ignoring bad value for mem3.sync_frequency: ~p", + [Frequency0] + ) + end, + {ok, St}; +handle_config_change(_, _, _, _, St) -> {ok, St}. + +handle_config_terminate(_, stop, _) -> ok; +handle_config_terminate(_Server, _Reason, St) -> + Fun = fun() -> + timer:sleep(5000), + config:listen_for_changes(?MODULE, St) + end, + spawn(Fun). + +bucket_shard(ShardName, [B|Bs]=Buckets0) -> + case waiting(ShardName, Buckets0) of + true -> Buckets0; + false -> [sets:add_element(ShardName, B)|Bs] + end. + +waiting(_, []) -> + false; +waiting(ShardName, [B|Bs]) -> + case sets:is_element(ShardName, B) of + true -> true; + false -> waiting(ShardName, Bs) + end. + +rebucket_shards(Frequency, Delay, Buckets0) -> + case (Delay div Frequency + 1) - length(Buckets0) of + 0 -> + Buckets0; + N when N < 0 -> + %% Reduce the number of buckets by merging the last N + 1 together + {ToMerge, [B|Buckets1]} = lists:split(abs(N), Buckets0), + [sets:union([B|ToMerge])|Buckets1]; + M -> + %% Extend the number of buckets by M + lists:duplicate(M, sets:new()) ++ Buckets0 + end. + +%% To ensure that mem3_sync:push/2 is indeed called with roughly the frequency +%% specified by #state.frequency, every message callback must return via a call +%% to maybe_push_shards/1 rather than directly. All timing coordination - i.e., +%% calling mem3_sync:push/2 or setting a proper timeout to ensure that pending +%% messages aren't dropped in case no further messages arrive - is handled here. +maybe_push_shards(#state{last_push=undefined} = St) -> + {ok, St#state{last_push=os:timestamp()}, St#state.frequency}; +maybe_push_shards(St) -> + #state{frequency=Frequency, last_push=LastPush, buckets=Buckets0} = St, + Now = os:timestamp(), + Delta = timer:now_diff(Now, LastPush) div 1000, + case Delta > Frequency of + true -> + {Buckets1, [ToPush]} = lists:split(length(Buckets0) - 1, Buckets0), + Buckets2 = [sets:new()|Buckets1], + %% There's no sets:map/2! + sets:fold( + fun(ShardName, _) -> push_shard(ShardName) end, + undefined, + ToPush + ), + {ok, St#state{last_push=Now, buckets=Buckets2}, Frequency}; + false -> + {ok, St, Frequency - Delta} + end. + +push_shard(ShardName) -> + try mem3:shards(mem3:dbname(ShardName)) of + Shards -> + Targets = [S || #shard{node=N, name=Name} = S <- Shards, + N =/= node(), Name =:= ShardName], + Live = nodes(), + [mem3_sync:push(ShardName,N) || #shard{node=N} <- Targets, + lists:member(N, Live)] + catch error:database_does_not_exist -> + ok + end. From 130efcd6b0b6f9fa6403e131586dfbf003643074 Mon Sep 17 00:00:00 2001 From: Benjamin Anderson Date: Sat, 9 Apr 2016 23:08:39 -0700 Subject: [PATCH 3/4] Use ets:select/2 to retrieve shards by name The result of mem3_shards:for_db/1 on databases with high q values can be very large, resulting in suboptimal performance for high-volume callers. mem3_sync_event_listener is only interested in a small subset of the result of mem3_shards:for_db/1; moving this filter in to an ets:select/2 call improves performance significantly. COUCHDB-2984 --- src/mem3_shards.erl | 49 ++++++++++++++++++++++++++++++++ src/mem3_sync_event_listener.erl | 15 ++++++---- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl index 14aed34..224673f 100644 --- a/src/mem3_shards.erl +++ b/src/mem3_shards.erl @@ -21,6 +21,7 @@ -export([start_link/0]). -export([for_db/1, for_db/2, for_docid/2, for_docid/3, get/3, local/1, fold/2]). +-export([for_shard_name/1]). -export([set_max_size/1]). -record(st, { @@ -94,6 +95,42 @@ for_docid(DbName, DocId, Options) -> false -> mem3_util:downcast(Shards) end. +for_shard_name(ShardName) -> + for_shard_name(ShardName, []). + +for_shard_name(ShardName, Options) -> + DbName = mem3:dbname(ShardName), + ShardHead = #shard{ + name = ShardName, + node = '_', + dbname = DbName, + range = '_', + ref = '_' + }, + OrderedShardHead = #ordered_shard{ + name = ShardName, + node = '_', + dbname = DbName, + range = '_', + ref = '_', + order = '_' + }, + ShardSpec = {ShardHead, [], ['$_']}, + OrderedShardSpec = {OrderedShardHead, [], ['$_']}, + Shards = try ets:select(?SHARDS, [ShardSpec, OrderedShardSpec]) of + [] -> + filter_shards_by_name(ShardName, load_shards_from_disk(DbName)); + Else -> + gen_server:cast(?MODULE, {cache_hit, DbName}), + Else + catch error:badarg -> + filter_shards_by_name(ShardName, load_shards_from_disk(DbName)) + end, + case lists:member(ordered, Options) of + true -> Shards; + false -> mem3_util:downcast(Shards) + end. + get(DbName, Node, Range) -> Res = lists:foldl(fun(#shard{node=N, range=R}=S, Acc) -> case {N, R} of @@ -360,3 +397,15 @@ cache_clear(St) -> true = ets:delete_all_objects(?SHARDS), true = ets:delete_all_objects(?ATIMES), St#st{cur_size=0}. + +filter_shards_by_name(Name, Shards) -> + filter_shards_by_name(Name, [], Shards). + +filter_shards_by_name(_, Matches, []) -> + Matches; +filter_shards_by_name(Name, Matches, [#ordered_shard{name=Name}=S|Ss]) -> + filter_shards_by_name(Name, [S|Matches], Ss); +filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) -> + filter_shards_by_name(Name, [S|Matches], Ss); +filter_shards_by_name(Name, Matches, [_|Ss]) -> + filter_shards_by_name(Name, Matches, Ss). diff --git a/src/mem3_sync_event_listener.erl b/src/mem3_sync_event_listener.erl index d74c21f..ca058db 100644 --- a/src/mem3_sync_event_listener.erl +++ b/src/mem3_sync_event_listener.erl @@ -207,13 +207,18 @@ maybe_push_shards(St) -> end. push_shard(ShardName) -> - try mem3:shards(mem3:dbname(ShardName)) of + try mem3_shards:for_shard_name(ShardName) of Shards -> - Targets = [S || #shard{node=N, name=Name} = S <- Shards, - N =/= node(), Name =:= ShardName], Live = nodes(), - [mem3_sync:push(ShardName,N) || #shard{node=N} <- Targets, - lists:member(N, Live)] + lists:foreach( + fun(#shard{node=N}) -> + case lists:member(N, Live) of + true -> mem3_sync:push(ShardName, N); + false -> ok + end + end, + Shards + ) catch error:database_does_not_exist -> ok end. From 0b70afb7cc11f39e894a37211349e4444711facb Mon Sep 17 00:00:00 2001 From: Benjamin Anderson Date: Sat, 9 Apr 2016 23:21:58 -0700 Subject: [PATCH 4/4] Add read_concurrency option to mem3_shards table This table sees a great deal of activity from various subsystems - turning on read_concurrency should be a win. COUCHDB-2984 --- src/mem3_shards.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl index 224673f..8a1bb54 100644 --- a/src/mem3_shards.erl +++ b/src/mem3_shards.erl @@ -181,7 +181,13 @@ handle_config_terminate(_, _, _) -> end). init([]) -> - ets:new(?SHARDS, [bag, protected, named_table, {keypos,#shard.dbname}]), + ets:new(?SHARDS, [ + bag, + protected, + named_table, + {keypos,#shard.dbname}, + {read_concurrency, true} + ]), ets:new(?DBS, [set, protected, named_table]), ets:new(?ATIMES, [ordered_set, protected, named_table]), ok = config:listen_for_changes(?MODULE, nil),