Skip to content
Browse files

Add suspend/resume for the torrent table.

  • Loading branch information...
1 parent 6e1640b commit c7ebdf8b262cdf2458c95b85847c08aff6c50330 @arcusfelis committed
Showing with 238 additions and 167 deletions.
  1. +226 −161 src/cascadae_hub.erl
  2. +12 −6 src/cascadae_session.erl
View
387 src/cascadae_hub.erl
@@ -8,6 +8,8 @@
-export([start_link/0]).
-export([all_torrents/0,
add_handler/0,
+ resume_handler/1,
+ suspend_handler/0,
fire_event/1]).
%% ------------------------------------------------------------------
@@ -38,36 +40,36 @@
-type json_pl() :: [{atom(), term()}].
-record(torrent, {
- 'id' :: torrent_id(),
- 'wanted' :: non_neg_integer(),
- 'left' :: non_neg_integer(),
- 'leechers' :: non_neg_integer(),
- 'seeders' :: non_neg_integer(),
- 'all_time_downloaded' :: non_neg_integer(),
- 'all_time_uploaded' :: non_neg_integer(),
- 'downloaded' :: non_neg_integer(),
- 'uploaded' :: non_neg_integer(),
- 'state' :: atom(),
-
- 'speed_in' = 0 :: non_neg_integer(),
- 'speed_out' = 0 :: non_neg_integer()
+ id :: torrent_id(),
+ wanted :: non_neg_integer(),
+ left :: non_neg_integer(),
+ leechers :: non_neg_integer(),
+ seeders :: non_neg_integer(),
+ all_time_downloaded :: non_neg_integer(),
+ all_time_uploaded :: non_neg_integer(),
+ downloaded :: non_neg_integer(),
+ uploaded :: non_neg_integer(),
+ state :: atom(),
+
+ speed_in = 0 :: non_neg_integer(),
+ speed_out = 0 :: non_neg_integer()
}).
-type torrent_diff() :: [
- {'id', torrent_id()}
- | {'left', integer()}
- | {'leechers', integer()}
- | {'seeders', integer()}
+ {id, torrent_id()}
+ | {left, integer()}
+ | {leechers, integer()}
+ | {seeders, integer()}
| {atom(), term()}
].
-record(diff_acc, {
- 'diff'=[] :: [torrent_diff()],
- 'added'=[] :: [torrent_id()],
- 'deleted'=[] :: [torrent_id()]
+ diff=[] :: [torrent_diff()],
+ added=[] :: [torrent_id()],
+ deleted=[] :: [torrent_id()]
}).
@@ -106,12 +108,12 @@ init([Timeout]) ->
% Nobody cares about this event.
-await({'log_event', _Mess}, SD) ->
+await({log_event, _Mess}, SD) ->
{next_state, await, SD}.
% Initialization after "sleeping mode".
-await('add_handler', {Pid, _Tag}, SD=#state{tick=Timeout}) ->
+await(add_handler, {Pid, _Tag}, SD=#state{tick=Timeout}) ->
lager:info("Add the first client on ~w.", [Pid]),
% Registration of the client
@@ -122,13 +124,34 @@ await('add_handler', {Pid, _Tag}, SD=#state{tick=Timeout}) ->
% Collect data
PLs = query_torrent_list(),
- UnsortedNewTorrents = lists:map(
- fun to_record/1,
- PLs),
+ UnsortedNewTorrents = lists:map(fun to_record/1, PLs),
NewTorrents = sort_records(UnsortedNewTorrents),
% Run timer
- TRef = gen_fsm:send_event_after(Timeout, 'update'),
+ TRef = gen_fsm:send_event_after(Timeout, update),
+ SD1 = SD#state{timer=TRef,
+ handlers=[Pid],
+ torrents=NewTorrents},
+
+ {reply, ok, active, SD1};
+await({resume_handler, OldTorrents},
+ {Pid, _Tag}, SD=#state{tick=Timeout}) ->
+ lager:info("Add the first client on ~w.", [Pid]),
+
+ % Registration of the client
+ erlang:monitor(process, Pid),
+
+ % Subscribe on new events.
+ cascadae_event:add(),
+
+ % Collect data
+ PLs = query_torrent_list(),
+ UnsortedNewTorrents = lists:map(fun to_record/1, PLs),
+ NewTorrents = sort_records(UnsortedNewTorrents),
+ send_diff_records([Pid], OldTorrents, NewTorrents, PLs),
+
+ % Run timer
+ TRef = gen_fsm:send_event_after(Timeout, update),
SD1 = SD#state{timer=TRef,
handlers=[Pid],
torrents=NewTorrents},
@@ -136,9 +159,8 @@ await('add_handler', {Pid, _Tag}, SD=#state{tick=Timeout}) ->
{reply, ok, active, SD1}.
-active({'log_event', Mess}, SD=#state{
- torrents=OldTorrents,
- handlers=Handlers}) ->
+%% TODO: move it from here
+active({log_event, Mess}, SD=#state{handlers=Handlers}) ->
PL = event_to_json(Mess),
@@ -149,67 +171,26 @@ active({'log_event', Mess}, SD=#state{
{next_state, active, SD};
% It is called because gen_fsm:send_event_after.
-active('update', SD=#state{
+active(update, SD=#state{
torrents=OldTorrents,
handlers=Handlers,
tick=Timeout}) ->
% proplists from etorrent.
PLs = query_torrent_list(),
- UnsortedNewTorrents = lists:map(
- fun to_record/1,
- PLs),
+ UnsortedNewTorrents = lists:map(fun to_record/1, PLs),
NewTorrents = sort_records(UnsortedNewTorrents),
NewTorrents2 = calc_speed_records(OldTorrents, NewTorrents, Timeout),
-
- #diff_acc{
- diff=Diff,
- added=Added,
- deleted=Deleted} = diff_records(OldTorrents, NewTorrents2),
-
- SendFn = fun(Mess) ->
- lists:map(fun(H) ->
- ?HANDLER_MODULE:send(H, Mess)
- end, Handlers)
- end,
-
- case Diff of
- [] -> 'skip';
- _ ->
- SendFn({torrents, {'diff_list', Diff}})
- end,
-
- case Added of
- [] -> 'skip';
- _ ->
- Fn = form_json_proplist_fn(),
-
- % Convert the list of ids to list of JSON.
- AddedJSON = lists:map(fun(Id) ->
- Fn2 = fun(X) ->
- lists:member({'id', Id}, X)
- end,
- PL = search(PLs, Fn2),
- Fn(PL)
- end, Added),
-
- SendFn({torrents, {'add_list', AddedJSON}})
- end,
-
- case Deleted of
- [] -> 'skip';
- _ ->
- SendFn({torrents, {'delete_list', Deleted}})
- end,
+ send_diff_records(Handlers, OldTorrents, NewTorrents2, PLs),
% Run timer again
- TRef = gen_fsm:send_event_after(Timeout, 'update'),
+ TRef = gen_fsm:send_event_after(Timeout, update),
- {next_state, active, SD#state{torrents=NewTorrents2, timer=TRef}}.
+ {next_state, active, SD#state{torrents=NewTorrents, timer=TRef}}.
% Another client was connected.
-active('add_handler', {Pid, _Tag}, SD=#state{handlers=Hs}) ->
+active(add_handler, {Pid, _Tag}, SD=#state{handlers=Hs}) ->
lager:info("Add the client on ~w.", [Pid]),
case lists:member(Pid, Hs) of
true ->
@@ -220,29 +201,31 @@ active('add_handler', {Pid, _Tag}, SD=#state{handlers=Hs}) ->
erlang:monitor(process, Pid),
SD1 = SD#state{handlers=[Pid|Hs]},
{reply, ok, active, SD1}
+ end;
+active(suspend_handler, {Pid, _Tag}, SD=#state{torrents=Torrents}) ->
+ case remove_handler(Pid, SD) of
+ {next_state, NS, ND} ->
+ {reply, Torrents, NS, ND}
+ end;
+active({resume_handler, OldTorrents}, {Pid, _Tag},
+ SD=#state{torrents=NewTorrents, handlers=Hs}) ->
+ lager:info("Resume the client from ~w.", [Pid]),
+ case lists:member(Pid, Hs) of
+ true ->
+ lager:error("Cannot add a handler twice for the same process ~p.",
+ [Pid]),
+ {reply, ok, active, SD};
+ false ->
+ % Registration of the client
+ erlang:monitor(process, Pid),
+ send_diff_records([Pid], OldTorrents, NewTorrents, undefined),
+ SD1 = SD#state{handlers=[Pid|Hs]},
+ {reply, ok, active, SD1}
end.
-
-handle_info({'DOWN', _Ref, process, Pid, _Reason}=_Info,
- 'active'=_SN,
- SD=#state{handlers=Hs, timer=TRef}) ->
-
- case Hs -- [Pid] of
- [] ->
- cascadae_event:delete(),
-
- % Go to the sleeping mode.
- gen_fsm:cancel_timer(TRef),
- SD1 = SD#state{handlers=[], torrents=undefined},
- lager:info("Delete the last client on ~w.", [Pid]),
- {next_state, 'await', SD1};
-
- NHs ->
- % Just continue.
- lager:info("Delete the client on ~w.", [Pid]),
- {next_state, 'active', SD#state{handlers=NHs}}
- end.
+handle_info({'DOWN', _Ref, process, Pid, _Reason}=_Info, active, SD) ->
+ remove_handler(Pid, SD).
terminate(_Reason, _SN, _SD) ->
@@ -261,14 +244,22 @@ all_torrents() ->
%% @doc Subscribes this process on new messages.
%% It is used in bullet handler.
-%% You can also run this into your console and use `flush()'.
+%% You can also run this into your console and use `flush().
+-spec add_handler() -> SavedState :: term().
add_handler() ->
- gen_fsm:sync_send_event(?SERVER, 'add_handler').
+ gen_fsm:sync_send_event(?SERVER, add_handler).
+
+-spec resume_handler(SavedState :: term()) -> ok.
+resume_handler(SavedState) ->
+ gen_fsm:sync_send_event(?SERVER, {resume_handler, SavedState}).
-%% @doc Sends message from `etorrent_event'.
+suspend_handler() ->
+ gen_fsm:sync_send_event(?SERVER, suspend_handler).
+
+%% @doc Sends message from `etorrent_event.
%% This function is called by cascadae_event.
fire_event(Mess) ->
- gen_fsm:send_event(?SERVER, {'log_event', Mess}).
+ gen_fsm:send_event(?SERVER, {log_event, Mess}).
%% ------------------------------------------------------------------
@@ -293,38 +284,38 @@ query_torrent_list() ->
form_json_proplist_fn() ->
fun(X) ->
%% Data from etorrent_torrent
- Id = proplists:get_value('id', X),
+ Id = proplists:get_value(id, X),
%% Data from tracking_map (etorrent_table)
PL = get_torrent(Id),
- Name = proplists:get_value('filename', PL),
- Hash = case proplists:get_value('info_hash', PL) of
+ Name = proplists:get_value(filename, PL),
+ Hash = case proplists:get_value(info_hash, PL) of
unknown -> <<>>;
<<InfoHashInt:160>> -> integer_hash_to_literal(InfoHashInt)
end,
- IsOnline = case proplists:get_value('state', PL) of
- 'started' -> true;
+ IsOnline = case proplists:get_value(state, PL) of
+ started -> true;
_ -> false
end,
- [{'id', Id}
- ,{'name', list_to_binary(Name)}
- ,{'display_name', proplists:get_value('display_name', X)}
- ,{'total', proplists:get_value('total', X)}
- ,{'wanted', proplists:get_value('wanted', X)}
- ,{'left', proplists:get_value('left', X)}
- ,{'online', IsOnline}
- ,{'leechers', proplists:get_value('leechers', X)}
- ,{'seeders', proplists:get_value('seeders', X)}
- ,{'state', atom_to_binary(proplists:get_value('state', X))}
- ,{'downloaded', proplists:get_value('downloaded', X)}
- ,{'uploaded', proplists:get_value('uploaded', X)}
- ,{'all_time_downloaded',
- proplists:get_value('all_time_downloaded', X)}
- ,{'all_time_uploaded',
- proplists:get_value('all_time_uploaded', X)}
- ,{'pid', to_binary(etorrent_torrent_ctl:lookup_server(Id))}
- ,{'info_hash', Hash}
+ [{id, Id}
+ ,{name, list_to_binary(Name)}
+ ,{display_name, proplists:get_value(display_name, X)}
+ ,{total, proplists:get_value(total, X)}
+ ,{wanted, proplists:get_value(wanted, X)}
+ ,{left, proplists:get_value(left, X)}
+ ,{online, IsOnline}
+ ,{leechers, proplists:get_value(leechers, X)}
+ ,{seeders, proplists:get_value(seeders, X)}
+ ,{state, atom_to_binary(proplists:get_value(state, X))}
+ ,{downloaded, proplists:get_value(downloaded, X)}
+ ,{uploaded, proplists:get_value(uploaded, X)}
+ ,{all_time_downloaded,
+ proplists:get_value(all_time_downloaded, X)}
+ ,{all_time_uploaded,
+ proplists:get_value(all_time_uploaded, X)}
+ ,{pid, to_binary(etorrent_torrent_ctl:lookup_server(Id))}
+ ,{info_hash, Hash}
]
end.
@@ -337,16 +328,16 @@ get_torrent(Id) ->
-spec to_record(etorrent_pl()) -> #torrent{}.
to_record(X) ->
#torrent{
- id = proplists:get_value('id', X),
- left = proplists:get_value('left', X),
- wanted = proplists:get_value('wanted', X),
- leechers = proplists:get_value('leechers', X),
- seeders = proplists:get_value('seeders', X),
- state = proplists:get_value('state', X),
- downloaded = proplists:get_value('downloaded', X),
- uploaded = proplists:get_value('uploaded', X),
- all_time_downloaded = proplists:get_value('all_time_downloaded', X),
- all_time_uploaded = proplists:get_value('all_time_uploaded', X)
+ id = proplists:get_value(id, X),
+ left = proplists:get_value(left, X),
+ wanted = proplists:get_value(wanted, X),
+ leechers = proplists:get_value(leechers, X),
+ seeders = proplists:get_value(seeders, X),
+ state = proplists:get_value(state, X),
+ downloaded = proplists:get_value(downloaded, X),
+ uploaded = proplists:get_value(uploaded, X),
+ all_time_downloaded = proplists:get_value(all_time_downloaded, X),
+ all_time_uploaded = proplists:get_value(all_time_uploaded, X)
}.
@@ -474,55 +465,55 @@ diff_element(Old=#torrent{left=ORem, leechers=OLs, seeders=OSs},
UnfilteredList =
[case NRem of
- ORem -> 'not_modified';
- X -> {'left', X}
+ ORem -> not_modified;
+ X -> {left, X}
end
,case NW of
- OW -> 'not_modified';
- X -> {'wanted', X}
+ OW -> not_modified;
+ X -> {wanted, X}
end
,case NLs of
- OLs -> 'not_modified';
- X -> {'leechers', X}
+ OLs -> not_modified;
+ X -> {leechers, X}
end
,case NSs of
- OSs -> 'not_modified';
- X -> {'seeders', X}
+ OSs -> not_modified;
+ X -> {seeders, X}
end
,case ND of
- OD -> 'not_modified';
- X -> {'downloaded', X}
+ OD -> not_modified;
+ X -> {downloaded, X}
end
,case NU of
- OU -> 'not_modified';
- X -> {'uploaded', X}
+ OU -> not_modified;
+ X -> {uploaded, X}
end
,case NATD of
- OATD -> 'not_modified';
- X -> {'all_time_downloaded', X}
+ OATD -> not_modified;
+ X -> {all_time_downloaded, X}
end
,case NATU of
- OATU -> 'not_modified';
- X -> {'all_time_uploaded', X}
+ OATU -> not_modified;
+ X -> {all_time_uploaded, X}
end
,case NS of
- OS -> 'not_modified';
- X -> {'state', atom_to_binary(X)}
+ OS -> not_modified;
+ X -> {state, atom_to_binary(X)}
end
,case NSO of
- OSO -> 'not_modified';
- X -> {'speed_out', X} % Byte per second
+ OSO -> not_modified;
+ X -> {speed_out, X} % Byte per second
end
,case NSI of
- OSI -> 'not_modified';
- X -> {'speed_in', X}
+ OSI -> not_modified;
+ X -> {speed_in, X}
end
],
% Filtering and adding id
- case [Y || Y <- UnfilteredList, Y =/= 'not_modified'] of
+ case [Y || Y <- UnfilteredList, Y =/= not_modified] of
[] -> false;
- FilteredList -> [{'id', New#torrent.id} | FilteredList]
+ FilteredList -> [{id, New#torrent.id} | FilteredList]
end.
@@ -545,14 +536,14 @@ search([], _F) ->
event_to_json({Name, Id})
- when Name =:= 'checking_torrent';
- Name =:= 'started_torrent';
- Name =:= 'stopped_torrent' ->
+ when Name =:= checking_torrent;
+ Name =:= started_torrent;
+ Name =:= stopped_torrent ->
[{name, atom_to_binary(Name)}
,{torrent_id, Id}];
event_to_json({Name, Id, Message})
- when Name =:= 'tracker_error' ->
+ when Name =:= tracker_error ->
[{name, atom_to_binary(Name)}
,{torrent_id, Id}
,{message, list_to_binary(Message)}];
@@ -581,3 +572,77 @@ sort_records_test_() ->
integer_hash_to_literal(InfoHashInt) when is_integer(InfoHashInt) ->
iolist_to_binary(io_lib:format("~40.16.0B", [InfoHashInt])).
+
+
+remove_handler(Pid, SD=#state{handlers=Hs, timer=TRef}) ->
+ case Hs -- [Pid] of
+ [] ->
+ cascadae_event:delete(),
+
+ % Go to the sleeping mode.
+ gen_fsm:cancel_timer(TRef),
+ SD1 = SD#state{handlers=[], torrents=undefined},
+ lager:info("Delete the last client on ~w.", [Pid]),
+ {next_state, await, SD1};
+
+ NHs ->
+ % Just continue.
+ lager:info("Delete the client on ~w.", [Pid]),
+ {next_state, active, SD#state{handlers=NHs}}
+ end.
+
+
+send_diff_records(Handlers, OldTorrents, NewTorrents, PLs) ->
+
+ #diff_acc{
+ diff=Diff,
+ added=Added,
+ deleted=Deleted} = diff_records(OldTorrents, NewTorrents),
+
+ SendFn = fun(Mess) ->
+ lists:map(fun(H) ->
+ ?HANDLER_MODULE:send(H, Mess)
+ end, Handlers)
+ end,
+
+ case Diff of
+ [] -> skip;
+ _ ->
+ SendFn({torrents, {diff_list, Diff}})
+ end,
+
+ case Added of
+ [] -> skip;
+ _ when is_list(PLs) ->
+ Fn = form_json_proplist_fn(),
+
+ % Convert the list of ids to list of JSON.
+ AddedJSON = lists:map(fun(Id) ->
+ Fn2 = fun(X) -> lists:member({id, Id}, X) end,
+ PL = search(PLs, Fn2),
+ Fn(PL)
+ end, Added),
+
+ SendFn({torrents, {add_list, AddedJSON}});
+ _ when PLs =:= undefined ->
+ PLs1 = query_torrent_list(),
+ Fn = form_json_proplist_fn(),
+
+ % Convert the list of ids to list of JSON.
+ AddedJSON = lists:flatmap(fun(Id) ->
+ Fn2 = fun(X) -> lists:member({id, Id}, X) end,
+ %% If the torrent with ID not found, PL = false,
+ %% and result list is empty.
+ [Fn(PL) || is_list(PL = search(PLs1, Fn2))]
+ end, Added),
+
+ SendFn({torrents, {add_list, AddedJSON}})
+ end,
+
+ case Deleted of
+ [] -> skip;
+ _ ->
+ SendFn({torrents, {delete_list, Deleted}})
+ end,
+
+ ok.
View
18 src/cascadae_session.erl
@@ -18,7 +18,8 @@
is_peers_active = true,
is_peers_visible,
is_page_visible,
- session_timeout_tref
+ session_timeout_tref,
+ saved_hub_state
}).
-define(OBJ_TBL, cascadae_object_register).
%% The socketio_session has same timeout already, but this timeout is
@@ -66,15 +67,17 @@ recv(_, _, {event, <<>>, <<"deactivated">>, Meta}=Mess, State) ->
[<<"cascadae">>,<<"peers">>,<<"Table">>] when is_pid(PeersPid) ->
{ok, check_peer_visibility(State#sess_state{is_peers_visible=false})};
[<<"cascadae">>,<<"Container">>] ->
- {ok, check_peer_visibility(
- check_file_visibility(State#sess_state{is_page_visible=false}))};
+ SavedHubState = cascadae_hub:suspend_handler(),
+ State2 = State#sess_state{is_page_visible=false,
+ saved_hub_state=SavedHubState},
+ {ok, check_peer_visibility(check_file_visibility(State2))};
_ ->
lager:debug("Ignore ~p.", [Mess]),
{ok, State}
end;
recv(SPid, _, {event, <<>>, <<"activated">>, Meta}=Mess, State) ->
- #sess_state{files_pid=FilesPid} = State,
+ #sess_state{files_pid=FilesPid, saved_hub_state=SavedHubState} = State,
Hash = proplists:get_value(<<"hash">>, Meta),
assert_hash(Hash),
#object{path=Path} = extract_object(Hash, State),
@@ -96,8 +99,11 @@ recv(SPid, _, {event, <<>>, <<"activated">>, Meta}=Mess, State) ->
{ok, check_peer_visibility(State#sess_state{is_peers_visible=true,
peers_pid=PeersPid})};
[<<"cascadae">>,<<"Container">>] ->
- {ok, check_peer_visibility(
- check_file_visibility(State#sess_state{is_page_visible=true}))};
+ [cascadae_hub:resume_handler(SavedHubState)
+ || SavedHubState =/= undefined],
+ State2 = State#sess_state{is_page_visible=true,
+ saved_hub_state=undefined},
+ {ok, check_peer_visibility(check_file_visibility(State2))};
_ ->
lager:debug("Ignore ~p.", [Mess]),
{ok, State}

0 comments on commit c7ebdf8

Please sign in to comment.
Something went wrong with that request. Please try again.