Skip to content

Commit

Permalink
incremental kind of working for inserts
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Dionne committed Dec 2, 2009
1 parent 56474e2 commit dd906cd
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 12 deletions.
20 changes: 19 additions & 1 deletion indexer.erl
Expand Up @@ -86,10 +86,28 @@ worker(Pid) ->
done ->
%% what we need to do here is go into polling mode
%% and start polling for new updates to the db
true
poll_for_changes(Pid)
end
end.

poll_for_changes(Pid) ->
case possibly_stop(Pid) of
done ->
ok;
void ->
?LOG(?DEBUG, "polling for changes again ~n",[]),
{Inserts, _Updates, _Deletes, LastSeq} = indexer_server:get_changes(Pid),
%% first do inserts
index_these_docs(Pid,Inserts),
?LOG(?INFO, "indexed another ~w ~n", [length(Inserts)]),
%% then deletes
%% then updates
indexer_server:checkpoint(Pid,changes,LastSeq),
sleep(60000),
poll_for_changes(Pid)
end.


possibly_stop(Pid) ->
case indexer_server:should_i_stop(Pid) of
true ->
Expand Down
65 changes: 56 additions & 9 deletions indexer_couchdb_crawler.erl
Expand Up @@ -10,7 +10,16 @@
-module(indexer_couchdb_crawler).
%%
%%
-export([start/2, next/1, get_doc_infos/2, db_exists/1, store_chkp/3, get_changes_since/2, lookup_doc/2, lookup_indices/2, write_indices/3]).
-export([start/2,
next/1,
db_exists/1,
store_chkp/3,
read_last_seq/1,
write_last_seq/2,
get_changes_since/2,
lookup_doc/2,
lookup_indices/2,
write_indices/3]).

-include("../couchdb/src/couchdb/couch_db.hrl").
-include("indexer.hrl").
Expand All @@ -19,7 +28,9 @@

start(DbName, [{reset, DbIndexName}]) ->
hovercraft:delete_db(DbIndexName),
hovercraft:create_db(DbIndexName),
hovercraft:create_db(DbIndexName),
{ok, #db{update_seq=LastSeq}} = hovercraft:open_db(DbName),
write_last_seq(DbIndexName, LastSeq),
{DbName, 0}.

next({DbName, StartId}) ->
Expand Down Expand Up @@ -50,17 +61,34 @@ open_by_id_btree(DbName) ->
IdBtree.


get_doc_infos(DbName, Ids) ->
IdBtree = open_by_id_btree(DbName),
{ok, Docs, _} = couch_btree:query_modify(IdBtree, Ids, [], []),
Docs.

get_changes_since(DbName, SeqNum) ->
{ok, #db{update_seq=LastSeq}=Db} = hovercraft:open_db(DbName),
?LOG(?DEBUG,"last update sequences id is: ~p ~n",[LastSeq]),
couch_db:changes_since(Db, all_docs, SeqNum, fun(DocInfos, Acc) ->
{ok, DocInfos} = couch_db:changes_since(Db, all_docs, SeqNum, fun(DocInfos, Acc) ->
{ok, lists:append(Acc, DocInfos)} end,
[],[]).
[],[]),
{InsIds, UpdIds, DelIds} = lists:foldl(fun(DocInfo, {Inserts, Updates, Deletes}) ->
{doc_info, Id, _, [{rev_info,{Rev,_},_,Deleted,_}]}=DocInfo,
case Rev of
1 -> {[Id | Inserts],Updates, Deletes};
_ -> case Deleted of
true -> {Inserts, Updates, [Id | Deletes]};
_ -> {Inserts, [Id | Updates], Deletes}
end
end
end,{[],[],[]},DocInfos),
{get_docs(InsIds, DbName), UpdIds, DelIds, LastSeq}.

get_docs(DocIdList, DbName) ->
lists:map(fun(Id) ->
{ok, Doc} = lookup_doc(Id, DbName),
Doc
end,
DocIdList).






get_all_docs(DbName, Options) ->
Expand Down Expand Up @@ -121,6 +149,25 @@ store_chkp(DocId, B, DbName) ->
{<<"chkp">>, B}]},
hovercraft:save_doc(DbName, NewDoc)
end.

write_last_seq(DbName, LastSeq) ->
NewDoc =
case lookup_doc(<<"last_seq">>, DbName) of
{ok, Doc} ->
Props = element(1, Doc),
NewProps = proplists:delete(<<"value">>, Props),
{lists:append(NewProps,
[{<<"value">>,LastSeq}] )};
not_found ->
{[{<<"_id">>, <<"last_seq">>},
{<<"value">>, LastSeq}]}
end,
hovercraft:save_doc(DbName, NewDoc).

read_last_seq(DbName) ->
{ok, Doc} = lookup_doc(<<"last_seq">>, DbName),
proplists:get_value(<<"value">>,element(1,Doc)).



lookup_indices(Word, DbName) ->
Expand Down
16 changes: 14 additions & 2 deletions indexer_server.erl
Expand Up @@ -12,8 +12,10 @@
-module(indexer_server).

-export([next_docs/1,
get_changes/1,
ets_table/1,
checkpoint/1,
checkpoint/3,
schedule_stop/1,
search/2,
write_index/3,
Expand All @@ -38,10 +40,14 @@ should_i_stop(Pid) ->
stop(Pid) ->
gen_server:cast(Pid, stop).



next_docs(Pid) -> gen_server:call(Pid, next_docs, infinity).
get_changes(Pid) ->
gen_server:call(Pid, changes, infinity).

checkpoint(Pid) -> gen_server:call(Pid, checkpoint).
checkpoint(Pid, changes, LastSeq) ->
gen_server:call(Pid, {checkpoint, changes, LastSeq}).

ets_table(Pid) -> gen_server:call(Pid, ets_table).

search(Pid, Str) -> gen_server:call(Pid, {search, Str}).
Expand Down Expand Up @@ -91,13 +97,19 @@ handle_call(next_docs, _From, S) ->
done ->
{reply, done, S}
end;
handle_call(changes, _From, S) ->
LastSeq = indexer_couchdb_crawler:read_last_seq(S#env.idx),
{reply, indexer_couchdb_crawler:get_changes_since(S#env.dbnam, LastSeq), S};
handle_call(checkpoint, _From, S) ->
Next = S#env.nextCP,
DbIndexName = S#env.idx,
Next1 = indexer_checkpoint:checkpoint(Next, {DbIndexName, S#env.chkp}),
?LOG(?DEBUG, "the next checkpoint is ~p ~n",[Next1]),
S1 = S#env{nextCP = Next1, cont=S#env.chkp},
{reply, ok, S1};
handle_call({checkpoint, changes, LastSeq}, _From, S) ->
indexer_couchdb_crawler:write_last_seq(S#env.idx, LastSeq),
{reply, ok, S};
handle_call(schedule_stop, _From, S) ->
case S#env.chkp of
{_, done} -> {reply, norun, S};
Expand Down

0 comments on commit dd906cd

Please sign in to comment.