Permalink
Browse files

We now parse the json by chunks instead of using our try catch over e…

…json:decode_json . More efficient. While I'm here add support for changes loop monitoring in gen_changes and restart connection if it's stopped.

New api is now in couchbeam_changes module:

- fetch/1, fetch/2
- stream/2, stream/3

Old functions in couchbeam module has been deprecated (changes, changes_wait, changes_wait_once).
  • Loading branch information...
1 parent b73646b commit 5eecf57cec6be2e293b864a38b20b283262514c3 @benoitc committed Jul 4, 2011
View
@@ -4,6 +4,9 @@ ebin
edoc-info
.DS_Store
deps/
-doc/api
+doc/*.html
+doc/*.css
+doc/erlang.png
+doc/edoc-info
t/*.beam
examples/*.beam
View
@@ -13,8 +13,7 @@ deps:
@./rebar get-deps
doc:
- @mkdir -p doc/api
- @$(ERL) -noshell -run edoc_run application '$(APP)' '"."' '[{preprocess, true},{includes, ["."]}, {dir, "./doc/api"}]'
+ @rebar doc
test: all
@@ -32,7 +31,7 @@ cover: all
clean:
@./rebar clean
@rm -f t/*.beam
- @rm -rf doc/api
+ @rm -f doc/*.html doc/*.css doc/edoc-info doc/*.png
distclean: clean
@./rebar delete-deps
View
7 NOTICE
@@ -37,3 +37,10 @@ CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+couchbeam_json_stream
+---------------------
+
+Copyright 2011 The Apache Fundation
+From couchdb/json_stream_parse.erl
+
View
@@ -265,13 +265,13 @@ ok = couchbeam:delete_attachment(Db, Doc4, AttName).
<h3>9. Changes</h3>
CouchDB provides a means to get a list of changes made to documents in
-the database. With couchbeam you can get changes using {@link couchbeam:changes/2}.
+the database. With couchbeam you can get changes using {@link couchbeam_changes:fetch/2}.
This function returns all changes immediately. But you can also retrieve
-all changes using longpolling :
+all changes rows using longpolling :
```
Options = [],
-{ok, Changes} = couchbeam:changes_wait_once(Db, Options).
+{ok, LastSeq, Rows} = couchbeam_changes:fetch(Db, Options).
'''
Options can be any Changes query parameters. See
@@ -283,19 +283,20 @@ You can also get <a href="http://wiki.apache.org/couchdb/HTTP_database_API#Conti
```
ChangesFun = fun(ReqId, F) ->
receive
- {ReqId, done} ->
+ {change, StartRef, {done, LastSeq}} ->
+ io:format("stopped, last seq is ~p~n", [LastSeq]),
ok;
- {ReqId, {change, Change}} ->
- io:format("change ~p ~n", [Change]),
- F(ReqId, F);
- {ReqId, {error, E}}->
- io:format("error ? ~p ~n", [E])
- end
+ {change, StartRef, Row} ->
+ io:format("change row ~p ~n", [Row]),
+ F(StartRef, F);
+ {error, StartRef, LastSeq, Error}->
+ io:format("error ? ~p ~n, last seq: ~p~n", [Error, LastSeq])
+ end
end,
Pid = self(),
-Options = [{heartbeat, "true"}],
-{ok, ReqId} = couchbeam:changes_wait(Db, Pid, Options),
-ChangesFun(ReqId, ChangesFun).
+Options = [continuous, heartbeat],
+{ok, StartRef, _ChangePid} = couchbeam_changes:stream(Db, Pid, Options),
+ChangesFun(StartRef, ChangesFun).
'''
<h3>Authentication/ Connections options</h3>
View
@@ -8,20 +8,21 @@ main(_) ->
couchbeam:start(),
Server = couchbeam:server_connection(),
{ok, Db} = couchbeam:open_or_create_db(Server, "testdb"),
- {ok, ReqId} = couchbeam:changes_wait(Db, self(), [{heartbeat, "true"}]),
- io:format("StartRef ~p~n", [ReqId]),
- get_changes(ReqId).
+ {ok, StartRef, ChangesPid} = couchbeam_changes:stream(Db, self(), []),
+ io:format("StartRef ~p~n", [ChangesPid]),
+ get_changes(StartRef).
-get_changes(ReqId) ->
+get_changes(StartRef) ->
receive
- {ReqId, done} ->
+ {change, StartRef, {done, LastSeq}} ->
+ io:format("stopped, last seq is ~p~n", [LastSeq]),
ok;
- {ReqId, {change, Change}} ->
- io:format("change ~p ~n", [Change]),
- get_changes(ReqId);
- {ReqId, {error, E}}->
- io:format("error ? ~p ~n", [E])
+ {change, StartRef, Row} ->
+ io:format("change row ~p ~n", [Row]),
+ get_changes(StartRef);
+ {error, StartRef, LastSeq, Error}->
+ io:format("error ? ~p ~n, last seq: ~p~n", [Error, LastSeq])
end.
@@ -29,12 +29,16 @@ init([]) ->
get_changes(Pid) ->
gen_changes:call(Pid, get_changes).
+handle_change({done, _LastSeq}, State) ->
+ {noreply, State};
+
+
handle_change(Change, State=#state{changes=Changes}) ->
NewChanges = [Change|Changes],
{noreply, State#state{changes=NewChanges}}.
handle_call(get_changes, _From, State=#state{changes=Changes}) ->
- {reply, Changes, State}.
+ {reply, lists:reverse(Changes), State}.
handle_cast(_Msg, State) -> {noreply, State}.
View
@@ -24,6 +24,45 @@
%% -type boolean() :: bool()
+
+-type db_name() :: binary() | string().
+-type docid() :: binary() | string().
+
+-type ejson() :: ejson_object() | ejson_array().
+
+-type ejson_array() :: [ejson_term()].
+-type ejson_object() :: {[{ejson_key(), ejson_term()}]}.
+
+-type ejson_key() :: binary() | atom().
+
+-type ejson_term() :: ejson_array()
+ | ejson_object()
+ | ejson_string()
+ | ejson_number()
+ | true | false | null.
+
+-type ejson_string() :: binary().
+
+-type ejson_number() :: float() | integer().
+
+-type changes_option() :: continuous | longpoll | normal
+ | include_docs | {since, integer()}
+ | {timeout, integer()}
+ | heartbeat | {heartbeat, integer()}
+ | {filter, string()} | {filter, string(), list({string(), string() | integer()}
+)}
+ | conflicts | {style, string()} | descending.
+-type changes_options() :: list(changes_option()).
+
+-type changes_option1() :: longpoll | normal
+ | include_docs | {since, integer()}
+ | {timeout, integer()}
+ | heartbeat | {heartbeat, integer()}
+ | {filter, string()} | {filter, string(), list({string(), string() | integer()}
+)}
+ | conflicts | {style, string()} | descending.
+-type changes_options1() :: list(changes_option1()).
+
-record(server, {
host :: string(),
port :: integer(),
@@ -60,17 +99,21 @@
-type view() :: #view{}.
+-record(changes_args, {
+ type = normal,
+ http_options = []}).
+-type changes_args() :: #changes_args{}.
+
-record(gen_changes_state, {
- req_id,
+ start_ref,
+ changes_pid,
+ last_seq=0,
mod,
modstate,
- seq,
db,
- options,
- partial_chunk = <<"">>,
- row,
- complete=false
-}).
+ options}).
-define(USER_AGENT, "couchbeam/0.5.0").
+-define(DEPRECATED(Old, New, When),
+ couchbeam_util:deprecated(Old, New, When)).
View
@@ -15,5 +15,5 @@
%% ibrowse for doing HTTP requests
{ibrowse, ".*", {git, "git://github.com/cmullaparthi/ibrowse.git",
- "master"}}
+ {branch, "master"}}}
]}.
View
@@ -806,6 +806,7 @@ compact(#db{server=Server, options=IbrowseOpts}=Db, DesignName) ->
%% instead use ```changes_wait_once''' or continuous, use
%% ```wait_once'''
%% @equiv changes(Db, [])
+%% @deprecated Use {@link couchbeam_changes:fetch/1} instead.
changes(Db) ->
changes(Db, []).
@@ -820,7 +821,11 @@ changes(Db) ->
%% {filter, string()} |
%% {since, integer()|string()} |
%% {heartbeat, string()|boolean()}
+%% @deprecated Use {@link couchbeam_changes:fetch/2} instead.
changes(#db{server=Server, options=IbrowseOpts}=Db, Options) ->
+ ?DEPRECATED(<<"couchbeam:changes and couchbeam:changes_wait_once">>,
+ <<"couchbeam_changes:fetch">>,
+ <<"in version 0.8">>),
Url = make_url(Server, [db_url(Db), "/_changes"], Options),
case couchbeam_httpc:request_stream({self(), once}, get, Url, IbrowseOpts) of
{ok, ReqId} ->
@@ -830,16 +835,19 @@ changes(#db{server=Server, options=IbrowseOpts}=Db, Options) ->
%% @doc wait for longpoll changes
%% @equiv changes_wait_once(Db, [])
+%% @deprecated Use {@link couchbeam_changes:fetch/1} instead.
changes_wait_once(Db) ->
changes_wait_once(Db, []).
%% @doc wait for longpoll changes
+%% @deprecated Use {@link couchbeam_changes:fetch/2} instead.
changes_wait_once(Db, Options) ->
Options1 = [{"feed", "longpoll"}|Options],
changes(Db, Options1).
%% @doc wait for continuous changes
%% @equiv changes_wait(Db, ClientPid, [])
+%% @deprecated Use {@link couchbeam_changes:stream/2} instead.
changes_wait(Db, ClientPid) ->
changes_wait(Db, ClientPid, []).
@@ -856,8 +864,12 @@ changes_wait(Db, ClientPid) ->
%% <dt>{error, term()}</dt>
%% <dd>n error occurred</dd>
%% </dl>
-%% @spec changes_wait(Db::db(), Pid::pid(), Options::changeoptions()) -> term()
+%% @spec changes_wait(Db::db(), Pid::pid(), Options::changeoptions()) -> term()
+%% @deprecated Use {@link couchbeam_changes:stream/3} instead.
changes_wait(#db{server=Server, options=IbrowseOpts}=Db, ClientPid, Options) ->
+ ?DEPRECATED(<<"couchbeam:changes_wait">>,
+ <<"couchbeam_changes:stream">>,
+ <<"in version 0.8">>),
Options1 = [{"feed", "continuous"}|Options],
Url = make_url(Server, [db_url(Db), "/_changes"], Options1),
StartRef = make_ref(),
Oops, something went wrong.

0 comments on commit 5eecf57

Please sign in to comment.