From db52f10059d46e736898a8de45ebc689b7ea4254 Mon Sep 17 00:00:00 2001 From: Benjamin Bastian Date: Wed, 19 Feb 2014 13:41:28 -0800 Subject: [PATCH] Use gen_server:cast rather than rexi RPCs Prior to this commit, the global_changes_listener would send updates to the global_changes_server via a rexi RPC which only did a gen_server:call. This is needlessly heavyweight and would cause gen_server:call timeout errors when the global_changes_server was slow (usually due to slow disk IO). This commit removes the rexi RPC and replaces it with a gen_server:cast. Since this causes all global_changes_server handlers to not reply, this commit also removes the format_reply function as it's unnecessary. BugzID: 28242 --- src/global_changes_listener.erl | 3 +- src/global_changes_server.erl | 55 ++++++++++++++------------------- 2 files changed, 25 insertions(+), 33 deletions(-) diff --git a/src/global_changes_listener.erl b/src/global_changes_listener.erl index ae38245..1aa4ea4 100644 --- a/src/global_changes_listener.erl +++ b/src/global_changes_listener.erl @@ -112,8 +112,7 @@ maybe_send_updates(#state{update_db=true}=State) -> try group_updates_by_node(State#state.dbname, Updates) of Grouped -> dict:map(fun(Node, Docs) -> - MFA = {global_changes_server, update_docs, [Docs]}, - rexi:cast(Node, MFA) + global_changes_server:update_docs(Node, Docs) end, Grouped) catch error:database_does_not_exist -> ok diff --git a/src/global_changes_server.erl b/src/global_changes_server.erl index d4c951e..d30a319 100644 --- a/src/global_changes_server.erl +++ b/src/global_changes_server.erl @@ -19,7 +19,7 @@ ]). -export([ - update_docs/1 + update_docs/2 ]). @@ -67,20 +67,23 @@ terminate(_Reason, _Srv) -> ok. -handle_call(_Msg, _From, #state{update_db=false}=State) -> - {reply, ok, State}; -handle_call({update_docs, DocIds}, _From, State) -> +handle_call(_Msg, _From, State) -> + {reply, ok, State}. + + +handle_cast(_Msg, #state{update_db=false}=State) -> + {noreply, State}; +handle_cast({update_docs, DocIds}, State) -> Pending = sets:union(sets:from_list(DocIds), State#state.pending_updates), NewState = State#state{ pending_updates=Pending, pending_update_count=sets:size(Pending) }, - format_reply(reply, maybe_update_docs(NewState)). - + maybe_update_docs(NewState); handle_cast({set_max_write_delay, MaxWriteDelay}, State) -> NewState = State#state{max_write_delay=MaxWriteDelay}, - format_reply(noreply, maybe_update_docs(NewState)); + maybe_update_docs(NewState); handle_cast({set_update_db, Boolean}, State0) -> % If turning update_db off, clear out server state State = case {Boolean, State0#state.update_db} of @@ -94,9 +97,9 @@ handle_cast({set_update_db, Boolean}, State0) -> _ -> State0#state{update_db=Boolean} end, - format_reply(noreply, maybe_update_docs(State)); + maybe_update_docs(State); handle_cast(_Msg, State) -> - format_reply(noreply, maybe_update_docs(State)). + maybe_update_docs(State). handle_info(start_listener, State) -> @@ -104,13 +107,13 @@ handle_info(start_listener, State) -> NewState = State#state{ handler_ref=erlang:monitor(process, Handler) }, - format_reply(noreply, maybe_update_docs(NewState)); + maybe_update_docs(NewState); handle_info({'DOWN', Ref, _, _, Reason}, #state{handler_ref=Ref}=State) -> couch_log:error("global_changes_listener terminated: ~w", [Reason]), erlang:send_after(5000, self(), start_listener), - format_reply(noreply, maybe_update_docs(State)); + maybe_update_docs(State); handle_info(_, State) -> - format_reply(noreply, maybe_update_docs(State)). + maybe_update_docs(State). code_change(_OldVsn, State, _Extra) -> @@ -118,13 +121,13 @@ code_change(_OldVsn, State, _Extra) -> maybe_update_docs(#state{pending_update_count=0}=State) -> - State; + {noreply, State}; maybe_update_docs(#state{update_db=true}=State) -> #state{max_write_delay=MaxWriteDelay, last_update_time=LastUpdateTime} = State, Now = os:timestamp(), case LastUpdateTime of undefined -> - {State#state{last_update_time=Now}, MaxWriteDelay}; + {noreply, State#state{last_update_time=Now}, MaxWriteDelay}; _ -> Delta = round(timer:now_diff(Now, LastUpdateTime)/1000), if Delta >= MaxWriteDelay -> @@ -145,23 +148,23 @@ maybe_update_docs(#state{update_db=true}=State) -> fabric:update_docs(State#state.dbname, Docs, []) end) catch error:database_does_not_exist -> - ok + {noreply, State} end, - State#state{ + {noreply, State#state{ pending_updates=sets:new(), pending_update_count=0, last_update_time=undefined - }; + }}; true -> - {State, MaxWriteDelay-Delta} + {noreply, State, MaxWriteDelay-Delta} end end; maybe_update_docs(State) -> - State. + {noreply, State}. -update_docs(Updates) -> - gen_server:call(?MODULE, {update_docs, Updates}). +update_docs(Node, Updates) -> + gen_server:cast({?MODULE, Node}, {update_docs, Updates}). group_ids_by_shard(DbName, DocIds) -> @@ -177,16 +180,6 @@ group_ids_by_shard(DbName, DocIds) -> end, dict:new(), DocIds). -format_reply(reply, #state{}=State) -> - {reply, ok, State}; -format_reply(reply, {State, Timeout}) -> - {reply, ok, State, Timeout}; -format_reply(noreply, #state{}=State) -> - {noreply, State}; -format_reply(noreply, {State, Timeout}) -> - {noreply, State, Timeout}. - - get_docs_locally(Shard, Ids) -> lists:map(fun(Id) -> DocInfo = couch_db:get_doc_info(Shard, Id),