Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 120 additions & 35 deletions src/couch/src/couch_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@
-define(DEFAULT_COMPRESSIBLE_TYPES,
"text/*, application/javascript, application/json, application/xml"
).
% Purge client max lag window in seconds (defaulting to 24 hours)
-define(PURGE_LAG_SEC, 86400).

start_link(Engine, DbName, Filepath, Options) ->
Arg = {Engine, DbName, Filepath, Options},
Expand Down Expand Up @@ -469,7 +471,6 @@ get_minimum_purge_seq(#db{} = Db) ->
DbName = couch_db:name(Db),
% If there's a broken doc we have to keep every
% purge info until the doc is fixed or removed.
Fmt = "Invalid purge doc '~s' on ~p with purge_seq '~w'",
case ClientSeq of
CS when is_integer(CS), CS >= PurgeSeq - PurgeInfosLimit ->
{ok, SeqAcc};
Expand All @@ -478,11 +479,15 @@ get_minimum_purge_seq(#db{} = Db) ->
true ->
{ok, erlang:min(CS, SeqAcc)};
false ->
couch_log:error(Fmt, [DocId, DbName, ClientSeq]),
Fmt1 =
"Missing or stale purge doc '~s' on ~p "
"with purge_seq '~w'",
couch_log:error(Fmt1, [DocId, DbName, ClientSeq]),
{ok, SeqAcc}
end;
_ ->
couch_log:error(Fmt, [DocId, DbName, ClientSeq]),
Fmt2 = "Invalid purge doc '~s' on ~p with purge_seq '~w'",
couch_log:error(Fmt2, [DocId, DbName, ClientSeq]),
{ok, erlang:min(OldestPurgeSeq, SeqAcc)}
end;
_ ->
Expand Down Expand Up @@ -511,46 +516,22 @@ get_minimum_purge_seq(#db{} = Db) ->
FinalSeq.

purge_client_exists(DbName, DocId, Props) ->
% Warn about clients that have not updated their purge
% checkpoints in the last "index_lag_warn_seconds"
LagWindow = config:get_integer(
% Default 24 hours
"purge",
"index_lag_warn_seconds",
86400
),

{Mega, Secs, _} = os:timestamp(),
NowSecs = Mega * 1000000 + Secs,
LagThreshold = NowSecs - LagWindow,

try
Exists = couch_db_plugin:is_valid_purge_client(DbName, Props),
if
not Exists ->
ok;
case Exists of
true ->
Updated = couch_util:get_value(<<"updated_on">>, Props),
if
is_integer(Updated) and Updated > LagThreshold ->
ok;
true ->
Diff = NowSecs - Updated,
Fmt1 =
"Purge checkpoint '~s' not updated in ~p seconds\n"
" in database ~p",
couch_log:error(Fmt1, [DocId, Diff, DbName])
end
purge_client_warn_lag(Updated, DbName, DocId);
_ ->
ok
end,
Exists
catch
_:_ ->
Tag:Error ->
% If we fail to check for a client we have to assume that
% it exists.
Fmt2 =
"Failed to check purge checkpoint using\n"
" document '~p' in database ~p",
couch_log:error(Fmt2, [DocId, DbName]),
Log = "~p : Failed to check purge checkpoint ~p : ~s. Error ~p:~p",
couch_log:error(Log, [?MODULE, DbName, DocId, Tag, Error]),
true
end.

Expand Down Expand Up @@ -2142,8 +2123,27 @@ possible_ancestors(FullInfo, MissingRevs) ->
{MaxMissingPos, _} = lists:max(MissingRevs),
lists:filter(fun({Pos, _}) -> Pos < MaxMissingPos end, LeafRevs).

purge_client_warn_lag(Updated, DbName, DocId) when is_integer(Updated) ->
% Warn about clients that have not updated their purge
% checkpoints in the last "index_lag_warn_seconds".
Lag = config:get_integer("purge", "index_lag_warn_seconds", ?PURGE_LAG_SEC),
Diff = erlang:system_time(second) - Updated,
case Diff =< Lag of
true ->
ok;
false ->
Log =
"~p : Purge checkpoint ~p : ~s not updated in ~p seconds. "
"Expected update interval is at least ~p seconds",
couch_log:error(Log, [?MODULE, DbName, DocId, Diff, Lag])
end;
purge_client_warn_lag(Updated, DbName, DocId) ->
Log = "~p : Purge checkpoint ~p : ~s has an invalid updated value: ~p",
couch_log:error(Log, [?MODULE, DbName, DocId, Updated]).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

-include_lib("couch/include/couch_eunit.hrl").

setup_all() ->
ok = meck:new(couch_epi, [passthrough]),
Expand Down Expand Up @@ -2388,4 +2388,89 @@ add_shard(DbName) ->
generate_cases(DbName) ->
[{DbName, DbName}, {DbName, ?l2b(DbName)}].

get_minimum_purge_sequence_test_() ->
{
foreach,
fun setup_purge_seq/0,
fun teardown_purge_seq/1,
[
?TDEF_FE(t_purge_warn_log),
?TDEF_FE(t_purge_client_exists),
?TDEF_FE(t_purge_client_doesnt_exist),
?TDEF_FE(t_purge_client_check_throws_error)
]
}.

setup_purge_seq() ->
meck:new(couch_log, [passthrough]),
meck:new(config, [passthrough]),
meck:new(couch_db_plugin, [passthrough]),
meck:expect(config, get, fun(_, _, Default) -> Default end),
ok.

teardown_purge_seq(_) ->
meck:unload().

t_purge_warn_log(_) ->
Now = erlang:system_time(second),

Updated1 = Now - ?PURGE_LAG_SEC - 100,
purge_client_warn_lag(Updated1, <<"foo">>, <<"bar">>),
?assertEqual(1, meck:num_calls(couch_log, error, 2)),

meck:reset(couch_log),
Updated2 = Now - ?PURGE_LAG_SEC + 100,
purge_client_warn_lag(Updated2, <<"foo">>, <<"bar">>),
?assertEqual(0, meck:num_calls(couch_log, error, 2)),

meck:reset(couch_log),
purge_client_warn_lag(not_a_number, <<"foo">>, <<"bar">>),
?assertEqual(1, meck:num_calls(couch_log, error, 2)).

t_purge_client_exists(_) ->
Now = erlang:system_time(second),
meck:expect(couch_db_plugin, is_valid_purge_client, 2, false),

% If client doesn't exist, don't log
Props = [{<<"updated_on">>, Now - ?PURGE_LAG_SEC + 100}],
?assertNot(purge_client_exists(<<"foo">>, <<"bar">>, Props)),
?assertEqual(0, meck:num_calls(couch_log, error, 2)).

t_purge_client_doesnt_exist(_) ->
Now = erlang:system_time(second),
meck:expect(couch_db_plugin, is_valid_purge_client, 2, true),

% Log if stale
Props1 = [{<<"updated_on">>, Now - ?PURGE_LAG_SEC - 100}],
?assert(purge_client_exists(<<"foo">>, <<"bar">>, Props1)),
?assertEqual(1, meck:num_calls(couch_log, error, 2)),

% Not stale, so don't log
meck:reset(couch_log),
Props2 = [{<<"updated_on">>, Now - ?PURGE_LAG_SEC + 100}],
?assert(purge_client_exists(<<"foo">>, <<"bar">>, Props2)),
?assertEqual(0, meck:num_calls(couch_log, error, 2)),

% Invalid checkpoint doc props, log an error
meck:reset(couch_log),
Props3 = [{<<"invalid_props">>, yes}],
?assert(purge_client_exists(<<"foo">>, <<"bar">>, Props3)),
?assertEqual(1, meck:num_calls(couch_log, error, 2)),

% Log if invalid checkpoint timestamp
meck:reset(couch_log),
Props4 = [{<<"updated_on">>, not_valid}],
?assert(purge_client_exists(<<"foo">>, <<"bar">>, Props4)),
?assertEqual(1, meck:num_calls(couch_log, error, 2)).

t_purge_client_check_throws_error(_) ->
Now = erlang:system_time(second),
meck:expect(couch_db_plugin, is_valid_purge_client, 2, meck:raise(error, bam)),

% If error is throw when checking validy expect a log but return true as if
% the puarge client exists.
Props = [{<<"updated_on">>, Now - ?PURGE_LAG_SEC + 100}],
?assert(purge_client_exists(<<"foo">>, <<"bar">>, Props)),
?assertEqual(1, meck:num_calls(couch_log, error, 2)).

-endif.