Permalink
Browse files

changes: eventsource feed

  • Loading branch information...
1 parent a1d46ac commit d1e6f745d28751c8db3405f708d4337b5723fc1b @indutny committed May 2, 2012
Showing with 74 additions and 5 deletions.
  1. +19 −0 share/www/script/test/changes.js
  2. +12 −4 src/couchdb/couch_changes.erl
  3. +16 −0 src/couchdb/couch_httpd.erl
  4. +27 −1 src/couchdb/couch_httpd_db.erl
@@ -22,6 +22,25 @@ couchTests.changes = function(debug) {
db.createDb();
if (debug) debugger;
+ var hasEventSource = !!window.EventSource,
+ eventSourceOk = true;
+
+ if (hasEventSource) {
+ var es = new EventSource("/test_suite_db/_changes?feed=eventsource");
+ eventSourceOk = false;
+
+ es.addEventListener('message', function(e) {
+ var data = JSON.parse(e.data);
+ if (typeof data.seq !== 'number') {
+ throw TypeError('Incorrect EventSource message');
+ }
+
+ if (data.seq > 2) {
+ es.close();
+ }
+ }, false);
+ }
+
var req = CouchDB.request("GET", "/test_suite_db/_changes");
var resp = JSON.parse(req.responseText);
@@ -63,7 +63,8 @@ handle_changes(Args1, Req, Db0) ->
put(last_changes_heartbeat, now())
end,
- if Feed == "continuous" orelse Feed == "longpoll" ->
+ if Feed == "continuous" orelse Feed == "longpoll" orelse
+ Feed == "eventsource" ->
fun(CallbackAcc) ->
{Callback, UserAcc} = get_callback_acc(CallbackAcc),
Self = self(),
@@ -263,6 +264,8 @@ get_changes_timeout(Args, Callback) ->
start_sending_changes(_Callback, UserAcc, "continuous") ->
UserAcc;
+start_sending_changes(_Callback, UserAcc, "eventsource") ->
+ UserAcc;
start_sending_changes(Callback, UserAcc, ResponseType) ->
Callback(start, ResponseType, UserAcc).
@@ -434,7 +437,7 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
Callback({stop, EndSeq}, ResponseType, UserAcc).
-changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) ->
+changes_enumerator(DocInfo, Acc, ResponseType) ->
#changes_acc{
filter = FilterFun, callback = Callback,
user_acc = UserAcc, limit = Limit, db = Db,
@@ -456,10 +459,15 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) ->
end;
_ ->
ChangesRow = changes_row(Results, DocInfo, Acc),
- UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc),
+ UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
reset_heartbeat(),
{Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}
- end;
+ end.
+
+changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) ->
+ changes_enumerator(DocInfo, Acc, "continuous");
+changes_enumerator(DocInfo, #changes_acc{resp_type = "eventsource"} = Acc) ->
+ changes_enumerator(DocInfo, Acc, "eventsource");
changes_enumerator(DocInfo, Acc) ->
#changes_acc{
filter = FilterFun, callback = Callback, prepend = Prepend,
@@ -27,6 +27,7 @@
-export([start_chunked_response/3,send_chunk/2,log_request/2]).
-export([start_response_length/4, start_response/3, send/2]).
-export([start_json_response/2, start_json_response/3, end_json_response/1]).
+-export([start_eventsource_response/2, start_eventsource_response/3, end_eventsource_response/1]).
-export([send_response/4,send_method_not_allowed/2,send_error/4, send_redirect/2,send_chunked_error/2]).
-export([send_json/2,send_json/3,send_json/4,last_chunk/1,parse_multipart_request/3]).
-export([accepted_encodings/1,handle_request_int/5,validate_referer/1,validate_ctype/2]).
@@ -730,6 +731,21 @@ end_json_response(Resp) ->
send_chunk(Resp, end_jsonp() ++ [$\n]),
last_chunk(Resp).
+start_eventsource_response(Req, Code) ->
+ start_eventsource_response(Req, Code, []).
+
+start_eventsource_response(Req, Code, Headers) ->
+ initialize_jsonp(Req),
+ DefaultHeaders = [
+ {"Content-Type", "text/event-stream"},
+ {"Cache-Control", "no-cache"}
+ ],
+ start_chunked_response(Req, Code, DefaultHeaders ++ Headers).
+
+end_eventsource_response(Resp) ->
+ send_chunk(Resp, [$\n]),
+ last_chunk(Resp).
+
initialize_jsonp(Req) ->
case get(jsonp) of
undefined -> put(jsonp, qs_value(Req, "callback", no_jsonp));
@@ -21,6 +21,7 @@
-import(couch_httpd,
[send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,
start_json_response/2,send_chunk/2,last_chunk/1,end_json_response/1,
+ start_eventsource_response/2,end_eventsource_response/1,
start_chunked_response/3, absolute_uri/2, send/2,
start_response_length/4, send_error/4]).
@@ -78,10 +79,16 @@ handle_changes_req2(Req, Db) ->
MakeCallback = fun(Resp) ->
fun({change, Change, _}, "continuous") ->
send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]);
+ ({change, {ChangeProp} = Change, _}, "eventsource") ->
+ Seq = proplists:get_value(<<"seq">>, ChangeProp),
+ send_chunk(Resp, ["data:", ?JSON_ENCODE(Change), "\n",
+ "id:", ?JSON_ENCODE(Seq), "\n\n"]);
({change, Change, Prepend}, _) ->
send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]);
(start, "continuous") ->
ok;
+ (start, "eventsource") ->
+ ok;
(start, _) ->
send_chunk(Resp, "{\"results\":[\n");
({stop, EndSeq}, "continuous") ->
@@ -90,6 +97,8 @@ handle_changes_req2(Req, Db) ->
[?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]
),
end_json_response(Resp);
+ ({stop, _}, "eventsource") ->
+ end_eventsource_response(Resp);
({stop, EndSeq}, _) ->
send_chunk(
Resp,
@@ -100,8 +109,20 @@ handle_changes_req2(Req, Db) ->
send_chunk(Resp, "\n")
end
end,
+ % Use `Last-Event-ID` header as `since` argument for eventsource feeds
ChangesArgs = parse_changes_query(Req),
- ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db),
+ ChangesArgs1 = case ChangesArgs#changes_args.feed of
+ "eventsource" ->
+ ChangesArgs#changes_args{
+ since = list_to_integer(
+ couch_httpd:header_value(Req,
+ "Last-Event-ID",
+ integer_to_list(
+ ChangesArgs#changes_args.since)))};
+ _ ->
+ ChangesArgs
+ end,
+ ChangesFun = couch_changes:handle_changes(ChangesArgs1, Req, Db),
WrapperFun = case ChangesArgs#changes_args.feed of
"normal" ->
{ok, Info} = couch_db:get_db_info(Db),
@@ -118,6 +139,11 @@ handle_changes_req2(Req, Db) ->
end
)
end;
+ "eventsource" ->
+ {ok, Resp} = couch_httpd:start_eventsource_response(Req, 200),
+ fun(FeedChangesFun) ->
+ FeedChangesFun(MakeCallback(Resp))
+ end;
_ ->
% "longpoll" or "continuous"
{ok, Resp} = couch_httpd:start_json_response(Req, 200),

0 comments on commit d1e6f74

Please sign in to comment.