Skip to content
This repository has been archived by the owner on Sep 27, 2023. It is now read-only.

Commit

Permalink
fixing sync_transactions and tweaking timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
jkvor committed May 26, 2011
1 parent f448176 commit 2acc100
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 18 deletions.
30 changes: 23 additions & 7 deletions src/logplex_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,29 @@
-include_lib("logplex.hrl").

create(ChannelName, AppId, Addon) when is_binary(ChannelName), is_integer(AppId), is_binary(Addon) ->
ChannelId = mnesia:dirty_update_counter(counters, channel, 1),
{atomic, _} = mnesia:transaction(
fun() ->
Channel = #channel{id=ChannelId, name=ChannelName, app_id=AppId, addon=Addon},
mnesia:write(channel, Channel, write)
end),
ChannelId.
case sync_incr_channel_id() of
{atomic, ChannelId} when is_integer(ChannelId) ->
{atomic, _} = mnesia:sync_transaction(
fun() ->
Channel = #channel{id=ChannelId, name=ChannelName, app_id=AppId, addon=Addon},
mnesia:write(channel, Channel, write)
end),
ChannelId;
{aborted, Reason} ->
{error, Reason}
end.

sync_incr_channel_id() ->
mnesia:sync_transaction(fun() ->
case mnesia:wread({counters, channel}) of
[{counters, channel, ChannelId}] ->
mnesia:write(counters, {counters, channel, ChannelId+1}, write),
ChannelId+1;
[] ->
mnesia:write(counters, {counters, channel, 1}, write),
1
end
end).

delete(ChannelId) when is_integer(ChannelId) ->
case lookup(ChannelId) of
Expand Down
30 changes: 23 additions & 7 deletions src/logplex_drain.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,32 @@ create(ChannelId, Host, Port) when is_integer(ChannelId), is_binary(Host), (is_i
error_logger:error_msg("invalid drain: ~p:~p~n", [Host, Port]),
{error, invalid_drain};
Ip ->
DrainId = mnesia:dirty_update_counter(counters, drain, 1),
{atomic, _} = mnesia:transaction(
fun() ->
Drain = #drain{id=DrainId, channel_id=ChannelId, resolved_host=Ip, host=Host, port=Port},
mnesia:write(drain, Drain, write)
end),
DrainId
case sync_incr_drain_id() of
{atomic, DrainId} when is_integer(DrainId) ->
{atomic, _} = mnesia:transaction(
fun() ->
Drain = #drain{id=DrainId, channel_id=ChannelId, resolved_host=Ip, host=Host, port=Port},
mnesia:write(drain, Drain, write)
end),
DrainId;
{aborted, Reason} ->
{error, Reason}
end
end
end.

sync_incr_drain_id() ->
mnesia:sync_transaction(fun() ->
case mnesia:wread({counters, drain}) of
[{counters, drain, DrainId}] ->
mnesia:write(counters, {counters, drain, DrainId+1}, write),
DrainId+1;
[] ->
mnesia:write(counters, {counters, drain, 1}, write),
1
end
end).

delete(ChannelId, Host, Port) when is_integer(ChannelId), is_binary(Host) ->
Port1 = if Port == "" -> undefined; true -> list_to_integer(Port) end,
case ets:match_object(drain, #drain{id='_', channel_id=ChannelId, resolved_host='_', host=Host, port=Port1}) of
Expand Down
6 changes: 3 additions & 3 deletions src/logplex_rate_limit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

-include_lib("logplex.hrl").

-define(TIMEOUT, 10000).
-define(LOCK_TIMEOUT, 1000).

%% API functions
start_link() ->
Expand All @@ -57,9 +57,9 @@ lock(ChannelId) when is_integer(ChannelId) ->
end.

set_lock(Node, ChannelId) when is_integer(ChannelId) ->
gen_server:call({?MODULE, Node}, {set_lock, ChannelId}, ?TIMEOUT).
gen_server:call({?MODULE, Node}, {set_lock, ChannelId}, ?LOCK_TIMEOUT).
set_lock(ChannelId) when is_integer(ChannelId) ->
gen_server:call(?MODULE, {set_lock, ChannelId}, ?TIMEOUT).
gen_server:call(?MODULE, {set_lock, ChannelId}, ?LOCK_TIMEOUT).

clear_all() ->
gen_server:cast(?MODULE, clear_all).
Expand Down
2 changes: 1 addition & 1 deletion src/logplex_token.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ create(ChannelId, TokenName) when is_integer(ChannelId), is_binary(TokenName) ->
case logplex_channel:lookup(ChannelId) of
#channel{app_id=AppId, addon=Addon} ->
TokenId = list_to_binary("t." ++ string:strip(os:cmd("uuidgen"), right, $\n)),
{atomic, _} = mnesia:transaction(
{atomic, _} = mnesia:sync_transaction(
fun() ->
Token = #token{id=TokenId, channel_id=ChannelId, name=TokenName, app_id=AppId, addon=Addon},
mnesia:write(token, Token, write)
Expand Down

0 comments on commit 2acc100

Please sign in to comment.