From dd906cd20883f32f1ca9c9d9e4ec10a767068bfa Mon Sep 17 00:00:00 2001 From: Robert Dionne Date: Wed, 2 Dec 2009 12:28:36 -0500 Subject: [PATCH] incremental kind of working for inserts --- indexer.erl | 20 +++++++++++- indexer_couchdb_crawler.erl | 65 ++++++++++++++++++++++++++++++++----- indexer_server.erl | 16 +++++++-- 3 files changed, 89 insertions(+), 12 deletions(-) diff --git a/indexer.erl b/indexer.erl index c1c5460..04bd592 100644 --- a/indexer.erl +++ b/indexer.erl @@ -86,10 +86,28 @@ worker(Pid) -> done -> %% what we need to do here is go into polling mode %% and start polling for new updates to the db - true + poll_for_changes(Pid) end end. +poll_for_changes(Pid) -> + case possibly_stop(Pid) of + done -> + ok; + void -> + ?LOG(?DEBUG, "polling for changes again ~n",[]), + {Inserts, _Updates, _Deletes, LastSeq} = indexer_server:get_changes(Pid), + %% first do inserts + index_these_docs(Pid,Inserts), + ?LOG(?INFO, "indexed another ~w ~n", [length(Inserts)]), + %% then deletes + %% then updates + indexer_server:checkpoint(Pid,changes,LastSeq), + sleep(60000), + poll_for_changes(Pid) + end. + + possibly_stop(Pid) -> case indexer_server:should_i_stop(Pid) of true -> diff --git a/indexer_couchdb_crawler.erl b/indexer_couchdb_crawler.erl index aa951fc..2260822 100644 --- a/indexer_couchdb_crawler.erl +++ b/indexer_couchdb_crawler.erl @@ -10,7 +10,16 @@ -module(indexer_couchdb_crawler). %% %% --export([start/2, next/1, get_doc_infos/2, db_exists/1, store_chkp/3, get_changes_since/2, lookup_doc/2, lookup_indices/2, write_indices/3]). +-export([start/2, + next/1, + db_exists/1, + store_chkp/3, + read_last_seq/1, + write_last_seq/2, + get_changes_since/2, + lookup_doc/2, + lookup_indices/2, + write_indices/3]). -include("../couchdb/src/couchdb/couch_db.hrl"). -include("indexer.hrl"). @@ -19,7 +28,9 @@ start(DbName, [{reset, DbIndexName}]) -> hovercraft:delete_db(DbIndexName), - hovercraft:create_db(DbIndexName), + hovercraft:create_db(DbIndexName), + {ok, #db{update_seq=LastSeq}} = hovercraft:open_db(DbName), + write_last_seq(DbIndexName, LastSeq), {DbName, 0}. next({DbName, StartId}) -> @@ -50,17 +61,34 @@ open_by_id_btree(DbName) -> IdBtree. -get_doc_infos(DbName, Ids) -> - IdBtree = open_by_id_btree(DbName), - {ok, Docs, _} = couch_btree:query_modify(IdBtree, Ids, [], []), - Docs. - get_changes_since(DbName, SeqNum) -> {ok, #db{update_seq=LastSeq}=Db} = hovercraft:open_db(DbName), ?LOG(?DEBUG,"last update sequences id is: ~p ~n",[LastSeq]), - couch_db:changes_since(Db, all_docs, SeqNum, fun(DocInfos, Acc) -> + {ok, DocInfos} = couch_db:changes_since(Db, all_docs, SeqNum, fun(DocInfos, Acc) -> {ok, lists:append(Acc, DocInfos)} end, - [],[]). + [],[]), + {InsIds, UpdIds, DelIds} = lists:foldl(fun(DocInfo, {Inserts, Updates, Deletes}) -> + {doc_info, Id, _, [{rev_info,{Rev,_},_,Deleted,_}]}=DocInfo, + case Rev of + 1 -> {[Id | Inserts],Updates, Deletes}; + _ -> case Deleted of + true -> {Inserts, Updates, [Id | Deletes]}; + _ -> {Inserts, [Id | Updates], Deletes} + end + end + end,{[],[],[]},DocInfos), + {get_docs(InsIds, DbName), UpdIds, DelIds, LastSeq}. + +get_docs(DocIdList, DbName) -> + lists:map(fun(Id) -> + {ok, Doc} = lookup_doc(Id, DbName), + Doc + end, + DocIdList). + + + + get_all_docs(DbName, Options) -> @@ -121,6 +149,25 @@ store_chkp(DocId, B, DbName) -> {<<"chkp">>, B}]}, hovercraft:save_doc(DbName, NewDoc) end. + +write_last_seq(DbName, LastSeq) -> + NewDoc = + case lookup_doc(<<"last_seq">>, DbName) of + {ok, Doc} -> + Props = element(1, Doc), + NewProps = proplists:delete(<<"value">>, Props), + {lists:append(NewProps, + [{<<"value">>,LastSeq}] )}; + not_found -> + {[{<<"_id">>, <<"last_seq">>}, + {<<"value">>, LastSeq}]} + end, + hovercraft:save_doc(DbName, NewDoc). + +read_last_seq(DbName) -> + {ok, Doc} = lookup_doc(<<"last_seq">>, DbName), + proplists:get_value(<<"value">>,element(1,Doc)). + lookup_indices(Word, DbName) -> diff --git a/indexer_server.erl b/indexer_server.erl index 9e4cd18..07cb921 100644 --- a/indexer_server.erl +++ b/indexer_server.erl @@ -12,8 +12,10 @@ -module(indexer_server). -export([next_docs/1, + get_changes/1, ets_table/1, checkpoint/1, + checkpoint/3, schedule_stop/1, search/2, write_index/3, @@ -38,10 +40,14 @@ should_i_stop(Pid) -> stop(Pid) -> gen_server:cast(Pid, stop). - - next_docs(Pid) -> gen_server:call(Pid, next_docs, infinity). +get_changes(Pid) -> + gen_server:call(Pid, changes, infinity). + checkpoint(Pid) -> gen_server:call(Pid, checkpoint). +checkpoint(Pid, changes, LastSeq) -> + gen_server:call(Pid, {checkpoint, changes, LastSeq}). + ets_table(Pid) -> gen_server:call(Pid, ets_table). search(Pid, Str) -> gen_server:call(Pid, {search, Str}). @@ -91,6 +97,9 @@ handle_call(next_docs, _From, S) -> done -> {reply, done, S} end; +handle_call(changes, _From, S) -> + LastSeq = indexer_couchdb_crawler:read_last_seq(S#env.idx), + {reply, indexer_couchdb_crawler:get_changes_since(S#env.dbnam, LastSeq), S}; handle_call(checkpoint, _From, S) -> Next = S#env.nextCP, DbIndexName = S#env.idx, @@ -98,6 +107,9 @@ handle_call(checkpoint, _From, S) -> ?LOG(?DEBUG, "the next checkpoint is ~p ~n",[Next1]), S1 = S#env{nextCP = Next1, cont=S#env.chkp}, {reply, ok, S1}; +handle_call({checkpoint, changes, LastSeq}, _From, S) -> + indexer_couchdb_crawler:write_last_seq(S#env.idx, LastSeq), + {reply, ok, S}; handle_call(schedule_stop, _From, S) -> case S#env.chkp of {_, done} -> {reply, norun, S};