Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Add the initial version of the global_changes app.
BugzID: 17681
  • Loading branch information
sagelywizard authored and rnewson committed Aug 7, 2014
1 parent fed1844 commit f429cd0e0d10939778c56ca4acd4652cdb5990d2
Showing 10 changed files with 769 additions and 0 deletions.
@@ -0,0 +1,2 @@
.eunit/
ebin/
@@ -0,0 +1,27 @@
### global\_changes

This app supplies the functionality for the `/_db_updates` endpoint.

When a database is created, deleted, or updated, a corresponding event will be persisted to disk (Note: This was designed without the guarantee that a DB event will be persisted or ever occur in the `_db_updates` feed. It probably will, but it isn't guaranteed). Users can subscribe to a `_changes`-like feed of these database events by querying the `_db_updates` endpoint.

When an admin user queries the `/_db_updates` endpoint, they will see the account name associated with the DB update as well as update

### Captured Metrics

1: `global_changes`, `db_writes`: The number of doc updates caused by global\_changes.

2: `global_changes`, `server_pending_updates`: The number of documents aggregated into the pending write batch.

3: `global_changes`, `listener_pending_updates`: The number of documents aggregated into the pending event batch.

4: `global_changes`, `event_doc_conflict`: The number of rev tree branches in event docs encountered by global\_changes. Should never happen.

5: `global_changes`, `rpcs`: The number of non-fabric RPCs caused by global\_changes.

### Important Configs

1: `global_changes`, `max_event_delay`: (integer, milliseconds) The total timed added before an event is forwarded to the writer.

2: `global_changes`, `max_write_delay`: (integer, milliseconds) The time added before an event is sent to disk.

3: `global_changes`, `update_db`: (true/false) A flag setting whether to update the global\_changes database. If false, changes will be lost and there will be no performance impact of global\_changes on the cluster.
@@ -0,0 +1,18 @@
{application, global_changes, [
{description, "_changes-like feeds for multiple DBs"},
{vsn, git},
{registered, [global_changes_config_listener, global_changes_server]},
{applications, [
kernel,
stdlib,
config,
couch_log,
couch,
mem3,
fabric
]},
{mod, {global_changes_app, []}},
{env, [
{dbname, <<"global_changes">>}
]}
]}.
@@ -0,0 +1,18 @@
%% Copyright 2013 Cloudant

-module(global_changes_app).
-behavior(application).


-export([
start/2,
stop/1
]).


start(_StartType, _StartArgs) ->
global_changes_sup:start_link().


stop(_State) ->
ok.
@@ -0,0 +1,94 @@
% Copyright 2013 Cloudant. All rights reserved.

-module(global_changes_config_listener).
-behavior(gen_server).
-behavior(config_listener).


-export([
start_link/0
]).

-export([
init/1,
terminate/2,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3
]).

-export([
handle_config_change/5
]).


-define(LISTENER, global_changes_listener).
-define(SERVER, global_changes_server).


start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).


init([]) ->
ok = config:listen_for_changes(?MODULE, nil),
{ok, nil}.


terminate(_, _St) ->
ok.


handle_call(Msg, _From, St) ->
{stop, {invalid_call, Msg}, {invalid_call, Msg}, St}.


handle_cast(Msg, St) ->
{stop, {invalid_cast, Msg}, St}.


handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, St) ->
erlang:send_after(5000, self(), restart_config_listener),
{noreply, St};
handle_info(restart_config_listener, St) ->
ok = config:listen_for_changes(?MODULE, St),
{noreply, St};
handle_info(_Msg, St) ->
{noreply, St}.


code_change(_, St, _) ->
{ok, St}.


handle_config_change("global_changes", "max_event_delay", MaxDelayStr, _, _) ->
try list_to_integer(MaxDelayStr) of
MaxDelay ->
gen_server:cast(?LISTENER, {set_max_event_delay, MaxDelay})
catch error:badarg ->
ok
end,
{ok, nil};

handle_config_change("global_changes", "max_write_delay", MaxDelayStr, _, _) ->
try list_to_integer(MaxDelayStr) of
MaxDelay ->
gen_server:cast(?SERVER, {set_max_write_delay, MaxDelay})
catch error:badarg ->
ok
end,
{ok, nil};

handle_config_change("global_changes", "update_db", "false", _, _) ->
gen_server:cast(?LISTENER, {set_update_db, false}),
gen_server:cast(?SERVER, {set_update_db, false}),
{ok, nil};

handle_config_change("global_changes", "update_db", _, _, _) ->
gen_server:cast(?LISTENER, {set_update_db, true}),
gen_server:cast(?SERVER, {set_update_db, true}),
{ok, nil};

handle_config_change(_, _, _, _, _) ->
{ok, nil}.
@@ -0,0 +1,203 @@
%% Copyright 2013 Cloudant

-module(global_changes_httpd).

-export([handle_global_changes_req/1]).

-include_lib("couch/include/couch_db.hrl").

-record(acc, {
heartbeat_interval,
last_data_sent_time,
feed,
prepend,
resp,
etag,
username
}).

handle_global_changes_req(#httpd{method='GET'}=Req) ->
Db = global_changes_util:get_dbname(),
Feed = couch_httpd:qs_value(Req, "feed", "normal"),
Options = parse_global_changes_query(Req),
Heartbeat = case lists:keyfind(heartbeat, 1, Options) of
{heartbeat, Other} -> Other;
{heartbeat, true} -> 60000;
false -> false
end,
chttpd:verify_is_server_admin(Req),
Acc = #acc{
username=admin,
feed=Feed,
resp=Req,
heartbeat_interval=Heartbeat
},
case Feed of
"normal" ->
{ok, Info} = fabric:get_db_info(Db),
Etag = chttpd:make_etag(Info),
chttpd:etag_respond(Req, Etag, fun() ->
fabric:changes(Db, fun changes_callback/2, Acc#acc{etag=Etag}, Options)
end);
Feed when Feed =:= "continuous"; Feed =:= "longpoll" ->
fabric:changes(Db, fun changes_callback/2, Acc, Options);
_ ->
Msg = <<"Supported `feed` types: normal, continuous, longpoll">>,
throw({bad_request, Msg})
end;
handle_global_changes_req(Req) ->
chttpd:send_method_not_allowed(Req, "GET").


transform_change(Username, _Resp, {Props}) ->
{id, Id} = lists:keyfind(id, 1, Props),
{seq, Seq} = lists:keyfind(seq, 1, Props),
Info = case binary:split(Id, <<":">>) of
[Event0, DbNameAndUsername] ->
case binary:split(DbNameAndUsername, <<"/">>) of
[AccountName0, DbName0] ->
{Event0, AccountName0, DbName0};
[DbName0] ->
{Event0, '_admin', DbName0}
end;
_ ->
skip
end,
case Info of
% Matches the client's username
{Event, Username, DbName} when Username /= admin ->
{[
{dbname, DbName},
{type, Event},
{seq, Seq}
]};
% Client is an admin, show them everything.
{Event, AccountName, DbName} when Username == admin ->
{[
{dbname, DbName},
{type, Event},
{account, AccountName},
{seq, Seq}
]};
_ ->
skip
end.


% callbacks for continuous feed (newline-delimited JSON Objects)
changes_callback(start, #acc{feed="continuous"}=Acc) ->
#acc{resp=Req} = Acc,
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200),
{ok, Acc#acc{resp=Resp, last_data_sent_time=os:timestamp()}};
changes_callback({change, Change0}, #acc{feed="continuous"}=Acc) ->
#acc{resp=Resp, username=Username} = Acc,
case transform_change(Username, Resp, Change0) of
skip ->
{ok, maybe_send_heartbeat(Acc)};
Change ->
Line = [?JSON_ENCODE(Change) | "\n"],
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Line),
{ok, Acc#acc{resp=Resp1, last_data_sent_time=os:timestamp()}}
end;
changes_callback({stop, EndSeq}, #acc{feed="continuous"}=Acc) ->
#acc{resp=Resp} = Acc,
{ok, Resp1} = chttpd:send_delayed_chunk(Resp,
[?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]),
chttpd:end_delayed_json_response(Resp1);

% callbacks for longpoll and normal (single JSON Object)
changes_callback(start, #acc{feed="normal", etag=Etag}=Acc)
when Etag =/= undefined ->
#acc{resp=Req} = Acc,
FirstChunk = "{\"results\":[\n",
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200,
[{"Etag",Etag}], FirstChunk),
{ok, Acc#acc{resp=Resp, prepend="", last_data_sent_time=os:timestamp()}};
changes_callback(start, Acc) ->
#acc{resp=Req} = Acc,
FirstChunk = "{\"results\":[\n",
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk),
{ok, Acc#acc{resp=Resp, prepend="", last_data_sent_time=os:timestamp()}};
changes_callback({change, Change0}, Acc) ->
#acc{resp=Resp, prepend=Prepend, username=Username} = Acc,
case transform_change(Username, Resp, Change0) of
skip ->
{ok, maybe_send_heartbeat(Acc)};
Change ->
#acc{resp=Resp, prepend=Prepend} = Acc,
Line = [Prepend, ?JSON_ENCODE(Change)],
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Line),
Acc1 = Acc#acc{
prepend=",\r\n",
resp=Resp1,
last_data_sent_time=os:timestamp()
},
{ok, Acc1}
end;
changes_callback({stop, EndSeq}, Acc) ->
#acc{resp=Resp} = Acc,
{ok, Resp1} = chttpd:send_delayed_chunk(Resp,
["\n],\n\"last_seq\":", ?JSON_ENCODE(EndSeq), "}\n"]),
chttpd:end_delayed_json_response(Resp1);

changes_callback(timeout, Acc) ->
{ok, maybe_send_heartbeat(Acc)};

changes_callback({error, Reason}, #acc{resp=Req=#httpd{}}) ->
chttpd:send_error(Req, Reason);
changes_callback({error, Reason}, Acc) ->
#acc{etag=Etag, feed=Feed, resp=Resp} = Acc,
case {Feed, Etag} of
{"normal", Etag} when Etag =/= undefined ->
chttpd:send_error(Resp, Reason);
_ ->
chttpd:send_delayed_error(Resp, Reason)
end.


maybe_send_heartbeat(#acc{heartbeat_interval=false}=Acc) ->
Acc;
maybe_send_heartbeat(Acc) ->
#acc{last_data_sent_time=LastSentTime, heartbeat_interval=Interval, resp=Resp} = Acc,
Now = os:timestamp(),
case timer:now_diff(Now, LastSentTime) div 1000 > Interval of
true ->
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, "\n"),
Acc#acc{last_data_sent_time=Now, resp=Resp1};
false ->
Acc
end.


parse_global_changes_query(Req) ->
lists:foldl(fun({Key, Value}, Args) ->
case {Key, Value} of
{"feed", _} ->
[{feed, Value} | Args];
{"descending", "true"} ->
[{dir, rev} | Args];
{"since", _} ->
[{since, Value} | Args];
{"limit", _} ->
[{limit, to_non_neg_int(Value)} | Args];
{"heartbeat", "true"} ->
[{heartbeat, true} | Args];
{"heartbeat", _} ->
[{heartbeat, to_non_neg_int(Value)} | Args];
{"timeout", _} ->
[{timeout, to_non_neg_int(Value)} | Args];
_Else -> % unknown key value pair, ignore.
Args
end
end, [], couch_httpd:qs(Req)).


to_non_neg_int(Value) ->
try list_to_integer(Value) of
V when V >= 0 ->
V;
_ ->
throw({bad_request, invalid_integer})
catch error:badarg ->
throw({bad_request, invalid_integer})
end.

0 comments on commit f429cd0

Please sign in to comment.