Skip to content
This repository has been archived by the owner on Sep 27, 2023. It is now read-only.

Commit

Permalink
add flags table
Browse files Browse the repository at this point in the history
  • Loading branch information
jkvor committed Jun 29, 2011
1 parent ab7ca72 commit dab01ac
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 6 deletions.
2 changes: 1 addition & 1 deletion include/logplex.hrl
Expand Up @@ -2,7 +2,7 @@

-record(msg, {time, source, ps, content}).
-record(channel, {id, name, app_id}).
-record(token, {id, channel_id, name, app_id, drains=[]}).
-record(token, {id, channel_id, name, app_id, drains=[], flags=[]}).
-record(drain, {id, channel_id, token, resolved_host, host, port}).
-record(session, {id, body}).

Expand Down
3 changes: 2 additions & 1 deletion src/logplex_channel.erl
Expand Up @@ -63,7 +63,8 @@ lookup_drains(ChannelId) when is_integer(ChannelId) ->
ets:match_object(drains, drain_match_expr(ChannelId)).

token_match_expr(ChannelId) ->
#token{id='_', channel_id=ChannelId, name='_', app_id='_', drains='_'}.
T = logplex_utils:empty_token(),
T#token{channel_id=ChannelId}.

drain_match_expr(ChannelId) ->
#drain{id='_', channel_id=ChannelId, token='_', resolved_host='_', host='_', port='_'}.
Expand Down
1 change: 1 addition & 0 deletions src/logplex_db.erl
Expand Up @@ -36,6 +36,7 @@ create_ets_tables() ->
ets:new(tokens, [named_table, public, set, {keypos, 2}]),
ets:new(drains, [named_table, public, set, {keypos, 2}]),
ets:new(sessions, [named_table, public, set, {keypos, 2}]),
ets:new(flags, [named_table, public, set]),
ok.

boot_nsync() ->
Expand Down
3 changes: 2 additions & 1 deletion src/logplex_token.erl
Expand Up @@ -84,7 +84,8 @@ new_token(0) ->

new_token(Retries) ->
Token = list_to_binary("t." ++ string:strip(os:cmd("uuidgen"), right, $\n)),
case ets:match_object(tokens, #token{id=Token, channel_id='_', name='_', app_id='_', drains='_'}) of
T = logplex_utils:empty_token(),
case ets:match_object(tokens, T#token{id=Token}) of
[#token{}] -> new_token(Retries-1);
[] -> Token
end.
5 changes: 4 additions & 1 deletion src/logplex_utils.erl
Expand Up @@ -23,7 +23,7 @@
-module(logplex_utils).
-export([rpc/4, set_weight/1, setup_test_channel/2, resolve_host/1,
parse_msg/1, filter/2, formatted_utc_date/0, format/1, field_val/2, field_val/3,
redis_opts/1, parse_redis_url/1, instance_name/0, heroku_domain/0]).
empty_token/0, redis_opts/1, parse_redis_url/1, instance_name/0, heroku_domain/0]).

-include_lib("logplex.hrl").

Expand Down Expand Up @@ -101,6 +101,9 @@ field_val(Key, [_, _ | Tail], Default) ->
field_val(_Key, _, Default) ->
Default.

empty_token() ->
setelement(1, erlang:make_tuple(length(record_info(fields, token))+1, '_'), token).

redis_opts(ConfigVar) when is_list(ConfigVar) ->
case os:getenv(ConfigVar) of
false ->
Expand Down
53 changes: 51 additions & 2 deletions src/nsync_callback.erl
Expand Up @@ -40,12 +40,17 @@ handle({load, <<"drain:", Rest/binary>>, Dict}) when is_tuple(Dict) ->
Id = list_to_integer(parse_id(Rest)),
create_drain(Id, Dict);

handle({load, <<"flags:", Rest/binary>>, List}) when is_list(List) ->
Id = list_to_integer(parse_id(Rest)),
create_flags(Id, List);

handle({load, _Key, _Val}) ->
ok;

handle({load, eof}) ->
populate_token_channel_data(ets:tab2list(tokens)),
populate_token_drain_data(ets:tab2list(drains)),
populate_token_flags(ets:tab2list(flags)),
error_logger:info_msg("NSYNC sync complete"),
application:set_env(logplex, read_only, false),
ok;
Expand Down Expand Up @@ -81,6 +86,22 @@ handle({cmd, "del", [<<"drain:", Rest/binary>> | _Args]}) ->
remove_token_drain_data(Id),
ets:delete(drains, Id);

handle({cmd, "sadd", [<<"flags:", Rest/binary>>, Item]}) ->
Id = list_to_integer(parse_id(Rest)),
Flags = create_flags(Id, [Item]),
populate_token_flags(Flags);

handle({cmd, "srem", [<<"flags:", Rest/binary>>, Item]}) ->
Id = list_to_integer(parse_id(Rest)),
List =
case ets:lookup(flags, Id) of
[] -> [];
[{Id, List0}] -> List0 -- [Item]
end,
Flags = {Id, lists:usort(List)},
ets:insert(flags, Flags),
populate_token_flags(Flags);

handle({cmd, _Cmd, _Args}) ->
ok;

Expand Down Expand Up @@ -150,6 +171,16 @@ create_drain(Id, Dict) ->
end
end.

create_flags(Id, List) ->
List1 =
case ets:lookup(flags, Id) of
[] -> List;
[{Id, List0}] -> List0 ++ List
end,
Flags = {Id, lists:usort(List1)},
ets:insert(flags, Flags),
Flags.

populate_token_channel_data([]) ->
ok;

Expand All @@ -169,7 +200,8 @@ populate_token_drain_data([]) ->
ok;

populate_token_drain_data([Drain|Tail]) when is_record(Drain, drain) ->
case ets:match_object(tokens, #token{id='_', channel_id=Drain#drain.channel_id, name='_', app_id='_', drains='_'}) of
T = logplex_utils:empty_token(),
case ets:match_object(tokens, T#token{channel_id=Drain#drain.channel_id}) of
[] ->
io:format("Error ~p ~p ~p ~p~n", [?MODULE, populate_token_drain_data, undefined_tokens, Drain]);
Tokens ->
Expand All @@ -181,12 +213,29 @@ populate_token_drain_data([Drain|Tail]) when is_record(Drain, drain) ->
populate_token_drain_data([_|Tail]) ->
populate_token_drain_data(Tail).

populate_token_flags([]) ->
ok;

populate_token_flags([{ChannelId, Flags}|Tail]) ->
populate_token_flags({ChannelId, Flags}),
populate_token_flags(Tail);

populate_token_flags({ChannelId, Flags}) ->
T = logplex_utils:empty_token(),
case ets:match_object(tokens, T#token{channel_id=ChannelId}) of
[] ->
ok;
Tokens ->
[ets:insert(tokens, Token#token{flags=Flags}) || Token <- Tokens]
end.

remove_token_drain_data(DrainId) ->
case logplex_drain:lookup(DrainId) of
undefined ->
io:format("Error ~p ~p ~p ~p~n", [?MODULE, remove_token_drain_data, undefined_drain, DrainId]);
Drain ->
case ets:match_object(tokens, #token{id='_', channel_id=Drain#drain.channel_id, name='_', app_id='_', drains='_'}) of
T = logplex_utils:empty_token(),
case ets:match_object(tokens, T#token{channel_id=Drain#drain.channel_id}) of
[] ->
io:format("Error ~p ~p ~p ~p~n", [?MODULE, remove_token_drain_data, undefined_tokens, Drain]);
Tokens ->
Expand Down

0 comments on commit dab01ac

Please sign in to comment.