Permalink
Browse files

Adding read-only & disabled mode for Logplex API

The Disabled mode entirely shuts down access to the web API at all
levels, including for healthchecks. Use with extreme care.

The read-only mode forbids all non-GET operations outside of what v2
canaries are allowed to do.
  • Loading branch information...
1 parent e23d653 commit 54a5d9cae12a171ed4e6f3a0c360cf786c53e98e @ferd ferd committed Oct 21, 2013
Showing with 137 additions and 27 deletions.
  1. +61 −27 src/logplex_api.erl
  2. +76 −0 test/logplex_api_SUITE.erl
View
@@ -22,12 +22,15 @@
%% OTHER DEALINGS IN THE SOFTWARE.
-module(logplex_api).
-export([loop/1, start_link/0, child_spec/0, stop/0]).
+-export([status/0, set_status/1]).
-include("logplex.hrl").
-include("logplex_logging.hrl").
-define(HDR, [{"Content-Type", "text/html"}]).
-define(JSON_CONTENT, [{"Content-Type", "application/json"}]).
+-define(API_READONLY, <<"Logplex API temporarily in read-only mode">>).
+-define(API_DISABLED, <<"Service Temporarily Disabled">>).
start_link() ->
Port = logplex_utils:to_int(logplex_app:config(http_port)),
@@ -60,7 +63,7 @@ loop(Req) ->
Path = Req:get(path),
ChannelId = header_value(Req, "Channel", ""),
try
- Served = case serve(handlers(), Method, Path, Req) of
+ Served = case serve(handlers(), Method, Path, Req, status()) of
{done,{C,D}} -> {done,{C,D}};
{C, B} -> {C, ?HDR, B};
{_C, _H, _B} = Resp -> Resp
@@ -92,7 +95,7 @@ loop(Req) ->
end.
handlers() ->
- [{['GET', "/healthcheck"], fun(Req, _Match) ->
+ [{['GET', "/healthcheck"], fun(Req, _Match, _Status) ->
authorize(Req),
RegisteredMods = [logplex_realtime, logplex_stats, logplex_tail, logplex_shard, tcp_acceptor],
@@ -104,7 +107,7 @@ handlers() ->
{200, <<"OK">>}
end},
- {['POST', "/load$"], fun(Req, _Match) ->
+ {['POST', "/load$"], fun(Req, _Match, _Status) ->
authorize(Req),
Body = Req:recv_body(),
Modules = mochijson2:decode(Body),
@@ -122,7 +125,8 @@ handlers() ->
{RespCode, iolist_to_binary(mochijson2:encode(Json))}
end},
- {['POST', "^/channels$"], fun(Req, _Match) ->
+ {['POST', "^/channels$"], fun(_Req, _Match, read_only) -> {503, ?API_READONLY};
+ (Req, _Match, _Status) ->
authorize(Req),
Body = Req:recv_body(),
{struct, Params} = mochijson2:decode(Body),
@@ -143,7 +147,7 @@ handlers() ->
end},
%% V2
- {['GET', "^/v2/channels/(\\d+)$"], fun(Req, [ChannelId]) ->
+ {['GET', "^/v2/channels/(\\d+)$"], fun(Req, [ChannelId], _Status) ->
authorize(Req),
case channel_info(api_v2, ChannelId) of
not_found -> not_found_json();
@@ -152,7 +156,8 @@ handlers() ->
end
end},
- {['DELETE', "^/channels/(\\d+)$"], fun(Req, [ChannelId]) ->
+ {['DELETE', "^/channels/(\\d+)$"], fun(_Req, _Match, read_only) -> {503, ?API_READONLY};
+ (Req, [ChannelId], _) ->
authorize(Req),
case logplex_channel:delete(list_to_integer(ChannelId)) of
ok -> {200, <<"OK">>};
@@ -161,7 +166,8 @@ handlers() ->
end},
%% V2
- {['DELETE', "^/v2/channels/(\\d+)$"], fun(Req, [ChannelId]) ->
+ {['DELETE', "^/v2/channels/(\\d+)$"], fun(_Req, _Match, read_only) -> {503, ?API_READONLY};
+ (Req, [ChannelId], _) ->
authorize(Req),
case logplex_channel:delete(list_to_integer(ChannelId)) of
ok ->
@@ -171,7 +177,8 @@ handlers() ->
end
end},
- {['POST', "^/channels/(\\d+)/token$"], fun(Req, [ChannelId]) ->
+ {['POST', "^/channels/(\\d+)/token$"], fun(_Req, _Match, read_only) -> {503, ?API_READONLY};
+ (Req, [ChannelId], _) ->
authorize(Req),
{struct, Params} = mochijson2:decode(Req:recv_body()),
@@ -189,7 +196,8 @@ handlers() ->
end},
%% V2
- {['POST', "^/v2/channels/(\\d+)/tokens$"], fun(Req, [ChannelId]) ->
+ {['POST', "^/v2/channels/(\\d+)/tokens$"], fun(_Req, _Match, read_only) -> {503, ?API_READONLY};
+ (Req, [ChannelId], _) ->
authorize(Req),
{struct, Params} = mochijson2:decode(Req:recv_body()),
@@ -210,7 +218,8 @@ handlers() ->
mochijson2:encode({struct, [{name, Name}, {token, Token}]})}
end},
- {['POST', "^/sessions$"], fun(Req, _Match) ->
+ {['POST', "^/sessions$"], fun(_Req, _Match, read_only) -> {503, ?API_READONLY};
+ (Req, _Match, _) ->
authorize(Req),
Body = Req:recv_body(),
UUID = logplex_session:publish(Body),
@@ -219,7 +228,8 @@ handlers() ->
end},
%% V2
- {['POST', "^/v2/sessions$"], fun(Req, _Match) ->
+ {['POST', "^/v2/sessions$"], fun(_Req, _Match, read_only) -> {503, ?API_READONLY};
+ (Req, _Match, _) ->
authorize(Req),
Body = Req:recv_body(),
UUID = logplex_session:publish(Body),
@@ -228,7 +238,7 @@ handlers() ->
mochijson2:encode({struct, [{url, api_relative_url(api_v2, UUID)}]})}
end},
- {['POST', "^/v2/canary-sessions$"], fun(Req, _Match) ->
+ {['POST', "^/v2/canary-sessions$"], fun(Req, _Match, _) -> % canaries remain available
authorize(Req),
Body = Req:recv_body(),
UUID = logplex_session:publish(Body),
@@ -237,7 +247,7 @@ handlers() ->
mochijson2:encode({struct, [{url, api_relative_url(canary, UUID)}]})}
end},
- {['GET', "^/sessions/([\\w-]+)$"], fun(Req, [Session]) ->
+ {['GET', "^/sessions/([\\w-]+)$"], fun(Req, [Session], _) ->
proplists:get_value("srv", Req:parse_qs()) == undefined
andalso error_resp(400, <<"[Error]: Please update your Heroku client to the most recent version. If this error message persists then uninstall the Heroku client gem completely and re-install.\n">>),
Timeout = timer:seconds(logplex_app:config(session_lookup_timeout_s,
@@ -293,7 +303,7 @@ handlers() ->
{200, ""}
end},
- {['GET', "^/v2/canary-fetch/([\\w-]+)$"], fun(Req, [Session]) ->
+ {['GET', "^/v2/canary-fetch/([\\w-]+)$"], fun(Req, [Session], _) ->
proplists:get_value("srv", Req:parse_qs()) == undefined
andalso error_resp(400, <<"[Error]: Please update your Heroku client to the most recent version. If this error message persists then uninstall the Heroku client gem completely and re-install.\n">>),
Timeout = timer:seconds(logplex_app:config(session_lookup_timeout_s,
@@ -329,15 +339,16 @@ handlers() ->
end},
%% V1
- {['GET', "^/channels/(\\d+)/info$"], fun(Req, [ChannelId]) ->
+ {['GET', "^/channels/(\\d+)/info$"], fun(Req, [ChannelId], _) ->
authorize(Req),
Info = channel_info(api_v1, ChannelId),
not is_list(Info) andalso exit({expected_list, Info}),
{200, iolist_to_binary(mochijson2:encode({struct, Info}))}
end},
- {['POST', "^/channels/(\\d+)/drains/tokens$"], fun(Req, [ChannelId]) ->
+ {['POST', "^/channels/(\\d+)/drains/tokens$"], fun(_Req, _, read_only) -> {503, ?API_READONLY};
+ (Req, [ChannelId], _) ->
authorize(Req),
{ok, DrainId, Token} = logplex_drain:reserve_token(),
@@ -348,12 +359,13 @@ handlers() ->
{201, iolist_to_binary(mochijson2:encode({struct, Resp}))}
end},
- {['POST', "^/channels/(\\d+)/drains/(\\d+)$"], fun(Req, [_ChannelIdStr, _DrainIdStr]) ->
+ {['POST', "^/channels/(\\d+)/drains/(\\d+)$"], fun(Req, [_ChannelIdStr, _DrainIdStr], _) ->
authorize(Req),
{501, <<"V1 Drain Creation API deprecated">>}
end},
- {['POST', "^/v2/channels/(\\d+)/drains/(\\d+)$"], fun(Req, [ChannelIdStr, DrainIdStr]) ->
+ {['POST', "^/v2/channels/(\\d+)/drains/(\\d+)$"], fun(_Req, _Match, read_only) -> {503, ?API_READONLY};
+ (Req, [ChannelIdStr, DrainIdStr], _) ->
authorize(Req),
DrainId = list_to_integer(DrainIdStr),
@@ -391,14 +403,15 @@ handlers() ->
end
end},
- {['POST', "^/channels/(\\d+)/drains$"], fun(Req, [_ChannelId]) ->
+ {['POST', "^/channels/(\\d+)/drains$"], fun(Req, [_ChannelId], _) ->
authorize(Req),
{501, <<"V1 Drain Creation API deprecated.">>}
end},
%% V2
- {['POST', "^/v2/channels/(\\d+)/drains$"], fun(Req, [ChannelIdStr]) ->
+ {['POST', "^/v2/channels/(\\d+)/drains$"], fun(_Req, _Match, read_only) -> {503, ?API_READONLY};
+ (Req, [ChannelIdStr], _) ->
ChannelId = list_to_integer(ChannelIdStr),
case valid_uri(Req) of
{error, What} ->
@@ -427,18 +440,19 @@ handlers() ->
end
end},
- {['GET', "^/channels/(\\d+)/drains$"], fun(Req, [_ChannelId]) ->
+ {['GET', "^/channels/(\\d+)/drains$"], fun(Req, [_ChannelId], _) ->
authorize(Req),
{501, <<"V1 Drain API Deprecated.">>}
end},
- {['DELETE', "^/channels/(\\d+)/drains$"], fun(Req, [_ChannelId]) ->
+ {['DELETE', "^/channels/(\\d+)/drains$"], fun(Req, [_ChannelId], _) ->
authorize(Req),
{501, <<"V1 Drain API Deprecated.">>}
end},
%% V2
- {['DELETE', "^/v2/channels/(\\d+)/drains/(\\d+)$"], fun(Req, [ChannelId, DrainId]) ->
+ {['DELETE', "^/v2/channels/(\\d+)/drains/(\\d+)$"], fun(_Req, _Match, read_only) -> {503, ?API_READONLY};
+ (Req, [ChannelId, DrainId], _) ->
authorize(Req),
Deletable = try
ChannelIdInt = list_to_integer(ChannelId),
@@ -454,13 +468,16 @@ handlers() ->
end
end}].
-serve([], _Method, _Path, _Req) ->
+serve(_Handlers, _Method, _Path, _Req, disabled) ->
+ {503, ?API_DISABLED};
+
+serve([], _Method, _Path, _Req, _Status) ->
{404, <<"Not found">>};
-serve([{[HMethod, Regexp], Fun}|Tail], Method, Path, Req) ->
+serve([{[HMethod, Regexp], Fun}|Tail], Method, Path, Req, Status) ->
case re:run(Path, Regexp, [{capture, all_but_first, list}]) of
{match, Captured} when HMethod == Method ->
- case catch Fun(Req, Captured) of
+ case catch Fun(Req, Captured, Status) of
{'EXIT', {Code, Body}} when is_integer(Code), is_binary(Body) ->
{Code, Body};
{'EXIT', {Code, Body}} when is_integer(Code), is_list(Body) ->
@@ -481,7 +498,7 @@ serve([{[HMethod, Regexp], Fun}|Tail], Method, Path, Req) ->
exit({unexpected, Other})
end;
_ ->
- serve(Tail, Method, Path, Req)
+ serve(Tail, Method, Path, Req, Status)
end.
authorize(Req) ->
@@ -707,3 +724,20 @@ valid_uri(Req) ->
end
end,
logplex_drain:valid_uri(Uri).
+
+%% Checks whether the API state
+-spec status() -> 'normal' | 'read_only' | 'disabled'.
+status() ->
+ logplex_app:config(api_status, normal).
+
+set_status(Term) ->
+ Old = status(),
+ case Term of
+ normal -> io:format("Fully Enabling API~n");
+ read_only -> io:format("API in read-only mode: only GET requests and "
+ "canary operations allowed.~n");
+ disabled -> io:format("API entirely disabled, **INCLUDING HEALTHCHECKS**~n")
+ end,
+ logplex_app:set_config(api_status, Term),
+ Old.
+
View
@@ -7,10 +7,19 @@ groups() ->
[v2_canary_session
,v2_canary_fetch
]}
+ ,{read_only, [],
+ [{group, v2_canary} % canaries keep working when API read-only
+ ]}
+ ,{disabled, [], % nothing works with API disabled
+ [unavailable_v2_canary_session
+ ,unavailable_v2_canary_fetch
+ ]}
].
all() ->
[{group, v2_canary}
+ ,{group, read_only}
+ ,{group, disabled}
].
init_per_suite(Config) ->
@@ -22,6 +31,32 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
application:stop(logplex).
+init_per_group(read_only, Config) ->
+ InitialStatus = application:get_env(logplex, api_status),
+ logplex_api:set_status(read_only),
+ read_only = logplex_api:status(),
+ [{initial_api_status, InitialStatus} | Config];
+init_per_group(disabled, Config) ->
+ InitialStatus = application:get_env(logplex, api_status),
+ logplex_api:set_status(disabled),
+ disabled = logplex_api:status(),
+ [{initial_api_status, InitialStatus} | Config];
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(read_only, Config) ->
+ case ?config(initial_api_status, Config) of
+ undefined -> application:unset_env(logplex, api_status);
+ {ok,Val} -> application:set_env(logplex, api_status, Val)
+ end;
+end_per_group(disabled, Config) ->
+ case ?config(initial_api_status, Config) of
+ undefined -> application:unset_env(logplex, api_status);
+ {ok,Val} -> application:set_env(logplex, api_status, Val)
+ end;
+end_per_group(_, _Config) ->
+ ok.
+
init_per_testcase(v2_canary_session=Case, Config) ->
Channel = logplex_channel:create(atom_to_binary(Case, latin1)),
ChannelId = logplex_channel:id(Channel),
@@ -39,9 +74,35 @@ init_per_testcase(v2_canary_fetch, Config) ->
N <- lists:seq(1, 10)],
timer:sleep(1000),
Config;
+init_per_testcase(unavailable_v2_canary_session=Case, Config) ->
+ Channel = logplex_channel:create(atom_to_binary(Case, latin1)),
+ ChannelId = logplex_channel:id(Channel),
+ logplex_SUITE:wait_for_chan(Channel),
+ logplex_channel:register({channel, ChannelId}),
+ logplex_SUITE:wait_for_registration({channel, ChannelId}),
+ [{channel_id, ChannelId}|Config];
+init_per_testcase(unavailable_v2_canary_fetch=Case, Config) ->
+ % Create a token and hook up to the channel
+ Channel = logplex_channel:create(atom_to_binary(Case, latin1)),
+ ChannelId = logplex_channel:id(Channel),
+ logplex_SUITE:wait_for_chan(Channel),
+ logplex_channel:register({channel, ChannelId}),
+ logplex_SUITE:wait_for_registration({channel, ChannelId}),
+ %% Create a working session to simulate a failure after having gotten auth
+ UUID = logplex_session:publish(<<"{\"channel_id\":\"",
+ (list_to_binary(integer_to_list(ChannelId)))/binary,
+ "\"}">>),
+ TokenId = logplex_token:create(ChannelId, <<"ct">>),
+ Token = logplex_SUITE:get_token(TokenId),
+ [ok = logplex_SUITE:send_msg(make_msg(Token, N)) ||
+ N <- lists:seq(1, 10)],
+ [{canary_session, UUID} | Config];
init_per_testcase(_Case, Config) ->
Config.
+end_per_testcase(_Case, Config) ->
+ Config.
+
v2_canary_session(Config) ->
Api = ?config(api, Config) ++ "/v2/canary-sessions",
BasicAuth = ?config(auth, Config),
@@ -81,6 +142,21 @@ v2_canary_fetch(Config) ->
end || Log <- Logs]),
Config.
+unavailable_v2_canary_session(Config) ->
+ Api = ?config(api, Config) ++ "/v2/canary-sessions",
+ BasicAuth = ?config(auth, Config),
+ ChannelId = ?config(channel_id, Config),
+ Res = post(Api, [{headers, [{"Authorization", BasicAuth}]},
+ {body, "{\"channel_id\":\"" ++ integer_to_list(ChannelId) ++ "\"}"}]),
+ 503 = proplists:get_value(status_code, Res).
+
+unavailable_v2_canary_fetch(Config) ->
+ Session = proplists:get_value(canary_session, Config),
+ Api = ?config(api, Config) ++ "/v2/canary-fetch/"
+ ++ binary_to_list(Session) ++ "?srv=ct",
+ Res = get_(Api, []),
+ 503 = proplists:get_value(status_code, Res).
+
%% Other helpers
get_(Url, Opts) ->
request(get, Url, Opts).

0 comments on commit 54a5d9c

Please sign in to comment.