From d1e6f745d28751c8db3405f708d4337b5723fc1b Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Thu, 3 May 2012 02:09:12 +0700 Subject: [PATCH] changes: eventsource feed --- share/www/script/test/changes.js | 19 +++++++++++++++++++ src/couchdb/couch_changes.erl | 16 ++++++++++++---- src/couchdb/couch_httpd.erl | 16 ++++++++++++++++ src/couchdb/couch_httpd_db.erl | 28 +++++++++++++++++++++++++++- 4 files changed, 74 insertions(+), 5 deletions(-) diff --git a/share/www/script/test/changes.js b/share/www/script/test/changes.js index 19e22fd001e..1e89158e803 100644 --- a/share/www/script/test/changes.js +++ b/share/www/script/test/changes.js @@ -22,6 +22,25 @@ couchTests.changes = function(debug) { db.createDb(); if (debug) debugger; + var hasEventSource = !!window.EventSource, + eventSourceOk = true; + + if (hasEventSource) { + var es = new EventSource("/test_suite_db/_changes?feed=eventsource"); + eventSourceOk = false; + + es.addEventListener('message', function(e) { + var data = JSON.parse(e.data); + if (typeof data.seq !== 'number') { + throw TypeError('Incorrect EventSource message'); + } + + if (data.seq > 2) { + es.close(); + } + }, false); + } + var req = CouchDB.request("GET", "/test_suite_db/_changes"); var resp = JSON.parse(req.responseText); diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl index aec7873d862..d278ab61c4c 100644 --- a/src/couchdb/couch_changes.erl +++ b/src/couchdb/couch_changes.erl @@ -63,7 +63,8 @@ handle_changes(Args1, Req, Db0) -> put(last_changes_heartbeat, now()) end, - if Feed == "continuous" orelse Feed == "longpoll" -> + if Feed == "continuous" orelse Feed == "longpoll" orelse + Feed == "eventsource" -> fun(CallbackAcc) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), Self = self(), @@ -263,6 +264,8 @@ get_changes_timeout(Args, Callback) -> start_sending_changes(_Callback, UserAcc, "continuous") -> UserAcc; +start_sending_changes(_Callback, UserAcc, "eventsource") -> + UserAcc; start_sending_changes(Callback, UserAcc, ResponseType) -> Callback(start, ResponseType, UserAcc). @@ -434,7 +437,7 @@ keep_sending_changes(Args, Acc0, FirstRound) -> end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> Callback({stop, EndSeq}, ResponseType, UserAcc). -changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) -> +changes_enumerator(DocInfo, Acc, ResponseType) -> #changes_acc{ filter = FilterFun, callback = Callback, user_acc = UserAcc, limit = Limit, db = Db, @@ -456,10 +459,15 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) -> end; _ -> ChangesRow = changes_row(Results, DocInfo, Acc), - UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc), + UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc), reset_heartbeat(), {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}} - end; + end. + +changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) -> + changes_enumerator(DocInfo, Acc, "continuous"); +changes_enumerator(DocInfo, #changes_acc{resp_type = "eventsource"} = Acc) -> + changes_enumerator(DocInfo, Acc, "eventsource"); changes_enumerator(DocInfo, Acc) -> #changes_acc{ filter = FilterFun, callback = Callback, prepend = Prepend, diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl index 8b05076837d..a2e11c28fc7 100644 --- a/src/couchdb/couch_httpd.erl +++ b/src/couchdb/couch_httpd.erl @@ -27,6 +27,7 @@ -export([start_chunked_response/3,send_chunk/2,log_request/2]). -export([start_response_length/4, start_response/3, send/2]). -export([start_json_response/2, start_json_response/3, end_json_response/1]). +-export([start_eventsource_response/2, start_eventsource_response/3, end_eventsource_response/1]). -export([send_response/4,send_method_not_allowed/2,send_error/4, send_redirect/2,send_chunked_error/2]). -export([send_json/2,send_json/3,send_json/4,last_chunk/1,parse_multipart_request/3]). -export([accepted_encodings/1,handle_request_int/5,validate_referer/1,validate_ctype/2]). @@ -730,6 +731,21 @@ end_json_response(Resp) -> send_chunk(Resp, end_jsonp() ++ [$\n]), last_chunk(Resp). +start_eventsource_response(Req, Code) -> + start_eventsource_response(Req, Code, []). + +start_eventsource_response(Req, Code, Headers) -> + initialize_jsonp(Req), + DefaultHeaders = [ + {"Content-Type", "text/event-stream"}, + {"Cache-Control", "no-cache"} + ], + start_chunked_response(Req, Code, DefaultHeaders ++ Headers). + +end_eventsource_response(Resp) -> + send_chunk(Resp, [$\n]), + last_chunk(Resp). + initialize_jsonp(Req) -> case get(jsonp) of undefined -> put(jsonp, qs_value(Req, "callback", no_jsonp)); diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index de39b9efb8e..366ded89ba6 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -21,6 +21,7 @@ -import(couch_httpd, [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2, start_json_response/2,send_chunk/2,last_chunk/1,end_json_response/1, + start_eventsource_response/2,end_eventsource_response/1, start_chunked_response/3, absolute_uri/2, send/2, start_response_length/4, send_error/4]). @@ -78,10 +79,16 @@ handle_changes_req2(Req, Db) -> MakeCallback = fun(Resp) -> fun({change, Change, _}, "continuous") -> send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]); + ({change, {ChangeProp} = Change, _}, "eventsource") -> + Seq = proplists:get_value(<<"seq">>, ChangeProp), + send_chunk(Resp, ["data:", ?JSON_ENCODE(Change), "\n", + "id:", ?JSON_ENCODE(Seq), "\n\n"]); ({change, Change, Prepend}, _) -> send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]); (start, "continuous") -> ok; + (start, "eventsource") -> + ok; (start, _) -> send_chunk(Resp, "{\"results\":[\n"); ({stop, EndSeq}, "continuous") -> @@ -90,6 +97,8 @@ handle_changes_req2(Req, Db) -> [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"] ), end_json_response(Resp); + ({stop, _}, "eventsource") -> + end_eventsource_response(Resp); ({stop, EndSeq}, _) -> send_chunk( Resp, @@ -100,8 +109,20 @@ handle_changes_req2(Req, Db) -> send_chunk(Resp, "\n") end end, + % Use `Last-Event-ID` header as `since` argument for eventsource feeds ChangesArgs = parse_changes_query(Req), - ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db), + ChangesArgs1 = case ChangesArgs#changes_args.feed of + "eventsource" -> + ChangesArgs#changes_args{ + since = list_to_integer( + couch_httpd:header_value(Req, + "Last-Event-ID", + integer_to_list( + ChangesArgs#changes_args.since)))}; + _ -> + ChangesArgs + end, + ChangesFun = couch_changes:handle_changes(ChangesArgs1, Req, Db), WrapperFun = case ChangesArgs#changes_args.feed of "normal" -> {ok, Info} = couch_db:get_db_info(Db), @@ -118,6 +139,11 @@ handle_changes_req2(Req, Db) -> end ) end; + "eventsource" -> + {ok, Resp} = couch_httpd:start_eventsource_response(Req, 200), + fun(FeedChangesFun) -> + FeedChangesFun(MakeCallback(Resp)) + end; _ -> % "longpoll" or "continuous" {ok, Resp} = couch_httpd:start_json_response(Req, 200),