Skip to content

Commit

Permalink
Merge branch 'flowcontrol' of git@github.com:RJ/playdar-core into web…
Browse files Browse the repository at this point in the history
…sockets
  • Loading branch information
RJ committed Jan 7, 2010
2 parents 6228815 + 7953f0f commit 7bdd482
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 108 deletions.
44 changes: 38 additions & 6 deletions playdar_modules/playdar-tcp/src/listener_impl.erl
Expand Up @@ -4,20 +4,52 @@
%% API
-export([start_link/1]).

start_link(Port) -> spawn_link(fun()->
{ok, Sock} = gen_tcp:listen(Port, ?TCP_OPTS_SERVER),
do_accept(Sock)
end).
start_link(Port) ->
spawn_link(fun()->
{ok, Sock} = gen_tcp:listen(Port, ?TCP_OPTS_SERVER),
do_accept(Sock)
end).

do_accept(LSock) ->
case gen_tcp:accept(LSock) of

{ok, Sock} ->
?LOG(info, "handle accept",[]),
{ok, Pid} = playdartcp_conn:start(Sock, in),
Pid = spawn(fun()->handle(Sock)end),
gen_tcp:controlling_process(Sock, Pid),
do_accept(LSock);

{error, Err} ->
?LOG(warning, "Couldn't accept incoming connection: ~p", [Err]),
throw(playdartcp_listen_accept_error)
end.

% Is this a control connection, or a stream/filetransfer?
% once we know, delegate socket to correct type of process.
handle(Sock) ->
% rcv just 1 msg: the connection mode
case gen_tcp:recv(Sock, 0, 3000) of
{ok, Packet} ->
case (catch ?B2T(Packet)) of
{conntype, control} ->
?LOG(info, "handle accept, control connection",[]),
{ok, Pid} = playdartcp_conn:start(Sock, in),
gen_tcp:controlling_process(Sock, Pid);

{conntype, stream} ->
?LOG(info, "handle accept, stream connection",[]),
{ok, Pid} = playdartcp_stream:start(Sock),
gen_tcp:controlling_process(Sock, Pid);

Er ->
?LOG(warning, "Unknown socket mode, aborting connection: ~p", [Er]),
gen_tcp:close(Sock)
end;

{error, timeout} ->
?LOG(info, "Timeout waiting for connection mode packet", []),
gen_tcp:close(Sock);

{error, X} ->
?LOG(warning, "Connection failed: ~p", [X]),
gen_tcp:close(Sock)
end.
2 changes: 2 additions & 0 deletions playdar_modules/playdar-tcp/src/playdartcp.hrl
Expand Up @@ -5,3 +5,5 @@
{reuseaddr, true}]).

-define(TCP_OPTS_SERVER, [ {backlog, 10} | ?TCP_OPTS ]).
-define(T2B(T), term_to_binary(T)).
-define(B2T(T), binary_to_term(T)).
186 changes: 86 additions & 100 deletions playdar_modules/playdar-tcp/src/playdartcp_conn.erl
@@ -1,17 +1,16 @@
% this process owns a tcp socket connected to a remote playdar instance
-module(playdartcp_conn).
-include("playdar.hrl").
-include("playdartcp.hrl").
-behaviour(gen_server).
-define(T2B(T), term_to_binary(T)).
-define(B2T(T), binary_to_term(T)).
%% API
-export([start/2, start/3, send_msg/2, request_sid/4, stats/1, stats/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-record(state, {sock, % pid: gen_tcp pid
-record(state, {sock, % pid: gen_tcp pid
authed, % bool: connection authed and ready?
name, % string: name of remote peer
props, % proplist of remote peer settings
Expand Down Expand Up @@ -40,21 +39,23 @@ stats(Pid, Opts) -> gen_server:call(Pid, {stats,Opts}).

%% gen_server callbacks


init([Sock, InOut, Share]) ->
{ok, {SockAddr, SockPort}} = inet:peername(Sock),
?LOG(info, "playdartcp_conn:init, inout:~w Remote:~p:~p", [InOut,SockAddr,SockPort]),
% kill the connection if not authed after 10 seconds:
% kill the connection if not authed fast enough:
timer:send_after(15000, self(), auth_timeout),
% if we are initiating the connection, ie it's outbound, send our auth:
case InOut of
out ->
Msg = ?T2B({auth, ?CONFVAL(name, "unknown"), [{share, Share}]}),
% Needs more DRY - props also created in response to auth packet below
PublicPort = ?CONFVAL({playdartcp,port},60211),
Props = [{share, Share}, {public_port, PublicPort}],
Msg = ?T2B({auth, ?CONFVAL(name, "unknown"), Props}),
ok = gen_tcp:send(Sock, Msg);
in ->
noop
end,
ok = inet:setopts(Sock, [{active, once}]),
ok = inet:setopts(Sock, [{active, once}]),
SQ = ets:new(seenqids,[]),
Transfers = ets:new(transfers,[]),
{ok, #state{ sock=Sock,
Expand Down Expand Up @@ -82,8 +83,8 @@ handle_call({send_msg, Msg}, _From, State) when is_tuple(Msg) ->
{reply, ok, State}.

handle_cast({request_sid, Sid, RecPid, Ref}, State) ->
ets:insert(State#state.transfers, {{Sid, Ref},RecPid}),
Msg = ?T2B({request_sid, Ref, Sid}),
playdartcp_router:register_transfer({Ref,Sid}, RecPid),
Msg = ?T2B({request_stream, Ref, Sid}),
ok = gen_tcp:send(State#state.sock, Msg),
{noreply, State};

Expand All @@ -98,17 +99,12 @@ handle_info(auth_timeout, State = #state{authed = false}) ->
% Incoming packet:
handle_info({tcp, Sock, Packet}, State = #state{sock=Sock}) ->
Term = ?B2T(Packet),
case Term of
{sid_response, _, _Sid, {data, _}} ->
ok;
%?LOG(info, "INCOMING (~p) data packet for ~p", [State#state.name, Sid]);
_ ->
?LOG(info, "INCOMING (~p)\n~p", [State#state.name, Term])
end,
?LOG(info, "INCOMING (~p)\n~p", [State#state.name, Term]),
{Reply, NewState} = handle_packet(Term, State),
ok = inet:setopts(Sock, [{active, once}]),
{Reply, NewState};


% Refwd query that originally arrived at this connection
% but only if not already solved!
handle_info({fwd_query, #qry{qid=Qid, obj=Q}}, State = #state{authed=true}) ->
Expand Down Expand Up @@ -138,8 +134,6 @@ handle_info({tcp, _Data, Sock}, State = #state{sock=Sock, authed=false}) ->

handle_info(_Msg, State) -> {noreply, State}.



terminate(_Reason, _State) ->
?LOG(info, "playdartcp connection terminated", []),
ok.
Expand All @@ -149,7 +143,6 @@ code_change(_OldVsn, State, _Extra) ->

%%% Internal functions


%% Incoming auth/peer ID:
handle_packet({auth, Name, Props}, State = #state{sock=Sock, authed=false}) when is_list(Name), is_list(Props) ->
?LOG(info, "new playdartcp connection authed as ~s, props: ~p", [Name, Props]),
Expand All @@ -162,7 +155,8 @@ handle_packet({auth, Name, Props}, State = #state{sock=Sock, authed=false}) when
in ->
DefName = "unknown-"++erlang:integer_to_list(random:uniform(9999999)), % HACK
% tell them if we are sharing content with them:
OurProps = [{share, State#state.weshare}],
PublicPort = ?CONFVAL({playdartcp,port},60211),
OurProps = [{share, State#state.weshare}, {public_port, PublicPort}],
M = ?T2B({auth, ?CONFVAL(name, DefName), OurProps}),
ok = gen_tcp:send(Sock, M),
{noreply, State#state{authed=true, name=Name, props=Props, theyshare=TheyShare}};
Expand Down Expand Up @@ -213,7 +207,8 @@ handle_packet({result, Qid, {struct, L}}, State = #state{authed=true}) when is_l
[{<<"url">>, list_to_binary(Url)}|L]}),
{noreply, State};

handle_packet({request_sid, Ref, Sid}, State = #state{authed=true, weshare=true}) ->
% request for us to send a file
handle_packet({request_stream, Ref, Sid}, State = #state{authed=true, weshare=true}) ->
case playdar_resolver:result(Sid) of
undefined ->
?LOG(info,"sid not found: ~p", [Sid]),
Expand All @@ -230,93 +225,84 @@ handle_packet({request_sid, Ref, Sid}, State = #state{authed=true, weshare=true}
ok = gen_tcp:send(State#state.sock, Msg),
{noreply, State};
_ ->
ConnPid = self(),
spawn(fun()->stream_result(Ref, Sid, State#state.sock, ConnPid)end),
handle_stream_request(send, Ref, Sid, State),
{noreply, State}
end
end;

% We asked for a file, which is ok, but we need to initiate the connection
handle_packet({request_stream_ready, Ref, Sid}, State = #state{authed=true}) ->
?LOG(info, "We are prompted to initiate a connection for ~s", [Sid]),
handle_stream_request(rcv, Ref, Sid, State),
{noreply, State};

% the only kind of sid_response possible here is an error.
% all headers, data, etc send in the stream tcp connection
handle_packet({sid_response, Ref, Sid, M}, State = #state{authed=true}) ->
case ets:lookup(State#state.transfers, {Sid, Ref}) of
[{{Sid,Ref},Pid}] ->
case M of
{headers, H}->
?LOG(info, "headers arrived: ~p", [H]),
Pid ! {Ref, headers, H};
{data, D} -> Pid ! {Ref, data, D};
{error, E} ->
?LOG(warn, "Error for transfer: ~p",[E]),
Pid ! {Ref, error, E};
eof -> Pid ! {Ref, eof}
end,
{noreply, State};
_ ->
?LOG(info, "sid_response arrived, but no registered handler",[]),
%TODO send cancel msg for this transfer
{noreply, State}
end;
case M of
{error, Code} ->
case playdartcp_router:consume_transfer({Ref,Sid}) of
undefined ->
?LOG(info, "sid_response error received for unknown transfer", []),
{noreply, State};
P when is_pid(P) ->
P ! {Ref, error, {inline_error_maybe_legacy, Code}},
{noreply, State};
_ ->
?LOG(info, "sid_response error received in wrong place", []),
{noreply, State}
end;
_ ->
?LOG(warn, "Unexpected sid_response packet, disconnecting", []),
gen_tcp:close(State#state.sock),
{stop, normal}
end;

handle_packet(Data, State) ->
io:format("GOT UNKNOWN, state: ~p ~p~n", [State, Data]),
{noreply, State}.


stream_result(Ref, Sid, Sock, ConnPid) ->
A = playdar_resolver:result(Sid),
case playdar_reader_registry:get_streamer(A, self(), Ref) of
undefined ->
Msg = ?T2B({sid_response, Ref, Sid, {error, 5031}}),
ok = gen_tcp:send(Sock, Msg),
ok;
Sfun ->
% we trap exits, so if the streaming fun crashes we can catch it
% and fwd an error message to the recipient
process_flag(trap_exit, true),
Sfun(),
receive
{Ref, headers, Headers} ->
M = ?T2B({sid_response, Ref, Sid, {headers, Headers}}),
ok = gen_tcp:send(Sock, M),
stream_result_body(Ref, Sid, Sock, ConnPid);

{'EXIT', _Pid, Reason} when Reason /= normal ->
?LOG(error, "Streamer fun crashed: ~p", [Reason]),
Msg = ?T2B({sid_response, Ref, Sid, {error, 999}}),
ok = gen_tcp:send(Sock, Msg),
ok

after 10000 ->
M = ?T2B({sid_response, Ref, Sid, {error, headers_timeout}}),
ok = gen_tcp:send(Sock, M),
ok
end
end.

stream_result_body(Ref, Sid, Sock, ConnPid) ->
receive
{'EXIT', _Pid, Reason} when Reason /= normal ->
?LOG(error, "Streamer fun crashed whilst streaming body: ~p", [Reason]),
Msg = ?T2B({sid_response, Ref, Sid, {error, 999}}),
ok = gen_tcp:send(Sock, Msg),
ok;

{Ref, data, Data} ->
Msg = ?T2B({sid_response, Ref, Sid, {data, Data}}),
ok = gen_tcp:send(Sock, Msg),
stream_result_body(Ref, Sid, Sock, ConnPid);

{Ref, error, _Reason} ->
Msg = ?T2B({sid_response, Ref, Sid, {error, -1}}),
ok = gen_tcp:send(Sock, Msg),
ok;

{Ref, eof} ->
Msg = ?T2B({sid_response, Ref, Sid, eof}),
ok = gen_tcp:send(Sock, Msg),
ok

after 10000 ->
Msg = ?T2B({sid_response, Ref, Sid, {error, timeout}}),
ok = gen_tcp:send(Sock, Msg),
ok
end.
% Handle stream creation if a peer requests a stream from us
% Direction: send/rcv, depending if we are asking-for or offering the file
handle_stream_request(Direction, Ref, Sid, State = #state{inout=out}) ->
{ok, {Address, _SockPort}} = inet:peername(State#state.sock),
Port = proplists:get_value(public_port, State#state.props),
?LOG(info, "handle_stream_request [~p] -> ~p:~p", [Direction, Address, Port]),
case gen_tcp:connect(Address, Port, ?TCP_OPTS, 5000) of
{ok, Sock} ->
ok = gen_tcp:send(Sock, ?T2B({conntype, stream})),
case Direction of
send ->
playdartcp_router:register_transfer({Ref,Sid}, Address), % only send to the correct peer IP
?LOG(info, "Sending {sending} header", []),
gen_tcp:send(Sock, ?T2B({sending, Ref, Sid})),
{ok, Pid} = playdartcp_stream:start(Sock, send),
gen_tcp:controlling_process(Sock, Pid);
rcv ->
?LOG(info, "Sending {requesting} header", []),
gen_tcp:send(Sock, ?T2B({requesting, Ref, Sid})),
{ok, Pid} = playdartcp_stream:start(Sock, recv),
gen_tcp:controlling_process(Sock, Pid)
end,
?LOG(info, "Created stream process for ~s to ~p:~p", [Sid, Address, Port]),
ok;
{error, timeout} ->
?LOG(warn, "Failing to connect to ~p:~p for streaming", [Address,Port]),
% TODO send an error back down the control channel
error;
{error, Reason} ->
?LOG(warn, "Failed to create stream process to ~p:~p Reason: ~p", [Address,Port,Reason]),
error
end;

handle_stream_request(send, Ref, Sid, State = #state{inout=in}) ->
% tell them we're ready, and they should connect to us
{ok, {Address, _SockPort}} = inet:peername(State#state.sock),
playdartcp_router:register_transfer({Ref,Sid}, Address), % only send to the correct peer IP
Msg = ?T2B({request_stream_ready, Ref, Sid}),
ok = gen_tcp:send(State#state.sock, Msg).




13 changes: 12 additions & 1 deletion playdar_modules/playdar-tcp/src/playdartcp_router.erl
Expand Up @@ -5,7 +5,9 @@

-export([start_link/1, register_connection/3, send_query_response/3,
connect/2, connect/3, peers/0, bytes/0, broadcast/1, broadcast/2, broadcast/3,
seen_qid/1, disconnect/1, sanitize_msg/1]).
seen_qid/1, disconnect/1, sanitize_msg/1,
register_transfer/2, consume_transfer/1
]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
Expand Down Expand Up @@ -33,6 +35,8 @@ broadcast(M, Except, SharersOnly) when is_tuple(M) -> gen_server:cast(?MODULE,

seen_qid(Qid) -> gen_server:cast(?MODULE, {seen_qid, Qid}).

register_transfer(Key, Pid) -> gen_server:cast(?MODULE, {register_transfer, Key, Pid}).
consume_transfer(Key) -> gen_server:call(?MODULE, {consume_transfer, Key}).

%% ====================================================================
%% Server functions
Expand Down Expand Up @@ -67,6 +71,7 @@ handle_call({disconnect, Name}, _From, State) ->
handle_call({connect, Ip, Port, Share}, _From, State) ->
case gen_tcp:connect(Ip, Port, ?TCP_OPTS, 10000) of
{ok, Sock} ->
ok = gen_tcp:send(Sock, ?T2B({conntype,control})),
{ok, Pid} = playdartcp_conn:start(Sock, out, Share),
gen_tcp:controlling_process(Sock, Pid),
?LOG(info, "New outbound connection to ~p:~p pid:~p, sharing:~w", [Ip, Port,Pid,Share]),
Expand All @@ -83,6 +88,8 @@ handle_call(peers, _From, State) ->
handle_call(bytes, _From, State) ->
{reply, ets:tab2list(State#state.piddb), State};

handle_call({consume_transfer, Key}, _From, State) -> {reply, erlang:erase(Key), State};

handle_call({register_connection, Pid, Name, Sharing = {WeShare, TheyShare}}, _From, State) ->
% TODO we should probably kick the old conn with this name
% so ppl can reconnect if their old conn lags out
Expand All @@ -103,6 +110,10 @@ handle_call({register_connection, Pid, Name, Sharing = {WeShare, TheyShare}}, _F

%%

handle_cast({register_transfer, Key, Pid}, State) ->
erlang:put(Key, Pid),
{noreply, State};

handle_cast({seen_qid, Qid}, State) ->
% TODO may want to use a bloom filter instead of ets here:
ets:insert(State#state.seenqids, {Qid, true}),
Expand Down

0 comments on commit 7bdd482

Please sign in to comment.