diff --git a/src/logplex_channel.erl b/src/logplex_channel.erl index 48191b00..fb11ea46 100644 --- a/src/logplex_channel.erl +++ b/src/logplex_channel.erl @@ -149,11 +149,16 @@ lookup(ChannelId) when is_integer(ChannelId) -> _ -> undefined end. +-spec lookup_flag(id(), F) -> F | 'no_such_flag' | 'not_found' + when is_subtype(F, flag()). lookup_flag(ChannelId, Flag) when Flag =:= no_tail; Flag =:= no_redis -> try Flags =ets:lookup_element(channels, ChannelId, #channel.flags), - lists:member(Flag, Flags) + case lists:member(Flag, Flags) of + true -> Flag; + false -> no_such_flag + end catch error:badarg -> not_found diff --git a/src/logplex_db.erl b/src/logplex_db.erl index e3f24198..f5e84441 100644 --- a/src/logplex_db.erl +++ b/src/logplex_db.erl @@ -32,7 +32,6 @@ start_link() -> create_ets_tables() -> ets:new(channels, [named_table, public, set, {keypos, 2}]), - logplex_redis_quarantine:create_ets_table(), ets:new(tokens, [named_table, public, set, {keypos, 2}]), ets:new(drains, [named_table, public, set, {keypos, 2}]), logplex_session:create_ets_table(), diff --git a/src/logplex_redis_quarantine.erl b/src/logplex_redis_quarantine.erl deleted file mode 100644 index 4febfbc5..00000000 --- a/src/logplex_redis_quarantine.erl +++ /dev/null @@ -1,43 +0,0 @@ -%%%------------------------------------------------------------------- -%% @copyright Geoff Cant -%% @author Geoff Cant -%% @version {@vsn}, {@date} {@time} -%% @doc Table manager for redis channel quarantines. -%% @end -%%%------------------------------------------------------------------- --module(logplex_redis_quarantine). - --include_lib("eunit/include/eunit.hrl"). - -%% API --export([create_ets_table/0 - ,channel/1 - ,quarantine_channel/1 - ,unquarantine_channel/1 - ]). --define(TABLE, ?MODULE). - -%%==================================================================== -%% API -%%==================================================================== - -create_ets_table() -> - ets:new(?MODULE, [named_table, public, set]). - --spec channel(ChannelId::integer()) -> quarantined | not_quarantined. -channel(ChannelId) when is_integer(ChannelId) -> - try ets:lookup(?TABLE, {channel, ChannelId}) of - [] -> - not_quarantined; - [_] -> - quarantined - catch - error:badarg -> - not_quarantined - end. - -quarantine_channel(ChannelId) when is_integer(ChannelId) -> - ets:insert(?TABLE, {{channel, ChannelId}, os:timestamp()}). - -unquarantine_channel(ChannelId) when is_integer(ChannelId) -> - ets:delete(?TABLE, {channel, ChannelId}). diff --git a/src/logplex_worker.erl b/src/logplex_worker.erl index 318a8976..46f1608d 100644 --- a/src/logplex_worker.erl +++ b/src/logplex_worker.erl @@ -107,8 +107,8 @@ process_tails(ChannelId, Msg) -> ok. process_msg(ChannelId, State, Msg) -> - case logplex_redis_quarantine:channel(ChannelId) of - not_quarantined -> + case logplex_channel:has_flag(no_redis, ChannelId) of + no_redis -> {Map, Interval} = map_interval(State), BufferPid = logplex_shard:lookup(integer_to_list(ChannelId), Map, Interval), diff --git a/src/nsync_callback.erl b/src/nsync_callback.erl index 34693174..3e7a3ef1 100644 --- a/src/nsync_callback.erl +++ b/src/nsync_callback.erl @@ -41,9 +41,6 @@ handle({load, <<"drain:", Rest/binary>>, Dict}) when is_tuple(Dict) -> Id = drain_id(parse_id(Rest)), create_drain(Id, Dict); -handle({load, <<"quarantine:channels">>, Channels}) when is_list(Channels) -> - quarantine_channels(Channels); - handle({load, _Key, _Val}) -> ok; @@ -98,12 +95,6 @@ handle({cmd, "del", [<<"session:", UUID/binary>> | _Args]}) catch logplex_session:delete(UUID), ?INFO("at=delete type=session id=~p", [UUID]); -handle({cmd, "sadd", [<<"quarantine:channels">> | Members]}) -> - quarantine_channels(Members); - -handle({cmd, "srem", [<<"quarantine:channels">> | Members]}) -> - unquarantine_channels(Members); - handle({cmd, _Cmd, [<<"redgrid", _/binary>>|_]}) -> ok; @@ -234,27 +225,6 @@ dict_find(Key, Dict) -> _ -> undefined end. -quarantine_channels(Channels) -> - [ try - ID = channel_id(Channel), - logplex_redis_quarantine:quarantine_channel(ID), - ?INFO("at=quarantine_channel channel_id=~p", - [ID]) - catch - _:_ -> ok - end - || Channel <- Channels]. - -unquarantine_channels(Channels) -> - [ try - ID = channel_id(Channel), - logplex_redis_quarantine:unquarantine_channel(ID), - ?INFO("at=unquarantine_channel channel_id=~p", - [ID]) - catch - _:_ -> ok - end - || Channel <- Channels]. channel_id(Bin) when is_binary(Bin) -> list_to_integer(binary_to_list(Bin)).