Skip to content

Commit

Permalink
refactored playdartcp_stream into gen_server, proper tracking of acti…
Browse files Browse the repository at this point in the history
…ve streams
  • Loading branch information
RJ committed Jan 8, 2010
1 parent 94a41d4 commit 24e28f3
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 55 deletions.
7 changes: 3 additions & 4 deletions playdar_modules/playdar-tcp/README.txt
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ playdar (via ssh) to your work machine, for example.
It is also hoped that people will host playdar nodes with large free repos, It is also hoped that people will host playdar nodes with large free repos,
such as magnatune, archive.org etc. This resolver would be useful for that. such as magnatune, archive.org etc. This resolver would be useful for that.


Currently streams are multiplexed (with no flow control) down the same pipe The first connection established is used for search queries and results,
as queries, so you will notice whilst streaming over a low bandwidth link and to initiate streams. Each stream is sent using a new tcp connection.
that queries can't be recieved at the same time. (will be fixed "soon").


***ATTENTION*** ***ATTENTION***


Expand All @@ -32,5 +31,5 @@ with the node you're connecting to.


To see who you're connected to, check the playdartcp page on: To see who you're connected to, check the playdartcp page on:


http://localhost:60210/ http://localhost:60210/playdartcp


2 changes: 1 addition & 1 deletion playdar_modules/playdar-tcp/src/playdartcp_conn.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ handle_stream_request(Direction, Ref, Sid, State = #state{inout=out}) ->
{ok, Pid} = playdartcp_stream:start(Sock, recv), {ok, Pid} = playdartcp_stream:start(Sock, recv),
gen_tcp:controlling_process(Sock, Pid) gen_tcp:controlling_process(Sock, Pid)
end, end,
?LOG(info, "Created stream process for ~s to ~p:~p", [Sid, Address, Port]), ?LOG(info, "Created stream process(~p) for ~s to ~p:~p", [Pid, Sid, Address, Port]),
ok; ok;
{error, timeout} -> {error, timeout} ->
?LOG(warn, "Failing to connect to ~p:~p for streaming", [Address,Port]), ?LOG(warn, "Failing to connect to ~p:~p for streaming", [Address,Port]),
Expand Down
8 changes: 6 additions & 2 deletions playdar_modules/playdar-tcp/src/playdartcp_router.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -199,13 +199,17 @@ handle_info(calculate_bandwidth_secs, State) ->
{noreply, State}; {noreply, State};




handle_info({'EXIT', Pid, _Reason}, State) -> handle_info({'EXIT', Pid, Reason}, State) ->
erlang:erase({stream, Pid}), % might exist ?LOG(info, "Caught exit of ~p because ~p", [Pid, Reason]),
_Streamrm = erlang:erase({stream, Pid}),
case ets:lookup(State#state.piddb, Pid) of case ets:lookup(State#state.piddb, Pid) of
[{_, Name, _Bw, _Sharing}] -> [{_, Name, _Bw, _Sharing}] ->
?LOG(info, "Removing user from registered cons: ~p", [Name]), ?LOG(info, "Removing user from registered cons: ~p", [Name]),
ets:delete(State#state.namedb, Name), ets:delete(State#state.namedb, Name),
ets:delete(State#state.piddb, Pid), ets:delete(State#state.piddb, Pid),
{noreply, State};
_X ->
nevermind,
{noreply, State} {noreply, State}
end. end.


Expand Down
150 changes: 107 additions & 43 deletions playdar_modules/playdar-tcp/src/playdartcp_stream.erl
Original file line number Original file line Diff line number Diff line change
@@ -1,35 +1,49 @@
% manages tcp connection used for transfering files % manages tcp connection used for transfering files
% needs DRYing out a bit and tidying up once things settle down
-module(playdartcp_stream). -module(playdartcp_stream).
-behaviour(gen_server). -behaviour(gen_server).
-include("playdar.hrl"). -include("playdar.hrl").
-include("playdartcp.hrl"). -include("playdartcp.hrl").


%% API %% API
-export([start/1, start/2]). -export([start/1, start/2, stat/1]).


%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).


-record(state, {sock, mode, ip, current, ref, sid, pid}). -record(state, {sock, mode, ip, current, ref, sid, pid, track, start_now, start_localtime}).


start(Sock) -> start(Sock, unknown). start(Sock) -> start(Sock, unknown).


start(Sock, Mode) -> start(Sock, Mode) ->
gen_server:start_link(?MODULE, [Sock ,Mode], []). gen_server:start(?MODULE, [Sock ,Mode], []).

stat(Pid) ->
gen_server:call(Pid, {stat}).


%% %%


init([Sock, Mode]) -> init([Sock, Mode]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
{ok, {RemoteIp, _SockPort}} = inet:peername(Sock), {ok, {RemoteIp, _SockPort}} = inet:peername(Sock),
ok = inet:setopts(Sock, [{active, once}]), ok = inet:setopts(Sock, [{active, once}]),
{ok, #state{sock=Sock, mode=Mode, ip=RemoteIp, current=setup}}. {ok, #state{sock=Sock,

mode=Mode,

ip=RemoteIp,
handle_call(_Request, _From, State) -> current=setup,
Reply = ok, track={struct,[]},
start_now=erlang:now(),
start_localtime=erlang:localtime()
}}.

handle_call({stat}, _From, State) ->
{ok, Stats} = inet:getstat(State#state.sock),
Reply = [{track, State#state.track},
{current,State#state.current},
{stats, Stats},
{start_now, State#state.start_now},
{start_localtime, State#state.start_localtime}
],
{reply, Reply, State}. {reply, Reply, State}.


handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
Expand All @@ -38,19 +52,87 @@ handle_cast(_Msg, State) ->
% Incoming packet: % Incoming packet:
handle_info({tcp, Sock, Packet}, State = #state{sock=Sock}) -> handle_info({tcp, Sock, Packet}, State = #state{sock=Sock}) ->
Term = ?B2T(Packet), Term = ?B2T(Packet),
case Term of
{sid_response,_,_,{data,_}} -> spam;
_ -> ?LOG(info, "RCVD: ~p", [Term])
end,
Ret = handle_packet(Term, State), Ret = handle_packet(Term, State),
inet:setopts(Sock, [{active, once}]), inet:setopts(Sock, [{active, once}]),
Ret; Ret;


handle_info({tcp_error, Sock, Reason}, State = #state{current=receive_stream,sock=Sock}) -> % Sending transfer:
State#state.pid ! {State#state.ref, error, Reason}, handle_info({Ref, headers, Headers},
State=#state{current=send_stream, sock=Sock, ref=Ref, sid=Sid}) ->
M = ?T2B({sid_response, Ref, Sid, {headers, Headers}}),
ok = gen_tcp:send(Sock, M),
{noreply, State#state{current=send_stream_body}};

handle_info({Ref, data, Data}, State=#state{current=send_stream_body,
sock=Sock, ref=Ref, sid=Sid}) ->
Msg = ?T2B({sid_response, Ref, Sid, {data, Data}}),
ok = gen_tcp:send(Sock, Msg),
%timer:sleep(60), % artifically slow the stream speed
{noreply, State};

handle_info({Ref, error, Reason}, State=#state{current=send_stream_body,
sock=Sock, ref=Ref, sid=Sid}) ->
Msg = ?T2B({sid_response, Ref, Sid, {error, Reason}}),
ok = gen_tcp:send(Sock, Msg),
gen_tcp:close(Sock), gen_tcp:close(Sock),
{stop, normal, State}; {stop, normal, State};


handle_info({tcp_closed, Sock}, State = #state{current=receive_stream,sock=Sock}) -> handle_info({Ref, eof}, State=#state{current=send_stream_body,
State#state.pid ! {State#state.ref, eof}, sock=Sock, ref=Ref, sid=Sid}) ->
Msg = ?T2B({sid_response, Ref, Sid, eof}),
ok = gen_tcp:send(Sock, Msg),
gen_tcp:close(Sock),
{stop, normal, State};

% socket errors:
handle_info({tcp_error, Sock, Reason}, State = #state{sock=Sock}) ->
?LOG(info, "Socket errored during: ~p", [State#state.current]),
case State#state.current of
setup ->
gen_tcp:close(Sock);
receive_stream ->
State#state.pid ! {State#state.ref, error, Reason},
gen_tcp:close(Sock);
_ ->
ok
end,
{stop, normal, State};

handle_info({tcp_closed, Sock}, State = #state{sock=Sock}) ->
?LOG(info, "Socket closed during: ~p", [State#state.current]),
case State#state.current of
receive_stream ->
State#state.pid ! {State#state.ref, eof};
_ ->
ok
end,
{stop, normal, State}; {stop, normal, State};


handle_info({'EXIT', Pid, Reason}, State=#state{sock=Sock,ref=Ref,sid=Sid}) when Reason /= normal ->
?LOG(warn, "Pid ~p exited: ~p", [Pid, Reason]),
case State#state.current of
send_stream ->
?LOG(info, "Aborting sending!", []),
Msg = ?T2B({sid_response, Ref, Sid, {error, 999}}),
gen_tcp:send(Sock, Msg),
gen_tcp:close(Sock),
{stop, normal};

receive_stream ->
?LOG(info, "Aborting receiving, cancelling transfer!", []),
gen_tcp:close(Sock),
{stop, normal};

setup ->
?LOG(info, "Aborting transfer during setup phase", []),
gen_tcp:close(Sock),
{stop, normal}
end;

handle_info({'EXIT', Pid, Reason}, State) -> handle_info({'EXIT', Pid, Reason}, State) ->
?LOG(warn, "Pid ~p exited: ~p", [Pid, Reason]), ?LOG(warn, "Pid ~p exited: ~p", [Pid, Reason]),
{noreply, State}; {noreply, State};
Expand All @@ -70,6 +152,13 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------


lookup_track(Sid, State) ->
case playdar_resolver:result(Sid) of
undefined -> State;
T ->
State#state{track=T}
end.

%% setup: %% setup:


handle_packet({sending, Ref, Sid}, State = #state{current=setup, mode=Mode, sock=Sock}) -> handle_packet({sending, Ref, Sid}, State = #state{current=setup, mode=Mode, sock=Sock}) ->
Expand All @@ -82,7 +171,8 @@ handle_packet({sending, Ref, Sid}, State = #state{current=setup, mode=Mode, sock
end, end,
?LOG(info, "current -> receive_stream", []), ?LOG(info, "current -> receive_stream", []),
playdartcp_router:stream_started(Pid, Sid), playdartcp_router:stream_started(Pid, Sid),
{noreply, State#state{current=receive_stream,ref=Ref, sid=Sid, pid=Pid}}; State1 = lookup_track(Sid, State),
{noreply, State1#state{current=receive_stream,ref=Ref, sid=Sid, pid=Pid}};


unknown -> unknown ->
?LOG(warn, "Invalid transfer key", []), ?LOG(warn, "Invalid transfer key", []),
Expand All @@ -109,7 +199,8 @@ handle_packet({requesting, Ref, Sid}, State = #state{current=setup, mode=Mode, s
{stop, normal, State}; {stop, normal, State};
Sfun -> Sfun ->
Sfun(), Sfun(),
{noreply, State#state{current=send_stream, ref=Ref, sid=Sid}} State1 = lookup_track(Sid, State),
{noreply, State1#state{current=send_stream, ref=Ref, sid=Sid}}
end; end;


Else -> Else ->
Expand Down Expand Up @@ -150,33 +241,6 @@ handle_packet({sid_response, Ref, Sid, M},
handle_packet(Wtf, State = #state{current=receive_stream,sock=Sock}) -> handle_packet(Wtf, State = #state{current=receive_stream,sock=Sock}) ->
?LOG(info, "Unhandled packet in receive_stream: ~p", [Wtf]), ?LOG(info, "Unhandled packet in receive_stream: ~p", [Wtf]),
gen_tcp:close(Sock), gen_tcp:close(Sock),
{stop, normal, State}; {stop, normal, State}.


% send_stream:


handle_packet({Ref, headers, Headers},
State=#state{current=send_stream, sock=Sock, ref=Ref, sid=Sid}) ->
M = ?T2B({sid_response, Ref, Sid, {headers, Headers}}),
ok = gen_tcp:send(Sock, M),
{noreply, State#state{current=send_stream_body}};

handle_packet({Ref, _, _} = Packet,
State=#state{current=send_stream_body, sock=Sock, ref=Ref, sid=Sid}) ->
case Packet of
{Ref, data, Data} ->
Msg = ?T2B({sid_response, Ref, Sid, {data, Data}}),
ok = gen_tcp:send(Sock, Msg),
{noreply, State};

{Ref, error, Reason} ->
Msg = ?T2B({sid_response, Ref, Sid, {error, Reason}}),
ok = gen_tcp:send(Sock, Msg),
gen_tcp:close(Sock),
{stop, normal, State};

{Ref, eof} ->
Msg = ?T2B({sid_response, Ref, Sid, eof}),
ok = gen_tcp:send(Sock, Msg),
gen_tcp:close(Sock),
{stop, normal, State}
end.
10 changes: 5 additions & 5 deletions src/playdar_web.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ loop1(Req, DocRoot) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
Spid = Sfun(), Spid = Sfun(),
?LOG(info, "Stream fun pid: ~p", [Spid]), ?LOG(info, "Stream fun pid: ~p", [Spid]),
link(Spid), %link(Spid),
stream_result(Req, Ref) stream_result(Req, Ref)
end end
end; end;
Expand Down Expand Up @@ -261,11 +261,11 @@ stream_result_body(Req, Resp, Ref) ->
err; err;


{Ref, eof} -> {Ref, eof} ->
ok; ok


{'EXIT', Pid, Reason} -> %{'EXIT', Pid, Reason} ->
?LOG(warn, "Streamer process ~p exited (body): ~p", [Pid, Reason]), % ?LOG(warn, "Streamer process ~p exited (body): ~p", [Pid, Reason]),
stream_result_body(Req, Resp, Ref) % stream_result_body(Req, Resp, Ref)


after 10000 -> after 10000 ->
?LOG(info, "10secs timeout on streaming~n",[]), ?LOG(info, "10secs timeout on streaming~n",[]),
Expand Down

0 comments on commit 24e28f3

Please sign in to comment.