Skip to content
Browse files

EventSource for _changes

  • Loading branch information...
1 parent 5234a7e commit a152fb950bff92bc279a81570d96cb77cd71e48b @indutny committed Dec 14, 2010
Showing with 64 additions and 4 deletions.
  1. +25 −1 src/couchdb/couch_changes.erl
  2. +16 −0 src/couchdb/couch_httpd.erl
  3. +23 −3 src/couchdb/couch_httpd_db.erl
View
26 src/couchdb/couch_changes.erl
@@ -26,7 +26,9 @@ handle_changes(#changes_args{style=Style}=Args1, Req, Db) ->
fwd ->
Args#changes_args.since
end,
- if Feed == "continuous" orelse Feed == "longpoll" ->
+
+ if Feed == "continuous" orelse Feed == "longpoll" orelse
+ Feed == "eventsource" ->
fun(CallbackAcc) ->
{Callback, UserAcc} = get_callback_acc(CallbackAcc),
Self = self(),
@@ -190,6 +192,8 @@ get_changes_timeout(Args, Callback) ->
fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
end.
+start_sending_changes(_Callback, UserAcc, "eventsource") ->
+ UserAcc;
start_sending_changes(_Callback, UserAcc, "continuous") ->
UserAcc;
start_sending_changes(Callback, UserAcc, ResponseType) ->
@@ -259,6 +263,26 @@ end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
Callback({stop, EndSeq}, ResponseType, UserAcc).
changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, UserAcc,
+ "eventsource", Limit, IncludeDocs}) ->
+
+ #doc_info{id=Id, high_seq=Seq,
+ revs=[#rev_info{deleted=Del,rev=Rev}|_]} = DocInfo,
+ Results0 = FilterFun(DocInfo),
+ Results = [Result || Result <- Results0, Result /= null],
+ Go = if Limit =< 1 -> stop; true -> ok end,
+ case Results of
+ [] ->
+ {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc, "eventsource", Limit,
+ IncludeDocs}
+ };
+ _ ->
+ ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs),
+ UserAcc2 = Callback({change, ChangesRow, <<>>}, "eventsource", UserAcc),
+ {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc2, "eventsource",
+ Limit - 1, IncludeDocs}
+ }
+ end;
+changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, UserAcc,
"continuous", Limit, IncludeDocs}) ->
#doc_info{id=Id, high_seq=Seq,
View
16 src/couchdb/couch_httpd.erl
@@ -24,6 +24,7 @@
-export([primary_header_value/2,partition/1,serve_file/3,serve_file/4, server_header/0]).
-export([start_chunked_response/3,send_chunk/2,log_request/2]).
-export([start_response_length/4, start_response/3, send/2]).
+-export([start_eventstream_response/2, start_eventstream_response/3, end_eventstream_response/1]).
-export([start_json_response/2, start_json_response/3, end_json_response/1]).
-export([send_response/4,send_method_not_allowed/2,send_error/4, send_redirect/2,send_chunked_error/2]).
-export([send_json/2,send_json/3,send_json/4,last_chunk/1,parse_multipart_request/3]).
@@ -619,6 +620,17 @@ send_json(Req, Code, Headers, Value) ->
),
send_response(Req, Code, DefaultHeaders ++ Headers, Body).
+start_eventstream_response(Req, Code) ->
+ start_eventstream_response(Req, Code, []).
+
+start_eventstream_response(Req, Code, Headers) ->
+ DefaultHeaders = [
+ {"Content-Type", "text/event-stream"},
+ {"Cache-Control", "no-cache"}
+ ],
+ %start_chunked_response(Req, Code, DefaultHeaders ++ Headers).
+ start_chunked_response(Req, Code, DefaultHeaders ++ Headers).
+
start_json_response(Req, Code) ->
start_json_response(Req, Code, []).
@@ -636,6 +648,10 @@ start_json_response(Req, Code, Headers) ->
end,
{ok, Resp}.
+end_eventstream_response(Resp) ->
+ send_chunk(Resp, [$\n]),
+ last_chunk(Resp).
+
end_json_response(Resp) ->
send_chunk(Resp, end_jsonp() ++ [$\n]),
last_chunk(Resp).
View
26 src/couchdb/couch_httpd_db.erl
@@ -21,7 +21,8 @@
-import(couch_httpd,
[send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,
send_response/4,start_json_response/2,start_json_response/3,
- send_chunk/2,last_chunk/1,end_json_response/1,
+ start_eventstream_response/2,start_eventstream_response/3,
+ send_chunk/2,last_chunk/1,end_json_response/1,end_eventstream_response/1,
start_chunked_response/3, absolute_uri/2, send/2,
start_response_length/4]).
@@ -67,12 +68,20 @@ handle_changes_req1(Req, Db) ->
MakeCallback = fun(Resp) ->
fun({change, Change, _}, "continuous") ->
send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]);
+ ({change, {ChangeProp}=Change, _}, "eventsource") ->
+ Seq = proplists:get_value(<<"seq">>, ChangeProp),
+ send_chunk(Resp, "data: " ++ ?JSON_ENCODE(Change) ++ "\n" ++
+ "id: " ++ ?JSON_ENCODE(Seq) ++ "\n\n");
({change, Change, Prepend}, _) ->
send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]);
+ (start, "eventsource") ->
+ ok;
(start, "continuous") ->
ok;
(start, _) ->
send_chunk(Resp, "{\"results\":[\n");
+ ({stop, _}, "eventsource") ->
+ end_eventstream_response(Resp);
({stop, EndSeq}, "continuous") ->
send_chunk(
Resp,
@@ -90,11 +99,17 @@ handle_changes_req1(Req, Db) ->
end
end,
ChangesArgs = parse_changes_query(Req),
- ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db),
+ ChangesArgs1 = case ChangesArgs#changes_args.feed of
+ "eventsource" ->
+ ChangesArgs#changes_args{since=list_to_integer(couch_httpd:header_value(Req, "Last-Event-ID", integer_to_list(ChangesArgs#changes_args.since)))};
+ _ ->
+ ChangesArgs
+ end,
+ ChangesFun = couch_changes:handle_changes(ChangesArgs1, Req, Db),
WrapperFun = case ChangesArgs#changes_args.feed of
"normal" ->
{ok, Info} = couch_db:get_db_info(Db),
- CurrentEtag = couch_httpd:make_etag(Info),
+ CurrentEtag = couch_httpd:make_etag(Info),
@janl
janl added a note May 6, 2012

trailing whitespace

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
fun(FeedChangesFun) ->
couch_httpd:etag_respond(
Req,
@@ -107,6 +122,11 @@ handle_changes_req1(Req, Db) ->
end
)
end;
+ "eventsource" ->
+ {ok, Resp} = couch_httpd:start_eventstream_response(Req, 200),
+ fun(FeedChangesFun) ->
+ FeedChangesFun(MakeCallback(Resp))
+ end;
_ ->
% "longpoll" or "continuous"
{ok, Resp} = couch_httpd:start_json_response(Req, 200),

8 comments on commit a152fb9

@tejohnso

What happened with this? Is it abandoned?

@jchris
jchris commented on a152fb9 Apr 16, 2012

I want to see this also!

@indutny
Owner

and me too, should I check if that can be merged to master?

@jchris
jchris commented on a152fb9 Apr 17, 2012

The only other thing missing from this patch is a test. I'd prefer the JavaScript test suite, as that will prove this really works in browsers.

Once it has that, the code itself looks clean, let's get it in for 1.3 :)

@benoitc
benoitc commented on a152fb9 May 2, 2012

continuous & eventsource share a lot of codes. I think it worth to reuse it there (like i said 1 year ago) . Also a test is needed yes :)

@janl
janl commented on a152fb9 May 2, 2012

@jchris, The browser tests are dead :)

@janl
janl commented on a152fb9 May 2, 2012

Which doesn't mean we shouldn't add a test case, it is absolutely needed, but then, this would be great to have in master/1.3

@natevw
natevw commented on a152fb9 May 2, 2012

Woohoo! For reference, this should resolve https://issues.apache.org/jira/browse/COUCHDB-986 when it lands.

Please sign in to comment.
Something went wrong with that request. Please try again.