Skip to content
This repository has been archived by the owner on Sep 27, 2023. It is now read-only.

Commit

Permalink
Merge branch 'tcp_v2'
Browse files Browse the repository at this point in the history
  • Loading branch information
archaelus committed Dec 16, 2011
2 parents 4d4c213 + f3ce584 commit 22aeba0
Show file tree
Hide file tree
Showing 8 changed files with 437 additions and 78 deletions.
47 changes: 29 additions & 18 deletions ebin/logplex.app
Expand Up @@ -3,24 +3,35 @@
{description, "Log multiplexer"},
{vsn, "1.0"},
{modules, [
logplex_api,
logplex_app,
logplex_channel,
logplex_drain,
logplex_drain_writer,
logplex_queue_sup,
logplex_realtime,
logplex_redis_writer,
logplex_session,
logplex_shard,
logplex_stats,
logplex_tail,
logplex_token,
logplex_utils,
logplex_worker,
logplex_worker_sup,
redis_helper,
udp_acceptor
http_handler
,logplex_api
,logplex_api_tests
,logplex_app
,logplex_channel
,logplex_db
,logplex_drain
,logplex_drain_writer
,logplex_drain_writer_mon
,logplex_queue
,logplex_queue_sup
,logplex_realtime
,logplex_redis_writer
,logplex_report_handler
,logplex_session
,logplex_shard
,logplex_stats
,logplex_tail
,logplex_token
,logplex_utils
,logplex_worker
,logplex_worker_sup
,nsync_callback
,redis_helper
,syslog_parser
,tcp_acceptor
,tcp_proxy
,tcp_proxy_sup
,uuid
]},
{registered, []},
{applications, [kernel, stdlib, sasl, inets, crypto, public_key, ssl]},
Expand Down
2 changes: 1 addition & 1 deletion src/logplex_api.erl
Expand Up @@ -78,7 +78,7 @@ handlers() ->

[throw({500, io_lib:format("Zero ~p child processes running", [Worker])}) || {Worker, 0} <- logplex_stats:workers()],

RegisteredMods = [logplex_realtime, logplex_stats, logplex_tail, logplex_shard, udp_acceptor],
RegisteredMods = [logplex_realtime, logplex_stats, logplex_tail, logplex_shard, tcp_acceptor],
[(whereis(Name) == undefined orelse not is_process_alive(whereis(Name))) andalso throw({500, io_lib:format("Process dead: ~p", [Name])}) || Name <- RegisteredMods],

Count = logplex_stats:healthcheck(),
Expand Down
6 changes: 4 additions & 2 deletions src/logplex_app.erl
Expand Up @@ -70,9 +70,11 @@ init([]) ->
{logplex_work_queue, {logplex_queue, start_link, [logplex_work_queue, logplex_work_queue_args()]}, permanent, 2000, worker, [logplex_work_queue]},
{logplex_drain_buffer, {logplex_queue, start_link, [logplex_drain_buffer, logplex_drain_buffer_args()]}, permanent, 2000, worker, [logplex_drain_buffer]},

{tcp_proxy_sup, {tcp_proxy_sup, start_link, []}, permanent, 2000, worker, [tcp_proxy_sup]},

{logplex_api, {logplex_api, start_link, []}, permanent, 2000, worker, [logplex_api]},
{udp_acceptor, {udp_acceptor, start_link, []}, permanent, 2000, worker, [udp_acceptor]},
{cowboy_listener_sup, {cowboy_listener_sup, start_link, http_handler:opts()}, permanent, 2000, supervisor, [cowboy_listener_sup]}]
{cowboy_listener_sup, {cowboy_listener_sup, start_link, http_handler:opts()}, permanent, 2000, supervisor, [cowboy_listener_sup]},
{tcp_acceptor, {tcp_acceptor, start_link, [?TCP_PORT]}, permanent, 2000, worker, [tcp_acceptor]}]
}}.

set_cookie() ->
Expand Down
142 changes: 142 additions & 0 deletions src/syslog_parser.erl
@@ -0,0 +1,142 @@
%% @copyright Heroku 2011
%% @author Geoff Cant <nem@erlang.geek.nz>
%% @version {@vsn}, {@date} {@time}
%% @doc Re-entrant syslog tcp stream parser.
%% @end
-module(syslog_parser).

-export([new/0, push/2]).

-export([example1/0, parse/1]).


-define(SYSLOG_MAX_SIZE, 10240). % 10Kb max in a single log message.

-record(buf, {bytes = <<>> :: binary(),
waiting_for = unknown :: 'unknown' | non_neg_integer()}).

-opaque parse_buffer() :: #buf{}.
-type syslog_message() :: {msg, binary()} | {malformed, binary()}.
-type syslog_messages() :: [syslog_message()].

%% Syslog frames
%% a) <Length as ascii integer><space><msg:Length>"
%% b) <msg>\n.
%% ZOMGWTFBBQ? Two framing formats and they get switched between
%% randomly? #killmaimdestroy

example1() ->
<<"1753626010 web.23 - - Started GET \"/default/payload\" for 69.179.15.165 at 2011-12-12 15:20:13 -0800\n218 <13>1 2011-12-12T23:20:13+00:00 runtime.60161@heroku.com t.fa298f04-b533-4a04-8b47-1e1753626010 web.9 - - cache: [GET /?a=gd&v=2&m=263100500876800&f=%2C21474836474ecc3d2c729cd7.34662728%2C1%2C3%2Cgrupo%2Cgrupov1] miss\n200 <158>1 2011-12-12T23:20:13+00:00 hermes-argon.60125@heroku.com t.12d76170-aa5c-4f25-8f3a-bf214f93d208 router - - POST p.pingme.net/v1/poll dyno=web.26 queue=0 wait=0ms service=61ms status=200 bytes=2\n202 <158>1 2011-12-12T23:20:13+00:00 hermes-argon.60125@heroku.com t.922a3f60-85bc-4d99-9d24-1a4c6caa7fb2 router - - GET d.lqw.me/delivery.js dyno=web.24 queue=0 wait=0ms service=83ms status=200 bytes=3187\n204 <158>1 2011-12-12T23:20:13+00:00 hermes-argon.60125@heroku.com t.1681f42f-75e0-484b-bdcd-4fd3b8487f92 router - - GET bid.tapengage.com/admeld dyno=web.13 queue=0 wait=0ms service=11ms status=200 bytes=53\n288 <13>1 2011-12-12T23:20:13+00:00 runtime.60159@heroku.com t.223ccc64-e9fb-4570-ba">>.

new() ->
#buf{}.

parse(Bytes) when is_binary(Bytes) ->
push(Bytes, new()).

-spec push(Bytes::binary(), parse_buffer()) ->
{ ok | {error, term()},
syslog_messages(),
parse_buffer()}.

push(Bytes, Buf = #buf{bytes=OldBuf, waiting_for=WF})
when is_binary(Bytes) ->
push(iolist_to_binary([OldBuf, Bytes]), WF, Buf).

push(Bytes, RequiredLength, Buf)
when is_integer(RequiredLength),
byte_size(Bytes) < RequiredLength ->
%% Haven't accumulated enough bytes to complete a parse yet.
{ok, [], Buf#buf{bytes=Bytes}};
push(Bytes, _, _) ->
parse_recursive(Bytes, []).


-spec parse_recursive(binary(), Acc::syslog_messages()) ->
{'ok' | {'error', term()},
syslog_messages(), parse_buffer()}.
parse_recursive(<<>>, Acc) ->
{ok, lists:reverse(Acc), #buf{}};
parse_recursive(Buf, Acc) ->
case parse_beginning(Buf) of
{incomplete, N, Rest} ->
{ok, lists:reverse(Acc), #buf{bytes=Rest, waiting_for=N}};
{_Msg, Buf} ->
{{error, {parser_made_no_progress, Buf}},
lists:reverse(Acc), #buf{}};
{Msg, Rest} ->
parse_recursive(Rest, [Msg | Acc])
end.

%% Can only be used at the beginning of a message. Will give bogus
%% results otherwise.
-spec msg_type(Buf::binary()) -> {'length_prefixed',
Length::non_neg_integer(),
Offset::non_neg_integer()} |
'nl_terminated' |
'incomplete'.
msg_type(<<FirstChar, _/binary>> = Buf)
when $1 =< FirstChar, FirstChar =< $9 ->
case find_int(Buf, 0) of
{Len, Offset} when Len =< ?SYSLOG_MAX_SIZE ->
{length_prefixed, Len, Offset};
{Len, _} when Len > ?SYSLOG_MAX_SIZE ->
nl_terminated;
bad_int_or_missing_space ->
nl_terminated;
incomplete ->
incomplete
end;
msg_type(_) ->
nl_terminated.

-spec parse_beginning(binary()) ->
{syslog_message(), Remaining::binary()} |
{'incomplete', 'unknown' | non_neg_integer(),
Remaining::binary()}.
parse_beginning(Buf) ->
case msg_type(Buf) of
{length_prefixed, Len, Offset} ->
RequiredLength = Offset + Len,
if byte_size(Buf) >= RequiredLength ->
<<_:Offset/binary,
Msg:Len/binary,
Rest/binary>> = Buf,
{{msg, Msg}, Rest};
true ->
{incomplete, RequiredLength - byte_size(Buf), Buf}
end;
incomplete ->
{incomplete, unknown, Buf};
nl_terminated ->
parse_to_nl(Buf)
end.

%% Idx 0 must be 1-9.
%% Finds the ascii encoded integer terminated by a space.
-spec find_int(binary(), non_neg_integer()) ->
'incomplete' |
{Length::non_neg_integer(),
MsgOffset::non_neg_integer()} |
'bad_int_or_missing_space'.
find_int(Buf, Idx) when byte_size(Buf) =< Idx ->
incomplete;
find_int(Buf, Idx) ->
case binary:at(Buf, Idx) of
C when $0 =< C, C =< $9 ->
find_int(Buf, Idx + 1);
$\s ->
{list_to_integer(binary:bin_to_list(Buf, 0, Idx)), Idx + 1};
_ ->
bad_int_or_missing_space
end.

-spec parse_to_nl(binary()) -> {'incomplete', 'unknown', binary()} |
{{'malformed', binary()}, Rest::binary()}.
parse_to_nl(Buf) ->
case binary:split(Buf, [<<"\r\n">>, <<"\n">>]) of
[Msg, Rest] ->
{{malformed, Msg}, Rest};
[Buf] ->
{incomplete, unknown, Buf}
end.
115 changes: 115 additions & 0 deletions src/tcp_acceptor.erl
@@ -0,0 +1,115 @@
%% Copyright (c) 2010 Jacob Vorreuter <jacob.vorreuter@gmail.com>
%%
%% Permission is hereby granted, free of charge, to any person
%% obtaining a copy of this software and associated documentation
%% files (the "Software"), to deal in the Software without
%% restriction, including without limitation the rights to use,
%% copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the
%% Software is furnished to do so, subject to the following
%% conditions:
%%
%% The above copyright notice and this permission notice shall be
%% included in all copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-module(tcp_acceptor).
-behaviour(gen_server).

%% API
-export([start_link/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-define(SOCK_OPTS, [binary, {reuseaddr, true}, {packet, raw},
{keepalive, true}, {nodelay, true},
{backlog, 1000}, {active, false}]).

-include_lib("logplex.hrl").

-record(state, {listener, acceptor, accept = true}).

%%====================================================================
%% API functions
%%====================================================================
start_link(Port) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Port], []).

%%====================================================================
%% gen_server callbacks
%%====================================================================
init([Port]) ->
process_flag(trap_exit, true),
case gen_tcp:listen(Port, ?SOCK_OPTS) of
{ok, LSock} ->
{ok, Ref} = prim_inet:async_accept(LSock, -1),
{ok, #state{listener=LSock, acceptor=Ref}};
Error ->
{stop, Error}
end.

handle_call(_Request, _From, State) ->
{reply, ignore, State}.

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info({inet_async, LSock, Ref, {ok, CSock}}, #state{listener=LSock, acceptor=Ref}=State) ->
try
case set_sockopt(LSock, CSock) of
ok -> ok;
{error, Reason} -> exit({set_sockopt, Reason})
end,

{ok, Pid} = tcp_proxy_sup:start_child(),
gen_tcp:controlling_process(CSock, Pid),
tcp_proxy:set_socket(Pid, CSock),

%% Signal the network driver that we are ready to accept another connection
case prim_inet:async_accept(LSock, -1) of
{ok, NewRef} -> ok;
{error, NewRef} -> exit({async_accept, inet:format_error(NewRef)})
end,

{noreply, State#state{acceptor=NewRef}}
catch exit:Why ->
error_logger:error_msg("Error in async accept: ~p.\n", [Why]),
{stop, Why, State}
end;

handle_info({inet_async, LSock, Ref, Error}, #state{listener=LSock, acceptor=Ref}=State) ->
error_logger:error_msg("Error in socket acceptor: ~p.\n", [Error]),
{stop, Error, State};

handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, State) ->
gen_tcp:close(State#state.listener),
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%% Taken from prim_inet. We are merely copying some socket options from the
%% listening socket to the new client socket.
set_sockopt(ListSock, CliSocket) ->
true = inet_db:register_socket(CliSocket, inet_tcp),
case prim_inet:getopts(ListSock, [nodelay, keepalive, delay_send, priority, tos]) of
{ok, Opts} ->
case prim_inet:setopts(CliSocket, [{active, once}|Opts]) of
ok -> ok;
Error -> gen_tcp:close(CliSocket), Error
end;
Error ->
gen_tcp:close(CliSocket), Error
end.

0 comments on commit 22aeba0

Please sign in to comment.