Permalink
Browse files

add tracking of active transfers to web interface, with stats

  • Loading branch information...
1 parent 24e28f3 commit 2ce2f04e3c24339350ac0b3497390d69cde2ccb1 @RJ committed Jan 8, 2010
View
32 playdar_modules/playdar-tcp/src/playdartcp_router.erl
@@ -7,13 +7,13 @@
connect/2, connect/3, peers/0, bytes/0, broadcast/1, broadcast/2, broadcast/3,
seen_qid/1, disconnect/1, sanitize_msg/1,
register_transfer/2, consume_transfer/1,
- stream_started/2, stream_ended/1, streams/0
+ stream_started/3, stream_ended/1, streams/0
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
--record(state, {listener, conns, seenqids, piddb, namedb}).
+-record(state, {listener, conns, seenqids, piddb, namedb, streamsdb}).
start_link(Port) -> gen_server:start({local, ?MODULE}, ?MODULE, [Port], []).
@@ -39,9 +39,12 @@ seen_qid(Qid) -> gen_server:cast(?MODULE, {seen_qid, Qid}).
register_transfer(Key, Pid) -> gen_server:cast(?MODULE, {register_transfer, Key, Pid}).
consume_transfer(Key) -> gen_server:call(?MODULE, {consume_transfer, Key}).
-stream_started(Pid, Sid) -> gen_server:cast(?MODULE, {stream_started, Pid, Sid}).
-stream_ended(Pid) -> gen_server:cast(?MODULE, {stream_ended, Pid}).
-streams() -> gen_server:call(?MODULE, {streams}).
+stream_started(Pid, Sid, Props) ->
+ gen_server:cast(?MODULE, {stream_started, Pid, Sid, Props}).
+stream_ended(Pid) ->
+ gen_server:cast(?MODULE, {stream_ended, Pid}).
+streams() ->
+ gen_server:call(?MODULE, {streams}).
%% ====================================================================
%% Server functions
@@ -61,7 +64,8 @@ init([Port]) ->
seenqids=ets:new(seenqids,[]),
conns=[],
piddb=ets:new(piddb,[]),
- namedb=ets:new(namedb,[])
+ namedb=ets:new(namedb,[]),
+ streamsdb=ets:new(streamsdb,[])
}}.
handle_call({disconnect, Name}, _From, State) ->
@@ -96,9 +100,9 @@ handle_call(bytes, _From, State) ->
handle_call({consume_transfer, Key}, _From, State) -> {reply, erlang:erase({transfer, Key}), State};
handle_call({streams}, _From, State) ->
- Streams = [ {Pid, Sid, Localtime, Nowtime}
- || {{stream, Pid}, {Sid, Localtime, Nowtime}} <- erlang:get() ],
- {reply, Streams, State};
+ R = [ {Pid, Sid, Props} ||
+ {Pid, {Sid, Props}} <- ets:tab2list(State#state.streamsdb) ],
+ {reply, R, State};
handle_call({register_connection, Pid, Name, Sharing = {WeShare, TheyShare}}, _From, State) ->
% TODO we should probably kick the old conn with this name
@@ -120,13 +124,13 @@ handle_call({register_connection, Pid, Name, Sharing = {WeShare, TheyShare}}, _F
%%
-handle_cast({stream_started, Pid, Sid}, State) ->
+handle_cast({stream_started, Pid, Sid, Props}, State) ->
link(Pid),
- erlang:put({stream, Pid}, {Sid, erlang:localtime(), erlang:now()}),
+ ets:insert(State#state.streamsdb, {Pid, {Sid, Props}}),
{noreply, State};
-handle_cast({stream_ended, Pid}, State) ->
- erlang:erase({stream, Pid}),
+handle_cast({stream_ended, Pid}, State) ->
+ ets:delete(State#state.streamsdb, Pid),
{noreply, State};
handle_cast({register_transfer, Key, Pid}, State) ->
@@ -201,7 +205,7 @@ handle_info(calculate_bandwidth_secs, State) ->
handle_info({'EXIT', Pid, Reason}, State) ->
?LOG(info, "Caught exit of ~p because ~p", [Pid, Reason]),
- _Streamrm = erlang:erase({stream, Pid}),
+ ets:delete(State#state.streamsdb, Pid), % might be a stream process
case ets:lookup(State#state.piddb, Pid) of
[{_, Name, _Bw, _Sharing}] ->
?LOG(info, "Removing user from registered cons: ~p", [Name]),
View
15 playdar_modules/playdar-tcp/src/playdartcp_stream.erl
@@ -1,4 +1,6 @@
% manages tcp connection used for transfering files
+% TODO push all state into the registry held by the router, inc the Sock
+% so stats can be extracted outside this process (it's spammed by data)
-module(playdartcp_stream).
-behaviour(gen_server).
-include("playdar.hrl").
@@ -170,8 +172,13 @@ handle_packet({sending, Ref, Sid}, State = #state{current=setup, mode=Mode, sock
_ -> nothing
end,
?LOG(info, "current -> receive_stream", []),
- playdartcp_router:stream_started(Pid, Sid),
State1 = lookup_track(Sid, State),
+ playdartcp_router:stream_started(self(), Sid,
+ [{now, State#state.start_now},
+ {localtime, State#state.start_localtime},
+ {track, State1#state.track},
+ {mode, receive_stream},
+ {sock, Sock}]),
{noreply, State1#state{current=receive_stream,ref=Ref, sid=Sid, pid=Pid}};
unknown ->
@@ -200,6 +207,12 @@ handle_packet({requesting, Ref, Sid}, State = #state{current=setup, mode=Mode, s
Sfun ->
Sfun(),
State1 = lookup_track(Sid, State),
+ playdartcp_router:stream_started(self(), Sid,
+ [{now, State#state.start_now},
+ {localtime, State#state.start_localtime},
+ {track, State1#state.track},
+ {mode, send_stream},
+ {sock, Sock}]),
{noreply, State1#state{current=send_stream, ref=Ref, sid=Sid}}
end;
View
49 playdar_modules/playdar-tcp/src/playdartcp_web.erl
@@ -23,7 +23,10 @@ http_req(Req, DocRoot) ->
index(Path, Req, DocRoot) ->
case Path of
"" ->
+ Streams = get_streams(),
+ ?LOG(info, "~p", [Streams]),
Vars = [{ftok, playdar_auth:gen_formtoken()},
+ {streams, Streams},
{peers,
[ begin
ShareWe = case WeShare of true -> "yes"; _ -> "no" end,
@@ -44,4 +47,48 @@ index(Path, Req, DocRoot) ->
_ ->
Req:not_found()
- end.
+ end.
+
+get_streams() ->
+ S = playdartcp_router:streams(),
+% ?LOG(info, "Streams: ~p", [S]),
+ S2 = [ begin
+ Sock = proplists:get_value(sock, L),
+ Current = proplists:get_value(mode, L),
+ case inet:getstat(Sock) of
+ {ok,BW} when is_list(BW) ->
+ % elapsed transfer time in secs:
+ Tdiff = timer:now_diff(erlang:now(), proplists:get_value(now, L))/1000000,
+
+ case Current of
+ receive_stream ->
+ B = proplists:get_value(recv_oct, BW),
+ A = ((B*8)/Tdiff)/1024, % kbps
+ [{transferred, B},
+ {kbps, erlang:round(A)}];
+ send_stream ->
+ B = proplists:get_value(send_oct, BW),
+ A = ((B*8)/Tdiff)/1024, % kbps
+ [{transferred, B},
+ {kbps, erlang:round(A)}]
+ end;
+ {error,_} ->
+ [{transferred, -1},{kbps,-1}]
+ end
+ ++
+ [ {mode, atom_to_list(Current)},
+ {sid, Sid} ]
+ ++
+ case proplists:get_value(track, L) of
+ undefined -> [{artist,""},{album,""},{track,""}];
+ {struct, Trk} ->
+ [ {artist,proplists:get_value(<<"artist">>,Trk,"")},
+ {track, proplists:get_value(<<"track">>,Trk,"")},
+ {album, proplists:get_value(<<"album">>,Trk,"")}
+ ]
+ end
+ end
+ || {_Pid, Sid, L} <- S ],
+ S2.
+
+
View
30 priv/www/playdartcp/index.html
@@ -1,6 +1,9 @@
{% extends "../base.tpl" %}
{% block content %}
-<h3>Peers</h3>
+<h3>Peers ({{peers|length}})</h3>
+{% if not peers %}
+You can connect to peers using: <pre>playdarctl connect &lt;host&gt; &lt;port&gt; &lt;true|false&gt;</pre>
+{% else %}
<table style="width:100%">
<tr style="font-weight:bold">
<td>Name</td>
@@ -24,4 +27,29 @@
</tr>
{% endfor %}
</table>
+{% endif %}
+
+<h3>Streams ({{streams|length}})</h3>
+{% if not streams %}
+<i>No active p2p transfers</i>
+{% else %}
+<table style="width:100%">
+<tr style="font-weight:bold">
+<td>Track</td>
+<td>Mode</td>
+<td>Bytes Transferred</td>
+<td>Avg Speed (Kbps)</td>
+</tr>
+{% for s in streams %}
+<tr style="background-color: {% cycle white,lightyellow %}">
+<td>{{s.artist|force_escape}} - {{s.track|force_escape}} &nbsp; <i>{{s.album|force_escape}}</i></td>
+<td>{{s.mode}}</td>
+<td>{{s.transferred}}</td>
+<td>{{s.kbps}}</td>
+</tr>
+{% endfor %}
+</table>
+{% endif %}
+
+
{% endblock %}

0 comments on commit 2ce2f04

Please sign in to comment.