Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

merging nsync into master

  • Loading branch information...
commit 74dc7e0ff079bc01d5fde04c3428b10bc50771d3 2 parents 4f95cbf + f44f9a9
@jkvor jkvor authored
Showing with 675 additions and 1,253 deletions.
  1. +5 −1 .gitignore
  2. +9 −0 .gitmodules
  3. +6 −0 Makefile
  4. +1 −1  bin/console
  5. +14 −0 bin/recover_db
  6. +0 −3  bin/shard_info
  7. +5 −2 bin/weight
  8. +1 −0  deps/nsync
  9. +1 −0  deps/redgrid
  10. +1 −0  deps/redo
  11. +0 −3  ebin/logplex.app
  12. +6 −9 include/logplex.hrl
  13. +48 −29 src/logplex_api.erl
  14. +24 −20 src/logplex_app.erl
  15. +38 −186 src/logplex_channel.erl
  16. +0 −77 src/logplex_cloudkick.erl
  17. +64 −0 src/logplex_db.erl
  18. +23 −123 src/logplex_drain.erl
  19. +6 −2 src/logplex_drain_writer.erl
  20. +0 −104 src/logplex_grid.erl
  21. +37 −6 src/logplex_queue.erl
  22. +0 −161 src/logplex_rate_limit.erl
  23. +13 −107 src/logplex_session.erl
  24. +2 −7 src/logplex_shard.erl
  25. +1 −3 src/logplex_stats.erl
  26. +12 −3 src/logplex_tail.erl
  27. +31 −119 src/logplex_token.erl
  28. +14 −58 src/logplex_utils.erl
  29. +19 −40 src/logplex_worker.erl
  30. +222 −0 src/nsync_callback.erl
  31. +31 −115 src/redis_helper.erl
  32. +15 −8 src/udp_acceptor.erl
  33. +26 −66 test/logplex_api_tests.erl
View
6 .gitignore
@@ -1,4 +1,8 @@
release/*.boot
ebin/*.beam
erl_crash.dump
-go_src/*.out
+data
+channels
+tokens
+drains
+src/*.swp
View
9 .gitmodules
@@ -7,3 +7,12 @@
[submodule "deps/pagerduty"]
path = deps/pagerduty
url = git@git.herokai.com:erlang_pagerduty.git
+[submodule "deps/redgrid"]
+ path = deps/redgrid
+ url = git@git.herokai.com:redgrid.git
+[submodule "deps/redo"]
+ path = deps/redo
+ url = git@git.herokai.com:redo.git
+[submodule "deps/nsync"]
+ path = deps/nsync
+ url = git@git.herokai.com:nsync.git
View
6 Makefile
@@ -2,6 +2,9 @@ all:
(cd deps/redis_pool;$(MAKE) all)
(cd deps/mochiweb;$(MAKE) all)
(cd deps/pagerduty;$(MAKE) all)
+ (cd deps/redgrid;$(MAKE) all)
+ (cd deps/redo;$(MAKE) all)
+ (cd deps/nsync;$(MAKE) all)
@erl -make
@escript release/build_rel.escript boot logplex `pwd`/ebin
@@ -12,3 +15,6 @@ clean_all: clean
(cd deps/redis_pool;$(MAKE) clean)
(cd deps/mochiweb;$(MAKE) clean)
(cd deps/pagerduty;$(MAKE) clean)
+ (cd deps/redgrid;$(MAKE) clean)
+ (cd deps/redo;$(MAKE) clean)
+ (cd deps/nsync;$(MAKE) clean)
View
2  bin/console
@@ -8,4 +8,4 @@ else
fi
ulimit -n 65535
-erl +K true +A30 +P500000 -env ERL_MAX_PORTS 65535 -env HTTP_PORT 8002 -env INSTANCE_NAME localhost -env LOGPLEX_AUTH_KEY secret -env LOGPLEX_WORKERS 1 -env LOGPLEX_DRAIN_WRITERS 1 -env LOGPLEX_REDIS_WRITERS 1 -env LOGPLEX_READERS 1 -name logplex_console@$hostname -pa ebin -pa deps/*/ebin -boot release/logplex-1.0
+erl +K true +A30 +P500000 -env ERL_MAX_PORTS 65535 -env HTTP_PORT 8002 -env INSTANCE_NAME localhost -env LOGPLEX_AUTH_KEY secret -env LOGPLEX_WORKERS 1 -env LOGPLEX_DRAIN_WRITERS 1 -env LOGPLEX_REDIS_WRITERS 1 -env LOGPLEX_READERS 1 -name logplex_console@$hostname -pa ebin -pa deps/*/ebin -boot release/logplex-1.0
View
14 bin/recover_db
@@ -0,0 +1,14 @@
+#!/bin/sh
+
+if [ $# -ne 1 ]
+then
+ echo "Usage: `basename $0` {node}"
+ exit 1
+fi
+
+HOSTNAME=`hostname --fqdn`
+COOKIE=`awk -F"'" '/LOGPLEX_COOKIE/ { print $2 }' /home/logplex/keys.sh`
+HOME=/home/logplex
+
+erl -name recover@$HOSTNAME -pa ebin -pa deps/*/ebin -setcookie $COOKIE -noinput -eval "logplex_utils:rpc('logplex@$HOSTNAME', 'logplex_db', 'recover_from', ['$1'])" -s init stop
+
View
3  bin/shard_info
@@ -1,3 +0,0 @@
-#!/bin/sh
-
-envdir keys erl -pa ebin -pa deps/*/ebin -noinput -eval "logplex_utils:shard_info()" -s init stop
View
7 bin/weight
@@ -1,4 +1,7 @@
#!/bin/sh
-. /home/logplex/keys.sh
-erl -pa ebin -pa deps/*/ebin -noinput -eval "logplex_utils:set_weight($1)" -s init stop
+HOSTNAME=`hostname --fqdn`
+COOKIE=`awk -F"'" '/LOGPLEX_COOKIE/ { print $2 }' /home/logplex/keys.sh`
+HOME=/home/logplex
+
+erl -name nodes@$HOSTNAME -pa ebin -pa deps/*/ebin -hidden -setcookie $COOKIE -noinput -eval "logplex_utils:rpc('logplex@$HOSTNAME', 'logplex_utils', 'set_weight', [$1])" -s init stop
1  deps/nsync
@@ -0,0 +1 @@
+Subproject commit e4f34b02d4bbff6ca1ee1096905a9ec54e57d1af
1  deps/redgrid
@@ -0,0 +1 @@
+Subproject commit 5ff5f6cf86bcc89d83130f7c60497c8b7db3a075
1  deps/redo
@@ -0,0 +1 @@
+Subproject commit c612f29871d2c84f54fc32356a82aa55e078f263
View
3  ebin/logplex.app
@@ -6,12 +6,9 @@
logplex_api,
logplex_app,
logplex_channel,
- logplex_cloudkick,
logplex_drain,
logplex_drain_writer,
- logplex_grid,
logplex_queue_sup,
- logplex_rate_limit,
logplex_realtime,
logplex_redis_writer,
logplex_session,
View
15 include/logplex.hrl
@@ -1,20 +1,17 @@
-include_lib("eunit/include/eunit.hrl").
-record(msg, {time, source, ps, content}).
--record(channel, {id, name, app_id, addon}).
--record(token, {id, channel_id, name, app_id, addon}).
--record(drain, {id, channel_id, resolved_host, host, port}).
+-record(channel, {id, name, app_id}).
+-record(token, {id, channel_id, name, app_id, drains=[]}).
+-record(drain, {id, channel_id, token, resolved_host, host, port}).
+-record(session, {id, body}).
-define(HTTP_PORT, 8001).
-define(TCP_PORT, 9998).
-define(UDP_PORT, 9999).
--define(DEFAULT_LOG_HISTORY, <<"500">>).
--define(ADVANCED_LOG_HISTORY, <<"1500">>).
-
--define(BASIC_THROUGHPUT, 500).
--define(EXPANDED_THROUGHPUT, 10000).
-
+-define(MAX_DRAINS, 5).
+-define(LOG_HISTORY, <<"1500">>).
-define(MAX_SPOOL_POOL_SIZE, 1000).
-define(DEFAULT_LOGPLEX_QUEUE_LENGTH, 2000).
View
77 src/logplex_api.erl
@@ -59,14 +59,16 @@ loop(Req) ->
{Code, Body} = serve(handlers(), Method, Path, Req),
Time = timer:now_diff(now(), Start) div 1000,
io:format("logplex_api app_id=~s channel_id=~s method=~p path=~s resp_code=~w time=~w body=~s~n", [AppId, ChannelId, Method, Path, Code, Time, Body]),
- Req:respond({Code, ?HDR, Body})
+ Req:respond({Code, ?HDR, Body}),
+ exit(normal)
catch
exit:normal ->
- ok;
+ exit(normal);
Class:Exception ->
Time1 = timer:now_diff(now(), Start) div 1000,
io:format("logplex_api app_id=~s channel_id=~s method=~p path=~s time=~w exception=~1000p:~1000p~n", [AppId, ChannelId, Method, Path, Time1, Class, Exception]),
- Req:respond({500, ?HDR, ""})
+ Req:respond({500, ?HDR, ""}),
+ exit(normal)
end.
handlers() ->
@@ -75,7 +77,7 @@ handlers() ->
[throw({500, io_lib:format("Zero ~p child processes running", [Worker])}) || {Worker, 0} <- logplex_stats:workers()],
- RegisteredMods = [logplex_grid, logplex_rate_limit, logplex_realtime, logplex_stats, logplex_channel, logplex_token, logplex_drain, logplex_session, logplex_tail, logplex_shard, udp_acceptor],
+ RegisteredMods = [logplex_realtime, logplex_stats, logplex_tail, logplex_shard, udp_acceptor],
[(whereis(Name) == undefined orelse not is_process_alive(whereis(Name))) andalso throw({500, io_lib:format("Process dead: ~p", [Name])}) || Name <- RegisteredMods],
Count = logplex_stats:healthcheck(),
@@ -85,8 +87,10 @@ handlers() ->
end},
{['POST', "/channels$"], fun(Req, _Match) ->
+ readonly(Req),
authorize(Req),
- {struct, Params} = mochijson2:decode(Req:recv_body()),
+ Body = Req:recv_body(),
+ {struct, Params} = mochijson2:decode(Body),
ChannelName = proplists:get_value(<<"name">>, Params),
ChannelName == undefined andalso error_resp(400, <<"'name' post param missing">>),
@@ -94,10 +98,7 @@ handlers() ->
AppId = proplists:get_value(<<"app_id">>, Params),
AppId == undefined andalso error_resp(400, <<"'app_id' post param missing">>),
- Addon = proplists:get_value(<<"addon">>, Params),
- Addon == undefined andalso error_resp(400, <<"'addon' post param missing">>),
-
- ChannelId = logplex_channel:create(ChannelName, AppId, Addon),
+ ChannelId = logplex_channel:create(ChannelName, AppId),
not is_integer(ChannelId) andalso exit({expected_integer, ChannelId}),
case proplists:get_value(<<"tokens">>, Params) of
@@ -113,20 +114,14 @@ handlers() ->
end
end},
- {['POST', "/channels/(\\d+)/addon$"], fun(Req, [ChannelId]) ->
+ {['POST', "/channels/(\\d+)/addon$"], fun(Req, [_ChannelId]) ->
+ readonly(Req),
authorize(Req),
- {struct, Params} = mochijson2:decode(Req:recv_body()),
-
- Addon = proplists:get_value(<<"addon">>, Params),
- Addon == undefined andalso error_resp(400, <<"'addon' post param missing">>),
-
- case logplex_channel:update_addon(list_to_integer(ChannelId), Addon) of
- ok -> {200, <<"OK">>};
- {error, not_found} -> {404, <<"Not found">>}
- end
+ {200, <<"OK">>}
end},
{['DELETE', "/channels/(\\d+)$"], fun(Req, [ChannelId]) ->
+ readonly(Req),
authorize(Req),
case logplex_channel:delete(list_to_integer(ChannelId)) of
ok -> {200, <<"OK">>};
@@ -135,15 +130,21 @@ handlers() ->
end},
{['POST', "/channels/(\\d+)/token$"], fun(Req, [ChannelId]) ->
+ readonly(Req),
authorize(Req),
{struct, Params} = mochijson2:decode(Req:recv_body()),
TokenName = proplists:get_value(<<"name">>, Params),
TokenName == undefined andalso error_resp(400, <<"'name' post param missing">>),
+ A = now(),
Token = logplex_token:create(list_to_integer(ChannelId), TokenName),
+ B = now(),
not is_binary(Token) andalso exit({expected_binary, Token}),
+ io:format("create_token name=~s channel_id=~s time=~w~n",
+ [TokenName, ChannelId, timer:now_diff(B,A) div 1000]),
+
{201, Token}
end},
@@ -156,6 +157,8 @@ handlers() ->
end},
{['GET', "/sessions/([\\w-]+)$"], fun(Req, [Session]) ->
+ proplists:get_value("srv", Req:parse_qs()) == undefined
+ andalso error_resp(400, <<"[Error]: Please update your Heroku client to the most recent version\n">>),
Body = logplex_session:lookup(list_to_binary("/sessions/" ++ Session)),
not is_binary(Body) andalso error_resp(404, <<"Not found">>),
@@ -205,6 +208,7 @@ handlers() ->
end},
{['POST', "/channels/(\\d+)/drains$"], fun(Req, [ChannelId]) ->
+ readonly(Req),
authorize(Req),
{struct, Data} = mochijson2:decode(Req:recv_body()),
@@ -214,20 +218,25 @@ handlers() ->
Host == <<"localhost">> andalso error_resp(400, <<"Invalid drain">>),
Host == <<"127.0.0.1">> andalso error_resp(400, <<"Invalid drain">>),
- DrainId = logplex_drain:create(list_to_integer(ChannelId), Host, Port),
- case DrainId of
- Int when is_integer(Int) ->
- {201, io_lib:format("Successfully added drain syslog://~s:~p", [Host, Port])};
- {error, already_exists} ->
- {400, io_lib:format("Drain syslog://~s:~p already exists", [Host, Port])};
- {error, invalid_drain} ->
- {400, io_lib:format("Invalid drain syslog://~s:~p", [Host, Port])}
- end
+ case logplex_channel:lookup_drains(list_to_integer(ChannelId)) of
+ List when length(List) >= ?MAX_DRAINS ->
+ {400, "You have already added the maximum number of drains allowed"};
+ _ ->
+ case logplex_drain:create(list_to_integer(ChannelId), Host, Port) of
+ #drain{id=_Id, token=_Token} ->
+ {201, io_lib:format("Successfully added drain syslog://~s:~p", [Host, Port])};
+ %{201, iolist_to_binary(mochijson2:encode({struct, [{id, Id}, {token, Token}]}))};
+ {error, already_exists} ->
+ {400, io_lib:format("Drain syslog://~s:~p already exists", [Host, Port])};
+ {error, invalid_drain} ->
+ {400, io_lib:format("Invalid drain syslog://~s:~p", [Host, Port])}
+ end
+ end
end},
{['GET', "/channels/(\\d+)/drains$"], fun(Req, [ChannelId]) ->
authorize(Req),
- Drains = logplex_channel:drains(list_to_integer(ChannelId)),
+ Drains = logplex_channel:lookup_drains(list_to_integer(ChannelId)),
not is_list(Drains) andalso exit({expected_list, Drains}),
Drains1 = [{struct, [{host, Host}, {port, Port}]} || #drain{host=Host, port=Port} <- Drains],
@@ -235,6 +244,7 @@ handlers() ->
end},
{['DELETE', "/channels/(\\d+)/drains$"], fun(Req, [ChannelId]) ->
+ readonly(Req),
authorize(Req),
Data = Req:parse_qs(),
@@ -287,6 +297,13 @@ authorize(Req) ->
throw({401, <<"Not Authorized">>})
end.
+readonly(_Req) ->
+ case application:get_env(logplex, read_only) of
+ {ok, true} ->
+ throw({500, <<"Read-only mode">>});
+ _ -> ok
+ end.
+
error_resp(RespCode, Body) ->
error_resp(RespCode, Body, undefined).
@@ -320,6 +337,8 @@ tail_loop(Socket, Filters) ->
logplex_utils:filter(Msg1, Filters) andalso gen_tcp:send(Socket, logplex_utils:format(Msg1)),
tail_loop(Socket, Filters);
{tcp_closed, Socket} ->
+ ok;
+ {tcp_error, Socket, _Reason} ->
ok
end.
View
44 src/logplex_app.erl
@@ -36,23 +36,22 @@ start(_StartType, _StartArgs) ->
set_cookie(),
boot_pagerduty(),
boot_redis(),
+ setup_redgrid_vals(),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
stop(_State) ->
+ io:format("stopping...~n"),
+ logplex_db:dump(),
ok.
init([]) ->
{ok, {{one_for_one, 5, 10}, [
- {logplex_rate_limit, {logplex_rate_limit, start_link, []}, permanent, 2000, worker, [logplex_rate_limit]},
+ {logplex_db, {logplex_db, start_link, []}, permanent, 2000, worker, [logplex_db]},
+ {redgrid, {redgrid, start_link, []}, permanent, 2000, worker, [redgrid]},
{logplex_realtime, {logplex_realtime, start_link, [logplex_utils:redis_opts("LOGPLEX_CONFIG_REDIS_URL")]}, permanent, 2000, worker, [logplex_realtime]},
{logplex_stats, {logplex_stats, start_link, []}, permanent, 2000, worker, [logplex_stats]},
- {logplex_channel, {logplex_channel, start_link, []}, permanent, 2000, worker, [logplex_channel]},
- {logplex_token, {logplex_token, start_link, []}, permanent, 2000, worker, [logplex_token]},
- {logplex_drain, {logplex_drain, start_link, []}, permanent, 2000, worker, [logplex_drain]},
- {logplex_session, {logplex_session, start_link, []}, permanent, 2000, worker, [logplex_session]},
-
- {logplex_grid, {logplex_grid, start_link, []}, permanent, 2000, worker, [logplex_grid]},
+ {logplex_token, {logplex_token, refresh_dns, []}, permanent, 2000, worker, [logplex_token]},
{logplex_tail, {logplex_tail, start_link, []}, permanent, 2000, worker, [logplex_tail]},
{logplex_redis_writer_sup, {logplex_worker_sup, start_link, [logplex_redis_writer_sup, logplex_redis_writer]}, permanent, 2000, worker, [logplex_redis_writer_sup]},
@@ -67,7 +66,6 @@ init([]) ->
{logplex_work_queue, {logplex_queue, start_link, [logplex_work_queue, logplex_work_queue_args()]}, permanent, 2000, worker, [logplex_work_queue]},
{logplex_drain_buffer, {logplex_queue, start_link, [logplex_drain_buffer, logplex_drain_buffer_args()]}, permanent, 2000, worker, [logplex_drain_buffer]},
- {logplex_cloudkick, {logplex_cloudkick, start_link, []}, permanent, 2000, worker, [logplex_cloudkick]},
{logplex_api, {logplex_api, start_link, []}, permanent, 2000, worker, [logplex_api]},
{udp_acceptor, {udp_acceptor, start_link, []}, permanent, 2000, worker, [udp_acceptor]}
]}}.
@@ -81,22 +79,27 @@ set_cookie() ->
boot_pagerduty() ->
case os:getenv("HEROKU_DOMAIN") of
"heroku.com" ->
- ok = application:load(pagerduty),
- application:set_env(pagerduty, service_key, os:getenv("ROUTING_PAGERDUTY_SERVICE_KEY")),
- ok = application:start(pagerduty, temporary),
- ok = error_logger:add_report_handler(logplex_report_handler);
+ case os:getenv("PAGERDUTY") of
+ "0" -> ok;
+ _ ->
+ ok = application:load(pagerduty),
+ application:set_env(pagerduty, service_key, os:getenv("ROUTING_PAGERDUTY_SERVICE_KEY")),
+ ok = application:start(pagerduty, temporary),
+ ok = error_logger:add_report_handler(logplex_report_handler)
+ end;
_ ->
ok
end.
boot_redis() ->
- case application:start(redis, temporary) of
- ok ->
- Opts = logplex_utils:redis_opts("LOGPLEX_CONFIG_REDIS_URL"),
- redis_pool:add(config_pool, Opts, 25);
- Err ->
- exit(Err)
- end.
+ application:start(redis, temporary).
+
+setup_redgrid_vals() ->
+ application:load(redgrid),
+ application:set_env(redgrid, local_ip, os:getenv("LOCAL_IP")),
+ application:set_env(redgrid, redis_url, os:getenv("LOGPLEX_CONFIG_REDIS_URL")),
+ application:set_env(redgrid, domain, os:getenv("HEROKU_DOMAIN")),
+ ok.
logplex_work_queue_args() ->
MaxLength =
@@ -113,7 +116,8 @@ logplex_work_queue_args() ->
{max_length, MaxLength},
{num_workers, NumWorkers},
{worker_sup, logplex_worker_sup},
- {worker_args, []}].
+ {worker_args, []},
+ {dict, dict:from_list([{producer_callback, fun(Self, Atom) -> whereis(udp_acceptor) ! {Self, Atom} end}])}].
logplex_drain_buffer_args() ->
MaxLength =
View
224 src/logplex_channel.erl
@@ -21,25 +21,21 @@
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-module(logplex_channel).
--behaviour(gen_server).
-%% gen_server callbacks
--export([start_link/0, init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3]).
-
--export([create/3, delete/1, update_addon/2, lookup/1, logs/2, tokens/1, drains/1, info/1, refresh_dns/0]).
+-export([create/2, delete/1, lookup/1,
+ lookup_tokens/1, lookup_drains/1, logs/2, info/1]).
-include_lib("logplex.hrl").
-%% API functions
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-create(ChannelName, AppId, Addon) when is_binary(ChannelName), is_integer(AppId), is_binary(Addon) ->
- case redis_helper:create_channel(ChannelName, AppId, Addon) of
+create(ChannelName, AppId) when is_binary(ChannelName), is_integer(AppId) ->
+ case redis_helper:channel_index() of
ChannelId when is_integer(ChannelId) ->
- logplex_grid:call(?MODULE, {create, ChannelId, ChannelName, AppId, Addon}, 8000),
- ChannelId;
+ case redis_helper:create_channel(ChannelId, ChannelName, AppId) of
+ ok ->
+ ChannelId;
+ Err ->
+ Err
+ end;
Err ->
Err
end.
@@ -49,196 +45,52 @@ delete(ChannelId) when is_integer(ChannelId) ->
undefined ->
{error, not_found};
_ ->
- logplex_grid:cast(?MODULE, {delete_channel, ChannelId}),
- logplex_grid:cast(logplex_token, {delete_channel, ChannelId}),
- logplex_grid:cast(logplex_drain, {delete_channel, ChannelId}),
+ [logplex_token:delete(TokenId) || #token{id=TokenId} <- lookup_tokens(ChannelId)],
+ [logplex_drain:delete(DrainId) || #drain{id=DrainId} <- lookup_drains(ChannelId)],
redis_helper:delete_channel(ChannelId)
end.
-update_addon(ChannelId, Addon) when is_integer(ChannelId), is_binary(Addon) ->
- case lookup(ChannelId) of
- undefined ->
- {error, not_found};
- Channel ->
- logplex_grid:cast(?MODULE, {update_channel, Channel#channel{addon=Addon}}),
- logplex_grid:cast(logplex_token, {update_addon, ChannelId, Addon}),
- redis_helper:update_channel_addon(ChannelId, Addon)
- end.
-
lookup(ChannelId) when is_integer(ChannelId) ->
- case ets:lookup(?MODULE, ChannelId) of
+ case ets:lookup(channels, ChannelId) of
[Channel] -> Channel;
_ -> undefined
end.
+lookup_tokens(ChannelId) when is_integer(ChannelId) ->
+ ets:match_object(tokens, token_match_expr(ChannelId)).
+
+lookup_drains(ChannelId) when is_integer(ChannelId) ->
+ ets:match_object(drains, drain_match_expr(ChannelId)).
+
+token_match_expr(ChannelId) ->
+ #token{id='_', channel_id=ChannelId, name='_', app_id='_', drains='_'}.
+
+drain_match_expr(ChannelId) ->
+ #drain{id='_', channel_id=ChannelId, token='_', resolved_host='_', host='_', port='_'}.
+
logs(ChannelId, Num) when is_integer(ChannelId), is_integer(Num) ->
[{logplex_read_pool_map, {Map, Interval}}] = ets:lookup(logplex_shard_info, logplex_read_pool_map),
Index = redis_shard:key_to_index(integer_to_list(ChannelId)),
{_RedisUrl, Pool} = redis_shard:get_matching_pool(Index, Map, Interval),
- redis_pool:q(Pool, [<<"LRANGE">>, iolist_to_binary(["ch:", integer_to_list(ChannelId), ":spool"]), <<"0">>, list_to_binary(integer_to_list(Num))]).
-
-tokens(ChannelId) when is_integer(ChannelId) ->
- ets:lookup(logplex_channel_tokens, ChannelId).
-
-drains(ChannelId) when is_integer(ChannelId) ->
- ets:lookup(logplex_channel_drains, ChannelId).
+ Cmd = [<<"LRANGE">>, iolist_to_binary(["ch:", integer_to_list(ChannelId), ":spool"]), <<"0">>, list_to_binary(integer_to_list(Num))],
+ case catch redo:cmd(Pool, Cmd) of
+ {'EXIT', Err} ->
+ io:format("Error fetching logs channel_id=~w error=~100p~n", [ChannelId, Err]),
+ [];
+ Logs ->
+ Logs
+ end.
info(ChannelId) when is_integer(ChannelId) ->
case lookup(ChannelId) of
- #channel{name=ChannelName, app_id=AppId, addon=Addon} ->
+ #channel{name=ChannelName, app_id=AppId} ->
+ Tokens = lookup_tokens(ChannelId),
+ Drains = lookup_drains(ChannelId),
[{channel_id, ChannelId},
{channel_name, ChannelName},
{app_id, AppId},
- {addon, Addon},
- {tokens, lists:sort([{Name, Token} || #token{id=Token, name=Name} <- tokens(ChannelId)])},
- {drains, [iolist_to_binary([<<"syslog://">>, Host, ":", integer_to_list(Port)]) || #drain{host=Host, port=Port} <- drains(ChannelId)]}];
+ {tokens, lists:sort([{Name, Token} || #token{id=Token, name=Name} <- Tokens])},
+ {drains, [iolist_to_binary([<<"syslog://">>, Host, ":", integer_to_list(Port)]) || #drain{host=Host, port=Port} <- Drains]}];
_ ->
[]
end.
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-%%--------------------------------------------------------------------
-%% Function: init(Args) -> {ok, State} |
-%% {ok, State, Timeout} |
-%% ignore |
-%% {stop, Reason}
-%% Description: Initiates the server
-%% @hidden
-%%--------------------------------------------------------------------
-init([]) ->
- ets:new(?MODULE, [protected, named_table, set, {keypos, 2}]),
- ets:new(logplex_channel_tokens, [protected, named_table, bag, {keypos, 3}]),
- ets:new(logplex_channel_drains, [protected, named_table, bag, {keypos, 3}]),
- populate_cache(),
- spawn_link(fun refresh_dns/0),
- {ok, []}.
-
-%%--------------------------------------------------------------------
-%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
-%% {reply, Reply, State, Timeout} |
-%% {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, Reply, State} |
-%% {stop, Reason, State}
-%% Description: Handling call messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_call({create, ChannelId, ChannelName, AppId, Addon}, _From, State) ->
- ets:insert(?MODULE, #channel{id=ChannelId, name=ChannelName, app_id=AppId, addon=Addon}),
- {reply, ok, State};
-
-handle_call({create_drain, DrainId, ChannelId, ResolvedHost, Host, Port}, _From, State) ->
- ets:insert(logplex_channel_drains, #drain{id=DrainId, channel_id=ChannelId, resolved_host=ResolvedHost, host=Host, port=Port}),
- {reply, ok, State};
-
-handle_call(_Msg, _From, State) ->
- {reply, {error, invalid_call}, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_cast(Msg, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling cast messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_cast({update_channel, #channel{id=ChannelId, addon=Addon}=Channel}, State) ->
- ets:insert(?MODULE, Channel),
- [begin
- ets:delete_object(logplex_channel_tokens, Token),
- ets:insert(logplex_channel_tokens, Token#token{addon=Addon})
- end || Token <- ets:lookup(logplex_channel_tokens, ChannelId)],
- {noreply, State};
-
-handle_cast({delete_channel, ChannelId}, State) ->
- ets:delete(?MODULE, ChannelId),
- ets:match_delete(logplex_channel_tokens, #token{id='_', channel_id=ChannelId, name='_', app_id='_', addon='_'}),
- ets:match_delete(logplex_channel_drains, #drain{id='_', channel_id=ChannelId, resolved_host='_', host='_', port='_'}),
- {noreply, State};
-
-handle_cast({create_token, ChannelId, TokenId, TokenName, AppId, Addon}, State) ->
- ets:insert(logplex_channel_tokens, #token{id=TokenId, channel_id=ChannelId, name=TokenName, app_id=AppId, addon=Addon}),
- {noreply, State};
-
-handle_cast({delete_token, ChannelId, TokenId}, State) ->
- ets:match_delete(logplex_channel_tokens, #token{id=TokenId, channel_id=ChannelId, name='_', app_id='_', addon='_'}),
- {noreply, State};
-
-handle_cast({delete_drain, DrainId}, State) ->
- ets:match_delete(logplex_channel_drains, #drain{id=DrainId, channel_id='_', resolved_host='_', host='_', port='_'}),
- {noreply, State};
-
-handle_cast({resolve_host, Ip, Drain}, State) ->
- ets:delete_object(logplex_channel_drains, Drain),
- ets:insert(logplex_channel_drains, Drain#drain{resolved_host=Ip}),
- {noreply, State};
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_info(Info, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling all non call/cast messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_info(_Info, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: terminate(Reason, State) -> void()
-%% Description: This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-%% @hidden
-%%--------------------------------------------------------------------
-terminate(_Reason, _State) ->
- ok.
-
-%%--------------------------------------------------------------------
-%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% Description: Convert process state when code is changed
-%% @hidden
-%%--------------------------------------------------------------------
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------
-populate_cache() ->
- Channels = redis_helper:lookup_channels(),
- length(Channels) > 0 andalso ets:insert(?MODULE, Channels),
-
- Tokens = [begin
- case logplex_channel:lookup(Token#token.channel_id) of
- #channel{app_id=AppId, addon=Addon} -> Token#token{app_id=AppId, addon=Addon};
- _ -> Token
- end
- end || Token <- redis_helper:lookup_tokens()],
- length(Tokens) > 0 andalso ets:insert(logplex_channel_tokens, Tokens),
-
- Drains = lists:foldl(
- fun(#drain{host=Host}=Drain, Acc) ->
- case logplex_utils:resolve_host(Host) of
- undefined -> Acc;
- Ip -> [Drain#drain{resolved_host=Ip}|Acc]
- end
- end, [], redis_helper:lookup_drains()),
- length(Drains) > 0 andalso ets:insert(logplex_channel_drains, Drains),
-
- ok.
-
-refresh_dns() ->
- timer:sleep(60 * 1000),
- [begin
- case logplex_utils:resolve_host(Host) of
- undefined -> ok;
- Ip -> gen_server:cast(?MODULE, {resolve_host, Ip, Drain})
- end
- end || #drain{host=Host}=Drain <- ets:tab2list(logplex_channel_drains)],
- ?MODULE:refresh_dns().
View
77 src/logplex_cloudkick.erl
@@ -1,77 +0,0 @@
-%% Copyright (c) 2010 Jacob Vorreuter <jacob.vorreuter@gmail.com>
-%%
-%% Permission is hereby granted, free of charge, to any person
-%% obtaining a copy of this software and associated documentation
-%% files (the "Software"), to deal in the Software without
-%% restriction, including without limitation the rights to use,
-%% copy, modify, merge, publish, distribute, sublicense, and/or sell
-%% copies of the Software, and to permit persons to whom the
-%% Software is furnished to do so, subject to the following
-%% conditions:
-%%
-%% The above copyright notice and this permission notice shall be
-%% included in all copies or substantial portions of the Software.
-%%
-%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
-%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
-%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
-%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
-%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
-%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
-%% OTHER DEALINGS IN THE SOFTWARE.
--module(logplex_cloudkick).
--export([start_link/0, loop/1]).
-
--include_lib("logplex.hrl").
-
--define(HDR, [{"Content-Type", "text/html"}]).
-
-start_link() ->
- mochiweb_http:start([
- {ip, "127.0.0.1"},
- {port, 8008},
- {backlog, 1024},
- {loop, {?MODULE, loop}},
- {name, ?MODULE}
- ]).
-
-loop(Req) ->
- try
- [begin
- Req:respond({200, ?HDR, iolist_to_binary(mochijson2:encode({struct, [
- {status, iolist_to_binary(io_lib:format("Zero ~p child processes running", [Worker]))},
- {state, <<"err">>}
- ]}))}),
- throw(normal)
- end || {Worker, 0} <- logplex_stats:workers()],
-
- RegisteredMods = [logplex_grid, logplex_rate_limit, logplex_realtime, logplex_stats, logplex_channel, logplex_token, logplex_drain, logplex_session, logplex_tail, logplex_shard, udp_acceptor],
- [begin
- case (whereis(Name) == undefined orelse not is_process_alive(whereis(Name))) of
- true ->
- Req:respond({200, ?HDR, iolist_to_binary(mochijson2:encode({struct, [
- {status, iolist_to_binary(io_lib:format("Process dead: ~p", [Name]))},
- {state, <<"err">>}
- ]}))}),
- throw(normal);
- false ->
- ok
- end
- end || Name <- RegisteredMods],
-
- Cached = logplex_stats:cached(),
- Req:respond({200, ?HDR, iolist_to_binary(mochijson2:encode({struct, [
- {state, <<"ok">>},
- {metrics, [
- [{type, <<"int">>},
- {name, Key},
- {value, Value}]
- || {Key, Value} <- Cached, Value > 0]}
- ]}))})
- catch
- exit:normal ->
- ok;
- _ ->
- Req:respond({500, ?HDR, ""})
- end.
View
64 src/logplex_db.erl
@@ -0,0 +1,64 @@
+%% Copyright (c) 2010 Jacob Vorreuter <jacob.vorreuter@gmail.com>
+%%
+%% Permission is hereby granted, free of charge, to any person
+%% obtaining a copy of this software and associated documentation
+%% files (the "Software"), to deal in the Software without
+%% restriction, including without limitation the rights to use,
+%% copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the
+%% Software is furnished to do so, subject to the following
+%% conditions:
+%%
+%% The above copyright notice and this permission notice shall be
+%% included in all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+%% OTHER DEALINGS IN THE SOFTWARE.
+-module(logplex_db).
+-export([start_link/0]).
+
+-include_lib("logplex.hrl").
+
+start_link() ->
+ create_ets_tables(),
+ {ok, Pid} = redo:start_link(config, redo_opts()),
+ boot_nsync(),
+ {ok, Pid}.
+
+create_ets_tables() ->
+ ets:new(channels, [named_table, public, set, {keypos, 2}]),
+ ets:new(tokens, [named_table, public, set, {keypos, 2}]),
+ ets:new(drains, [named_table, public, set, {keypos, 2}]),
+ ets:new(sessions, [named_table, public, set, {keypos, 2}]),
+ ok.
+
+boot_nsync() ->
+ ok = application:start(nsync, temporary),
+ Opts = nsync_opts(),
+ io:format("nsync:start_link(~p)~n", [Opts]),
+ A = now(),
+ {ok, _Pid} = nsync:start_link(Opts),
+ B = now(),
+ io:format("nsync load_time=~w~n", [timer:now_diff(B,A) div 1000000]).
+
+nsync_opts() ->
+ RedisOpts = logplex_utils:redis_opts("LOGPLEX_CONFIG_REDIS_URL"),
+ Ip = case proplists:get_value(ip, RedisOpts) of
+ {_,_,_,_}=L -> string:join([integer_to_list(I) || I <- tuple_to_list(L)], ".");
+ Other -> Other
+ end,
+ RedisOpts1 = proplists:delete(ip, RedisOpts),
+ RedisOpts2 = [{host, Ip} | RedisOpts1],
+ [{callback, {nsync_callback, handle, []}}, {block, true}, {timeout, 20 * 60 * 1000} | RedisOpts2].
+
+redo_opts() ->
+ case os:getenv("LOGPLEX_CONFIG_REDIS_URL") of
+ false -> [];
+ Url -> redo_uri:parse(Url)
+ end.
View
146 src/logplex_drain.erl
@@ -21,22 +21,13 @@
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-module(logplex_drain).
--behaviour(gen_server).
-%% gen_server callbacks
--export([start_link/0, init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3]).
-
--export([create/3, delete/3, clear_all/1, lookup/1]).
+-export([create/3, delete/1, delete/3, clear_all/1, lookup/1]).
-include_lib("logplex.hrl").
-%% API functions
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
create(ChannelId, Host, Port) when is_integer(ChannelId), is_binary(Host), (is_integer(Port) orelse Port == undefined) ->
- case ets:match_object(?MODULE, #drain{id='_', channel_id=ChannelId, resolved_host='_', host=Host, port=Port}) of
+ case ets:match_object(drains, #drain{id='_', channel_id=ChannelId, token='_', resolved_host='_', host=Host, port=Port}) of
[_] ->
{error, already_exists};
[] ->
@@ -44,132 +35,41 @@ create(ChannelId, Host, Port) when is_integer(ChannelId), is_binary(Host), (is_i
undefined ->
error_logger:error_msg("invalid drain: ~p:~p~n", [Host, Port]),
{error, invalid_drain};
- Ip ->
+ _Ip ->
case redis_helper:drain_index() of
DrainId when is_integer(DrainId) ->
- logplex_grid:cast(?MODULE, {create_drain, DrainId, ChannelId, Ip, Host, Port}),
- logplex_grid:call(logplex_channel, {create_drain, DrainId, ChannelId, Ip, Host, Port}, 8000),
- redis_helper:create_drain(DrainId, ChannelId, Host, Port),
- DrainId;
- Error ->
- Error
+ Token = list_to_binary("d." ++ string:strip(os:cmd("uuidgen"), right, $\n)),
+ case redis_helper:create_drain(DrainId, ChannelId, Token, Host, Port) of
+ ok ->
+ #drain{id=DrainId, channel_id=ChannelId, token=Token, host=Host, port=Port};
+ Err ->
+ Err
+ end;
+ Err ->
+ Err
end
end
end.
delete(ChannelId, Host, Port) when is_integer(ChannelId), is_binary(Host) ->
Port1 = if Port == "" -> undefined; true -> list_to_integer(Port) end,
- case ets:match_object(?MODULE, #drain{id='_', channel_id=ChannelId, resolved_host='_', host=Host, port=Port1}) of
+ case ets:match_object(drains, #drain{id='_', channel_id=ChannelId, token='_', resolved_host='_', host=Host, port=Port1}) of
[#drain{id=DrainId}|_] ->
- logplex_grid:cast(?MODULE, {delete_drain, DrainId}),
- logplex_grid:cast(logplex_channel, {delete_drain, DrainId}),
- redis_helper:delete_drain(DrainId);
+ delete(DrainId);
_ ->
{error, not_found}
end.
+delete(DrainId) when is_integer(DrainId) ->
+ redis_helper:delete_drain(DrainId).
+
clear_all(ChannelId) when is_integer(ChannelId) ->
- List = ets:match_object(?MODULE, #drain{id='_', channel_id=ChannelId, resolved_host='_', host='_', port='_'}),
- [begin
- logplex_grid:cast(?MODULE, {delete_drain, DrainId}),
- logplex_grid:cast(logplex_channel, {delete_drain, DrainId}),
- redis_helper:delete_drain(DrainId)
- end || #drain{id=DrainId} <- List],
+ List = ets:match_object(drains, #drain{id='_', channel_id=ChannelId, token='_', resolved_host='_', host='_', port='_'}),
+ [delete(DrainId) || #drain{id=DrainId} <- List],
ok.
lookup(DrainId) when is_integer(DrainId) ->
- redis_helper:lookup_drain(DrainId).
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-%%--------------------------------------------------------------------
-%% Function: init(Args) -> {ok, State} |
-%% {ok, State, Timeout} |
-%% ignore |
-%% {stop, Reason}
-%% Description: Initiates the server
-%% @hidden
-%%--------------------------------------------------------------------
-init([]) ->
- ets:new(?MODULE, [protected, named_table, set, {keypos, 2}]),
- populate_cache(),
- {ok, []}.
-
-%%--------------------------------------------------------------------
-%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
-%% {reply, Reply, State, Timeout} |
-%% {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, Reply, State} |
-%% {stop, Reason, State}
-%% Description: Handling call messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_call(_Msg, _From, State) ->
- {reply, {error, invalid_call}, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_cast(Msg, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling cast messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_cast({delete_channel, ChannelId}, State) ->
- ets:match_delete(?MODULE, #drain{id='_', channel_id=ChannelId, resolved_host='_', host='_', port='_'}),
- {noreply, State};
-
-handle_cast({create_drain, DrainId, ChannelId, ResolvedHost, Host, Port}, State) ->
- ets:insert(?MODULE, #drain{id=DrainId, channel_id=ChannelId, resolved_host=ResolvedHost, host=Host, port=Port}),
- {noreply, State};
-
-handle_cast({delete_drain, DrainId}, State) ->
- ets:delete(?MODULE, DrainId),
- {noreply, State};
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_info(Info, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling all non call/cast messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_info(_Info, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: terminate(Reason, State) -> void()
-%% Description: This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-%% @hidden
-%%--------------------------------------------------------------------
-terminate(_Reason, _State) ->
- ok.
-
-%%--------------------------------------------------------------------
-%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% Description: Convert process state when code is changed
-%% @hidden
-%%--------------------------------------------------------------------
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------
-populate_cache() ->
- Data = lists:foldl(
- fun(#drain{host=Host}=Drain, Acc) ->
- case logplex_utils:resolve_host(Host) of
- undefined -> Acc;
- Ip -> [Drain#drain{resolved_host=Ip}|Acc]
- end
- end, [], redis_helper:lookup_drains()),
- length(Data) > 0 andalso ets:insert(?MODULE, Data).
+ case ets:lookup(drains, DrainId) of
+ [Drain] -> Drain;
+ _ -> undefined
+ end.
View
8 src/logplex_drain_writer.erl
@@ -40,13 +40,17 @@ loop(Socket) ->
timeout -> ok;
{'EXIT', {noproc, _}} ->
exit(normal);
+ {_, [{undefined, _, _}]} ->
+ ok;
{1, [{Host, Port, Msg}]} ->
case gen_udp:send(Socket, Host, Port, Msg) of
ok ->
logplex_stats:incr(message_routed),
logplex_realtime:incr(message_routed);
- {error, nxdomain} -> error_logger:error_msg("nxdomin ~s:~w~n", [Host, Port]);
- Err -> exit(Err)
+ {error, nxdomain} ->
+ io:format("nxdomin ~s:~w~n", [Host, Port]);
+ Err ->
+ exit(Err)
end
end,
?MODULE:loop(Socket).
View
104 src/logplex_grid.erl
@@ -1,104 +0,0 @@
-%% Copyright (c) 2010 Jacob Vorreuter <jacob.vorreuter@gmail.com>
-%%
-%% Permission is hereby granted, free of charge, to any person
-%% obtaining a copy of this software and associated documentation
-%% files (the "Software"), to deal in the Software without
-%% restriction, including without limitation the rights to use,
-%% copy, modify, merge, publish, distribute, sublicense, and/or sell
-%% copies of the Software, and to permit persons to whom the
-%% Software is furnished to do so, subject to the following
-%% conditions:
-%%
-%% The above copyright notice and this permission notice shall be
-%% included in all copies or substantial portions of the Software.
-%%
-%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
-%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
-%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
-%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
-%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
-%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
-%% OTHER DEALINGS IN THE SOFTWARE.
--module(logplex_grid).
--export([start_link/0, init/1, call/3, cast/2, loop/3]).
-
--include_lib("logplex.hrl").
-
-start_link() ->
- proc_lib:start_link(?MODULE, init, [self()]).
-
-init(Parent) ->
- BinNode = atom_to_binary(node(), utf8),
- LocalIp =
- case os:getenv("LOCAL_IP") of
- false -> <<"127.0.0.1">>;
- Val1 -> list_to_binary(Val1)
- end,
- Domain = logplex_utils:heorku_domain(),
- register(?MODULE, self()),
- proc_lib:init_ack(Parent, {ok, self()}),
- loop(BinNode, LocalIp, Domain).
-
-call(RegName, Msg, Timeout) when is_atom(RegName), is_integer(Timeout) ->
- gen_server:multi_call([node()|nodes()], RegName, Msg, Timeout).
-
-cast(RegName, Msg) when is_atom(RegName) ->
- gen_server:abcast([node()|nodes()], RegName, Msg).
-
-loop(BinNode, LocalIp, Domain) ->
- redis_helper:set_node_ex(BinNode, LocalIp, Domain),
- redis_helper:register_with_face(Domain, LocalIp),
- case redis_helper:get_nodes(Domain) of
- Keys when is_list(Keys) ->
- [connect(size(Domain), Key) || Key <- Keys];
- _Err ->
- ok
- end,
- receive
- {nodedown, _Node} ->
- ok
- after 0 ->
- ok
- end,
- timer:sleep(5 * 1000),
- ?MODULE:loop(BinNode, LocalIp, Domain).
-
-connect(Size, Key) ->
- case Key of
- <<"node:", _:Size/binary, ":", BinNode/binary>> ->
- StrNode = binary_to_list(BinNode),
- Node = list_to_atom(StrNode),
- case node() == Node of
- true ->
- undefined;
- false ->
- case net_adm:ping(Node) of
- pong -> ok;
- pang ->
- case redis_helper:get_node(Key) of
- {ok, Ip} when is_binary(Ip) ->
- case inet:getaddr(binary_to_list(Ip), inet) of
- {ok, Addr} ->
- case re:run(StrNode, ".*@(.*)$", [{capture, all_but_first, list}]) of
- {match, [Host]} ->
- inet_db:add_host(Addr, [Host]),
- case net_adm:ping(Node) of
- pong ->
- erlang:monitor_node(Node, true);
- pang ->
- {error, {ping_failed, Node}}
- end;
- _ ->
- error_logger:error_msg("failed_to_parse_host: ~p~n", [StrNode])
- end;
- Err ->
- error_logger:error_msg("failed to resolve ~p: ~p~n", [Ip, Err])
- end;
- _ ->
- undefined
- end
- end
- end;
- _ -> ok
- end.
View
43 src/logplex_queue.erl
@@ -32,7 +32,17 @@
-include_lib("logplex.hrl").
--record(state, {dropped_stat_key, length_stat_key, queue, length, max_length, waiting, dict, workers=[]}).
+-record(state, {
+ dropped_stat_key,
+ length_stat_key,
+ queue,
+ length,
+ max_length,
+ waiting,
+ dict,
+ workers=[],
+ accepting=true
+}).
-define(TIMEOUT, 30000).
@@ -104,7 +114,7 @@ init([Props]) ->
length = 0,
max_length = proplists:get_value(max_length, Props),
waiting = queue:new(),
- dict = proplists:get_value(dict, Props)
+ dict = proplists:get_value(dict, Props, dict:new())
},
WorkerSup = proplists:get_value(worker_sup, Props),
NumWorkers = proplists:get_value(num_workers, Props),
@@ -124,12 +134,13 @@ init([Props]) ->
%% @hidden
%%--------------------------------------------------------------------
handle_call({out, Num}, {From, _Mref}, #state{queue=Queue, length=Length, waiting=Waiting}=State) ->
+ State1 = enable_producer(State),
case drain(Queue, Num) of
{Items, Queue1} when is_list(Items), length(Items) > 0 ->
NumItems = length(Items),
- {reply, {NumItems, lists:reverse(Items)}, State#state{queue=Queue1, length=Length-NumItems}};
+ {reply, {NumItems, lists:reverse(Items)}, State1#state{queue=Queue1, length=Length-NumItems}};
_ ->
- {reply, empty, State#state{waiting=queue:in(From, Waiting)}}
+ {reply, empty, State1#state{waiting=queue:in(From, Waiting)}}
end;
handle_call(info, _From, #state{length=Length, max_length=MaxLength}=State) ->
@@ -156,10 +167,14 @@ handle_call(_Msg, _From, State) ->
%% Description: Handling cast messages
%% @hidden
%%--------------------------------------------------------------------
-handle_cast({in, _Packet}, #state{dropped_stat_key=StatKey, length=Length, max_length=MaxLength}=State) when Length >= MaxLength ->
+handle_cast({in, _Packet}, #state{dict=Dict, dropped_stat_key=StatKey, length=Length, max_length=MaxLength}=State) when Length >= MaxLength ->
logplex_stats:incr(StatKey),
logplex_realtime:incr(StatKey),
- {noreply, State};
+ case dict:find(producer_callback, Dict) of
+ {ok, Fun} -> Fun(self(), stop_accepting);
+ error -> ok
+ end,
+ {noreply, State#state{accepting=false}};
handle_cast({in, Packet}, #state{queue=Queue, length=Length, waiting=Waiting}=State) ->
case queue:out(Waiting) of
@@ -260,6 +275,22 @@ drain(Queue, N, Acc) ->
{Acc, Queue}
end.
+enable_producer(#state{dict=Dict, length=Length, max_length=MaxLength, accepting=Accepting}=State) ->
+ case Accepting of
+ false ->
+ case Length < (MaxLength div 2) of
+ true ->
+ case dict:find(producer_callback, Dict) of
+ {ok, Fun} ->
+ Fun(self(), start_accepting),
+ State#state{accepting=true};
+ error -> State
+ end;
+ false -> State
+ end;
+ true -> State
+ end.
+
report_stats(Pid) ->
timer:sleep(60000),
Pid ! report_stats,
View
161 src/logplex_rate_limit.erl
@@ -1,161 +0,0 @@
-%% Copyright (c) 2010 Jacob Vorreuter <jacob.vorreuter@gmail.com>
-%%
-%% Permission is hereby granted, free of charge, to any person
-%% obtaining a copy of this software and associated documentation
-%% files (the "Software"), to deal in the Software without
-%% restriction, including without limitation the rights to use,
-%% copy, modify, merge, publish, distribute, sublicense, and/or sell
-%% copies of the Software, and to permit persons to whom the
-%% Software is furnished to do so, subject to the following
-%% conditions:
-%%
-%% The above copyright notice and this permission notice shall be
-%% included in all copies or substantial portions of the Software.
-%%
-%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
-%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
-%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
-%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
-%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
-%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
-%% OTHER DEALINGS IN THE SOFTWARE.
--module(logplex_rate_limit).
--behaviour(gen_server).
-
-%% gen_server callbacks
--export([start_link/0, init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3]).
-
--export([lock/1, set_lock/1, set_lock/2, clear_all/0, is_locked/1]).
-
--include_lib("logplex.hrl").
-
--define(TIMEOUT, 10000).
-
-%% API functions
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-lock(ChannelId) when is_integer(ChannelId) ->
- Nodes = lists:sort([node()|nodes()]),
- Index = (ChannelId rem length(Nodes)) + 1,
- Node = lists:nth(Index, Nodes),
- case Node =:= node() of
- true ->
- set_lock(ChannelId);
- false ->
- case set_lock(Node, ChannelId) of
- true ->
- true;
- false ->
- false;
- Error ->
- error_logger:error_msg("badrpc (~p): ~p~n", [Node, Error]),
- true
- end
- end.
-
-set_lock(Node, ChannelId) when is_integer(ChannelId) ->
- gen_server:call({?MODULE, Node}, {set_lock, ChannelId}, ?TIMEOUT).
-set_lock(ChannelId) when is_integer(ChannelId) ->
- gen_server:call(?MODULE, {set_lock, ChannelId}, ?TIMEOUT).
-
-clear_all() ->
- gen_server:cast(?MODULE, clear_all).
-
-is_locked(ChannelId) when is_integer(ChannelId) ->
- case ets:lookup(?MODULE, ChannelId) of
- [{ChannelId, true}] -> true;
- [] -> false
- end.
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-%%--------------------------------------------------------------------
-%% Function: init(Args) -> {ok, State} |
-%% {ok, State, Timeout} |
-%% ignore |
-%% {stop, Reason}
-%% Description: Initiates the server
-%% @hidden
-%%--------------------------------------------------------------------
-init([]) ->
- ets:new(?MODULE, [protected, named_table, set]),
- {ok, []}.
-
-%%--------------------------------------------------------------------
-%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
-%% {reply, Reply, State, Timeout} |
-%% {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, Reply, State} |
-%% {stop, Reason, State}
-%% Description: Handling call messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_call({set_lock, ChannelId}, _From, State) ->
- case is_locked(ChannelId) of
- true ->
- {reply, false, State};
- false ->
- ets:insert(?MODULE, {ChannelId, true}),
- gen_server:abcast(nodes(), ?MODULE, {insert_lock, ChannelId}),
- {reply, true, State}
- end;
-
-handle_call(_Msg, _From, State) ->
- {reply, {error, invalid_call}, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_cast(Msg, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling cast messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_cast({insert_lock, ChannelId}, State) ->
- ets:insert(?MODULE, {ChannelId, true}),
- {noreply, State};
-
-handle_cast(clear_all, State) ->
- ets:delete_all_objects(?MODULE),
- {noreply, State};
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_info(Info, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling all non call/cast messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_info(_Info, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: terminate(Reason, State) -> void()
-%% Description: This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-%% @hidden
-%%--------------------------------------------------------------------
-terminate(_Reason, _State) ->
- ok.
-
-%%--------------------------------------------------------------------
-%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% Description: Convert process state when code is changed
-%% @hidden
-%%--------------------------------------------------------------------
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------
View
120 src/logplex_session.erl
@@ -21,116 +21,22 @@
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-module(logplex_session).
--behaviour(gen_server).
-
-%% gen_server callbacks
--export([start_link/0, init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3]).
-
-export([create/1, lookup/1]).
-include_lib("logplex.hrl").
-%% API functions
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
create(Body) when is_binary(Body) ->
- Session = iolist_to_binary(["/sessions/", string:strip(os:cmd("uuidgen"), right, $\n)]),
- redis_helper:create_session(Session, Body),
- logplex_grid:call(?MODULE, {create, Session, Body}, 8000),
- Session.
-
-lookup(Session) when is_binary(Session) ->
- case ets:lookup(?MODULE, Session) of
- [] ->
- redis_helper:lookup_session(Session);
- [{Session, Body}] ->
- Body
+ SessionId = iolist_to_binary(["/sessions/", string:strip(os:cmd("uuidgen"), right, $\n)]),
+ Session = #session{id=SessionId, body=Body},
+ ets:insert(sessions, Session),
+ spawn_link(fun() ->
+ timer:sleep(6 * 60 * 1000), % 6 mins
+ ets:delete(sessions, SessionId)
+ end),
+ SessionId.
+
+lookup(SessionId) when is_binary(SessionId) ->
+ case ets:lookup(sessions, SessionId) of
+ [#session{body=Body}] -> Body;
+ _ -> undefined
end.
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-%%--------------------------------------------------------------------
-%% Function: init(Args) -> {ok, State} |
-%% {ok, State, Timeout} |
-%% ignore |
-%% {stop, Reason}
-%% Description: Initiates the server
-%% @hidden
-%%--------------------------------------------------------------------
-init([]) ->
- Self = self(),
- ets:new(?MODULE, [protected, named_table, set]),
- spawn_link(fun() -> expire_session_cache(Self) end),
- {ok, []}.
-
-%%--------------------------------------------------------------------
-%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
-%% {reply, Reply, State, Timeout} |
-%% {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, Reply, State} |
-%% {stop, Reason, State}
-%% Description: Handling call messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_call({create, Session, Body}, _From, State) ->
- ets:insert(?MODULE, {Session, Body}),
- {reply, ok, State};
-
-handle_call(_Msg, _From, State) ->
- {reply, {error, invalid_call}, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_cast(Msg, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling cast messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_info(Info, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling all non call/cast messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_info(expire_session_cache, State) ->
- ets:delete_all_objects(?MODULE),
- {noreply, State};
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: terminate(Reason, State) -> void()
-%% Description: This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-%% @hidden
-%%--------------------------------------------------------------------
-terminate(_Reason, _State) ->
- ok.
-
-%%--------------------------------------------------------------------
-%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% Description: Convert process state when code is changed
-%% @hidden
-%%--------------------------------------------------------------------
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------
-expire_session_cache(Pid) ->
- timer:sleep(20 * 60 * 1000),
- Pid ! expire_session_cache,
- expire_session_cache(Pid).
View
9 src/logplex_shard.erl
@@ -158,13 +158,8 @@ populate_info_table(Urls) ->
add_pools([], Acc) -> Acc;
add_pools([Url|Tail], Acc) ->
- Opts = logplex_utils:parse_redis_url(Url),
- NumWorkers =
- case os:getenv("LOGPLEX_READERS") of
- false -> ?DEFAULT_LOGPLEX_READERS;
- StrNum2 -> list_to_integer(StrNum2)
- end,
- {ok, Pool} = redis_pool:add(Opts, NumWorkers),
+ Opts = redo_uri:parse(Url),
+ {ok, Pool} = redo:start_link(undefined, Opts),
add_pools(Tail, [{Url, Pool}|Acc]).
redis_buffer_args(Url) ->
View
4 src/logplex_stats.erl
@@ -114,8 +114,6 @@ handle_cast(_Msg, State) ->
handle_info({timeout, _TimerRef, flush}, _State) ->
start_timer(),
- logplex_rate_limit:clear_all(),
-
Stats = ets:tab2list(logplex_stats),
ets:delete_all_objects(logplex_stats),
io:format("logplex_stats~s~n", [lists:flatten([[" ", to_list(Key), "=", to_list(Value)] || {Key, Value} <- Stats, Value > 0])]),
@@ -124,7 +122,7 @@ handle_info({timeout, _TimerRef, flush}, _State) ->
ets:delete_all_objects(logplex_stats_channels),
[begin
- io:format("logplex_channel_stats app_id=~w\tchannel_id=~w\tmessage_processed=~w~n", [AppId, ChannelId, Val])
+ io:format("logplex_channel_stats app_id=~w channel_id=~w message_processed=~w~n", [AppId, ChannelId, Val])
end || {{message_processed, AppId, ChannelId}, Val} <- ChannelStats, Val > 0],
{noreply, Stats};
View
15 src/logplex_tail.erl
@@ -37,7 +37,7 @@ start_link() ->
register(ChannelId) when is_integer(ChannelId) ->
Self = self(),
- logplex_grid:cast(?MODULE, {register, ChannelId, Self}),
+ gen_server:abcast([node()|nodes()], ?MODULE, {register, ChannelId, Self}),
ok.
route(ChannelId, Msg) when is_integer(ChannelId), is_binary(Msg) ->
@@ -57,8 +57,9 @@ route(ChannelId, Msg) when is_integer(ChannelId), is_binary(Msg) ->
%% @hidden
%%--------------------------------------------------------------------
init([]) ->
+ process_flag(trap_exit, true),
ets:new(?MODULE, [protected, named_table, bag]),
- {ok, []}.
+ {ok, []}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -80,7 +81,8 @@ handle_call(_Msg, _From, State) ->
%% Description: Handling cast messages
%% @hidden
%%--------------------------------------------------------------------
-handle_cast({register, ChannelId, Pid}, State)->
+handle_cast({register, ChannelId, Pid}, State) ->
+ link(Pid),
ets:insert(?MODULE, {ChannelId, Pid}),
{noreply, State};
@@ -94,6 +96,13 @@ handle_cast(_Msg, State) ->
%% Description: Handling all non call/cast messages
%% @hidden
%%--------------------------------------------------------------------
+handle_info({'EXIT', Pid, _Why}, State) ->
+ case ets:match_object(?MODULE, {'_', Pid}) of
+ [Obj] -> ets:delete_object(?MODULE, Obj);
+ [] -> ok
+ end,
+ {noreply, State};
+
handle_info(_Info, State) ->
{noreply, State}.
View
150 src/logplex_token.erl
@@ -21,145 +21,57 @@
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-module(logplex_token).
--behaviour(gen_server).
-%% gen_server callbacks
--export([start_link/0, init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3]).
-
--export([create/2, lookup/1, delete/1]).
+-export([create/2, lookup/1, delete/1, refresh_dns/0, init/1, loop/0]).
-include_lib("logplex.hrl").
-%% API functions
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
create(ChannelId, TokenName) when is_integer(ChannelId), is_binary(TokenName) ->
- case logplex_channel:lookup(ChannelId) of
- #channel{app_id=AppId, addon=Addon} ->
- TokenId = list_to_binary("t." ++ string:strip(os:cmd("uuidgen"), right, $\n)),
- logplex_grid:call(?MODULE, {create_token, ChannelId, TokenId, TokenName, AppId, Addon}, 8000),
- logplex_grid:cast(logplex_channel, {create_token, ChannelId, TokenId, TokenName, AppId, Addon}),
- redis_helper:create_token(ChannelId, TokenId, TokenName),
+ TokenId = list_to_binary("t." ++ string:strip(os:cmd("uuidgen"), right, $\n)),
+ case redis_helper:create_token(ChannelId, TokenId, TokenName) of
+ ok ->
TokenId;
- _ ->
- {error, not_found}
+ Err ->
+ Err
end.
delete(TokenId) when is_binary(TokenId) ->
case lookup(TokenId) of
- #token{channel_id=ChannelId} ->
- logplex_grid:cast(?MODULE, {delete_token, TokenId}),
- logplex_grid:cast(logplex_channel, {delete_token, ChannelId, TokenId}),
+ #token{} ->
redis_helper:delete_token(TokenId);
_ ->
- ok
+ {error, not_found}
end.
-lookup(Token) when is_binary(Token) ->
- case ets:lookup(?MODULE, Token) of
- [Token1] when is_record(Token1, token) ->
- Token1;
+lookup(TokenId) when is_binary(TokenId) ->
+ case ets:lookup(tokens, TokenId) of
+ [Token] when is_record(Token, token) ->
+ Token;
_ ->
undefined
end.
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-%%--------------------------------------------------------------------
-%% Function: init(Args) -> {ok, State} |
-%% {ok, State, Timeout} |
-%% ignore |
-%% {stop, Reason}
-%% Description: Initiates the server
-%% @hidden
-%%--------------------------------------------------------------------
-init([]) ->
- ets:new(?MODULE, [protected, named_table, set, {keypos, 2}]),
- populate_cache(),
- {ok, []}.
-
-%%--------------------------------------------------------------------
-%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
-%% {reply, Reply, State, Timeout} |
-%% {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, Reply, State} |
-%% {stop, Reason, State}
-%% Description: Handling call messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_call({create_token, ChannelId, TokenId, TokenName, AppId, Addon}, _From, State) ->
- ets:insert(?MODULE, #token{id=TokenId, channel_id=ChannelId, name=TokenName, app_id=AppId, addon=Addon}),
- {reply, ok, State};
-
-handle_call(_Msg, _From, State) ->
- {reply, {error, invalid_call}, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_cast(Msg, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling cast messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_cast({delete_channel, ChannelId}, State) ->
- ets:match_delete(?MODULE, #token{id='_', channel_id=ChannelId, name='_', app_id='_', addon='_'}),
- {noreply, State};
-
-handle_cast({delete_token, TokenId}, State) ->
- ets:delete(?MODULE, TokenId),
- {noreply, State};
-
-handle_cast({update_addon, ChannelId, Addon}, State) ->
- [begin
- ets:insert(?MODULE, Token#token{addon=Addon})
- end || Token <- ets:match_object(?MODULE, #token{id='_', channel_id=ChannelId, name='_', app_id='_', addon='_'})],
- {noreply, State};
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
+refresh_dns() ->
+ proc_lib:start_link(?MODULE, init, [self()]).
-%%--------------------------------------------------------------------
-%% Function: handle_info(Info, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling all non call/cast messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_info(_Info, State) ->
- {noreply, State}.
+init(Parent) ->
+ proc_lib:init_ack(Parent, {ok, self()}),
+ loop().
-%%--------------------------------------------------------------------
-%% Function: terminate(Reason, State) -> void()
-%% Description: This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-%% @hidden
-%%--------------------------------------------------------------------
-terminate(_Reason, _State) ->
- ok.
+loop() ->
+ timer:sleep(60 * 1000),
+ refresh_tokens(ets:tab2list(tokens)),
+ loop().
-%%--------------------------------------------------------------------
-%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% Description: Convert process state when code is changed
-%% @hidden
-%%--------------------------------------------------------------------
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+refresh_tokens([]) ->
+ ok;
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------
-populate_cache() ->
- Data = [begin
- case logplex_channel:lookup(Token#token.channel_id) of
- #channel{app_id=AppId, addon=Addon} -> Token#token{app_id=AppId, addon=Addon};
- _ -> Token
+refresh_tokens([#token{drains=Drains}=Token|Tail]) ->
+ Drains1 = [begin
+ case logplex_utils:resolve_host(Host) of
+ undefined -> Drain;
+ Ip -> Drain#drain{resolved_host=Ip}
end
- end || Token <- redis_helper:lookup_tokens()],
- length(Data) > 0 andalso ets:insert(?MODULE, Data).
+ end || #drain{host=Host}=Drain <- Drains],
+ ets:insert(tokens, Token#token{drains=Drains1}),
+ refresh_tokens(Tail).
View
72 src/logplex_utils.erl
@@ -21,12 +21,21 @@
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-module(logplex_utils).
--export([set_weight/1, shard_info/0, setup_test_channel/2, resolve_host/1, parse_msg/1, filter/2,
- formatted_utc_date/0, format/1, field_val/2, field_val/3,
- redis_opts/1, parse_redis_url/1, instance_name/0, heorku_domain/0]).
+-export([rpc/4, set_weight/1, setup_test_channel/2, resolve_host/1,
+ parse_msg/1, filter/2, formatted_utc_date/0, format/1, field_val/2, field_val/3,
+ redis_opts/1, parse_redis_url/1, instance_name/0, heroku_domain/0]).
-include_lib("logplex.hrl").
+rpc(Node, M, F, A) when is_atom(Node), is_atom(M), is_atom(F), is_list(A) ->
+ case net_adm:ping(Node) of
+ pong ->
+ Res = rpc:call(Node, M, F, A),
+ io:format("~100p~n", [Res]);
+ pang ->
+ io:format("Failed to connect to ~p~n", [Node])
+ end.
+
set_weight(Weight) when is_integer(Weight), Weight < 0 ->
set_weight(0);
@@ -34,60 +43,7 @@ set_weight(Weight) when is_integer(Weight), Weight > 100 ->
set_weight(100);
set_weight(Weight) when is_integer(Weight) ->
- application:start(sasl),
- application:start(redis),
- Opts = redis_opts("LOGPLEX_CONFIG_REDIS_URL"),
- redis_pool:add(config_pool, Opts, 1),
- LocalIp =
- case os:getenv("LOCAL_IP") of
- false -> <<"127.0.0.1">>;
- Val1 -> list_to_binary(Val1)
- end,
- Domain = heorku_domain(),
- Res = redis_helper:set_weight(Domain, LocalIp, Weight),
- io:format("set_weight ~p => ~w: ~p~n", [LocalIp, Weight, Res]).
-
-shard_info() ->
- application:start(sasl),
- application:start(redis),
- Opts = redis_opts("LOGPLEX_CONFIG_REDIS_URL"),
- redis_pool:add(config_pool, Opts, 1),
- [shard_info(binary_to_list(Url)) || Url <- redis_helper:shard_urls()],
- ok.
-
-shard_info(Url) ->
- Opts = parse_redis_url(Url),
- {ok, Pool} = redis_pool:start_link(Opts),
- redis_pool:expand(Pool, 1),
- Pid = redis_pool:pid(Pool),
- case redis:q(Pid, [<<"KEYS">>, <<"ch:*:spool">>]) of
- Keys when is_list(Keys) ->
- Result = [debug_object(Pid, Key) || Key <- Keys],
- io:format("== ~s~n", [Url]),
- [begin
- io:format("~s bytes ~s~n", [string:right(integer_to_list(N), 10, $ ), Key])
- end || {N, Key} <- lists:sort(Result)];
- W -> io:format("keys? ~p~n", [W]),
- ok
- end,
- redis_pool:remove_pool(shard_pool),
- ok.
-
-debug_object(Pid, Key) ->
- case redis:q(Pid, [<<"DEBUG">>, <<"OBJECT">>, Key]) of
- Output ->
- Tokens = string:tokens(binary_to_list(Output), " "),
- Len1 = lists:foldl(
- fun(Token, Acc) ->
- case string:tokens(Token, ":") of
- ["serializedlength", Len] -> list_to_integer(Len);
- _ -> Acc
- end
- end, 0, Tokens),
- {Len1, Key};
- _ ->
- {0, Key}
- end.
+ redgrid:update_meta([{"weight", integer_to_list(Weight)}]).
setup_test_channel(ChannelName, AppId) when is_binary(ChannelName), is_integer(AppId) ->
ChannelId = logplex_channel:create(ChannelName, AppId, <<"advanced">>),
@@ -171,7 +127,7 @@ instance_name() ->
InstanceName -> InstanceName
end.
-heorku_domain() ->
+heroku_domain() ->
case get(heroku_domain) of
undefined ->
Domain =
View
59 src/logplex_worker.erl
@@ -43,7 +43,7 @@ loop(#state{regexp=RE, map=Map, interval=Interval}=State) ->
case catch logplex_queue:out(logplex_work_queue) of
timeout ->
ok;
- {'EXIT', {noproc, _}} ->
+ {'EXIT', _} ->
exit(normal);
{1, [Msg]} ->
case re:run(Msg, RE, [{capture, all_but_first, binary}]) of
@@ -57,49 +57,28 @@ loop(#state{regexp=RE, map=Map, interval=Interval}=State) ->
route(Token, Map, Interval, Msg) when is_binary(Token), is_binary(Msg) ->
case logplex_token:lookup(Token) of
- #token{channel_id=ChannelId, name=TokenName, app_id=AppId, addon=Addon} ->
- Count = logplex_stats:incr(logplex_stats_channels, {message_received, AppId, ChannelId}),
- case exceeded_threshold(ChannelId, Count, Addon) of
- true ->
- ok;
- notify ->
- case logplex_rate_limit:lock(ChannelId) of
- true ->
- BufferPid = logplex_shard:lookup(integer_to_list(ChannelId), Map, Interval),
- Msg1 = iolist_to_binary(["<40>1 ", logplex_utils:formatted_utc_date(), " - heroku logplex - - You have exceeded ",
- integer_to_list(throughput(Addon)), " logs/min. Please upgrade your logging addon for higher throughput."]),
- process(ChannelId, BufferPid, Addon, Msg1);
- false ->
- ok
- end;
- false ->
- BufferPid = logplex_shard:lookup(integer_to_list(ChannelId), Map, Interval),
- logplex_stats:incr(logplex_stats_channels, {message_processed, AppId, ChannelId}),
- Msg1 = iolist_to_binary(re:replace(Msg, Token, TokenName)),
- process(ChannelId, BufferPid, Addon, Msg1)
- end;
+ #token{channel_id=ChannelId, name=TokenName, app_id=AppId, drains=Drains} ->
+ BufferPid = logplex_shard:lookup(integer_to_list(ChannelId), Map, Interval),
+ logplex_stats:incr(logplex_stats_channels, {message_processed, AppId, ChannelId}),
+ Msg1 = iolist_to_binary(re:replace(Msg, Token, TokenName)),
+ process_drains(Drains, Msg1),
+ process_tails(ChannelId, Msg1),
+ process_msg(ChannelId, BufferPid, Msg1);
_ ->
ok
end.
-process(ChannelId, BufferPid, Addon, Msg) ->
- logplex_tail:route(ChannelId, Msg),
- [logplex_queue:in(logplex_drain_buffer, {Host, Port, Msg}) || #drain{resolved_host=Host, port=Port} <- logplex_channel:drains(ChannelId)],
- logplex_queue:in(BufferPid, redis_helper:build_push_msg(ChannelId, spool_length(Addon), Msg)).
-
-throughput(<<"basic">>) -> ?BASIC_THROUGHPUT;
-throughput(<<"expanded">>) -> ?EXPANDED_THROUGHPUT.
+process_drains([], _Msg) ->
+ ok;
-exceeded_threshold(_ChannelId, _Count, <<"advanced">>) ->
- false;
-exceeded_threshold(ChannelId, Count, Addon) ->
- logplex_rate_limit:is_locked(ChannelId) orelse exceeded_threshold(Count, Addon).
+process_drains([#drain{resolved_host=Host, port=Port}|Tail], Msg) ->
+ logplex_queue:in(logplex_drain_buffer, {Host, Port, Msg}),
+ process_drains(Tail, Msg).
-exceeded_threshold(Count, <<"expanded">>) when Count =< ?EXPANDED_THROUGHPUT -> false;
-exceeded_threshold(Count, <<"expanded">>) when Count == (?EXPANDED_THROUGHPUT + 1) -> notify;
-exceeded_threshold(Count, <<"basic">>) when Count =< ?BASIC_THROUGHPUT -> false;
-exceeded_threshold(Count, <<"basic">>) when Count == (?BASIC_THROUGHPUT + 1) -> notify;
-exceeded_threshold(_, _) -> true.
+process_tails(ChannelId, Msg) ->
+ logplex_tail:route(ChannelId, Msg),
+ ok.
-spool_length(<<"advanced">>) -> ?ADVANCED_LOG_HISTORY;
-spool_length(_) -> ?DEFAULT_LOG_HISTORY.
+process_msg(ChannelId, BufferPid, Msg) ->
+ logplex_queue:in(BufferPid, redis_helper:build_push_msg(ChannelId, ?LOG_HISTORY, Msg)),
+ ok.
View
222 src/nsync_callback.erl
@@ -0,0 +1,222 @@
+%% Copyright (c) 2010 Jacob Vorreuter <jacob.vorreuter@gmail.com>
+%%
+%% Permission is hereby granted, free of charge, to any person
+%% obtaining a copy of this software and associated documentation
+%% files (the "Software"), to deal in the Software without
+%% restriction, including without limitation the rights to use,
+%% copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the
+%% Software is furnished to do so, subject to the following
+%% conditions:
+%%
+%% The above copyright notice and this permission notice shall be
+%% included in all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+%% OTHER DEALINGS IN THE SOFTWARE.
+-module(nsync_callback).
+-export([handle/1]).
+
+-include_lib("logplex.hrl").
+
+%% nsync callbacks
+
+%% LOAD
+handle({load, <<"ch:", Rest/binary>>, Dict}) when is_tuple(Dict) ->
+ Id = list_to_integer(parse_id(Rest)),
+ create_channel(Id, Dict);
+
+handle({load, <<"tok:", Rest/binary>>, Dict}) when is_tuple(Dict) ->
+ Id = list_to_binary(parse_id(Rest)),
+ create_token(Id, Dict);
+
+handle({load, <<"drain:", Rest/binary>>, Dict}) when is_tuple(Dict)