Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Merge pull request #1 from cloudant/fix-compilation-warnings
Fix compiler warnings
  • Loading branch information
iilyak committed Feb 25, 2019
2 parents 004881a + 7095e0d commit 3e471304b5aeb898b33f9723269a2ed3e40cd219
Showing 2 changed files with 54 additions and 17 deletions.
@@ -36,7 +36,7 @@
server, % Pid of either view group or search index
worker_pid = nil,
seq = 0,
lru = now()
lru = erlang:monotonic_time()
}).

-record(state, {
@@ -117,7 +117,7 @@ init(_) ->
ets:new(ken_resubmit, [named_table]),
ets:new(ken_workers, [named_table, public, {keypos, #job.name}]),
Limit = list_to_integer(config("limit", "20")),
{ok, #state{pruned_last = now(), limit = Limit}}.
{ok, #state{pruned_last = erlang:monotonic_time(), limit = Limit}}.

terminate(_Reason, _State) ->
ok.
@@ -166,19 +166,22 @@ handle_cast({trigger_update, #job{name={_, _, hastings}, server=GPid, seq=Seq} =
% hastings_index:await will trigger a hastings index update
{Pid, _} = erlang:spawn_monitor(hastings_index, await,
[GPid, Seq]),
ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
Now = erlang:monotonic_time(),
ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
{noreply, State, 0};
% search index job names have 3 elements. See job record definition.
handle_cast({trigger_update, #job{name={_,_,_}, server=GPid, seq=Seq} = Job}, State) ->
% dreyfus_index:await will trigger a search index update.
{Pid, _} = erlang:spawn_monitor(dreyfus_index, await,
[GPid, Seq]),
ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
Now = erlang:monotonic_time(),
ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
{noreply, State, 0};
handle_cast({trigger_update, #job{name={_,_}, server=SrvPid, seq=Seq} = Job}, State) ->
% couch_index:get_state/2 will trigger a view group index update.
{Pid, _} = erlang:spawn_monitor(couch_index, get_state, [SrvPid, Seq]),
ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
Now = erlang:monotonic_time(),
ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
{noreply, State, 0};

handle_cast(Msg, State) ->
@@ -200,8 +203,11 @@ handle_info(start_event_handler, State) ->
{noreply, State, 0};

handle_info(timeout, #state{prune_interval = I, pruned_last = Last} = State) ->
case timer:now_diff(now(), Last) of
X when X > (1000 * I) ->
Now = erlang:monotonic_time(),
Interval = erlang:convert_time_unit(
State#state.delay, millisecond, native),
case Now - Last > Interval of
true ->
NewState = prune_worker_table(State);
_ ->
NewState = State
@@ -269,8 +275,7 @@ get_active_count() ->
% If any indexing job fails, resubmit requests for all indexes.
update_db_indexes(Name, State) ->
{ok, DDocs} = design_docs(Name),
random:seed(now()),
RandomSorted = lists:sort([{random:uniform(), D} || D <- DDocs]),
RandomSorted = lists:sort([{rand:uniform(), D} || D <- DDocs]),
Resubmit = lists:foldl(fun({_, DDoc}, Acc) ->
JsonDDoc = couch_doc:from_json_obj(DDoc),
case update_ddoc_indexes(Name, JsonDDoc, State) of
@@ -447,7 +452,8 @@ should_start_job(#job{name = Name, seq = Seq, server = Pid}, State) ->
false
end;
[#job{worker_pid = nil, lru = LRU, seq = OldSeq}] ->
DeltaT = timer:now_diff(now(), LRU) / 1000,
Now = erlang:monotonic_time(),
DeltaT = erlang:convert_time_unit(Now - LRU, native, millisecond),
if
A < BatchChannels, (Seq - OldSeq) >= BS ->
true;
@@ -509,16 +515,49 @@ resubmit(Delay, DbName) ->
end.

prune_worker_table(State) ->
{A, B, _} = now(),
C = (1000000 * A) + B - 0.001 * State#state.delay,
MatchHead = #job{worker_pid=nil, lru={'$1','$2','_'}, _='_'},
Guard = {'<', {'+', {'*', '$1', 1000000}, '$2'}, C},
% remove all entries older than specified `delay` in milliseconds
Delay = erlang:convert_time_unit(State#state.delay, millisecond, native),
C = erlang:monotonic_time() - Delay,
%% fun(#job{worker_pid=nil, lru=A) when A < C -> true end
MatchHead = #job{worker_pid=nil, lru='$1', _='_'},
Guard = {'<', '$1', C},
ets:select_delete(ken_workers, [{MatchHead, [Guard], [true]}]),
State#state{pruned_last = now()}.
State#state{pruned_last = erlang:monotonic_time()}.

allowed_languages() ->
Config = config:get("query_servers") ++ config:get("native_query_servers"),
[list_to_binary(Lang) || {Lang, _Cmd} <- Config].

config(Key, Default) ->
config:get("ken", Key, Default).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").



prune_old_entries_test() ->
{
setup,
fun() ->
ets:new(ken_workers, [named_table, public, {keypos, #job.name}])
end,
fun(_) ->
catch ets:delete(ken_workers)
end,
?_test(begin
lists:foreach(fun(Idx) ->
ets:insert(ken_workers, #job{name=Idx}),
timer:sleep(100)
end, lists:seq(1, 3)),
prune_worker_table(#state{delay=250}),
?assertEqual(
[2, 3],
lists:usort(
[N || #job{name = N} <- ets:tab2list(ken_workers)])
),
ok
end)
}.

-endif.
@@ -12,8 +12,6 @@

-module(ken_server_test).

-compile([export_all]).

-include_lib("eunit/include/eunit.hrl").

%% hardcoded defaults: limit: 20; batch: 1; delay: 5000; prune: 60000

0 comments on commit 3e47130

Please sign in to comment.