Skip to content
This repository has been archived by the owner on Sep 27, 2023. It is now read-only.

Commit

Permalink
Adds the new http drain implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
archaelus committed Jun 13, 2012
1 parent 9621fed commit 2efec33
Show file tree
Hide file tree
Showing 2 changed files with 366 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/logplex.app.src
Expand Up @@ -25,6 +25,11 @@
,{syslog_port, 6001} % tcp port
,{max_drains_per_channel, 5} % #channels
,{drain_buffer_size, 1024} % #messages
,{http_send_loss_msg, send} % send | dont_send
,{http_drain_target_bytes, 4096} % bytes
,{http_drain_buffer_size, 1024} % messages
,{http_reconnect_time_s, 1} % seconds
,{http_frame_retries, 1} % #extra attempts after first
]}
]}.

361 changes: 361 additions & 0 deletions src/logplex_http_drain.erl
@@ -0,0 +1,361 @@
%% Copyright (c) 2012 Heroku <nem@erlang.geek.nz>
%%
%% Permission is hereby granted, free of charge, to any person
%% obtaining a copy of this software and associated documentation
%% files (the "Software"), to deal in the Software without
%% restriction, including without limitation the rights to use,
%% copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the
%% Software is furnished to do so, subject to the following
%% conditions:
%%
%% The above copyright notice and this permission notice shall be
%% included in all copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-module(logplex_http_drain).

-include("logplex.hrl").
-include("logplex_logging.hrl").
-include_lib("eunit/include/eunit.hrl").

-record(state, {drain_id :: logplex_drain:id(),
drain_tok :: logplex_drain:token(),
channel_id :: logplex_channel:id(),
uri :: uri:parsed_uri(),
buf :: pid(),
client :: cowboy_client:client(),
out_q = queue:new() :: queue()
}).

-record(frame, {frame :: iolist(),
msg_count :: non_neg_integer(),
tries = 0 :: non_neg_integer(),
id :: binary()
}).

-define(CONTENT_TYPE, <<"application/x-logplex-1">>).
-define(HTTP_VERSION, {1,1}).

%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------

-export([valid_uri/1
,start_link/4
]).

%% ------------------------------------------------------------------
%% gen_fsm Function Exports
%% ------------------------------------------------------------------

-export([disconnected/2,
connected/2
]).

-export([init/1, handle_event/3, handle_sync_event/4,
handle_info/3, terminate/3, code_change/4]).

%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

start_link(ChannelID, DrainID, DrainTok,
Uri) ->
gen_fsm:start_link(?MODULE,
#state{drain_id=DrainID,
drain_tok=DrainTok,
channel_id=ChannelID,
uri = Uri},
[]).

valid_uri({Scheme, _, _, _, _, _} = Uri)
when Scheme =:= http orelse Scheme =:= https ->
{valid, http, Uri};
valid_uri(_) ->
{error, invalid_http_uri}.

%% ------------------------------------------------------------------
%% gen_fsm Function Definitions
%% ------------------------------------------------------------------

%% @private
init(State0 = #state{uri=URI,
drain_id=DrainId,
channel_id=ChannelId}) ->
try
Dest = uri:to_binary(URI, [{hide_user_info, true}]),
Size = logplex_app:config(http_drain_buffer_size, 1024),
{ok, Buf} = logplex_drain_buffer:start_link(ChannelId, self(),
notify, Size),
logplex_drain:register(DrainId, http, Dest),
State = State0#state{buf = Buf},
?INFO("drain_id=~p channel_id=~p dest=~s at=spawn",
log_info(State, [])),
{ok, disconnected,
State, hibernate}
catch
error:badarg -> ignore
end.

%% @private
disconnected({logplex_drain_buffer, Buf, new_data},
State = #state{buf = Buf}) ->
try_connect(State);
disconnected(timeout, State = #state{buf = Buf}) ->
logplex_drain_buffer:notify(Buf),
{next_state, disconnected, State};
disconnected(Msg, State) ->
?WARN("drain_id=~p channel_id=~p dest=~s err=unexpected_info "
"data=~p state=disconnected",
log_info(State, [Msg])),
{next_state, disconnected, State}.

%% @private
try_connect(State = #state{uri={Scheme, _Auth, Host, Port, _, _},
client=undefined}) ->
{ok, Client0} = cowboy_client:init([]),
try cowboy_client:connect(scheme_to_transport(Scheme),
Host, Port,Client0) of
{ok, Client} ->
?INFO("drain_id=~p channel_id=~p dest=~s at=try_connect "
"attempt=success",
log_info(State, [])),
ready_to_send(State#state{client=Client});
{error, Why} ->
?WARN("drain_id=~p channel_id=~p dest=~s at=try_connect "
"attempt=fail tcp_err=~p",
log_info(State, [Why])),
http_fail(State)
catch
Class:Err ->
Report = {Class, Err, erlang:get_stacktrace()},
?WARN("drain_id=~p channel_id=~p dest=~s at=connect "
"attempt=fail err=exception data=~p next_state=disconnected",
log_info(State, [Report])),
http_fail(State)
end.

%% @private
http_fail(State = #state{}) ->
%% XXX - forcibly shutdown client if necessary.
%% XXX - set a timer to re-enable buffer notifications
{next_state, disconnected,
State#state{client=undefined},
timer:seconds(logplex_app:config(http_reconnect_time_s,1))}.

%% @private
ready_to_send(State = #state{buf = Buf, drain_tok=Token,
out_q = Q}) ->
case queue:out(Q) of
{empty, Q} ->
logplex_drain_buffer:set_active(Buf, target_bytes(),
framing_fun(Token)),
{next_state, connected, State};
{{value, Frame}, Q2} ->
try_send(Frame, State#state{out_q = Q2})
end.

try_send(Frame = #frame{tries = Tries},
State = #state{client = Client})
when Tries > 0 ->
Req = request_to_iolist(Frame, State),
case cowboy_client:raw_request(Req, Client) of
{ok, Client2} ->
wait_response(Frame, State#state{client=Client2});
{error, Why} ->
?WARN("drain_id=~p channel_id=~p dest=~s at=send_request"
" tcp_err=~1000p",
log_info(State, [Why])),
http_fail(retry_frame(Frame, State))
end;
try_send(Frame = #frame{tries = 0, msg_count=C}, State = #state{}) ->
?INFO("drain_id=~p channel_id=~p dest=~s at=try_send result=tries_exceeded "
"frame_tries=0 dropped_msgs=~p",
log_info(State, [C])),
ready_to_send(drop_frame(Frame, State)).

%% @private
connected({logplex_drain_buffer, Buf, {frame, Frame, MsgCount}},
State = #state{buf = Buf}) ->
ready_to_send(push_frame(Frame, MsgCount, State));
connected(Msg, State) ->
?WARN("drain_id=~p channel_id=~p dest=~s err=unexpected_info "
"data=~p state=connected",
log_info(State, [Msg])),
{next_state, connected, State}.

%% @private Decide what happened to the frame based on the http status
%% code. Back of the napkin algorithm - 2xx is success, 4xx (client
%% errors) are perm failures, so drop the frame and anything else is a
%% temp failure, so retry the frame.
status_action(201) -> success;
status_action(204) -> success;
status_action(N) when 200 =< N, N < 300 -> success;
status_action(N) when 400 =< N, N < 500 -> perm_fail;
status_action(_) -> temp_fail.

wait_response(Frame = #frame{},
State = #state{client = Client}) ->
case cowboy_client:response(Client) of
{ok, Status, _Headers, Client2} ->
Result = status_action(Status),
?INFO("drain_id=~p channel_id=~p dest=~s at=response "
"result=~p status=~p msg_count=~p",
log_info(State, [Result, Status, Frame#frame.msg_count])),
case Result of
success ->
ready_to_send(sent_frame(Frame,
State#state{client = Client2}));
temp_fail ->
cowboy_client:close(Client2),
http_fail(retry_frame(Frame,
State#state{client = Client2}));
perm_fail ->
ready_to_send(drop_frame(Frame,
State#state{client = Client2}))
end;
{error, Why} ->
?WARN("drain_id=~p channel_id=~p dest=~s at=wait_response"
" result=error tcp_err=~10000p",
log_info(State, [Why])),
http_fail(retry_frame(Frame, State))
end.

%% @private
handle_event(_Event, StateName, State) ->
{next_state, StateName, State}.

%% @private
handle_sync_event(Event, _From, StateName, State) ->
?WARN("[state ~p] Unexpected event ~p",
[StateName, Event]),
{next_state, StateName, State}.

%% @private
handle_info(shutdown, _StateName, State) ->
{stop, shutdown, State};
handle_info(Info, StateName, State) ->
?MODULE:StateName(Info, State).

%% @private
terminate(_Reason, _StateName, _State) ->
ok.

%% @private
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.

%% @private
log_info(#state{drain_id=DrainId, channel_id=ChannelId, uri=URI}, Rest)
when is_list(Rest) ->
[DrainId, ChannelId, uri:to_binary(URI, [{hide_user_info, true}]) | Rest].

%% @private
scheme_to_transport(https) -> cowboy_ssl_transport;
scheme_to_transport(http) -> cowboy_tcp_transport.

%% @private
request_to_iolist(#frame{frame = Body,
msg_count = Count,
id = Id},
#state{uri = URI = {_Scheme, AuthInfo,
_Host, _Port, Path, QueryInfo}}) ->
AuthHeader = cowboy_client:auth_header(AuthInfo),
Headers = [{<<"Content-type">>, ?CONTENT_TYPE},
{<<"x-logplex-msg-count">>, integer_to_list(Count)},
{<<"x-logplex-frame-id">>, frame_id_to_iolist(Id)}
| AuthHeader],
cowboy_client:request_to_iolist(<<"PUT">>,
Headers,
Body,
?HTTP_VERSION,
uri:full_host_iolist(URI, []),
case Path of
"" -> "/";
_ -> Path
end ++ QueryInfo).

framing_fun(Token) ->
Frame = fun ({Facility, Severity, Time, Source, Ps, Content}) ->
SyslogMsg = logplex_syslog_utils:rfc5424(Facility, Severity,
Time, Source, Ps,
Token, undefined,
Content),
logplex_syslog_utils:frame(SyslogMsg)
end,
fun ({loss_indication, N, When}) ->
case logplex_app:config(http_send_loss_msg, send) of
dont_send ->
skip;
_ ->
{frame,
Frame(logplex_syslog_utils:overflow_msg(N, When))}
end;
({msg, MData}) ->
{frame, Frame(MData)}
end.

-spec target_bytes() -> pos_integer().
%% @private
target_bytes() ->
logplex_app:config(http_drain_target_bytes,
4096).

%% @private
%% @doc Called on frames we've decided to drop. Records count of
%% messages dropped (not frame count).
drop_frame(#frame{msg_count=Count}, State) ->
msg_stat(drain_dropped, Count, State),
State.

%% @private
sent_frame(#frame{msg_count=Count}, State) ->
msg_stat(drain_delivered, Count, State),
State.

-spec msg_stat('drain_dropped' | 'drain_buffered' | 'drain_delivered',
non_neg_integer(), #state{}) -> any().
%% @private
msg_stat(Key, N,
#state{drain_id=DrainId, channel_id=ChannelId}) ->
logplex_stats:incr(#drain_stat{drain_id=DrainId,
channel_id=ChannelId,
key=Key}, N).

%% @private
%% Turn a Frame::iolist(), MsgCoung::non_neg_integer() into a #frame
%% and enqueue it.
push_frame(FrameData, MsgCount, State = #state{out_q = Q})
when not is_record(FrameData, frame) ->
Retries = logplex_app:config(http_frame_retries, 1),
Tries = Retries + 1,
Frame = #frame{frame=FrameData, msg_count=MsgCount,
tries = Tries,
id = frame_id()},
NewQ = queue:in(Frame, Q),
State#state{out_q = NewQ}.

frame_id() ->
crypto:md5(term_to_binary({self(), now()})).

frame_id_to_iolist(ID) when is_binary(ID) ->
[ hd(integer_to_list(I, 16)) || <<I:4>> <= ID ].

%% @private
%% @doc Frame has just consumed a try. Decrement #frame.tries. If it
%% has at least one try remaining, push it on to the front of the
%% outbound frame queue. If it is out of tries, drop it.
retry_frame(Frame = #frame{tries = N},
State = #state{out_q = Q}) when N >= 2 ->
NewQ = queue:in_r(Frame#frame{tries = N - 1}, Q),
State#state{out_q = NewQ};
retry_frame(Frame = #frame{tries = N}, State) when N < 2 ->
drop_frame(Frame, State).

0 comments on commit 2efec33

Please sign in to comment.