Permalink
Browse files

Add dynamic updates for peer table.

  • Loading branch information...
1 parent 2776a96 commit 4d7275d086a38a0c3bd8a6463911332da8a85922 @arcusfelis committed Mar 15, 2013
Showing with 125 additions and 136 deletions.
  1. +125 −136 src/cascadae_peers.erl
View
@@ -3,6 +3,7 @@
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-record(peers_state, {
+ peers = dict:new(), %% dict(Pid => #peer{})
session_pid,
session_tag,
torrent_ids,
@@ -17,10 +18,6 @@
}).
-record(peer, {
- %% static fields
- pid,
- torrent_id,
-
%% Dynamic
%% From table
state,
@@ -114,114 +111,6 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal Function Definitions
%% ------------------------------------------------------------------
-
-
-
-
-
-
-
-
-%% etorrent_peer_states:get_send_rate(Id, Pid)
-%% etorrent_peer_states:get_recv_rate(Id, Pid)
-
-
-all_peers() ->
-%{pid, Pid}, {ip, IP}, {port, Port},
-%{torrent_id, TorrentId},
-%{state, State}
- UnsortedPLs = etorrent_table:all_peers(),
- SortedPLs = sort_proplists(UnsortedPLs),
-
-%{pid, Pid},
-%{torrent_id, TorrentId},
-%{choke_state, Chokestate},
-%{interest_state, Intereststate},
-%{local_choke, Localchoke}
- UnsortedPLSs = etorrent_peer_states:all_peers(),
- SortedPLSs = sort_proplists(UnsortedPLSs),
-
- zip_list(SortedPLs, SortedPLSs, []).
-
-
-sort_proplists(PL) ->
- lists:sort(fun(X, Y) ->
- VX = proplists:get_value(pid, X),
- VY = proplists:get_value(pid, Y),
- VX < VY
- end, PL).
-
-
-%% @doc This is the realization of lists:zipwith/3 for incomplete lists.
-zip_list([PL|PLT], [PLS|PLST], Acc) ->
- Pid1 = proplists:get_value(pid, PL),
- Pid2 = proplists:get_value(pid, PLS),
-
- if
- Pid1 =:= Pid2 ->
- El = zip(PL, PLS),
- zip_list(PLT, PLST, [El|Acc]);
-
- Pid1 < Pid2 ->
- El = zip(PL, undefined),
- zip_list(PLT, [PLS|PLST], [El|Acc]);
-
- Pid1 > Pid2 ->
- El = zip(undefined, PLS),
- zip_list([PL|PLT], PLST, [El|Acc])
- end;
-
-zip_list([], [PLS|PLST], Acc) ->
- El = zip(undefined, PLS),
- zip_list([], PLST, [El|Acc]);
-
-zip_list([PL|PLT], [], Acc) ->
- El = zip(PL, undefined),
- zip_list(PLT, [], [El|Acc]);
-
-zip_list([], [], Acc) ->
- lists:reverse(Acc).
-
-
-
-zip(PL, undefined) ->
- Pid = proplists:get_value(pid, PL),
-
- [{id, to_binary(Pid)}
-
- ,{ip, render_ip(proplists:get_value(ip, PL))}
- ,{port, proplists:get_value(port, PL)}
- ,{torrent_id, proplists:get_value(torrent_id, PL)}
- ,{state, atom_to_binary(proplists:get_value(state, PL))}
- ];
-
-zip(undefined, PLS) ->
- Pid = proplists:get_value(pid, PLS),
-
- [{id, to_binary(Pid)}
-
- ,{choke_state, atom_to_binary(proplists:get_value(choke_state, PLS))}
- ,{interest_state, atom_to_binary(proplists:get_value(interest_state, PLS))}
- ,{local_choke, proplists:get_value(local_choke, PLS)} % bool
- ];
-
-zip(PL, PLS) ->
- Pid = proplists:get_value(pid, PL),
- Pid = proplists:get_value(pid, PLS),
-
- [{id, to_binary(Pid)}
-
- ,{ip, render_ip(proplists:get_value(ip, PL))}
- ,{port, proplists:get_value(port, PL)}
- ,{torrent_id, proplists:get_value(torrent_id, PL)}
- ,{state, atom_to_binary(proplists:get_value(state, PL))}
-
- ,{choke_state, atom_to_binary(proplists:get_value(choke_state, PLS))}
- ,{interest_state, atom_to_binary(proplists:get_value(interest_state, PLS))}
- ,{local_choke, proplists:get_value(local_choke, PLS)} % bool
- ].
-
-
render_ip({A,B,C,D}) ->
iolist_to_binary(io_lib:format("~w.~w.~w.~w", [A,B,C,D])).
@@ -234,16 +123,13 @@ atom_to_binary(X) -> atom_to_binary(X, utf8).
-
-
%% not active
set_torrent_list_int(NewTorrentIds, S=#peers_state{update_table_tref=undefined}) ->
lager:info("Update torrent list ~p => ~p while updates are inactive.",
[S#peers_state.viz_torrent_ids, NewTorrentIds]),
S#peers_state{viz_torrent_ids=NewTorrentIds};
set_torrent_list_int(NewTorrentIds, S=#peers_state{tid_pids=OldSTP,
- cl_tid_pids=OldClSTP,
- viz_tid_pids=OldVizSTP}) ->
+ cl_tid_pids=OldClSTP}) ->
self() ! update_table, %% TODO: can be optimized
lager:info("Update torrent list ~p => ~p.",
[S#peers_state.viz_torrent_ids, NewTorrentIds]),
@@ -259,13 +145,16 @@ cron_find_new(S=#peers_state{tid_pids=OldSTP}) ->
S1 = S#peers_state{tid_pids=STP},
S2 = viz_delete_peers(DeletedTP, S1),
S3 = viz_add_peers(AddedTP, S2),
- S3.
+ S4 = viz_diff_peers(S3),
+ S4.
%% Delete rows on the client side.
viz_delete_peers([], S=#peers_state{}) ->
S;
-viz_delete_peers(DeletedTP, S=#peers_state{cl_tid_pids=OldClSTP, viz_tid_pids=OldVizSTP}) ->
+viz_delete_peers(DeletedTP,
+ S=#peers_state{cl_tid_pids=OldClSTP, viz_tid_pids=OldVizSTP,
+ peers=Peers}) ->
lager:info("Deleted peers ~p.", [DeletedTP]),
NewClSTP = ordsets:subtract(OldClSTP, DeletedTP),
NewVizSTP = ordsets:subtract(OldVizSTP, DeletedTP),
@@ -275,9 +164,11 @@ viz_delete_peers(DeletedTP, S=#peers_state{cl_tid_pids=OldClSTP, viz_tid_pids=Ol
[_|_] ->
%% Push ClDeletedTP to the client.
lager:info("Delete ~p from the client peer table.", [ClDeletedTP]),
- PeerPids = [Pid || Pid <- ClDeletedTP],
+ PeerPids = [to_binary(Pid) || Pid <- ClDeletedTP],
push_to_client({delete_list, PeerPids}, S),
- S#peers_state{cl_tid_pids=NewClSTP, viz_tid_pids=NewVizSTP}
+ NewPeers = clean_peers(ClDeletedTP, Peers),
+ S#peers_state{cl_tid_pids=NewClSTP, viz_tid_pids=NewVizSTP,
+ peers=NewPeers}
end.
%% Add requested rows to the client
@@ -314,10 +205,12 @@ filter_visible_peers(AddedTP, VizTorrents) ->
orddict_with_set_intersection([{K,V}|Dict], [K|Set]) ->
[{K,V}|orddict_with_set_intersection(Dict, Set)];
-orddict_with_set_intersection([{DK,V}|Dict], [SK|Set]) when DK < SK ->
- orddict_with_set_intersection(Dict, [SK|Set]);
-orddict_with_set_intersection([{DK,V}|Dict], [_|Set]) ->
- orddict_with_set_intersection({DK,V}, Set);
+orddict_with_set_intersection([{DK,_V}|Dict], [SK|_]=Set) when DK < SK ->
+ %% Skip {DK,V}
+ orddict_with_set_intersection(Dict, Set);
+orddict_with_set_intersection([_|_]=Dict, [_|Set]) ->
+ %% Skip SK
+ orddict_with_set_intersection(Dict, Set);
orddict_with_set_intersection(_, _) ->
[].
@@ -372,7 +265,7 @@ peer_list([], S, Acc) ->
{lists:reverse(Acc), S}.
-get_peer(Tid, Pid, S) ->
+get_peer(Tid, Pid, S=#peers_state{peers=Peers}) ->
case etorrent_table:get_peer({pid, Pid}) of
{value, PL} ->
PL2 = case etorrent_peer_states:get_peer(Tid, Pid) of
@@ -383,27 +276,22 @@ get_peer(Tid, Pid, S) ->
PL2ChokeS = proplists:get_value(choke_state, PL2),
PL2InterS = proplists:get_value(interest_state, PL2),
PL2LocalC = proplists:get_value(local_choke, PL2),
- RecvRate = etorrent_peer_states:get_recv_rate(Tid, Pid),
- SendRate = etorrent_peer_states:get_send_rate(Tid, Pid),
+ RecvRate = as_int(etorrent_peer_states:get_recv_rate(Tid, Pid)),
+ SendRate = as_int(etorrent_peer_states:get_send_rate(Tid, Pid)),
ObjJSON =
[{torrent_id, Tid}
,{id, to_binary(Pid)}
,{ip, render_ip(proplists:get_value(ip, PL))}
,{port, proplists:get_value(port, PL)}
- ,{torrent_id, proplists:get_value(torrent_id, PL)}
,{state, atom_to_binary(PLState)}
,{choke_state, atom_to_binary(PL2ChokeS)}
,{interest_state, atom_to_binary(PL2InterS)}
,{local_choke, PL2LocalC} % bool
- ,{recv_rate, as_int(RecvRate)}
- ,{send_rate, as_int(SendRate)}
+ ,{recv_rate, RecvRate}
+ ,{send_rate, SendRate}
],
- #peer{
- %% static fields
- pid=Pid,
- torrent_id=Tid,
-
+ Peer = #peer{
%% Dynamic
%% From table
state=PLState,
@@ -414,7 +302,8 @@ get_peer(Tid, Pid, S) ->
recv_rate=RecvRate,
send_rate=SendRate
},
- {filter_undefined(ObjJSON), S};
+ NewPeers = dict:store(Pid, Peer, Peers),
+ {filter_undefined(ObjJSON), S#peers_state{peers=NewPeers}};
not_found -> {undefined, S}
end.
@@ -424,3 +313,103 @@ filter_undefined(ObjJSON) ->
as_int(X) when is_float(X) -> round(X);
as_int(_) -> undefined.
+
+
+clean_peers([TP|TPs], Peers) ->
+ clean_peers(TPs, dict:erase(TP, Peers));
+clean_peers([], Peers) ->
+ Peers.
+
+
+viz_diff_peers(S=#peers_state{viz_tid_pids=[]}) -> S;
+viz_diff_peers(S=#peers_state{viz_tid_pids=VizSTP, peers=Peers}) ->
+ {Diff, NewPeers} = diff_peers(VizSTP, Peers, []),
+ case Diff of
+ [] -> S;
+ [_|_] ->
+ lager:info("Diff ~p", [Diff]),
+ push_to_client({diff_list, Diff}, S),
+ S#peers_state{peers=NewPeers}
+ end.
+
+diff_peers([{Tid,Pid}|TPs], Peers, Diffs) ->
+ case dict:find(Pid, Peers) of
+ {ok, OldPeer} ->
+ case diff_peer(Tid, Pid, OldPeer) of
+ {changed, Diff, NewPeer} ->
+ diff_peers(TPs, dict:store(Pid, NewPeer, Peers),
+ [Diff|Diffs]);
+ same ->
+ diff_peers(TPs, Peers, Diffs)
+ end;
+
+ error ->
+ diff_peers(TPs, Peers, Diffs)
+ end;
+diff_peers([], Peers, Diffs) ->
+ {Diffs, Peers}.
+
+
+
+diff_peer(Tid, Pid, OldPeer) ->
+ case etorrent_table:get_peer({pid, Pid}) of
+ {value, PL} ->
+ PL2 = case etorrent_peer_states:get_peer(Tid, Pid) of
+ {value, PL2_} -> PL2_;
+ not_found -> []
+ end,
+ PLState = proplists:get_value(state, PL),
+ PL2ChokeS = proplists:get_value(choke_state, PL2),
+ PL2InterS = proplists:get_value(interest_state, PL2),
+ PL2LocalC = proplists:get_value(local_choke, PL2),
+ RecvRate = as_int(etorrent_peer_states:get_recv_rate(Tid, Pid)),
+ SendRate = as_int(etorrent_peer_states:get_send_rate(Tid, Pid)),
+ NewPeer = #peer{
+ %% Dynamic
+ %% From table
+ state=PLState,
+ %% From peer state
+ choke_state=PL2ChokeS,
+ interest_state=PL2InterS,
+ local_choke=PL2LocalC,
+ recv_rate=RecvRate,
+ send_rate=SendRate
+ },
+ case NewPeer of
+ OldPeer -> same;
+ _ ->
+ #peer{
+ %% Dynamic
+ %% From table
+ state=OldPLState,
+ %% From peer state
+ choke_state=OldPL2ChokeS,
+ interest_state=OldPL2InterS,
+ local_choke=OldPL2LocalC,
+ recv_rate=OldRecvRate,
+ send_rate=OldSendRate
+ } = OldPeer,
+ Diff = [{id, to_binary(Pid)}]
+ ++ if PLState =:= OldPLState -> [];
+ true -> [{state, atom_to_binary(PLState)}]
+ end
+ ++ if PL2ChokeS =:= OldPL2ChokeS -> [];
+ true -> [{choke_state, atom_to_binary(PL2ChokeS)}]
+ end
+ ++ if PL2InterS =:= OldPL2InterS -> [];
+ true -> [{interest_state, atom_to_binary(PL2InterS)}]
+ end
+ ++ if PL2LocalC =:= OldPL2LocalC -> [];
+ true -> [{local_choke, PL2LocalC}]
+ end
+ ++ if RecvRate =:= OldRecvRate -> [];
+ true -> [{recv_rate, RecvRate}]
+ end
+ ++ if SendRate =:= OldSendRate -> [];
+ true -> [{send_rate, SendRate}]
+ end,
+ {changed, Diff, NewPeer}
+ end;
+ not_found -> same
+ end.
+

0 comments on commit 4d7275d

Please sign in to comment.