Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Use gen_server:cast rather than rexi RPCs
Prior to this commit, the global_changes_listener would send updates to
the global_changes_server via a rexi RPC which only did a
gen_server:call. This is needlessly heavyweight and would cause
gen_server:call timeout errors when the global_changes_server was slow
(usually due to slow disk IO). This commit removes the rexi RPC and
replaces it with a gen_server:cast.

Since this causes all global_changes_server handlers to not reply, this
commit also removes the format_reply function as it's unnecessary.

BugzID: 28242
  • Loading branch information
sagelywizard authored and rnewson committed Aug 7, 2014
1 parent c0f0382 commit db52f10059d46e736898a8de45ebc689b7ea4254
Showing 2 changed files with 25 additions and 33 deletions.
@@ -112,8 +112,7 @@ maybe_send_updates(#state{update_db=true}=State) ->
try group_updates_by_node(State#state.dbname, Updates) of
Grouped ->
dict:map(fun(Node, Docs) ->
MFA = {global_changes_server, update_docs, [Docs]},
rexi:cast(Node, MFA)
global_changes_server:update_docs(Node, Docs)
end, Grouped)
catch error:database_does_not_exist ->
ok
@@ -19,7 +19,7 @@
]).

-export([
update_docs/1
update_docs/2
]).


@@ -67,20 +67,23 @@ terminate(_Reason, _Srv) ->
ok.


handle_call(_Msg, _From, #state{update_db=false}=State) ->
{reply, ok, State};
handle_call({update_docs, DocIds}, _From, State) ->
handle_call(_Msg, _From, State) ->
{reply, ok, State}.


handle_cast(_Msg, #state{update_db=false}=State) ->
{noreply, State};
handle_cast({update_docs, DocIds}, State) ->
Pending = sets:union(sets:from_list(DocIds), State#state.pending_updates),
NewState = State#state{
pending_updates=Pending,
pending_update_count=sets:size(Pending)
},
format_reply(reply, maybe_update_docs(NewState)).

maybe_update_docs(NewState);

handle_cast({set_max_write_delay, MaxWriteDelay}, State) ->
NewState = State#state{max_write_delay=MaxWriteDelay},
format_reply(noreply, maybe_update_docs(NewState));
maybe_update_docs(NewState);
handle_cast({set_update_db, Boolean}, State0) ->
% If turning update_db off, clear out server state
State = case {Boolean, State0#state.update_db} of
@@ -94,37 +97,37 @@ handle_cast({set_update_db, Boolean}, State0) ->
_ ->
State0#state{update_db=Boolean}
end,
format_reply(noreply, maybe_update_docs(State));
maybe_update_docs(State);
handle_cast(_Msg, State) ->
format_reply(noreply, maybe_update_docs(State)).
maybe_update_docs(State).


handle_info(start_listener, State) ->
{ok, Handler} = global_changes_listener:start(),
NewState = State#state{
handler_ref=erlang:monitor(process, Handler)
},
format_reply(noreply, maybe_update_docs(NewState));
maybe_update_docs(NewState);
handle_info({'DOWN', Ref, _, _, Reason}, #state{handler_ref=Ref}=State) ->
couch_log:error("global_changes_listener terminated: ~w", [Reason]),
erlang:send_after(5000, self(), start_listener),
format_reply(noreply, maybe_update_docs(State));
maybe_update_docs(State);
handle_info(_, State) ->
format_reply(noreply, maybe_update_docs(State)).
maybe_update_docs(State).


code_change(_OldVsn, State, _Extra) ->
{ok, State}.


maybe_update_docs(#state{pending_update_count=0}=State) ->
State;
{noreply, State};
maybe_update_docs(#state{update_db=true}=State) ->
#state{max_write_delay=MaxWriteDelay, last_update_time=LastUpdateTime} = State,
Now = os:timestamp(),
case LastUpdateTime of
undefined ->
{State#state{last_update_time=Now}, MaxWriteDelay};
{noreply, State#state{last_update_time=Now}, MaxWriteDelay};
_ ->
Delta = round(timer:now_diff(Now, LastUpdateTime)/1000),
if Delta >= MaxWriteDelay ->
@@ -145,23 +148,23 @@ maybe_update_docs(#state{update_db=true}=State) ->
fabric:update_docs(State#state.dbname, Docs, [])
end)
catch error:database_does_not_exist ->
ok
{noreply, State}
end,
State#state{
{noreply, State#state{
pending_updates=sets:new(),
pending_update_count=0,
last_update_time=undefined
};
}};
true ->
{State, MaxWriteDelay-Delta}
{noreply, State, MaxWriteDelay-Delta}
end
end;
maybe_update_docs(State) ->
State.
{noreply, State}.


update_docs(Updates) ->
gen_server:call(?MODULE, {update_docs, Updates}).
update_docs(Node, Updates) ->
gen_server:cast({?MODULE, Node}, {update_docs, Updates}).


group_ids_by_shard(DbName, DocIds) ->
@@ -177,16 +180,6 @@ group_ids_by_shard(DbName, DocIds) ->
end, dict:new(), DocIds).


format_reply(reply, #state{}=State) ->
{reply, ok, State};
format_reply(reply, {State, Timeout}) ->
{reply, ok, State, Timeout};
format_reply(noreply, #state{}=State) ->
{noreply, State};
format_reply(noreply, {State, Timeout}) ->
{noreply, State, Timeout}.


get_docs_locally(Shard, Ids) ->
lists:map(fun(Id) ->
DocInfo = couch_db:get_doc_info(Shard, Id),

0 comments on commit db52f10

Please sign in to comment.