Skip to content
This repository has been archived by the owner on Sep 19, 2019. It is now read-only.

Commit

Permalink
Fix stale shards cache
Browse files Browse the repository at this point in the history
There's a race condition in mem3_shards that can result in having shards
in the cache for a database that's been deleted. This results in a
confused cluster that thinks a database exists until you attempt to open
it.

The fix is to ignore any cache insert requests that come from an older
version of the dbs db than mem3_shards cache knows about.

Big thanks to @jdoane for the identification and original patch.

COUCHDB-3376
  • Loading branch information
davisp authored and jaydoane committed Apr 22, 2017
1 parent 0d3a4a2 commit 8c5d08d
Showing 1 changed file with 48 additions and 11 deletions.
59 changes: 48 additions & 11 deletions src/mem3_shards.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
-record(st, {
max_size = 25000,
cur_size = 0,
changes_pid
changes_pid,
update_seq
}).

-include("mem3.hrl").
Expand Down Expand Up @@ -144,11 +145,12 @@ init([]) ->
ets:new(?ATIMES, [ordered_set, protected, named_table]),
ok = config:listen_for_changes(?MODULE, nil),
SizeList = config:get("mem3", "shard_cache_size", "25000"),
{Pid, _} = spawn_monitor(fun() -> listen_for_changes(get_update_seq()) end),
UpdateSeq = get_update_seq(),
{ok, #st{
max_size = list_to_integer(SizeList),
cur_size = 0,
changes_pid = Pid
changes_pid = start_changes_listener(UpdateSeq),
update_seq = UpdateSeq
}}.

handle_call({set_max_size, Size}, _From, St) ->
Expand All @@ -163,12 +165,28 @@ handle_cast({cache_hit, DbName}, St) ->
couch_stats:increment_counter([dbcore, mem3, shard_cache, hit]),
cache_hit(DbName),
{noreply, St};
handle_cast({cache_insert, DbName, Shards}, St) ->
handle_cast({cache_insert, DbName, Shards, UpdateSeq}, St) ->
couch_stats:increment_counter([dbcore, mem3, shard_cache, miss]),
{noreply, cache_free(cache_insert(St, DbName, Shards))};
% This comparison correctly uses the `<` operator
% and not `=<`. The easiest way to understand why is
% to think of when a _dbs db doesn't change. If it used
% `=<` it would be impossible to insert anything into
% the cache.
NewSt = case UpdateSeq < St#st.update_seq of
true -> St;
false -> cache_free(cache_insert(St, DbName, Shards))
end,
{noreply, NewSt};
handle_cast({cache_remove, DbName}, St) ->
couch_stats:increment_counter([dbcore, mem3, shard_cache, eviction]),
{noreply, cache_remove(St, DbName)};
handle_cast({cache_insert_change, DbName, Shards, UpdateSeq}, St) ->
Msg = {cache_insert, DbName, Shards, UpdateSeq},
{noreply, NewSt} = handle_cast(Msg, St),
{noreply, NewSt#st{update_seq = UpdateSeq}};
handle_cast({cache_remove_change, DbName, UpdateSeq}, St) ->
{noreply, NewSt} = handle_cast({cache_remove, DbName}, St),
{noreply, NewSt#st{update_seq = UpdateSeq}};
handle_cast(_Msg, St) ->
{noreply, St}.

Expand All @@ -185,8 +203,9 @@ handle_info({'DOWN', _, _, Pid, Reason}, #st{changes_pid=Pid}=St) ->
erlang:send_after(5000, self(), {start_listener, Seq}),
{noreply, NewSt#st{changes_pid=undefined}};
handle_info({start_listener, Seq}, St) ->
{NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
{noreply, St#st{changes_pid=NewPid}};
{noreply, St#st{
changes_pid = start_changes_listener(Seq)
}};
handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
erlang:send_after(5000, self(), restart_config_listener),
{noreply, State};
Expand All @@ -205,6 +224,21 @@ code_change(_OldVsn, #st{}=St, _Extra) ->

%% internal functions

start_changes_listener(SinceSeq) ->
Self = self(),
{Pid, _} = erlang:spawn_monitor(fun() ->
erlang:spawn_link(fun() ->
Ref = erlang:monitor(process, Self),
receive
{'DOWN', Ref, _, _, _} ->
ok
end,
exit(shutdown)
end),
listen_for_changes(SinceSeq)
end),
Pid.

fold_fun(#full_doc_info{}=FDI, _, Acc) ->
DI = couch_doc:to_doc_info(FDI),
fold_fun(DI, nil, Acc);
Expand Down Expand Up @@ -243,24 +277,26 @@ changes_callback({stop, EndSeq}, _) ->
exit({seq, EndSeq});
changes_callback({change, {Change}, _}, _) ->
DbName = couch_util:get_value(<<"id">>, Change),
Seq = couch_util:get_value(<<"seq">>, Change),
case DbName of <<"_design/", _/binary>> -> ok; _Else ->
case mem3_util:is_deleted(Change) of
true ->
gen_server:cast(?MODULE, {cache_remove, DbName});
gen_server:cast(?MODULE, {cache_remove_change, DbName, Seq});
false ->
case couch_util:get_value(doc, Change) of
{error, Reason} ->
twig:log(error, "missing partition table for ~s: ~p",
[DbName, Reason]);
{Doc} ->
Shards = mem3_util:build_ordered_shards(DbName, Doc),
gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
Msg = {cache_insert_change, DbName, Shards, Seq},
gen_server:cast(?MODULE, Msg),
[create_if_missing(mem3:name(S)) || S
<- Shards, mem3:node(S) =:= node()]
end
end
end,
{ok, couch_util:get_value(<<"seq">>, Change)};
{ok, Seq};
changes_callback(timeout, _) ->
ok.

Expand All @@ -276,8 +312,9 @@ load_shards_from_disk(DbName) when is_binary(DbName) ->
load_shards_from_db(#db{} = ShardDb, DbName) ->
case couch_db:open_doc(ShardDb, DbName, []) of
{ok, #doc{body = {Props}}} ->
Seq = couch_db:get_update_seq(ShardDb),
Shards = mem3_util:build_ordered_shards(DbName, Props),
gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
gen_server:cast(?MODULE, {cache_insert, DbName, Shards, Seq}),
Shards;
{not_found, _} ->
erlang:error(database_does_not_exist, ?b2l(DbName))
Expand Down

0 comments on commit 8c5d08d

Please sign in to comment.