From 05bd1a022748b62bfdfe25fa6a8ee52d17919d6e Mon Sep 17 00:00:00 2001 From: Alexander Shorin Date: Wed, 9 Mar 2016 18:50:15 +0300 Subject: [PATCH] Emit heartbeats until feed timeout COUCHDB-2961 --- src/couch_changes.erl | 73 +++++++++++++++++------------------- test/couch_changes_tests.erl | 2 +- 2 files changed, 35 insertions(+), 40 deletions(-) diff --git a/src/couch_changes.erl b/src/couch_changes.erl index 29c22691..3d26ba84 100644 --- a/src/couch_changes.erl +++ b/src/couch_changes.erl @@ -396,33 +396,45 @@ filter_revs(Passes, Docs) -> [] end, lists:zip(Passes, Docs)). - get_changes_timeout(Args, Callback) -> #changes_args{ heartbeat = Heartbeat, - timeout = Timeout, + timeout = UserTimeout, feed = ResponseType } = Args, DefaultTimeout = list_to_integer( config:get("httpd", "changes_timeout", "60000") ), - case Heartbeat of - undefined -> - case Timeout of + Timeout = case UserTimeout of + undefined -> DefaultTimeout; + infiity -> infinity; + _ -> lists:min([DefaultTimeout, UserTimeout]) + end, + HeartbeatInterval = case Heartbeat of + undefined -> undefined; + true -> DefaultTimeout; + _ -> lists:min([DefaultTimeout, Heartbeat]) + end, + case HeartbeatInterval of undefined -> - {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end}; - infinity -> - {infinity, fun(UserAcc) -> {stop, UserAcc} end}; + {Timeout, fun(UserAcc) -> {stop, UserAcc} end}; _ -> - {lists:min([DefaultTimeout, Timeout]), - fun(UserAcc) -> {stop, UserAcc} end} - end; - true -> - {DefaultTimeout, - fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}; - _ -> - {lists:min([DefaultTimeout, Heartbeat]), - fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end} + {HeartbeatInterval, + fun(UserAcc) -> + Before = get(last_changes_heartbeat), + case Before of + undefined -> + {ok, UserAcc}; + _ -> + Now = now(), + case timer:now_diff(Now, Before) div 1000 >= Timeout of + true -> + {stop, UserAcc}; + false -> + {ok, Callback(timeout, ResponseType, UserAcc)} + end + end + end} end. start_sending_changes(_Callback, UserAcc, ResponseType) @@ -622,7 +634,7 @@ view_changes_enumerator(Value, Acc) -> #changes_acc{ filter = Filter, callback = Callback, prepend = Prepend, user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, - timeout = Timeout, timeout_fun = TimeoutFun, seq = CurrentSeq, + timeout_fun = TimeoutFun, seq = CurrentSeq, aggregation_kvs=AggKVs, aggregation_results=AggResults } = Acc, @@ -637,7 +649,7 @@ view_changes_enumerator(Value, Acc) -> [] -> AggKVs; _ -> [Value|AggKVs] end, - {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), + {Done, UserAcc2} = TimeoutFun(UserAcc), Acc0 = Acc#changes_acc{ seq = Seq, user_acc = UserAcc2, @@ -673,7 +685,7 @@ view_changes_enumerator(Value, Acc) -> [] -> {[], []}; _ -> {[Value], Results} end, - {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), + {Done, UserAcc2} = TimeoutFun(UserAcc), Acc0 = Acc#changes_acc{ seq = Seq, user_acc = UserAcc2, @@ -690,7 +702,7 @@ changes_enumerator(Value0, Acc) -> #changes_acc{ filter = Filter, callback = Callback, prepend = Prepend, user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, - timeout = Timeout, timeout_fun = TimeoutFun + timeout_fun = TimeoutFun } = Acc, {Value, Results0} = case Filter of {fast_view, _, _, _} -> @@ -708,7 +720,7 @@ changes_enumerator(Value0, Acc) -> Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, case Results of [] -> - {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), + {Done, UserAcc2} = TimeoutFun(UserAcc), case Done of stop -> {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}; @@ -828,20 +840,3 @@ reset_heartbeat() -> _ -> put(last_changes_heartbeat, os:timestamp()) end. - -maybe_heartbeat(Timeout, TimeoutFun, Acc) -> - Before = get(last_changes_heartbeat), - case Before of - undefined -> - {ok, Acc}; - _ -> - Now = os:timestamp(), - case timer:now_diff(Now, Before) div 1000 >= Timeout of - true -> - Acc2 = TimeoutFun(Acc), - put(last_changes_heartbeat, Now), - Acc2; - false -> - {ok, Acc} - end - end. diff --git a/test/couch_changes_tests.erl b/test/couch_changes_tests.erl index f3dcf6e7..ad49d867 100644 --- a/test/couch_changes_tests.erl +++ b/test/couch_changes_tests.erl @@ -549,7 +549,7 @@ spawn_consumer(DbName, ChangesArgs0, Req) -> ChangesArgs = case (ChangesArgs0#changes_args.timeout =:= undefined) andalso (ChangesArgs0#changes_args.heartbeat =:= undefined) of true -> - ChangesArgs0#changes_args{timeout = 10, heartbeat = 10}; + ChangesArgs0#changes_args{heartbeat = 10}; false -> ChangesArgs0 end,