Browse files

Initial commit.

  • Loading branch information...
0 parents commit 561365e9ada8538dbb257fd45f896dfbdea00547 @archaelus archaelus committed Nov 24, 2012
8 .gitignore
@@ -0,0 +1,8 @@
+*~
+*.beam
+.gitignore
+ebin/*.app
+c_src/*.o
+.eunit/*
+.#*
+deps/*
18 include/logsplat_log.hrl
@@ -0,0 +1,18 @@
+%%% Author : Geoff Cant <nem@erlang.geek.nz>
+%%% Description : Logging macros
+%%% Created : 13 Jan 2006 by Geoff Cant <nem@erlang.geek.nz>
+
+-ifndef(logging_macros).
+-define(logging_macros, true).
+
+-define(INFO(Format, Args),
+ error_logger:info_msg("(~p ~p:~p) " ++ Format ++ "~n",
+ [self(), ?MODULE, ?LINE | Args])).
+-define(WARN(Format, Args),
+ error_logger:warning_msg("(~p ~p:~p) " ++ Format ++ "~n",
+ [self(), ?MODULE, ?LINE | Args])).
+-define(ERR(Format, Args),
+ error_logger:error_msg("(~p ~p:~p) " ++ Format ++ "~n",
+ [self(), ?MODULE, ?LINE | Args])).
+
+-endif. %logging
4 rebar.config
@@ -0,0 +1,4 @@
+{erl_opts, [debug_info]}.
+{deps, [{cowboy, "", {git, "git://github.com/heroku/cowboy.git", "heroku_unstable"}}
+ ,{ex_uri, "", {git, "git://github.com/heroku/ex_uri.git", "master"}}
+ ]}.
15 src/logsplat.app.src
@@ -0,0 +1,15 @@
+{application, logsplat,
+ [{description, "Logplex protocol injector."}
+ ,{vsn, "0.1.0"}
+ ,{registered, []}
+ ,{applications,
+ [kernel
+ ,stdlib
+ ,ssl
+ ,cowboy
+ ]}
+ ,{mod, { logsplat_app, []}}
+ ,{env,
+ [
+ ]}
+ ]}.
44 src/logsplat.erl
@@ -0,0 +1,44 @@
+%%%-------------------------------------------------------------------
+%% @copyright Geoff Cant
+%% @author Geoff Cant <nem@erlang.geek.nz>
+%% @version {@vsn}, {@date} {@time}
+%% @doc logsplat public API
+%% @end
+%%%-------------------------------------------------------------------
+-module(logsplat).
+
+-include_lib("ex_uri/include/ex_uri.hrl").
+
+%% API
+-export([ heroku_test/3
+ ,heroku_test_req/3
+ ]).
+
+%%====================================================================
+%% API
+%%====================================================================
+
+heroku_test(Count, Token, Url) when is_list(Url),
+ is_integer(Count),
+ Count > 0->
+ RawReq = lsp_request:to_iolist(heroku_test_req(Count, Token, Url)),
+ {ok, Uri = #ex_uri{}, _} = ex_uri:decode(Url),
+ {ok, Pid} = lsp_http_client:start_link(Uri),
+ {Pid,
+ lsp_http_client:raw_request(Pid, RawReq, 5000)}.
+
+heroku_test_req(Count, Token, Url) ->
+ {ok,
+ Uri0 = #ex_uri{authority = Auth},
+ _} = ex_uri:decode(Url),
+ NewAuth = Auth#ex_uri_authority{userinfo = "token:" ++ Token},
+ Uri = Uri0#ex_uri{authority = NewAuth},
+ Msgs = [ lsp_msg:heroku(now, Token, "console.1",
+ io_lib:format("Logsplat test message ~p from ~p.",
+ [N, self()]))
+ || N <- lists:seq(1, Count) ],
+ lsp_request:new(Msgs, Uri).
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
67 src/logsplat_app.erl
@@ -0,0 +1,67 @@
+%%%-------------------------------------------------------------------
+%% @copyright Geoff Cant
+%% @author Geoff Cant <nem@erlang.geek.nz>
+%% @version {@vsn}, {@date} {@time}
+%% @doc logsplat OTP App callback module.
+%% @end
+%%%-------------------------------------------------------------------
+
+-module(logsplat_app).
+
+-behaviour(application).
+
+-define(APP, logsplat).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+-export([config/0, config/1, config/2,
+ start/0, a_start/2]).
+
+%%%===================================================================
+%%% Convenience Functions
+%%%===================================================================
+
+start() ->
+ a_start(?APP, permanent).
+
+config(Key, Default) ->
+ case application:get_env(?APP, Key) of
+ undefined -> Default;
+ {ok, Val} -> Val
+ end.
+
+config(Key) ->
+ case application:get_env(?APP, Key) of
+ undefined -> erlang:error({missing_config, Key});
+ {ok, Val} -> Val
+ end.
+
+config() ->
+ application:get_all_env(?APP).
+
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+ logsplat_sup:start_link().
+
+stop(_State) ->
+ ok.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+a_start(App, Type) ->
+ start_ok(App, Type, application:start(App, Type)).
+
+start_ok(_App, _Type, ok) -> ok;
+start_ok(_App, _Type, {error, {already_started, _App}}) -> ok;
+start_ok(App, Type, {error, {not_started, Dep}}) ->
+ ok = a_start(Dep, Type),
+ a_start(App, Type);
+start_ok(App, _Type, {error, Reason}) ->
+ erlang:error({app_start_failed, App, Reason}).
41 src/logsplat_sup.erl
@@ -0,0 +1,41 @@
+%%%-------------------------------------------------------------------
+%% @copyright Geoff Cant
+%% @author Geoff Cant <nem@erlang.geek.nz>
+%% @version {@vsn}, {@date} {@time}
+%% @doc logsplat top level supervisor
+%% @end
+%%%-------------------------------------------------------------------
+-module(logsplat_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+%%====================================================================
+%% API functions
+%%====================================================================
+%%--------------------------------------------------------------------
+%% @spec start_link() -> {ok,Pid} | ignore | {error,Error}
+%% @doc: Starts the supervisor
+%% @end
+%%--------------------------------------------------------------------
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+%%====================================================================
+%% Supervisor callbacks
+%%====================================================================
+
+%% Child :: {Id,StartFunc,Restart,Shutdown,Type,Modules}
+init([]) ->
+ {ok, { {one_for_all, 0, 1}, []} }.
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
174 src/lsp_http_client.erl
@@ -0,0 +1,174 @@
+%%%-------------------------------------------------------------------
+%% @copyright Geoff Cant
+%% @author Geoff Cant <nem@erlang.geek.nz>
+%% @version {@vsn}, {@date} {@time}
+%% @doc
+%% @end
+%%%-------------------------------------------------------------------
+-module(lsp_http_client).
+
+-include("logsplat_log.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("ex_uri/include/ex_uri.hrl").
+
+%% API
+-export([start_link/7
+ ,raw_request/3
+ ,close/1
+ ,start_link/1
+ ]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {drain_id, channel_id, dest,
+ client}).
+
+%%====================================================================
+%% API
+%%====================================================================
+
+start_link(Uri = #ex_uri{scheme = Scheme,
+ authority = #ex_uri_authority{host = Host,
+ port = Port}}) ->
+ start_link(1, 1, ex_uri:encode(ex_uri:hide_userinfo(Uri)),
+ Scheme, Host,
+ find_port(Scheme, Port), 5000).
+
+start_link(DrainId, ChannelId, Dest, Scheme, Host, Port, Timeout) ->
+ gen_server:start_link(?MODULE,
+ [#state{drain_id=DrainId,
+ channel_id=ChannelId,
+ dest=Dest},
+ Scheme, Host, Port],
+ [{timeout, Timeout}]).
+
+raw_request(Pid, Req, Timeout) ->
+ gen_server:call(Pid, {raw_request, Req}, Timeout).
+
+close(Pid) ->
+ gen_server:cast(Pid, close),
+ timer:kill_after(timer:seconds(1), Pid), % XXX - necessary?
+ ok.
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%% @private
+init([State = #state{},
+ Scheme, Host, Port]) ->
+ {ok, Client0} = client_init(Scheme),
+ ConnectStart = os:timestamp(),
+ try cowboy_client:connect(scheme_to_transport(Scheme),
+ Host, Port, Client0) of
+ {ok, Client} ->
+ ConnectEnd = os:timestamp(),
+ ?INFO("drain_id=~p channel_id=~p dest=~s at=try_connect "
+ "attempt=success connect_time=~p",
+ log_info(State, [ltcy(ConnectStart, ConnectEnd)])),
+ {ok, State#state{client=Client}};
+ {error, Why} ->
+ ConnectEnd = os:timestamp(),
+ ?WARN("drain_id=~p channel_id=~p dest=~s at=try_connect "
+ "attempt=fail connect_time=~p tcp_err=~1000p",
+ log_info(State, [ltcy(ConnectStart, ConnectEnd), Why])),
+ ignore
+ catch
+ Class:Err ->
+ Report = {Class, Err, erlang:get_stacktrace()},
+ ConnectEnd = os:timestamp(),
+ ?WARN("drain_id=~p channel_id=~p dest=~s at=connect "
+ "attempt=fail err=exception connect_time=~p "
+ "next_state=disconnected "
+ "data=~1000p",
+ log_info(State, [ltcy(ConnectStart, ConnectEnd), Report])),
+ ignore
+ end.
+
+%% @private
+handle_call({raw_request, Req}, _From, State) ->
+ ReqStart = os:timestamp(),
+ case raw_request(Req, State) of
+ {ok, Status, Headers, NewState} ->
+ ReqEnd = os:timestamp(),
+ ?INFO("drain_id=~p channel_id=~p dest=~s at=response "
+ "result=success status=~p req_time=~p",
+ log_info(State, [Status, ltcy(ReqStart, ReqEnd)])),
+ {reply, {ok, Status, Headers}, NewState};
+ {error, Why} = Err ->
+ ReqEnd = os:timestamp(),
+ ?WARN("drain_id=~p channel_id=~p dest=~s at=response"
+ " result=error req_time=~p tcp_err=\"~1000p\"",
+ log_info(State, [ltcy(ReqStart, ReqEnd), Why])),
+ {stop, normal, Err, State}
+ end;
+
+handle_call(Call, _From, State) ->
+ ?WARN("Unexpected call ~p.", [Call]),
+ {noreply, State}.
+
+%% @private
+handle_cast(close, State = #state{client=Client}) ->
+ cowboy_client:close(Client),
+ {stop, normal, State};
+
+handle_cast(Msg, State) ->
+ ?WARN("Unexpected cast ~p", [Msg]),
+ {noreply, State}.
+
+%% @private
+handle_info(Info, State) ->
+ ?WARN("Unexpected info ~p", [Info]),
+ {noreply, State}.
+
+%% @private
+terminate(_Reason, _State) ->
+ ok.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+scheme_to_transport("https") -> ranch_ssl;
+scheme_to_transport("http") -> ranch_tcp.
+
+client_init("http") ->
+ cowboy_client:init([]);
+client_init("https") ->
+ cowboy_client:init([{reuse_sessions, false}
+ ]).
+
+find_port(_, Port) when is_integer(Port) -> Port;
+find_port("http", _) -> 80;
+find_port("https", _) -> 443.
+
+log_info(#state{drain_id=DrainId, channel_id=ChannelId, dest=Dest}, Rest)
+ when is_list(Rest) ->
+ [DrainId, ChannelId, Dest | Rest].
+
+
+ltcy(Start, End) ->
+ timer:now_diff(End, Start).
+
+raw_request(Request, State = #state{client=Client}) ->
+ try
+ {ok, Client2} =
+ cowboy_client:raw_request(Request, Client),
+ case cowboy_client:response(Client2) of
+ {ok, Status, Headers, NewClient} ->
+ {ok, Status, Headers,
+ State#state{client = NewClient}};
+ {error, _Why} = Err ->
+ Err
+ end
+ catch
+ Class:Ex ->
+ {error, {Class, Ex, erlang:get_stacktrace()}}
+ end.
78 src/lsp_msg.erl
@@ -0,0 +1,78 @@
+%%%-------------------------------------------------------------------
+%% @copyright Geoff Cant
+%% @author Geoff Cant <nem@erlang.geek.nz>
+%% @version {@vsn}, {@date} {@time}
+%% @doc Logsplat message internals
+%% @end
+%%%-------------------------------------------------------------------
+-module(lsp_msg).
+
+%% API
+-export([ new/0
+ ,new/1
+ ,new/8
+ ,to_iolist/1
+ ,heroku/4
+ ]).
+
+-record(rfc5424,
+ {facility :: lsp_syslog_utils:facility(),
+ severity :: lsp_syslog_utils:severity(),
+ time :: iolist() | 'now',
+ host :: iolist(),
+ app_name :: iolist(),
+ proc_id :: iolist(),
+ msg_id :: iolist(),
+ msg :: iolist()
+ }).
+
+-type msg() :: #rfc5424{}.
+
+-export_type([ msg/0 ]).
+
+%%====================================================================
+%% API
+%%====================================================================
+
+new() ->
+ new(local0, debug, now, "localhost", "erlang.1", undefined, undefined,
+ "lsp_msg test message.").
+
+new(Facility, Severity, Time, Host, AppName, ProcID, MsgID, Msg) ->
+ #rfc5424{facility = Facility,
+ severity = Severity,
+ time = Time,
+ host = Host,
+ app_name = AppName,
+ proc_id = ProcID,
+ msg_id = MsgID,
+ msg = Msg}.
+
+new(Count) when is_integer(Count), Count > 0 ->
+ [ new(local0, debug, now, "localhost",
+ "erlang.1", undefined, integer_to_list(N),
+ io_lib:format("lsp_msg test message ~p.", [N]))
+ || N <- lists:seq(1, Count) ].
+
+to_iolist(#rfc5424{facility = Facility,
+ severity = Severity,
+ time = Time,
+ host = Host,
+ app_name = AppName,
+ proc_id = ProcID,
+ msg_id = MsgID,
+ msg = RMsg}) ->
+ lsp_syslog_utils:rfc5424(Facility, Severity, expand_time(Time),
+ Host, AppName, ProcID, MsgID, RMsg).
+
+heroku(Time, Token, PS, Msg) ->
+ new(local0, info, Time, "erlang", Token, PS, undefined, Msg).
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
+
+expand_time(Time) when is_atom(Time);
+ is_tuple(Time) ->
+ lsp_syslog_utils:datetime(Time);
+expand_time(Time) -> Time.
104 src/lsp_request.erl
@@ -0,0 +1,104 @@
+%%%-------------------------------------------------------------------
+%% @copyright Geoff Cant
+%% @author Geoff Cant <nem@erlang.geek.nz>
+%% @version {@vsn}, {@date} {@time}
+%% @doc
+%% @end
+%%%-------------------------------------------------------------------
+-module(lsp_request).
+
+-include_lib("ex_uri/include/ex_uri.hrl").
+
+%% API
+-export([new/2
+ ,to_iolist/1]).
+
+-record(frame, {body :: iolist(),
+ msg_count :: non_neg_integer(),
+ id :: iolist()}).
+-record(lsp_request, {uri :: #ex_uri{},
+ logs = [] :: list(lsp_msg:msg())}).
+-define(CONTENT_TYPE, <<"application/logplex-1">>).
+-define(HTTP_VERSION, {1,1}).
+
+%%====================================================================
+%% API
+%%====================================================================
+
+new(Logs, Url) when is_list(Url) ->
+ {ok, Uri = #ex_uri{}, _} = ex_uri:decode(Url),
+ new(Logs, Uri);
+new(Logs, Uri = #ex_uri{}) when is_list(Logs) ->
+ #lsp_request{logs=Logs, uri=Uri}.
+
+to_iolist(#lsp_request{uri = Uri} = Req) ->
+ to_iolist(frame(Req), Uri).
+
+frame(#lsp_request{logs=Logs}) ->
+ #frame{ body =
+ [ begin
+ MsgS = lsp_msg:to_iolist(Msg),
+ lsp_syslog_utils:frame(MsgS)
+ end
+ || Msg <- Logs ],
+ msg_count = length(Logs),
+ id = frame_id_iolist({self(), now()}) }.
+
+frame_id_iolist(Term) ->
+ << <<(hd(integer_to_list(I, 16)))>>
+ || <<I:4>> <= crypto:md5(term_to_binary(Term)) >>.
+
+to_iolist(#frame{body = Body,
+ msg_count = Count,
+ id = Id},
+ URI = #ex_uri{}) ->
+ AuthHeader = auth_header(URI),
+ MD5Header = case logsplat_app:config(http_body_checksum, none) of
+ md5 -> [{<<"Content-MD5">>,
+ base64:encode(crypto:md5(Body))}];
+ none -> []
+ end,
+ Headers = MD5Header ++ AuthHeader ++
+ [{<<"Content-Type">>, ?CONTENT_TYPE},
+ {<<"Logplex-Msg-Count">>, integer_to_list(Count)},
+ {<<"Logplex-Frame-Id">>, Id},
+ {<<"User-Agent">>, user_agent()}
+ ],
+ cowboy_client:request_to_iolist(<<"POST">>,
+ Headers,
+ Body,
+ ?HTTP_VERSION,
+ full_host_iolist(URI),
+ uri_ref(URI)).
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
+
+auth_header(#ex_uri{authority=#ex_uri_authority{userinfo=Info}})
+ when Info =/= undefined ->
+ cowboy_client:auth_header(Info);
+auth_header(_) ->
+ [].
+
+full_host_iolist(#ex_uri{authority=#ex_uri_authority{host=Host,
+ port=Port}})
+ when is_integer(Port), Host =/= undefined ->
+ [Host, ":", integer_to_list(Port)];
+full_host_iolist(#ex_uri{authority=#ex_uri_authority{host=Host}})
+ when Host =/= undefined ->
+ Host.
+
+uri_ref(#ex_uri{path=Path, q=Q}) ->
+ Ref = #ex_uri_ref{path = case Path of
+ "" -> "/";
+ _ -> Path
+ end, q=Q},
+ ex_uri:encode(Ref).
+
+user_agent() ->
+ [<<"Logsplat/">>,
+ case application:get_key(logsplat, vsn) of
+ undefined -> <<"unknown">>;
+ {ok, Vsn} -> Vsn
+ end].
142 src/lsp_syslog_parser.erl
@@ -0,0 +1,142 @@
+%% @copyright Heroku 2011
+%% @author Geoff Cant <nem@erlang.geek.nz>
+%% @version {@vsn}, {@date} {@time}
+%% @doc Re-entrant syslog tcp stream parser.
+%% @end
+-module(lsp_syslog_parser).
+
+-export([new/0, push/2]).
+
+-export([example1/0, parse/1]).
+
+
+-define(SYSLOG_MAX_SIZE, 10240). % 10Kb max in a single log message.
+
+-record(buf, {bytes = <<>> :: binary(),
+ waiting_for = unknown :: 'unknown' | non_neg_integer()}).
+
+-opaque parse_buffer() :: #buf{}.
+-type syslog_message() :: {msg, binary()} | {malformed, binary()}.
+-type syslog_messages() :: [syslog_message()].
+
+%% Syslog frames
+%% a) <Length as ascii integer><space><msg:Length>"
+%% b) <msg>\n.
+%% ZOMGWTFBBQ? Two framing formats and they get switched between
+%% randomly? #killmaimdestroy
+
+example1() ->
+ <<"1753626010 web.23 - - Started GET \"/default/payload\" for 69.179.15.165 at 2011-12-12 15:20:13 -0800\n218 <13>1 2011-12-12T23:20:13+00:00 runtime.60161@heroku.com t.fa298f04-b533-4a04-8b47-1e1753626010 web.9 - - cache: [GET /?a=gd&v=2&m=263100500876800&f=%2C21474836474ecc3d2c729cd7.34662728%2C1%2C3%2Cgrupo%2Cgrupov1] miss\n200 <158>1 2011-12-12T23:20:13+00:00 hermes-argon.60125@heroku.com t.12d76170-aa5c-4f25-8f3a-bf214f93d208 router - - POST p.pingme.net/v1/poll dyno=web.26 queue=0 wait=0ms service=61ms status=200 bytes=2\n202 <158>1 2011-12-12T23:20:13+00:00 hermes-argon.60125@heroku.com t.922a3f60-85bc-4d99-9d24-1a4c6caa7fb2 router - - GET d.lqw.me/delivery.js dyno=web.24 queue=0 wait=0ms service=83ms status=200 bytes=3187\n204 <158>1 2011-12-12T23:20:13+00:00 hermes-argon.60125@heroku.com t.1681f42f-75e0-484b-bdcd-4fd3b8487f92 router - - GET bid.tapengage.com/admeld dyno=web.13 queue=0 wait=0ms service=11ms status=200 bytes=53\n288 <13>1 2011-12-12T23:20:13+00:00 runtime.60159@heroku.com t.223ccc64-e9fb-4570-ba">>.
+
+new() ->
+ #buf{}.
+
+parse(Bytes) when is_binary(Bytes) ->
+ push(Bytes, new()).
+
+-spec push(Bytes::binary(), parse_buffer()) ->
+ { ok | {error, term()},
+ syslog_messages(),
+ parse_buffer()}.
+
+push(Bytes, Buf = #buf{bytes=OldBuf, waiting_for=WF})
+ when is_binary(Bytes) ->
+ push(iolist_to_binary([OldBuf, Bytes]), WF, Buf).
+
+push(Bytes, RequiredLength, Buf)
+ when is_integer(RequiredLength),
+ byte_size(Bytes) < RequiredLength ->
+ %% Haven't accumulated enough bytes to complete a parse yet.
+ {ok, [], Buf#buf{bytes=Bytes}};
+push(Bytes, _, _) ->
+ parse_recursive(Bytes, []).
+
+
+-spec parse_recursive(binary(), Acc::syslog_messages()) ->
+ {'ok' | {'error', term()},
+ syslog_messages(), parse_buffer()}.
+parse_recursive(<<>>, Acc) ->
+ {ok, lists:reverse(Acc), #buf{}};
+parse_recursive(Buf, Acc) ->
+ case parse_beginning(Buf) of
+ {incomplete, N, Rest} ->
+ {ok, lists:reverse(Acc), #buf{bytes=Rest, waiting_for=N}};
+ {_Msg, Buf} ->
+ {{error, {parser_made_no_progress, Buf}},
+ lists:reverse(Acc), #buf{}};
+ {Msg, Rest} ->
+ parse_recursive(Rest, [Msg | Acc])
+ end.
+
+%% Can only be used at the beginning of a message. Will give bogus
+%% results otherwise.
+-spec msg_type(Buf::binary()) -> {'length_prefixed',
+ Length::non_neg_integer(),
+ Offset::non_neg_integer()} |
+ 'nl_terminated' |
+ 'incomplete'.
+msg_type(<<FirstChar, _/binary>> = Buf)
+ when $1 =< FirstChar, FirstChar =< $9 ->
+ case find_int(Buf, 0) of
+ {Len, Offset} when Len =< ?SYSLOG_MAX_SIZE ->
+ {length_prefixed, Len, Offset};
+ {Len, _} when Len > ?SYSLOG_MAX_SIZE ->
+ nl_terminated;
+ bad_int_or_missing_space ->
+ nl_terminated;
+ incomplete ->
+ incomplete
+ end;
+msg_type(_) ->
+ nl_terminated.
+
+-spec parse_beginning(binary()) ->
+ {syslog_message(), Remaining::binary()} |
+ {'incomplete', 'unknown' | non_neg_integer(),
+ Remaining::binary()}.
+parse_beginning(Buf) ->
+ case msg_type(Buf) of
+ {length_prefixed, Len, Offset} ->
+ RequiredLength = Offset + Len,
+ if byte_size(Buf) >= RequiredLength ->
+ <<_:Offset/binary,
+ Msg:Len/binary,
+ Rest/binary>> = Buf,
+ {{msg, Msg}, Rest};
+ true ->
+ {incomplete, RequiredLength - byte_size(Buf), Buf}
+ end;
+ incomplete ->
+ {incomplete, unknown, Buf};
+ nl_terminated ->
+ parse_to_nl(Buf)
+ end.
+
+%% Idx 0 must be 1-9.
+%% Finds the ascii encoded integer terminated by a space.
+-spec find_int(binary(), non_neg_integer()) ->
+ 'incomplete' |
+ {Length::non_neg_integer(),
+ MsgOffset::non_neg_integer()} |
+ 'bad_int_or_missing_space'.
+find_int(Buf, Idx) when byte_size(Buf) =< Idx ->
+ incomplete;
+find_int(Buf, Idx) ->
+ case binary:at(Buf, Idx) of
+ C when $0 =< C, C =< $9 ->
+ find_int(Buf, Idx + 1);
+ $\s ->
+ {list_to_integer(binary:bin_to_list(Buf, 0, Idx)), Idx + 1};
+ _ ->
+ bad_int_or_missing_space
+ end.
+
+-spec parse_to_nl(binary()) -> {'incomplete', 'unknown', binary()} |
+ {{'malformed', binary()}, Rest::binary()}.
+parse_to_nl(Buf) ->
+ case binary:split(Buf, [<<"\r\n">>, <<"\n">>]) of
+ [Msg, Rest] ->
+ {{malformed, Msg}, Rest};
+ [Buf] ->
+ {incomplete, unknown, Buf}
+ end.
164 src/lsp_syslog_utils.erl
@@ -0,0 +1,164 @@
+%% @copyright Geoff Cant
+%% @author Geoff Cant <nem@erlang.geek.nz>
+%% @version {@vsn}, {@date} {@time}
+%% @doc Syslog message formatting utilities.
+%% @end
+-module(lsp_syslog_utils).
+
+-export([to_msg/2
+ ,from_msg/1
+ ,frame/1
+ ,datetime/1
+ ,facility_to_int/1
+ ,severity_to_int/1
+ ,fmt/7
+ ,rfc5424/1
+ ,rfc5424/8
+ ]).
+
+-type syslog_msg() :: {0..128, 0..7,
+ Time::iolist(), Source::iolist(),
+ Process::iolist(), Msg::iolist()}.
+
+-export_type([ syslog_msg/0
+ ,facility/0
+ ,severity/0
+ ]).
+
+-spec to_msg(syslog_msg(), iolist() | binary()) -> iolist().
+to_msg({Facility, Severity, Time, Source, Process, Msg}, Token) ->
+ [ <<"<">>, pri(Facility, Severity), <<">1 ">>,
+ Time, $\s, Token, $\s, Source, $\s, Process, <<" - - ">>, Msg ].
+
+rfc5424({Facility, Severity, Time, Source, Process, Msg}) ->
+ rfc5424(Facility, Severity, Time, Source,
+ Process, undefined, undefined, Msg).
+
+rfc5424(Facility, Severity, Time, Host, AppName, ProcID, MsgID, Msg) ->
+ [ <<"<">>, pri(Facility, Severity), <<">1">>,
+ [ [$\s, nvl(Item)]
+ || Item <- [Time, Host, AppName, ProcID, MsgID] ],
+ case Msg of
+ undefined -> [];
+ _ -> [$\s, Msg]
+ end
+ ].
+
+nvl(undefined) -> $-;
+nvl(Val) -> Val.
+
+
+from_msg(Msg) when is_binary(Msg) ->
+ %% <40>1 2010-11-10T17:16:33-08:00 domU-12-31-39-13-74-02 t.xxx web.1 - - State changed from created to starting
+ %% <PriFac>1 Time Host Token Process - - Msg
+ case re:run(Msg, "^<(\\d+)>1 (\\S+) \\S+ (\\S+) (\\S+) \\S+ \\S+ (.*)",
+ [{capture, all_but_first, binary}]) of
+ {match, [PriFac, Time, Source, Ps, Content]} ->
+ <<Facility:5, Severity:3>> =
+ << (list_to_integer(binary_to_list(PriFac))):8 >>,
+ {Facility, Severity, Time, Source, Ps, Content};
+ _ ->
+ {error, bad_syslog_msg}
+ end.
+
+-spec pri(0..128 | atom(), 0..7 | atom()) -> iolist().
+pri(Facility, Severity)
+ when is_atom(Facility) ->
+ pri(facility_to_int(Facility), Severity);
+pri(Facility, Severity)
+ when is_integer(Facility), is_atom(Severity) ->
+ pri(Facility, severity_to_int(Severity));
+pri(Facility, Severity)
+ when is_integer(Facility),
+ is_integer(Severity),
+ 0 =< Severity, Severity =< 7 ->
+ integer_to_list(Facility * 8 + Severity).
+
+-spec frame(Msg::iolist()) -> iolist().
+frame(Msg) when is_binary(Msg); is_list(Msg) ->
+ Length = iolist_size(Msg),
+ [ integer_to_list(Length),
+ " ",
+ Msg ].
+
+datetime(now) ->
+ datetime(os:timestamp());
+datetime({_,_,_} = Now) ->
+ DT = calendar:now_to_universal_time(Now),
+ datetime(DT);
+datetime({{Y,M,D},{H,MM,S}}) ->
+ io_lib:format("~4.10.0B-~2.10.0B-~2.10.0B ~2.10.0B:~2.10.0B:~2.10.0B"
+ "Z+00:00",
+ [Y,M,D, H,MM,S]).
+
+fmt(Facility, Severity, Time, Source, Process, Fmt, Args) ->
+ {facility_to_int(Facility),
+ severity_to_int(Severity),
+ datetime(Time),
+ Source,
+ Process,
+ io_lib:format(Fmt, Args)}.
+
+-type facility() :: 0..127 |
+ 'kernel' | 'user' | 'mail' | 'system' | 'internal' | 'lp' |
+ 'news' | 'uucp' | 'clock' | 'security2' | 'ftp' | 'ntp' |
+ 'audit' | 'alert' | 'clock2' | 'local0' | 'local1' |
+ 'local2' | 'local3' | 'local4' | 'local5' | 'local6' |
+ 'local7'.
+facilities() ->
+ [ { 0, kernel, "kernel messages"}
+ ,{ 1, user, "user-level messages"}
+ ,{ 2, mail, "mail system"}
+ ,{ 3, system, "system daemons"}
+ ,{ 4, security, "security/authorization messages"}
+ ,{ 5, internal, "messages generated internally by syslogd"}
+ ,{ 6, lp, "line printer subsystem"}
+ ,{ 7, news, "network news subsystem"}
+ ,{ 8, uucp, "UUCP subsystem"}
+ ,{ 9, clock, "clock daemon"}
+ ,{10, security2, "security/authorization messages"}
+ ,{11, ftp, "FTP daemon"}
+ ,{12, ntp, "NTP subsystem"}
+ ,{13, audit, "log audit"}
+ ,{14, alert, "log alert"}
+ ,{15, clock2, "clock daemon (note 2)"}
+ ,{16, local0, "local use 0 (local0)"}
+ ,{17, local1, "local use 1 (local1)"}
+ ,{18, local2, "local use 2 (local2)"}
+ ,{19, local3, "local use 3 (local3)"}
+ ,{20, local4, "local use 4 (local4)"}
+ ,{21, local5, "local use 5 (local5)"}
+ ,{22, local6, "local use 6 (local6)"}
+ ,{23, local7, "local use 7 (local7)"}].
+
+-spec facility_to_int(facility()) -> 0..127.
+facility_to_int(I)
+ when is_integer(I), 0 =< I, I =< 127 ->
+ I;
+facility_to_int(A) when is_atom(A) ->
+ element(1, lists:keyfind(A, 2, facilities())).
+
+-type severity() :: 0..7 |
+ 'emergency' |
+ 'alert' |
+ 'critical' |
+ 'error' |
+ 'warning' |
+ 'notice' |
+ 'info' |
+ 'debug'.
+severities() ->
+ [ {0, emergency, "Emergency: system is unusable"}
+ ,{1, alert, "Alert: action must be taken immediately"}
+ ,{2, critical, "Critical: critical conditions"}
+ ,{3, error, "Error: error conditions"}
+ ,{4, warning, "Warning: warning conditions"}
+ ,{5, notice, "Notice: normal but significant condition"}
+ ,{6, info, "Informational: informational messages"}
+ ,{7, debug, "Debug: debug-level messages"}].
+
+-spec severity_to_int(severity()) -> 0..7.
+severity_to_int(I) when is_integer(I) ->
+ I;
+severity_to_int(A) when is_atom(A) ->
+ element(1, lists:keyfind(A, 2, severities())).

0 comments on commit 561365e

Please sign in to comment.