Skip to content
This repository has been archived by the owner on Sep 27, 2023. It is now read-only.

Commit

Permalink
Replaces logplex_redis_quarantine with chan flags.
Browse files Browse the repository at this point in the history
  • Loading branch information
archaelus committed Jul 11, 2012
1 parent 015404f commit 751b843
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 77 deletions.
7 changes: 6 additions & 1 deletion src/logplex_channel.erl
Expand Up @@ -149,11 +149,16 @@ lookup(ChannelId) when is_integer(ChannelId) ->
_ -> undefined
end.

-spec lookup_flag(id(), F) -> F | 'no_such_flag' | 'not_found'
when is_subtype(F, flag()).
lookup_flag(ChannelId, Flag) when Flag =:= no_tail;
Flag =:= no_redis ->
try
Flags =ets:lookup_element(channels, ChannelId, #channel.flags),
lists:member(Flag, Flags)
case lists:member(Flag, Flags) of
true -> Flag;
false -> no_such_flag
end
catch
error:badarg ->
not_found
Expand Down
1 change: 0 additions & 1 deletion src/logplex_db.erl
Expand Up @@ -32,7 +32,6 @@ start_link() ->

create_ets_tables() ->
ets:new(channels, [named_table, public, set, {keypos, 2}]),
logplex_redis_quarantine:create_ets_table(),
ets:new(tokens, [named_table, public, set, {keypos, 2}]),
ets:new(drains, [named_table, public, set, {keypos, 2}]),
logplex_session:create_ets_table(),
Expand Down
43 changes: 0 additions & 43 deletions src/logplex_redis_quarantine.erl

This file was deleted.

4 changes: 2 additions & 2 deletions src/logplex_worker.erl
Expand Up @@ -107,8 +107,8 @@ process_tails(ChannelId, Msg) ->
ok.

process_msg(ChannelId, State, Msg) ->
case logplex_redis_quarantine:channel(ChannelId) of
not_quarantined ->
case logplex_channel:has_flag(no_redis, ChannelId) of
no_redis ->
{Map, Interval} = map_interval(State),
BufferPid = logplex_shard:lookup(integer_to_list(ChannelId),
Map, Interval),
Expand Down
30 changes: 0 additions & 30 deletions src/nsync_callback.erl
Expand Up @@ -41,9 +41,6 @@ handle({load, <<"drain:", Rest/binary>>, Dict}) when is_tuple(Dict) ->
Id = drain_id(parse_id(Rest)),
create_drain(Id, Dict);

handle({load, <<"quarantine:channels">>, Channels}) when is_list(Channels) ->
quarantine_channels(Channels);

handle({load, _Key, _Val}) ->
ok;

Expand Down Expand Up @@ -98,12 +95,6 @@ handle({cmd, "del", [<<"session:", UUID/binary>> | _Args]})
catch logplex_session:delete(UUID),
?INFO("at=delete type=session id=~p", [UUID]);

handle({cmd, "sadd", [<<"quarantine:channels">> | Members]}) ->
quarantine_channels(Members);

handle({cmd, "srem", [<<"quarantine:channels">> | Members]}) ->
unquarantine_channels(Members);

handle({cmd, _Cmd, [<<"redgrid", _/binary>>|_]}) ->
ok;

Expand Down Expand Up @@ -234,27 +225,6 @@ dict_find(Key, Dict) ->
_ -> undefined
end.

quarantine_channels(Channels) ->
[ try
ID = channel_id(Channel),
logplex_redis_quarantine:quarantine_channel(ID),
?INFO("at=quarantine_channel channel_id=~p",
[ID])
catch
_:_ -> ok
end
|| Channel <- Channels].

unquarantine_channels(Channels) ->
[ try
ID = channel_id(Channel),
logplex_redis_quarantine:unquarantine_channel(ID),
?INFO("at=unquarantine_channel channel_id=~p",
[ID])
catch
_:_ -> ok
end
|| Channel <- Channels].

channel_id(Bin) when is_binary(Bin) ->
list_to_integer(binary_to_list(Bin)).
Expand Down

0 comments on commit 751b843

Please sign in to comment.