Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

CBD-105 More efficient way to find if index needs update

This impacts the latency of queries following a query with
a staleness of update_after. This new method, event-oriented,
of finding if a an index needs update reduces avoids N
synchonous process calls (where N is number of active partitions
plus number of passive partitions).

Change-Id: Ibdec73f0f2db115765c6237fa279ff59500b00ed
Reviewed-on: http://review.couchbase.org/16290
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Volker Mische <volker.mische@gmail.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
  • Loading branch information...
commit 0be6d2dcc336a43d38bbbfb94e42d441ed7c2c43 1 parent b1cafe1
Filipe Manana fdmanana authored fdmanana committed
68 src/couch_set_view/src/couch_db_set.erl
View
@@ -80,7 +80,7 @@ do_init({SetName, Active0, Passive0, DbOpenOptions}) ->
Passive = lists:usort(Passive0),
OpenFun = fun(P, Acc) ->
Name = ?dbname(SetName, P),
- case couch_db:open(Name, DbOpenOptions) of
+ case couch_db:open_int(Name, DbOpenOptions) of
{ok, Db} ->
dict:store(Name, {Db, P, couch_db:monitor(Db)}, Acc);
Error ->
@@ -97,12 +97,18 @@ do_init({SetName, Active0, Passive0, DbOpenOptions}) ->
end,
_Ref = couch_db:monitor(MasterDb),
Server = self(),
- EventFun = fun({compacted, Name} = Ev) ->
- Sz = byte_size(SetName),
- case Name of
- <<SetName:Sz/binary, $/, _/binary>> ->
+ EventFun = fun({compacted, DbName} = Ev) ->
+ case is_set_db(DbName, SetName) of
+ true ->
ok = gen_server:cast(Server, Ev);
- _ ->
+ false ->
+ ok
+ end;
+ ({updated, {DbName, _NewSeq}} = Ev) ->
+ case is_set_db(DbName, SetName) of
+ true ->
+ ok = gen_server:cast(Server, Ev);
+ false ->
ok
end;
(_) ->
@@ -122,32 +128,12 @@ do_init({SetName, Active0, Passive0, DbOpenOptions}) ->
handle_call(get_seqs, _From, State) ->
FoldFun = fun(_DbName, {Db, P, _Ref}, Acc) ->
- {ok, S} = couch_db:get_current_seq(Db),
- [{P, S} | Acc]
+ [{P, Db#db.update_seq} | Acc]
end,
Seqs0 = dict:fold(FoldFun, [], State#state.dbs_active),
Seqs = dict:fold(FoldFun, Seqs0, State#state.dbs_passive),
{reply, {ok, Seqs}, State};
-handle_call(get_dbs, _From, State) ->
- FoldFun = fun(_, {Db, P, _Ref}, A) -> [{P, Db#db.name} | A] end,
- Active = dict:fold(FoldFun, [], State#state.dbs_active),
- Passive = dict:fold(FoldFun, [], State#state.dbs_passive),
- UserCtx = case Active of
- [{_, DbName} | _] ->
- {Db, _, _} = dict:fetch(DbName, State#state.dbs_active),
- Db#db.user_ctx;
- [] ->
- case Passive of
- [{_, DbName} | _] ->
- {Db, _, _} = dict:fetch(DbName, State#state.dbs_passive),
- Db#db.user_ctx;
- [] ->
- couch_util:get_value(user_ctx, State#state.db_open_options, #user_ctx{})
- end
- end,
- {reply, {ok, Active, Passive, UserCtx}, State};
-
handle_call({set_passive, PartList}, _From, State) ->
#state{
db_open_options = OpenOpts,
@@ -198,6 +184,23 @@ handle_call(close, _From, State) ->
{stop, normal, ok, State}.
+handle_cast({updated, {DbName, NewSeq}}, State) ->
+ case dict:find(DbName, State#state.dbs_active) of
+ {ok, {Db, Id, Ref}} ->
+ NewVal = {Db#db{update_seq = NewSeq}, Id, Ref},
+ Active2 = dict:store(DbName, NewVal, State#state.dbs_active),
+ {noreply, State#state{dbs_active = Active2}};
+ error ->
+ case dict:find(DbName, State#state.dbs_passive) of
+ {ok, {Db, Id, Ref}} ->
+ NewVal = {Db#db{update_seq = NewSeq}, Id, Ref},
+ Passive2 = dict:store(DbName, NewVal, State#state.dbs_passive),
+ {noreply, State#state{dbs_passive = Passive2}};
+ error ->
+ {noreply, State}
+ end
+ end;
+
handle_cast({compacted, DbName}, State) ->
case dict:find(DbName, State#state.dbs_active) of
error ->
@@ -245,7 +248,7 @@ switch_partitions_state(PartList, SetName, OpenOpts, SetA, SetB) ->
error ->
case dict:find(DbName, B) of
error ->
- {ok, Db} = couch_db:open(DbName, OpenOpts),
+ {ok, Db} = couch_db:open_int(DbName, OpenOpts),
Ref = couch_db:monitor(Db),
{A, dict:store(DbName, {Db, Id, Ref}, B)};
{ok, _} ->
@@ -280,3 +283,12 @@ find_db_name(Pid, [{Name, {#db{main_pid = Pid}, _, _}} | _]) ->
find_db_name(Pid, [_ | Rest]) ->
find_db_name(Pid, Rest).
+
+is_set_db(DbName, SetName) ->
+ Sz = byte_size(SetName),
+ case DbName of
+ <<SetName:Sz/binary, $/, _/binary>> ->
+ true;
+ _ ->
+ false
+ end.
2  src/couchdb/couch_changes.erl
View
@@ -54,7 +54,7 @@ handle_changes(Args1, Req, Db) ->
{Callback, UserAcc} = get_callback_acc(CallbackAcc),
Self = self(),
{ok, Notify} = couch_db_update_notifier:start_link(
- fun({_, DbName}) when DbName == Db#db.name ->
+ fun({updated, {DbName, _NewSeq}}) when DbName == Db#db.name ->
Self ! db_updated;
(_) ->
ok
8 src/couchdb/couch_db_updater.erl
View
@@ -113,7 +113,7 @@ handle_call(full_commit, _From, Db) ->
handle_call(increment_update_seq, _From, Db) ->
Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
ok = notify_db_updated(Db2),
- couch_db_update_notifier:notify({updated, Db#db.name}),
+ couch_db_update_notifier:sync_notify({updated, {Db2#db.name, Db2#db.update_seq}}),
{reply, {ok, Db2#db.update_seq}, Db2};
handle_call({set_security, NewSec}, _From, Db) ->
@@ -172,7 +172,7 @@ handle_call({purge_docs, IdRevs}, _From, Db) ->
header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),
ok = notify_db_updated(Db2),
- couch_db_update_notifier:notify({updated, Db#db.name}),
+ couch_db_update_notifier:sync_notify({updated, {Db2#db.name, Db2#db.update_seq}}),
{reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2};
handle_call(start_compact, _From, Db) ->
case Db#db.compactor_info of
@@ -257,7 +257,7 @@ handle_call({update_header_pos, FileVersion, NewPos}, _From, Db) ->
true ->
NewDb = populate_db_from_header(Db, NewHeader),
ok = notify_db_updated(NewDb),
- couch_db_update_notifier:notify({updated, Db#db.name}),
+ couch_db_update_notifier:sync_notify({updated, {NewDb#db.name, NewDb#db.update_seq}}),
{reply, ok, NewDb}
end;
Error ->
@@ -280,7 +280,7 @@ handle_info({update_docs, Client, Docs, NonRepDocs, FullCommit}, Db) ->
{ok, Db2} ->
ok = notify_db_updated(Db2),
if Db2#db.update_seq /= Db#db.update_seq ->
- couch_db_update_notifier:notify({updated, Db2#db.name});
+ couch_db_update_notifier:sync_notify({updated, {Db2#db.name, Db2#db.update_seq}});
true -> ok
end,
lists:foreach(
Please sign in to comment.
Something went wrong with that request. Please try again.