From a919492b4b71ceb01ba774ad9d53b8367d589d34 Mon Sep 17 00:00:00 2001 From: Geoff Cant Date: Tue, 13 Dec 2011 19:28:27 -0800 Subject: [PATCH] Adds (hopefully) more robust syslog message parser. Parsing split out of tcp_proxy into syslog_parser. --- src/syslog_parser.erl | 120 ++++++++++++++++++++++++++++++++++++++++++ src/tcp_proxy.erl | 63 ++++++++-------------- 2 files changed, 142 insertions(+), 41 deletions(-) create mode 100644 src/syslog_parser.erl diff --git a/src/syslog_parser.erl b/src/syslog_parser.erl new file mode 100644 index 00000000..147acb6d --- /dev/null +++ b/src/syslog_parser.erl @@ -0,0 +1,120 @@ +%% @copyright Heroku 2011 +%% @author Geoff Cant +%% @version {@vsn}, {@date} {@time} +%% @doc Re-entrant syslog tcp stream parser. +%% @end +-module(syslog_parser). + +-export([new/0, push/2]). + +-export([example1/0, parse/1]). + + +-define(SYSLOG_MAX_SIZE, 2048). + +-record(buf, {bytes = <<>> :: binary(), + waiting_for = unknown :: 'unknown' | non_neg_integer()}). + +%% Syslog frames +%% a) " +%% b) \n. +%% ZOMGWTFBBQ? Two framing formats and they get switched between +%% randomly? #killmaimdestroy + +example1() -> + <<"1753626010 web.23 - - Started GET \"/default/payload\" for 69.179.15.165 at 2011-12-12 15:20:13 -0800\n218 <13>1 2011-12-12T23:20:13+00:00 runtime.60161@heroku.com t.fa298f04-b533-4a04-8b47-1e1753626010 web.9 - - cache: [GET /?a=gd&v=2&m=263100500876800&f=%2C21474836474ecc3d2c729cd7.34662728%2C1%2C3%2Cgrupo%2Cgrupov1] miss\n200 <158>1 2011-12-12T23:20:13+00:00 hermes-argon.60125@heroku.com t.12d76170-aa5c-4f25-8f3a-bf214f93d208 router - - POST p.pingme.net/v1/poll dyno=web.26 queue=0 wait=0ms service=61ms status=200 bytes=2\n202 <158>1 2011-12-12T23:20:13+00:00 hermes-argon.60125@heroku.com t.922a3f60-85bc-4d99-9d24-1a4c6caa7fb2 router - - GET d.lqw.me/delivery.js dyno=web.24 queue=0 wait=0ms service=83ms status=200 bytes=3187\n204 <158>1 2011-12-12T23:20:13+00:00 hermes-argon.60125@heroku.com t.1681f42f-75e0-484b-bdcd-4fd3b8487f92 router - - GET bid.tapengage.com/admeld dyno=web.13 queue=0 wait=0ms service=11ms status=200 bytes=53\n288 <13>1 2011-12-12T23:20:13+00:00 runtime.60159@heroku.com t.223ccc64-e9fb-4570-ba">>. + +new() -> + #buf{}. + +parse(Bytes) when is_binary(Bytes) -> + push(Bytes, new()). + +push(Bytes, #buf{bytes=OldBuf, waiting_for=unknown}) + when is_binary(Bytes), is_binary(OldBuf) -> + NewBuf = iolist_to_binary([OldBuf, Bytes]), + parse_recursive(NewBuf, []); +push(Bytes, Buf = #buf{bytes=OldBuf, waiting_for=RequiredLength}) -> + NewBuf = iolist_to_binary([OldBuf, Bytes]), + case byte_size(NewBuf) >= RequiredLength of + true -> + parse_recursive(NewBuf, []); + false -> + Buf#buf{bytes=NewBuf} + end. + + +parse_recursive(<<>>, Acc) -> + {ok, lists:reverse(Acc), #buf{}}; +parse_recursive(Buf, Acc) -> + case parse_beginning(Buf) of + {incomplete, N, Rest} -> + {ok, lists:reverse(Acc), #buf{bytes=Rest, waiting_for=N}}; + {_Msg, Buf} -> + {{error, {parser_made_no_progress, Buf}}, + lists:reverse(Acc), #buf{}}; + {Msg, Rest} -> + parse_recursive(Rest, [Msg | Acc]) + end. + +%% Can only be used at the beginning of a message. Will give bogus +%% results otherwise. +-spec msg_type(Buf::binary()) -> {'length_prefixed', + Length::non_neg_integer(), + Offset::non_neg_integer()} | + 'nl_terminated' | + 'incomplete'. +msg_type(<> = Buf) + when $1 =< FirstChar, FirstChar =< $9 -> + case find_int(Buf, 0) of + {Len, Offset} when Len =< ?SYSLOG_MAX_SIZE -> + {length_prefixed, Len, Offset}; + {Len, _} when Len > ?SYSLOG_MAX_SIZE -> + nl_terminated; + bad_int_or_missing_space -> + nl_terminated; + incomplete -> + incomplete + end; +msg_type(_) -> + nl_terminated. + +parse_beginning(Buf) -> + case msg_type(Buf) of + {length_prefixed, Len, Offset} -> + RequiredLength = Offset + Len, + if byte_size(Buf) >= RequiredLength -> + <<_:Offset/binary, + Msg:Len/binary, + Rest/binary>> = Buf, + {{msg, Msg}, Rest}; + true -> + {incomplete, RequiredLength - byte_size(Buf), Buf} + end; + incomplete -> + {incomplete, unknown, Buf}; + nl_terminated -> + parse_to_nl(Buf) + end. + +%% Idx 0 must be 1-9. +%% Finds the ascii encoded integer terminated by a space. +find_int(Buf, Idx) when byte_size(Buf) =< Idx -> + incomplete; +find_int(Buf, Idx) -> + case binary:at(Buf, Idx) of + C when $0 =< C, C =< $9 -> + find_int(Buf, Idx + 1); + $\s -> + {list_to_integer(binary:bin_to_list(Buf, 0, Idx)), Idx + 1}; + _ -> + bad_int_or_missing_space + end. + +parse_to_nl(Buf) -> + case binary:split(Buf, [<<"\r\n">>, <<"\n">>]) of + [Msg, Rest] -> + {{malformed, Msg}, Rest}; + [Buf] -> + {incomplete, Buf} + end. diff --git a/src/tcp_proxy.erl b/src/tcp_proxy.erl index 2e425d6b..7cb82407 100644 --- a/src/tcp_proxy.erl +++ b/src/tcp_proxy.erl @@ -34,7 +34,7 @@ terminate/2, code_change/3]). --record(state, {sock, buffer = <<>>, regex}). +-record(state, {sock, buffer = <<>>}). %%==================================================================== %% API functions @@ -49,9 +49,7 @@ set_socket(Pid, CSock) -> %% gen_server callbacks %%==================================================================== init([]) -> - process_flag(trap_exit, true), - {ok, RE} = re:compile("^(\\d+) "), - {ok, #state{regex=RE}}. + {ok, #state{buffer=syslog_parser:new()}}. handle_call(_Request, _From, State) -> {reply, ignore, State}. @@ -62,10 +60,18 @@ handle_cast({set_socket, CSock}, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({tcp, Sock, Packet}, #state{sock=Sock, buffer=Buffer, regex=RE}=State) -> - {ok, Rest} = parse(true, RE, <>), +handle_info({tcp, Sock, Packet}, #state{sock=Sock, buffer=Buffer}=State) -> + {Result, Msgs, NewBuf} = syslog_parser:push(Packet, Buffer), + case Result of + ok -> ok; + {error, Err} -> + io:format("[~p] event=parse_error, txt=\"~p\"", + [?MODULE, Err]), + ok + end, + process_msgs(Msgs), inet:setopts(Sock, [{active, once}]), - {noreply, State#state{buffer=Rest}}; + {noreply, State#state{buffer=NewBuf}}; handle_info({tcp_closed, _}, State) -> {stop, normal, State}; @@ -85,38 +91,13 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%% internal -parse(_Accept, _RE, <<>>) -> - {ok, <<>>}; - -parse(Accept, RE, Packet) -> - case re:run(Packet, RE, [{capture, all_but_first, binary}]) of - nomatch -> - {ok, _Line, Rest} = read_line(Packet, []), - parse(Accept, RE, Rest); - {match, [Len]} -> - LSize = size(Len), - Size = list_to_integer(binary_to_list(Len)), - case Packet of - <> -> - process_msg(Msg), - parse(Accept, RE, Rest1); - _ -> - {ok, Packet} - end - end. - -read_line(<<$\n, Rest/binary>>, Acc) -> - {ok, iolist_to_binary(lists:reverse([$\n|Acc])), Rest}; - -read_line(<<>>, Acc) -> - {ok, iolist_to_binary(lists:reverse(Acc)), <<>>}; - -read_line(<>, Acc) -> - read_line(Rest, [C|Acc]). - -process_msg(Msg) -> - logplex_stats:incr(message_received_tcp), - logplex_realtime:incr(message_received_tcp), - logplex_queue:in(logplex_work_queue, Msg). +process_msgs(Msgs) when is_list(Msgs) -> + lists:foreach(fun process_msg/1, Msgs). +process_msg({msg, _Msg}) -> + logplex_stats:incr(message_received_tcp), + logplex_realtime:incr(message_received_tcp); +process_msg({malformed, _Msg}) -> + %% Log malformed msg + logplex_stats:incr(message_received_tcp_malformed). + %%logplex_queue:in(logplex_work_queue, Msg).