Skip to content
This repository has been archived by the owner on Sep 27, 2023. It is now read-only.

Commit

Permalink
Adds (hopefully) more robust syslog message parser.
Browse files Browse the repository at this point in the history
Parsing split out of tcp_proxy into syslog_parser.
  • Loading branch information
archaelus committed Dec 14, 2011
1 parent de35020 commit a919492
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 41 deletions.
120 changes: 120 additions & 0 deletions src/syslog_parser.erl
@@ -0,0 +1,120 @@
%% @copyright Heroku 2011
%% @author Geoff Cant <nem@erlang.geek.nz>
%% @version {@vsn}, {@date} {@time}
%% @doc Re-entrant syslog tcp stream parser.
%% @end
-module(syslog_parser).

-export([new/0, push/2]).

-export([example1/0, parse/1]).


-define(SYSLOG_MAX_SIZE, 2048).

-record(buf, {bytes = <<>> :: binary(),
waiting_for = unknown :: 'unknown' | non_neg_integer()}).

%% Syslog frames
%% a) <Length as ascii integer><space><msg:Length>"
%% b) <msg>\n.
%% ZOMGWTFBBQ? Two framing formats and they get switched between
%% randomly? #killmaimdestroy

example1() ->
<<"1753626010 web.23 - - Started GET \"/default/payload\" for 69.179.15.165 at 2011-12-12 15:20:13 -0800\n218 <13>1 2011-12-12T23:20:13+00:00 runtime.60161@heroku.com t.fa298f04-b533-4a04-8b47-1e1753626010 web.9 - - cache: [GET /?a=gd&v=2&m=263100500876800&f=%2C21474836474ecc3d2c729cd7.34662728%2C1%2C3%2Cgrupo%2Cgrupov1] miss\n200 <158>1 2011-12-12T23:20:13+00:00 hermes-argon.60125@heroku.com t.12d76170-aa5c-4f25-8f3a-bf214f93d208 router - - POST p.pingme.net/v1/poll dyno=web.26 queue=0 wait=0ms service=61ms status=200 bytes=2\n202 <158>1 2011-12-12T23:20:13+00:00 hermes-argon.60125@heroku.com t.922a3f60-85bc-4d99-9d24-1a4c6caa7fb2 router - - GET d.lqw.me/delivery.js dyno=web.24 queue=0 wait=0ms service=83ms status=200 bytes=3187\n204 <158>1 2011-12-12T23:20:13+00:00 hermes-argon.60125@heroku.com t.1681f42f-75e0-484b-bdcd-4fd3b8487f92 router - - GET bid.tapengage.com/admeld dyno=web.13 queue=0 wait=0ms service=11ms status=200 bytes=53\n288 <13>1 2011-12-12T23:20:13+00:00 runtime.60159@heroku.com t.223ccc64-e9fb-4570-ba">>.

new() ->
#buf{}.

parse(Bytes) when is_binary(Bytes) ->
push(Bytes, new()).

push(Bytes, #buf{bytes=OldBuf, waiting_for=unknown})
when is_binary(Bytes), is_binary(OldBuf) ->
NewBuf = iolist_to_binary([OldBuf, Bytes]),
parse_recursive(NewBuf, []);
push(Bytes, Buf = #buf{bytes=OldBuf, waiting_for=RequiredLength}) ->
NewBuf = iolist_to_binary([OldBuf, Bytes]),
case byte_size(NewBuf) >= RequiredLength of
true ->
parse_recursive(NewBuf, []);
false ->
Buf#buf{bytes=NewBuf}
end.


parse_recursive(<<>>, Acc) ->
{ok, lists:reverse(Acc), #buf{}};
parse_recursive(Buf, Acc) ->
case parse_beginning(Buf) of
{incomplete, N, Rest} ->
{ok, lists:reverse(Acc), #buf{bytes=Rest, waiting_for=N}};
{_Msg, Buf} ->
{{error, {parser_made_no_progress, Buf}},
lists:reverse(Acc), #buf{}};
{Msg, Rest} ->
parse_recursive(Rest, [Msg | Acc])
end.

%% Can only be used at the beginning of a message. Will give bogus
%% results otherwise.
-spec msg_type(Buf::binary()) -> {'length_prefixed',
Length::non_neg_integer(),
Offset::non_neg_integer()} |
'nl_terminated' |
'incomplete'.
msg_type(<<FirstChar, _/binary>> = Buf)
when $1 =< FirstChar, FirstChar =< $9 ->
case find_int(Buf, 0) of
{Len, Offset} when Len =< ?SYSLOG_MAX_SIZE ->
{length_prefixed, Len, Offset};
{Len, _} when Len > ?SYSLOG_MAX_SIZE ->
nl_terminated;
bad_int_or_missing_space ->
nl_terminated;
incomplete ->
incomplete
end;
msg_type(_) ->
nl_terminated.

parse_beginning(Buf) ->
case msg_type(Buf) of
{length_prefixed, Len, Offset} ->
RequiredLength = Offset + Len,
if byte_size(Buf) >= RequiredLength ->
<<_:Offset/binary,
Msg:Len/binary,
Rest/binary>> = Buf,
{{msg, Msg}, Rest};
true ->
{incomplete, RequiredLength - byte_size(Buf), Buf}
end;
incomplete ->
{incomplete, unknown, Buf};
nl_terminated ->
parse_to_nl(Buf)
end.

%% Idx 0 must be 1-9.
%% Finds the ascii encoded integer terminated by a space.
find_int(Buf, Idx) when byte_size(Buf) =< Idx ->
incomplete;
find_int(Buf, Idx) ->
case binary:at(Buf, Idx) of
C when $0 =< C, C =< $9 ->
find_int(Buf, Idx + 1);
$\s ->
{list_to_integer(binary:bin_to_list(Buf, 0, Idx)), Idx + 1};
_ ->
bad_int_or_missing_space
end.

parse_to_nl(Buf) ->
case binary:split(Buf, [<<"\r\n">>, <<"\n">>]) of
[Msg, Rest] ->
{{malformed, Msg}, Rest};
[Buf] ->
{incomplete, Buf}
end.
63 changes: 22 additions & 41 deletions src/tcp_proxy.erl
Expand Up @@ -34,7 +34,7 @@
terminate/2,
code_change/3]).

-record(state, {sock, buffer = <<>>, regex}).
-record(state, {sock, buffer = <<>>}).

%%====================================================================
%% API functions
Expand All @@ -49,9 +49,7 @@ set_socket(Pid, CSock) ->
%% gen_server callbacks
%%====================================================================
init([]) ->
process_flag(trap_exit, true),
{ok, RE} = re:compile("^(\\d+) "),
{ok, #state{regex=RE}}.
{ok, #state{buffer=syslog_parser:new()}}.

handle_call(_Request, _From, State) ->
{reply, ignore, State}.
Expand All @@ -62,10 +60,18 @@ handle_cast({set_socket, CSock}, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info({tcp, Sock, Packet}, #state{sock=Sock, buffer=Buffer, regex=RE}=State) ->
{ok, Rest} = parse(true, RE, <<Buffer/binary, Packet/binary>>),
handle_info({tcp, Sock, Packet}, #state{sock=Sock, buffer=Buffer}=State) ->
{Result, Msgs, NewBuf} = syslog_parser:push(Packet, Buffer),
case Result of
ok -> ok;
{error, Err} ->
io:format("[~p] event=parse_error, txt=\"~p\"",
[?MODULE, Err]),
ok
end,
process_msgs(Msgs),
inet:setopts(Sock, [{active, once}]),
{noreply, State#state{buffer=Rest}};
{noreply, State#state{buffer=NewBuf}};

handle_info({tcp_closed, _}, State) ->
{stop, normal, State};
Expand All @@ -85,38 +91,13 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%% internal
parse(_Accept, _RE, <<>>) ->
{ok, <<>>};

parse(Accept, RE, Packet) ->
case re:run(Packet, RE, [{capture, all_but_first, binary}]) of
nomatch ->
{ok, _Line, Rest} = read_line(Packet, []),
parse(Accept, RE, Rest);
{match, [Len]} ->
LSize = size(Len),
Size = list_to_integer(binary_to_list(Len)),
case Packet of
<<Len:LSize/binary, 32/integer, Msg:Size/binary, Rest1/binary>> ->
process_msg(Msg),
parse(Accept, RE, Rest1);
_ ->
{ok, Packet}
end
end.

read_line(<<$\n, Rest/binary>>, Acc) ->
{ok, iolist_to_binary(lists:reverse([$\n|Acc])), Rest};

read_line(<<>>, Acc) ->
{ok, iolist_to_binary(lists:reverse(Acc)), <<>>};

read_line(<<C, Rest/binary>>, Acc) ->
read_line(Rest, [C|Acc]).

process_msg(Msg) ->
logplex_stats:incr(message_received_tcp),
logplex_realtime:incr(message_received_tcp),
logplex_queue:in(logplex_work_queue, Msg).
process_msgs(Msgs) when is_list(Msgs) ->
lists:foreach(fun process_msg/1, Msgs).

process_msg({msg, _Msg}) ->
logplex_stats:incr(message_received_tcp),
logplex_realtime:incr(message_received_tcp);
process_msg({malformed, _Msg}) ->
%% Log malformed msg
logplex_stats:incr(message_received_tcp_malformed).
%%logplex_queue:in(logplex_work_queue, Msg).

0 comments on commit a919492

Please sign in to comment.