Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
  • 19 commits
  • 9 files changed
  • 0 commit comments
  • 3 contributors
Commits on Jul 09, 2012
@tel tel * src/zeta.app.src: we don't actually need lager to be started 4026558
@tel tel * Makefile: Added a nice trick for quickly running an erlang vm 21fbd89
@tel tel * README.md: Updated the config in the readme slightly a1356da
Commits on Jul 16, 2012
@tel tel More detailed error messages and sublime-text integration 844ca93
@tel tel Licensed under 2-clause BSD fe69880
@tel tel Makefile defaults to 'ft' instead of 'full' a505289
Commits on Jul 25, 2012
@lefant lefant use gen_tcp:close/1 to close tcp socket 053d001
@lefant lefant store host and port in zeta_client server state, use them to send udp 3216994
Commits on Jul 27, 2012
@lefant lefant implement and export cv_batch/1 and sv_batch/1 for sending of several…
… events at once
c021e0d
@lefant lefant Merge branch 'fix-udp' into campanja 7c1213f
Commits on Aug 02, 2012
@lefant lefant fix erlando url in rebar.config d6c5687
Commits on Aug 28, 2012
@lefant lefant just log and shutdown cleanly on disconnect instead of crashing 8d21311
@lefant lefant just log and shutdown cleanly in case of initial connection refused c893fa0
Commits on Sep 03, 2012
@lefant lefant Merge branch 'disconnect_no_crash' into campanja 36cce03
Commits on Sep 11, 2012
Torbjörn Norinder handle all errors during gen_tcp:recv 4d06aae
Torbjörn Norinder Set nodelay to true a864005
@lefant lefant reply ok when shutting down to avoid crash for our best effort tcp se…
…nding
c6d54ac
Commits on Nov 20, 2012
@lefant lefant remove covertool in rebar.config, we don't use it and its dep on reba…
…r breaks our build
e572e53
@lefant lefant Merge branch 'remove-covertool-dep' into campanja d556bca
View
10 LICENSE
@@ -0,0 +1,10 @@
+Copyright (c) 2012, Joseph Abrahamson
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY Joseph Abrahamson "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Joseph Abrahamson BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
View
9 Makefile
@@ -4,6 +4,9 @@ APP := zeta
.PHONY: deps
+ft:
+ @$(REBAR) skip_deps=true compile eunit
+
all: deps
@$(REBAR) compile
@@ -22,12 +25,12 @@ docs:
test: all
@$(REBAR) skip_deps=true eunit
-ft:
- @$(REBAR) skip_deps=true compile eunit
-
fast:
@$(REBAR) skip_deps=true compile
+run:
+ @$(ERL) -pa $(PWD)/../*/ebin
+
dialyzer: all
dialyzer \
ebin --no_check_plt \
View
7 README.md
@@ -8,7 +8,7 @@ By default Zeta is configured to sent to a Riemann server at
`127.0.0.1:5555`, so fix that in your global configuration
```erlang
-{zeta, {client, {Address, Port}}}
+{zeta, {clients, [{default, {{127,0,0,1}, 5555, undefined}}]}}
```
then include and start the Zeta application
@@ -29,10 +29,11 @@ ok
ok
```
+Feel free to instrument your code even if the app isn't running. The
+messages will just be ignored.
+
# To do #
-- *Fix lifecycles*. Right now, a failing Riemann server will quickly kill the
- zeta app and any release running it. This is dumb.
- Decide how to handle TCP/UDP methods.
- Make UDP automatically upgrade to TCP if the packet is too large
- Allow for multiple client endpoints
View
14 rebar.config
@@ -1,6 +1,6 @@
%%-*- mode: erlang -*-
-{plugins, [rebar_covertool]}.
+%{plugins, [rebar_covertool]}.
{sub_dirs, ["rel"]}.
{erl_opts, [fail_on_warning, debug_info,
@@ -11,18 +11,18 @@
{cover_print_enable, true}.
{clean_files, ["*.eunit", "ebin/*.beam"]}.
-{covertool_eunit, ".eunit/eunit.coverage.xml"}.
-{covertool_prefix_len, 2}.
+%%{covertool_eunit, ".eunit/eunit.coverage.xml"}.
+%%{covertool_prefix_len, 2}.
{lib_dirs, [".."]}.
{deps_dir, ".."}.
{deps, [{lager, ".*",
{git, "git://github.com/basho/lager.git", "HEAD"}},
{erlando, ".*",
- {git, "git@github.com:reifyhealth/erlando.git", "HEAD"}},
+ {git, "git://github.com/reifyhealth/erlando.git", "HEAD"}},
{proper, ".*",
{git, "https://github.com/manopapad/proper.git", "HEAD"}},
{protobuffs, ".*",
- {git, "https://github.com/dizzyd/protobuffs.git", "HEAD"}},
- {covertool, ".*",
- {git, "https://github.com/idubrov/covertool.git", "HEAD"}}]}.
+ {git, "https://github.com/dizzyd/protobuffs.git", "HEAD"}}]}.
+%% {covertool, ".*",
+%% {git, "https://github.com/idubrov/covertool.git", "HEAD"}}]}.
View
3  src/zeta.app.src
@@ -8,8 +8,7 @@
{registered, [zeta]},
{applications, [
kernel,
- stdlib,
- lager
+ stdlib
]},
{mod, {zeta, []}},
View
24 src/zeta.erl
@@ -7,6 +7,7 @@
-export([ev/2, ev/3, ev/4, evh/2, evh/3, evh/4]).
-export([sv/2, sv/3, sv/4, svh/2, svh/3, svh/4]).
-export([cv/2, cv/3, cv/4, cvh/2, cvh/3, cvh/4]).
+-export([cv_batch/1, sv_batch/1]).
-export([all_client_configs/0, client_config/1]).
@@ -88,12 +89,16 @@ sv(Loc, Metric) -> sv(Loc, Metric, undefined).
sv(Loc, Metric, State) -> sv(Loc, Metric, State, []).
sv(Loc, Metric, State, Opts) ->
E = ev(Loc, Metric, State, Opts),
- M = #zeta_msg{zevents = [E]},
+ sv_batch([E]).
+
+sv_batch(Es) ->
+ M = #zeta_msg{zevents = Es},
Data = zeta_pb:encode(M),
Length = byte_size(Data),
- do([error_m ||
- Client <- zeta_corral:client(),
- gen_server:call(Client, {events, <<Length:32/integer-big, Data/binary>>})]).
+ do([error_m ||
+ Client <- zeta_corral:client(),
+ gen_server:call(
+ Client, {events, <<Length:32/integer-big, Data/binary>>})]).
svh(Service, Metric) -> svh(Service, Metric, undefined).
svh(Service, Metric, State) -> svh(Service, Metric, State, []).
@@ -103,17 +108,20 @@ cv(Loc, Metric) -> cv(Loc, Metric, undefined).
cv(Loc, Metric, State) -> cv(Loc, Metric, State, []).
cv(Loc, Metric, State, Opts) ->
E = ev(Loc, Metric, State, Opts),
- M = #zeta_msg{zevents = [E]},
+ cv_batch([E]).
+
+cv_batch(Es) ->
+ M = #zeta_msg{zevents = Es},
Data = zeta_pb:encode(M),
do([error_m ||
- Client <- zeta_corral:client(),
- gen_server:cast(Client, {events, Data})]).
+ Client <- zeta_corral:client(),
+ gen_server:cast(Client, {events, Data})]).
+
cvh(Service, Metric) -> cvh(Service, Metric, undefined).
cvh(Service, Metric, State) -> cvh(Service, Metric, State, []).
cvh(Service, Metric, State, Opts) -> cv({node(), Service}, Metric, State, Opts).
-
lookup(K, List) when is_atom(K) ->
case lists:keyfind(K, 1, List) of
{K, V} -> V;
View
50 src/zeta_client.erl
@@ -12,7 +12,9 @@
handle_info/2, code_change/3]).
-record(st, {tcp :: inet:socket(),
- udp :: inet:socket()}).
+ udp :: inet:socket(),
+ host :: inet:ip_address() | inet:hostname(),
+ port :: integer()}).
%% -------------
%% Lifecycle API
@@ -31,10 +33,15 @@ init([Host, Port]) ->
%% Open UDP on a random port
{ok, UDPSock} = gen_udp:open(0, [binary, {active,false}]),
%% Try to make a TCP connection
- {ok, TCPSock} = gen_tcp:connect(Host, Port,
- [binary, {active, false}],
- 5000),
- {ok, #st{udp = UDPSock, tcp = TCPSock}}.
+ case gen_tcp:connect(Host, Port,
+ [binary, {active, false}, {nodelay, true}],
+ 5000) of
+ {ok, TCPSock} ->
+ {ok, #st{udp = UDPSock, tcp = TCPSock, host = Host, port = Port}};
+ {error, econnrefused} ->
+ error_logger:info_msg("zeta_client connection refused"),
+ {stop, {shutdown, econnrefused}}
+ end.
terminate(_Reason, #st{udp = UDPSock, tcp = TCPSock}) ->
case UDPSock of
@@ -43,23 +50,34 @@ terminate(_Reason, #st{udp = UDPSock, tcp = TCPSock}) ->
end,
case TCPSock of
undefined -> ok;
- TSock -> gen_udp:close(TSock)
+ TSock -> gen_tcp:close(TSock)
end.
handle_call({events, Msg}, _From, St = #st{tcp = TCP}) ->
- ok = gen_tcp:send(TCP, Msg),
- case gen_tcp:recv(TCP, 0, 2000) of
- {ok, Resp} ->
- case zeta_pb:pop(Resp) of
- {#zeta_msg{ok = true}, _} -> {reply, ok, St};
- {#zeta_msg{error = Error}, _} -> {error, {riemann, Error}};
- {none, _} -> {error, noparse}
- end
+ case gen_tcp:send(TCP, Msg) of
+ ok ->
+ case gen_tcp:recv(TCP, 0, 2000) of
+ {ok, Resp} ->
+ case zeta_pb:pop(Resp) of
+ {#zeta_msg{ok = true}, _} ->
+ {reply, ok, St};
+ {#zeta_msg{error = Error}, _} ->
+ {error, {riemann, Error}};
+ {none, _} -> {error, noparse}
+ end;
+ {error, _} = Error ->
+ error_logger:info_msg(
+ "ignoring zeta_client receive error: ~p~n", [Error]),
+ {stop, {shutdown, tcp_recv_error}, ok, St}
+ end;
+ {error, closed} ->
+ error_logger:info_msg("zeta_client disconnected"),
+ {stop, {shutdown, connection_closed}, ok, St}
end;
handle_call(_Message, _From, State) -> {reply, ignored, State}.
-handle_cast({events, Msg}, St = #st{udp = UDP}) ->
- gen_udp:send(UDP, Msg),
+handle_cast({events, Msg}, St = #st{udp = UDP, host = Host, port = Port}) ->
+ gen_udp:send(UDP, Host, Port, Msg),
{noreply, St}.
handle_info(_Message, State) -> {ok, State}.
View
22 src/zeta_pb.erl
@@ -119,28 +119,28 @@ decode(Bin, Msg = #zeta_msg{zstates = States, zevents = Events}) ->
decode(Rest, Msg#zeta_msg{error = Value});
{{?MSG_ZSTATE, Value}, Rest} ->
case decode(Value, #zeta_state{}) of
- {error, R, _} -> {error, R, state_failed};
+ {error, R, _} -> {error, {state_failed, R}};
State ->
decode(Rest, Msg#zeta_msg{zstates = [State | States]})
end;
{{?MSG_ZEVENT, Value}, Rest} ->
case decode(Value, #zeta_event{}) of
- {error, R, _} -> {error, R, event_failed};
+ {error, R, _} -> {error, {event_failed, R}};
Event ->
decode(Rest, Msg#zeta_msg{zevents = [Event | Events]})
end;
{{?MSG_ZQUERY, Value}, Rest} ->
case decode(Value, #zeta_query{}) of
- {error, R, _} -> {error, R, query_failed};
+ {error, R, _} -> {error, {query_failed, R}};
Query ->
decode(Rest, Msg#zeta_msg{zquery = Query})
end
catch
- error:function_clause -> {error, noparse, zmsg};
+ error:function_clause -> {error, {noparse, zmsg}};
E:V -> {E, V}
end
catch
- error:function_clause -> {error, noparse, field}
+ error:function_clause -> {error, {noparse, field, zmsg}}
end;
decode(Bin, ZState = #zeta_state{tags = Tags}) ->
try protobuffs:read_field_num_and_wire_type(Bin) of
@@ -168,10 +168,10 @@ decode(Bin, ZState = #zeta_state{tags = Tags}) ->
{{?STATE_METRICF, MetricF}, Rest} ->
decode(Rest, ZState#zeta_state{metric_f = MetricF})
catch
- error:function_clause -> {error, noparse, zstate}
+ error:function_clause -> {error, {noparse, zstate}}
end
catch
- error:function_clause -> {error, noparse, field}
+ error:function_clause -> {error, {noparse, field, zstate}}
end;
decode(Bin, ZEvent = #zeta_event{tags = Tags}) ->
try protobuffs:read_field_num_and_wire_type(Bin) of
@@ -194,10 +194,10 @@ decode(Bin, ZEvent = #zeta_event{tags = Tags}) ->
{{?EVENT_METRICF, MetricF}, Rest} ->
decode(Rest, ZEvent#zeta_event{metric_f = MetricF})
catch
- error:function_clause -> {error, noparse, zevent}
+ error:function_clause -> {error, {noparse, zevent}}
end
catch
- error:function_clause -> {error, noparse, field}
+ error:function_clause -> {error, {noparse, field, zevent}}
end;
decode(Bin, ZQuery = #zeta_query{}) ->
try protobuffs:read_field_num_and_wire_type(Bin) of
@@ -206,10 +206,10 @@ decode(Bin, ZQuery = #zeta_query{}) ->
{{?QUERY_STRING, String}, Rest} ->
decode(Rest, ZQuery#zeta_query{string = String})
catch
- error:function_clause -> {error, noparse, zquery}
+ error:function_clause -> {error, {noparse, zquery}}
end
catch
- error:function_clause -> {error, noparse, field}
+ error:function_clause -> {error, {noparse, field, zquery}}
end.
%% Utilities
View
10 zeta.sublime-project
@@ -0,0 +1,10 @@
+{
+ "folders":
+ [
+ {
+ "path": ".",
+ "file_exclude_patterns": ["rebar"],
+ "folder_exclude_patterns": ["ebin", ".eunit", "log", "doc"]
+ }
+ ]
+}

No commit comments for this range

Something went wrong with that request. Please try again.