Skip to content
Browse files

add the possibility to pass a callback instead of a pid to couch_chan…

…ges:stream
  • Loading branch information...
1 parent 9e5f3eb commit 06b001c345f4c015bcd0c897207832afa1e3a19b @benoitc committed
Showing with 90 additions and 22 deletions.
  1. +1 −1 examples/changes.ebin
  2. +28 −0 examples/changes_cb.ebin
  3. +61 −21 src/couchbeam_changes.erl
View
2 examples/changes.ebin
@@ -8,7 +8,7 @@ main(_) ->
couchbeam:start(),
Server = couchbeam:server_connection(),
{ok, Db} = couchbeam:open_or_create_db(Server, "testdb"),
- {ok, StartRef, ChangesPid} = couchbeam_changes:stream(Db, self(), []),
+ {ok, StartRef, ChangesPid} = couchbeam_changes:stream(Db, self(), [continuous, heartbeat]),
io:format("StartRef ~p~n", [ChangesPid]),
get_changes(StartRef).
View
28 examples/changes_cb.ebin
@@ -0,0 +1,28 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+%%! -pa ./ebin
+
+-module(changes_cb).
+
+-export([changes_cb/1]).
+
+changes_cb({done, LastSeq}) ->
+ io:format("stopped, last seq is ~p~n", [LastSeq]);
+changes_cb({change, Row}) ->
+ io:format("change row ~p ~n", [Row]);
+changes_cb({error, LastSeq, Error}) ->
+ io:format("error ? ~p ~n, last seq: ~p~n", [Error, LastSeq]),
+ self() ! shoot.
+
+main(_) ->
+ couchbeam:start(),
+ Server = couchbeam:server_connection(),
+ {ok, Db} = couchbeam:open_or_create_db(Server, "testdb"),
+
+ Cb = fun(V) -> changes_cb(V) end,
+ {ok, ChangesPid} = couchbeam_changes:stream(Db, Cb, [continuous, heartbeat]),
+ io:format("ChangesPid ~p~n", [ChangesPid]),
+ receive shoot -> ok end.
+
+
+
View
82 src/couchbeam_changes.erl
@@ -26,12 +26,15 @@
stream(Db, Client) ->
stream(Db, Client, []).
--spec stream(Db::db(), ClientPid::pid(),
- Options::changes_options()) -> {ok, StartRef::term(),
- ChangesPid::pid()} | {error, term()}.
+
+
+-spec stream(Db::db() | function(), Client::pid() | function(),
+ Options::changes_options()) -> {ok, StartRef::term(), ChangesPid::pid()} |
+ {ok, ChangesPid::pid()} | {error, term()}.
%% @doc Stream changes to a pid
%% <p>Db : a db record</p>
-%% <p>ClientPid : pid where to send changes events where events are
+%% <p>Client : pid or callback where to send changes events where events are
+%% The pid receive these events:
%% <dl>
%% <dt>{change, StartRef, {done, Lastseq::integer()}</dt>
%% <dd>Connection terminated or you got all changes</dd>
@@ -42,6 +45,14 @@ stream(Db, Client) ->
%% happend.</dd>
%% </dl>
%% LastSeq is the last sequence of changes.</p>
+%% While the callbac could be like:
+%%
+%% fun({done, LastSeq}) ->
+%% ok;
+%% fun({done, LastSeq}) ->
+%% ok;
+%% fun({done, LastSeq}) ->
+%% ok.
%% <p>ChangesOptions :: changes_options() [continuous | longpoll | normal
%% | include_docs | {since, integer()}
%% | {timeout, integer()}
@@ -64,13 +75,10 @@ stream(Db, Client) ->
%% used to disctint all changes from this pid. ChangesPid is the pid of
%% the changes loop process. Can be used to monitor it or kill it
%% when needed.</p>
-stream(#db{server=Server, options=IbrowseOpts}=Db,
- ClientPid, Options) ->
- Args = parse_changes_options(Options),
- Url = couchbeam:make_url(Server, [couchbeam:db_url(Db), "/_changes"],
- Args#changes_args.http_options),
- StartRef = make_ref(),
+
+stream(Db, ClientPid, Options) when is_pid(ClientPid) ->
+ StartRef = make_ref(),
UserFun = fun
(done) ->
LastSeq = get(last_seq),
@@ -85,19 +93,25 @@ stream(#db{server=Server, options=IbrowseOpts}=Db,
Seq = couchbeam_doc:get_value(<<"seq">>, Row),
put(last_seq, Seq)
end,
-
- Params = {Url, IbrowseOpts},
+ do_stream(Db, UserFun, Options, StartRef);
- ChangesPid = spawn_link(couchbeam_changes, changes_loop,
- [Args, UserFun, Params]),
+stream(Db, Fun, Options) ->
+ UserFun = fun
+ (done) ->
+ LastSeq = get(last_seq),
+ Fun({done, LastSeq});
+ ({error, Error}) ->
+ LastSeq = get(last_seq),
+ Fun({error, LastSeq, Error});
+ ({[{<<"last_seq">>, _}]}) ->
+ ok;
+ (Row) ->
+ Fun({change, Row}),
+ Seq = couchbeam_doc:get_value(<<"seq">>, Row),
+ put(last_seq, Seq)
+ end,
+ do_stream(Db, UserFun, Options).
- case couchbeam_httpc:request_stream({ChangesPid, once}, get, Url, IbrowseOpts) of
- {ok, ReqId} ->
- ChangesPid ! {ibrowse_req_id, ReqId},
- {ok, StartRef, ChangesPid};
- Error ->
- Error
- end.
-spec fetch(Db::db()) -> {ok, LastSeq::integer(), Rows::list()} | {error,
LastSeq::integer(), Error::term()}.
@@ -218,6 +232,32 @@ changes_loop(Args, UserFun, Params) ->
end.
%% @private
+do_stream(Db, UserFun, Options) ->
+ do_stream(Db, UserFun, Options, nil).
+
+do_stream(#db{server=Server, options=IbrowseOpts}=Db, UserFun, Options,
+ StartRef) ->
+ Args = parse_changes_options(Options),
+ Url = couchbeam:make_url(Server, [couchbeam:db_url(Db), "/_changes"],
+ Args#changes_args.http_options),
+
+ Params = {Url, IbrowseOpts},
+
+ ChangesPid = spawn_link(couchbeam_changes, changes_loop,
+ [Args, UserFun, Params]),
+
+ case couchbeam_httpc:request_stream({ChangesPid, once}, get, Url, IbrowseOpts) of
+ {ok, ReqId} ->
+ ChangesPid ! {ibrowse_req_id, ReqId},
+ case StartRef of
+ nil ->
+ {ok, ChangesPid};
+ _ ->
+ {ok, StartRef, ChangesPid}
+ end;
+ Error ->
+ Error
+ end.
collect_changes(Ref, Acc) ->
receive

0 comments on commit 06b001c

Please sign in to comment.
Something went wrong with that request. Please try again.