Skip to content

Commit

Permalink
Move the search repl code into a module within search
Browse files Browse the repository at this point in the history
  • Loading branch information
Vagabond committed Feb 22, 2012
1 parent c772c32 commit 92ecfe0
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/riak_search_app.erl
Expand Up @@ -16,6 +16,11 @@
%% ===================================================================

start(_StartType, _StartArgs) ->
%% ALWAYS register the repl hook, incase we're getting search data repled
%% to us
riak_core:register([
{repl_helper, riak_search_repl_helper}
]),
case app_helper:get_env(riak_search, enabled, false) of
true ->
%% Ensure that the KV service has fully loaded.
Expand Down
76 changes: 76 additions & 0 deletions src/riak_search_repl_helper.erl
@@ -0,0 +1,76 @@
-module(riak_search_repl_helper).

-export([send/1, recv/1]).

%% when sending the object, ensure that both the KV and proxy objexts are sent
send(Obj) ->
B = riak_object:bucket(Obj),
K = riak_object:key(Obj),
PO = is_proxy_object(B),
{ok, C} = riak:local_client(),
SHI = proplists:get_value(search, C:get_bucket(B)),

case SHI of
true ->
send_search(PO, Obj, B, K, C);
false ->
case PO of
true -> lager:debug("Outgoing proxy obj ~p/~p", [B, K]);
false -> lager:debug("Outgoing KV obj ~p/~p", [B, K])
end,
ok
end.

send_search(true, PO, IdxB, K, C) ->
lager:debug("Outgoing indexed KV obj ~p/~p", [IdxB, K]),
<<"_rsid_",B/binary>> = IdxB,
case C:get(B, K) of
{ok, KVO} ->
[KVO];
Other ->
lager:info("Couldn't find expected KV obj ~p/~p ~p", [B, K, Other]),
ok
end;

send_search(false, KVO, B, K, C) ->
lager:debug("Outgoing indexed KV obj ~p/~p", [B, K]),
IdxB = <<"_rsid_",B/binary>>,
case C:get(IdxB, K) of
{ok, PO} ->
[PO];
Other ->
lager:info("Couldn't find expected proxy obj ~p/~p ~p",
[IdxB, K, Other]),
ok
end.

is_proxy_object(B) ->
case binary:matches(B, <<"_rsid_">>) of
[] -> false;
_ -> true
end.

%% check whether to add/delete indexes on repl recv
recv(Object) ->
B = riak_object:bucket(Object),
K = riak_object:key(Object),
{ok, C} = riak:local_client(),
SC = riak_search_client:new(C),
case B of
<<"_rsid_", Idx/binary>> ->
case riak_kv_util:is_x_deleted(Object) of
true ->
lager:debug("Incoming deleted proxy obj ~p/~p", [B, K]),
riak_indexed_doc:remove_entries(C, SC, Idx, K),
ok;
false ->
lager:debug("Incoming proxy obj ~p/~p", [B, K]),
IdxDoc = riak_object:get_value(Object),
riak_indexed_doc:remove_entries(C, SC, Idx, K),
Postings = riak_indexed_doc:postings(IdxDoc),
SC:index_terms(Postings),
ok
end;
_ ->
ok
end.

0 comments on commit 92ecfe0

Please sign in to comment.