Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Reimplement global_change rate limiting
Rather than just checking when a change occurs to update we instead just
update after max_write_delay milliseconds.
  • Loading branch information
davisp committed Aug 14, 2014
1 parent 0324bf4 commit 8995b706209896bbffe417395a359ae39510b675
Showing 2 changed files with 47 additions and 53 deletions.
@@ -46,7 +46,7 @@ start() ->
init(_) ->
% get configs as strings
UpdateDb0 = config:get("global_changes", "update_db", "true"),
MaxEventDelay0 = config:get("global_changes", "max_event_delay", "500"),
MaxEventDelay0 = config:get("global_changes", "max_event_delay", "25"),

% make config strings into other data types
UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end,
@@ -40,7 +40,6 @@
-record(state, {
update_db,
pending_update_count,
last_update_time,
pending_updates,
max_write_delay,
dbname,
@@ -56,12 +55,15 @@ init([]) ->
{ok, Handler} = global_changes_listener:start(),
% get configs as strings
UpdateDb0 = config:get("global_changes", "update_db", "true"),
MaxWriteDelay0 = config:get("global_changes", "max_write_delay", "500"),
MaxWriteDelay0 = config:get("global_changes", "max_write_delay", "25"),

% make config strings into other data types
UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end,
MaxWriteDelay = list_to_integer(MaxWriteDelay0),

% Start our write triggers
erlang:send_after(MaxWriteDelay, self(), flush_updates),

State = #state{
update_db=UpdateDb,
pending_update_count=0,
@@ -89,88 +91,80 @@ handle_cast({update_docs, DocIds}, State) ->
pending_updates=Pending,
pending_update_count=sets:size(Pending)
},
maybe_update_docs(NewState);
{noreply, NewState};

handle_cast({set_max_write_delay, MaxWriteDelay}, State) ->
NewState = State#state{max_write_delay=MaxWriteDelay},
maybe_update_docs(NewState);
{noreply, NewState};
handle_cast({set_update_db, Boolean}, State0) ->
% If turning update_db off, clear out server state
State = case {Boolean, State0#state.update_db} of
{false, true} ->
State0#state{
update_db=Boolean,
pending_updates=sets:new(),
pending_update_count=0,
last_update_time=undefined
pending_update_count=0
};
_ ->
State0#state{update_db=Boolean}
end,
maybe_update_docs(State);
{noreply, State};
handle_cast(_Msg, State) ->
maybe_update_docs(State).
{noreply, State}.


handle_info(flush_updates, #state{pending_update_count=0}=State) ->
erlang:send_after(State#state.max_write_delay, self(), flush_updates),
{noreply, State};
handle_info(flush_updates, #state{update_db=false}=State) ->
erlang:send_after(State#state.max_write_delay, self(), flush_updates),
{noreply, State};
handle_info(flush_updates, State) ->
erlang:send_after(State#state.max_write_delay, self(), flush_updates),
flush_updates(State);
handle_info(start_listener, State) ->
{ok, Handler} = global_changes_listener:start(),
NewState = State#state{
handler_ref=erlang:monitor(process, Handler)
},
maybe_update_docs(NewState);
{noreply, NewState};
handle_info({'DOWN', Ref, _, _, Reason}, #state{handler_ref=Ref}=State) ->
couch_log:error("global_changes_listener terminated: ~w", [Reason]),
erlang:send_after(5000, self(), start_listener),
maybe_update_docs(State);
{noreply, State};
handle_info(_, State) ->
maybe_update_docs(State).
{noreply, State}.


code_change(_OldVsn, State, _Extra) ->
{ok, State}.


maybe_update_docs(#state{pending_update_count=0}=State) ->
{noreply, State};
maybe_update_docs(#state{update_db=true}=State) ->
#state{max_write_delay=MaxWriteDelay, last_update_time=LastUpdateTime} = State,
Now = os:timestamp(),
case LastUpdateTime of
undefined ->
{noreply, State#state{last_update_time=Now}, MaxWriteDelay};
_ ->
Delta = round(timer:now_diff(Now, LastUpdateTime)/1000),
if Delta >= MaxWriteDelay ->
DocIds = sets:to_list(State#state.pending_updates),
try group_ids_by_shard(State#state.dbname, DocIds) of
GroupedIds ->
Docs = dict:fold(fun(ShardName, Ids, DocInfoAcc) ->
{ok, Shard} = couch_db:open(ShardName, []),
try
GroupedDocs = get_docs_locally(Shard, Ids),
GroupedDocs ++ DocInfoAcc
after
couch_db:close(Shard)
end
end, [], GroupedIds),

spawn(fun() ->
fabric:update_docs(State#state.dbname, Docs, [])
end)
catch error:database_does_not_exist ->
{noreply, State}
end,
{noreply, State#state{
pending_updates=sets:new(),
pending_update_count=0,
last_update_time=undefined
}};
true ->
{noreply, State, MaxWriteDelay-Delta}
end
end;
maybe_update_docs(State) ->
{noreply, State}.

flush_updates(State) ->
DocIds = sets:to_list(State#state.pending_updates),
try group_ids_by_shard(State#state.dbname, DocIds) of
GroupedIds ->
Docs = dict:fold(fun(ShardName, Ids, DocInfoAcc) ->
{ok, Shard} = couch_db:open(ShardName, []),
try
GroupedDocs = get_docs_locally(Shard, Ids),
GroupedDocs ++ DocInfoAcc
after
couch_db:close(Shard)
end
end, [], GroupedIds),

spawn(fun() ->
fabric:update_docs(State#state.dbname, Docs, [])
end)
catch error:database_does_not_exist ->
{noreply, State}
end,
{noreply, State#state{
pending_updates=sets:new(),
pending_update_count=0
}}.


update_docs(Node, Updates) ->

0 comments on commit 8995b70

Please sign in to comment.