Skip to content

Commit

Permalink
Replace now() with os:tomestamp().
Browse files Browse the repository at this point in the history
Fix peer receiving speed rate.
Add etorrent_console:set_enabled/1.
Add a manual garbage collection, hibernation and fullsweep_after for idle processes.
  • Loading branch information
arcusfelis committed Mar 19, 2013
1 parent 99d2b27 commit 74ae4bb
Show file tree
Hide file tree
Showing 17 changed files with 79 additions and 68 deletions.
37 changes: 27 additions & 10 deletions src/etorrent_console.erl
Expand Up @@ -6,7 +6,8 @@
%% API Function Exports
%% ------------------------------------------------------------------

-export([start_link/0]).
-export([start_link/0,
set_enabled/1]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
Expand Down Expand Up @@ -47,9 +48,9 @@
}).

-record(state, {
timer :: reference(),
update_tref :: timer:tref(),
torrents :: [#torrent{}],
tick :: integer()
update_timeout :: integer()
}).


Expand All @@ -58,30 +59,46 @@
%% ------------------------------------------------------------------

start_link() ->
Args = [2000],
gen_server:start_link({local, ?SERVER}, ?MODULE, Args, []).
gen_server:start_link({local, ?SERVER}, ?MODULE, [2000], []).

set_enabled(IsEnabled) when is_boolean(IsEnabled) ->
gen_server:call(?SERVER, {set_enabled, IsEnabled}).


%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------

init([Timeout]) ->
timer:send_interval(Timeout, update),
{ok, TRef} = timer:send_interval(Timeout, update_timeout),
SD = #state{
tick = Timeout,
torrents=[]
update_timeout = Timeout,
torrents=[],
update_tref=TRef
},
{ok, SD}.

handle_call(_Mess, _From, SD) ->
handle_call({set_enabled, false}, _From, SD=#state{update_tref=undefined}) ->
%% Do nothing, because it is already disabled.
{reply, ok, SD};
handle_call({set_enabled, false}, _From, SD=#state{update_tref=TRef}) ->
{ok, cancel} = timer:cancel(TRef),
%% Disable this server.
{reply, ok, SD#state{update_tref=undefined, torrents=[]}};
handle_call({set_enabled, true}, _From, SD=#state{update_tref=undefined,
update_timeout=Timeout}) ->
{ok, TRef} = timer:send_interval(Timeout, update_timeout),
%% Enable this server.
{reply, ok, SD#state{update_tref=TRef}};
handle_call({set_enabled, true}, _From, SD=#state{}) ->
%% Do nothing, because it is already enabled.
{reply, ok, SD}.

handle_cast(_Mess, SD) ->
{noreply, SD}.


handle_info(update, SD=#state{torrents=OldTorrents, tick=Timeout}) ->
handle_info(update_timeout, SD=#state{torrents=OldTorrents, update_timeout=Timeout}) ->
% proplists from etorrent.
PLs = query_torrent_list(),
UnsortedNewTorrents = lists:map(fun to_record/1, PLs),
Expand Down
4 changes: 3 additions & 1 deletion src/etorrent_ctl.erl
Expand Up @@ -34,7 +34,9 @@
% @end
-spec start_link(binary()) -> {ok, pid()} | ignore | {error, term()}.
start_link(PeerId) when is_binary(PeerId) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [PeerId], []).
SOpts = [{fullsweep_after, 0}],
Opts = [{spawn_opt, SOpts}],
gen_server:start_link({local, ?SERVER}, ?MODULE, [PeerId], Opts).

% @doc Ask the manager process to start a new torrent, given in File.
% @end
Expand Down
18 changes: 9 additions & 9 deletions src/etorrent_dht_state.erl
Expand Up @@ -326,7 +326,7 @@ unreachable_tab() ->
etorrent_dht_unreachable_cache_tab.

random_node_tag() ->
random:seed(now()),
random:seed(os:timestamp()),
random:uniform(max_unreachable()).

%% @private
Expand Down Expand Up @@ -358,7 +358,7 @@ init([StateFile, BootstapNodes]) ->
node_timeout=NTimeout,
buck_timeout=BTimeout} = #state{},

Now = now(),
Now = os:timestamp(),
BTimers = lists:foldl(fun(Range, Acc) ->
BTimer = bucket_timer_from(Now, NTimeout, Now, BTimeout, Range),
add_timer(Range, Now, BTimer, Acc)
Expand Down Expand Up @@ -394,7 +394,7 @@ handle_call({is_interesting, InputID, IP, Port}, _From, State) ->

handle_call({insert_node, InputID, IP, Port}, _From, State) ->
ID = ensure_int_id(InputID),
Now = now(),
Now = os:timestamp(),
Node = {ID, IP, Port},
#state{
node_id=Self,
Expand Down Expand Up @@ -501,7 +501,7 @@ handle_call({closest_to, InputID, NumNodes}, _, State) ->
handle_call({request_timeout, InputID, IP, Port}, _, State) ->
ID = ensure_int_id(InputID),
Node = {ID, IP, Port},
Now = now(),
Now = os:timestamp(),
#state{
buckets=Buckets,
node_timeout=NTimeout,
Expand All @@ -521,7 +521,7 @@ handle_call({request_timeout, InputID, IP, Port}, _, State) ->

handle_call({request_success, InputID, IP, Port}, _, State) ->
ID = ensure_int_id(InputID),
Now = now(),
Now = os:timestamp(),
Node = {ID, IP, Port},
#state{
buckets=Buckets,
Expand Down Expand Up @@ -584,7 +584,7 @@ handle_cast(_, State) ->
%% @private
handle_info({inactive_node, InputID, IP, Port}, State) ->
ID = ensure_int_id(InputID),
Now = now(),
Now = os:timestamp(),
Node = {ID, IP, Port},
#state{
buckets=Buckets,
Expand Down Expand Up @@ -614,7 +614,7 @@ handle_info({inactive_node, InputID, IP, Port}, State) ->
{noreply, NewState};

handle_info({inactive_bucket, Range}, State) ->
Now = now(),
Now = os:timestamp(),
#state{
buckets=Buckets,
node_timers=NTimers,
Expand Down Expand Up @@ -859,7 +859,7 @@ timer_from(Time, Timeout, Msg) ->
erlang:send_after(Interval, self(), Msg).

ms_since(Time) ->
timer:now_diff(Time, now()) div 1000.
timer:now_diff(Time, os:timestamp()) div 1000.

ms_between(Time, Timeout) ->
MS = Timeout - ms_since(Time),
Expand All @@ -872,7 +872,7 @@ has_timed_out(Item, Timeout, Times) ->
ms_since(LastActive) > Timeout.

least_recent([], _) ->
now();
os:timestamp();
least_recent(Items, Times) ->
ATimes = [element(1, get_timer(I, Times)) || I <- Items],
lists:min(ATimes).
Expand Down
2 changes: 1 addition & 1 deletion src/etorrent_dht_tracker.erl
Expand Up @@ -137,7 +137,7 @@ poller_key() ->


random_peer() ->
random:seed(now()),
random:seed(os:timestamp()),
random:uniform(max_per_torrent()).

init(Args) ->
Expand Down
3 changes: 2 additions & 1 deletion src/etorrent_info.erl
Expand Up @@ -385,7 +385,8 @@ init([TorrentID, Torrent]) ->
metadata_pieces = list_to_tuple(metadata_pieces(TorrentBin, 0, MetadataSize)),
is_private=etorrent_metainfo:is_private(Torrent)
},
{ok, InitState}.
%% GC
{ok, InitState, hibernate}.


%% @private
Expand Down
4 changes: 2 additions & 2 deletions src/etorrent_io.erl
Expand Up @@ -485,10 +485,10 @@ handle_cast({schedule_operation, RelPath}, State) ->
rel_path=RelPath,
process=NewPid,
monitor=NewMon,
accessed=now()},
accessed=os:timestamp()},
[NewFile|OpenAfterClose];
_ ->
UpdatedFile = FileInfo#io_file{accessed=now()},
UpdatedFile = FileInfo#io_file{accessed=os:timestamp()},
lists:keyreplace(RelPath, #io_file.rel_path, OpenAfterClose, UpdatedFile)
end,
NewState = State#state{files_open=WithNewFile},
Expand Down
1 change: 1 addition & 0 deletions src/etorrent_io_sup.erl
Expand Up @@ -31,6 +31,7 @@ init([TorrentID, Torrent]) ->
Dldir = etorrent_config:download_dir(),
FileSup = file_server_sup_spec(TorrentID, Dldir, Files),
lager:debug("Completing initialization of IO supervisor for ~p.", [TorrentID]),
erlang:garbage_collect(),
{ok, {{one_for_one, 1, 60}, [FileSup, DirServer]}}.

%% ----------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/etorrent_memory_logger.erl
Expand Up @@ -56,7 +56,7 @@ init(_Args) ->

%% @private
handle_event(Evt, S) ->
Now = now(),
Now = os:timestamp(),
ets:insert_new(?TAB, {Now, erlang:localtime(), Evt}),
prune_old_events(),
{ok, S}.
Expand Down
2 changes: 1 addition & 1 deletion src/etorrent_peer_control.erl
Expand Up @@ -184,7 +184,7 @@ init([TrackerUrl, LocalPeerID, RemotePeerID,
InfoHash, TorrentID, {IP, Port}, Caps, Socket]) ->
lager:info("New peer ~p:~p is known as ~p for #~p.",
[IP, Port, RemotePeerID, TorrentID]),
random:seed(now()),
random:seed(os:timestamp()),
%% Use socket handle as remote peer-id.
register_server(TorrentID, Socket),
Download = etorrent_download:await_servers(TorrentID),
Expand Down
8 changes: 4 additions & 4 deletions src/etorrent_peer_mgr.erl
Expand Up @@ -76,7 +76,7 @@ is_bad_peer(IP, Port) ->
%% ====================================================================

init([LocalPeerId]) ->
random:seed(now()), %% Seed RNG
random:seed(os:timestamp()), %% Seed RNG
erlang:send_after(?CHECK_TIME, self(), cleanup_table),
_Tid = ets:new(etorrent_bad_peer, [protected, named_table,
{keypos, #bad_peer.ipport}]),
Expand All @@ -97,19 +97,19 @@ handle_cast({enter_bad_peer, IP, Port, PeerId}, S) ->
#bad_peer { ipport = {IP, Port},
peerid = PeerId,
offenses = 1,
last_offense = now() });
last_offense = os:timestamp() });
[P] ->
ets:insert(etorrent_bad_peer,
P#bad_peer { offenses = P#bad_peer.offenses + 1,
peerid = PeerId,
last_offense = now() })
last_offense = os:timestamp() })
end,
{noreply, S};
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(cleanup_table, S) ->
Bound = etorrent_utils:now_subtract_seconds(now(), ?GRACE_TIME),
Bound = etorrent_utils:now_subtract_seconds(os:timestamp(), ?GRACE_TIME),
_N = ets:select_delete(etorrent_bad_peer,
[{#bad_peer { last_offense = '$1', _='_'},
[{'<','$1',{Bound}}],
Expand Down
5 changes: 3 additions & 2 deletions src/etorrent_peer_recv.erl
Expand Up @@ -138,12 +138,13 @@ handle_info({rlimit, continue}, State) ->
end;

handle_info(rate_update, State) ->
#state{id=TorrentID, rate=Rate, last_piece_msg_count=PieceCount} = State,
#state{id=TorrentID, rate=Rate, last_piece_msg_count=PieceCount,
controller=CPid} = State,
NewRate = etorrent_rate:update(Rate, 0),
erlang:send_after(?RATE_UPDATE, self(), rate_update),
SnubState = is_snubbing_us(State),
ok = etorrent_peer_states:set_recv_rate(
TorrentID, self(), NewRate#peer_rate.rate, SnubState),
TorrentID, CPid, NewRate#peer_rate.rate, SnubState),
NewState = State#state{rate=NewRate, last_piece_msg_count=PieceCount+1},
{noreply, NewState};

Expand Down
10 changes: 6 additions & 4 deletions src/etorrent_peer_states.erl
Expand Up @@ -188,16 +188,18 @@ get_recv_rate(Id, Pid) -> fetch_rate(etorrent_recv_state, Id, Pid).
none | undefined | float().
get_send_rate(Id, Pid) -> fetch_rate(etorrent_send_state, Id, Pid).

%% @doc Set the receive rate of the peer
%% @doc Set the receive rate of the peer.
%% Pid is a pid of the peer control process.
%% @end
-spec set_recv_rate(integer(), pid(), float(), normal | snubbed) -> ok.
set_recv_rate(Id, Pid, Rate, SnubState) ->
set_recv_rate(Id, Pid, Rate, SnubState) when is_integer(Id), is_float(Rate) ->
alter_state(recv_rate, Id, Pid, Rate, SnubState).

%% @doc Set the send rate of the peer
%% @doc Set the send rate of the peer.
%% Pid is a pid of the peer control process.
%% @end
-spec set_send_rate(integer(), pid(), float()) -> ok.
set_send_rate(Id, Pid, Rate) ->
set_send_rate(Id, Pid, Rate) when is_integer(Id), is_float(Rate) ->
alter_state(send_rate, Id, Pid, Rate, unchanged).

%% @doc Get the rate of a given Torrent
Expand Down
6 changes: 3 additions & 3 deletions src/etorrent_proto_wire.erl
Expand Up @@ -111,12 +111,12 @@ incoming_packet(none, <<0:32/big-integer, Rest/binary>>) ->
incoming_packet(none, <<Left:32/big-integer, Rest/binary>>) ->
incoming_packet({partial, {Left, []}}, Rest);

incoming_packet({partial, Data}, <<More/binary>>) when is_binary(Data) ->
incoming_packet(none, <<Data/binary, More/binary>>);

incoming_packet(none, Packet) when byte_size(Packet) < 4 ->
{partial, Packet};

incoming_packet({partial, Data}, <<More/binary>>) when is_binary(Data) ->
incoming_packet(none, <<Data/binary, More/binary>>);

incoming_packet({partial, {Left, IOL}}, Packet)
when byte_size(Packet) >= Left, is_integer(Left) ->
<<Data:Left/binary, Rest/binary>> = Packet,
Expand Down
24 changes: 8 additions & 16 deletions src/etorrent_query.erl
Expand Up @@ -26,25 +26,18 @@ peer_list() ->
merge_peer_states(AllPeers, PeerState).

merge_peer_states(PeerList, StateList) ->
merge_by(lists:sort(
fun(P1, P2) ->
proplists:get_value(pid, P1) =< proplists:get_value(pid, P2)
end,
PeerList),
lists:sort(
fun(SL1, SL2) ->
proplists:get_value(pid, SL1) =< proplists:get_value(pid, SL2)
end,
StateList),
fun(Item1, Item2) ->
{E1, E2} = {proplists:get_value(pid, Item1), proplists:get_value(pid, Item2)},
Pid2PeerList = pid_pairs(PeerList),
Pid2StateList = pid_pairs(StateList),
merge_by(lists:keysort(1, Pid2PeerList),
lists:keysort(1, Pid2StateList),
fun({E1, _Item1}, {E2, _Item2}) ->
if
E1 == E2 -> equal;
E1 =< E2 -> less;
E1 >= E2 -> greater
end
end,
fun (I1, I2) ->
fun ({_E1, I1}, {_E2, I2}) ->
Merged = lists:umerge(I1, I2),
{B1, B2, B3, B4} = proplists:get_value(ip, Merged),

Expand All @@ -68,6 +61,5 @@ merge_by([I1 | R1], [I2 | R2], CompareFun, MergeFun) ->
[MergeFun(I1, I2) | merge_by(R1, R2, CompareFun, MergeFun)]
end.




pid_pairs(PLs) ->
[{proplists:get_value(pid, PL), PL} || PL <- PLs].
10 changes: 5 additions & 5 deletions src/etorrent_rate.erl
Expand Up @@ -62,12 +62,11 @@ update(#peer_rate {rate = Rate,
total = Total + Amount,
%% We expect the next data-block at the minimum of 5 secs or
%% when Amount bytes has been fetched at the current rate.
next_expected =
T + lists:min([5, Amount / lists:max([R, 0.0001])]),
next_expected = T + min(5, Amount / max(R, 0.0001)),
last = T,
%% RateSince is manipulated so it does not go beyond
%% ?MAX_RATE_PERIOD
rate_since = lists:max([RateSince, T - ?MAX_RATE_PERIOD])}
rate_since = max(RateSince, T - ?MAX_RATE_PERIOD)}
end.

%% @doc Calculate estimated time of arrival.
Expand Down Expand Up @@ -100,5 +99,6 @@ format_eta(Left, DownloadRate) ->
% @end
-spec now_secs() -> integer().
now_secs() ->
calendar:datetime_to_gregorian_seconds(
calendar:local_time()).
{Mega, Sec, _Micro} = os:timestamp(),
Mega * 1000000 + Sec.
% calendar:datetime_to_gregorian_seconds(calendar:local_time()).

0 comments on commit 74ae4bb

Please sign in to comment.