Skip to content

Commit

Permalink
stream connections know directionality
Browse files Browse the repository at this point in the history
  • Loading branch information
RJ committed Jan 7, 2010
1 parent 0d257e7 commit 7953f0f
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 22 deletions.
9 changes: 5 additions & 4 deletions playdar_modules/playdar-tcp/src/listener_impl.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
%% API %% API
-export([start_link/1]). -export([start_link/1]).


start_link(Port) -> spawn_link(fun()-> start_link(Port) ->
{ok, Sock} = gen_tcp:listen(Port, ?TCP_OPTS_SERVER), spawn_link(fun()->
do_accept(Sock) {ok, Sock} = gen_tcp:listen(Port, ?TCP_OPTS_SERVER),
end). do_accept(Sock)
end).


do_accept(LSock) -> do_accept(LSock) ->
case gen_tcp:accept(LSock) of case gen_tcp:accept(LSock) of
Expand Down
10 changes: 6 additions & 4 deletions playdar_modules/playdar-tcp/src/playdartcp_conn.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -276,13 +276,15 @@ handle_stream_request(Direction, Ref, Sid, State = #state{inout=out}) ->
send -> send ->
playdartcp_router:register_transfer({Ref,Sid}, Address), % only send to the correct peer IP playdartcp_router:register_transfer({Ref,Sid}, Address), % only send to the correct peer IP
?LOG(info, "Sending {sending} header", []), ?LOG(info, "Sending {sending} header", []),
gen_tcp:send(Sock, ?T2B({sending, Ref, Sid})); gen_tcp:send(Sock, ?T2B({sending, Ref, Sid})),
{ok, Pid} = playdartcp_stream:start(Sock, send),
gen_tcp:controlling_process(Sock, Pid);
rcv -> rcv ->
?LOG(info, "Sending {requesting} header", []), ?LOG(info, "Sending {requesting} header", []),
gen_tcp:send(Sock, ?T2B({requesting, Ref, Sid})) gen_tcp:send(Sock, ?T2B({requesting, Ref, Sid})),
{ok, Pid} = playdartcp_stream:start(Sock, recv),
gen_tcp:controlling_process(Sock, Pid)
end, end,
{ok, Pid} = playdartcp_stream:start(Sock),
gen_tcp:controlling_process(Sock, Pid),
?LOG(info, "Created stream process for ~s to ~p:~p", [Sid, Address, Port]), ?LOG(info, "Created stream process for ~s to ~p:~p", [Sid, Address, Port]),
ok; ok;
{error, timeout} -> {error, timeout} ->
Expand Down
48 changes: 34 additions & 14 deletions playdar_modules/playdar-tcp/src/playdartcp_stream.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
-include("playdar.hrl"). -include("playdar.hrl").
-include("playdartcp.hrl"). -include("playdartcp.hrl").


-export([start/1]). -export([start/1, start/2]).


start(Sock) -> start(Sock) -> start(Sock, unknown).
Pid = spawn(fun()-> start_real(Sock) end),
start(Sock, Mode) ->
Pid = spawn(fun()-> start_real(Sock, Mode) end),
{ok, Pid}. {ok, Pid}.


% New connection established - the parent that setup this socket outbound % New connection established - the parent that setup this socket outbound
% will have sent the first sending/requesting packet to initiate everything. % will have sent the first sending/requesting packet to initiate everything.
start_real(Sock) -> start_real(Sock, Mode) ->
case gen_tcp:recv(Sock, 0, 3000) of case gen_tcp:recv(Sock, 0, 3000) of
{ok, Packet} -> {ok, Packet} ->
% get remote Ip address % get remote Ip address
Expand All @@ -21,20 +23,33 @@ start_real(Sock) ->
% is this transfer we are about to receive valid? % is this transfer we are about to receive valid?
case playdartcp_router:consume_transfer({Ref,Sid}) of case playdartcp_router:consume_transfer({Ref,Sid}) of
Pid when is_pid(Pid) -> Pid when is_pid(Pid) ->
?LOG(info, "Responding to sending, with requesting", []), case Mode of
ok = gen_tcp:send(Sock, ?T2B({requesting, Ref, Sid})), recv ->
receive_stream(Ref, Sid, Sock, Pid); ?LOG(info, "receive_stream", []),
receive_stream(Ref, Sid, Sock, Pid);
unknown ->
ok = gen_tcp:send(Sock, ?T2B({requesting, Ref, Sid})),
?LOG(info, "receive_stream", []),
receive_stream(Ref, Sid, Sock, Pid)
end;

unknown -> unknown ->
?LOG(warn, "Invalid transfer key", []), ?LOG(warn, "Invalid transfer key", []),
gen_tcp:close(Sock) gen_tcp:close(Sock)
end; end;

{requesting, Ref, Sid} -> {requesting, Ref, Sid} ->
% is the transfer they are asking for permitted? % is the transfer they are asking for permitted?
case playdartcp_router:consume_transfer({Ref,Sid}) of case playdartcp_router:consume_transfer({Ref,Sid}) of
Ip when Ip == RemoteIp -> % should be Ip of client Ip when Ip == RemoteIp -> % should be Ip of client
ok = gen_tcp:send(Sock, ?T2B({sending, Ref, Sid})), case Mode of
send_stream(Ref, Sid, Sock); unknown ->
ok = gen_tcp:send(Sock, ?T2B({sending, Ref, Sid})),
send_stream(Ref, Sid, Sock);
send ->
send_stream(Ref, Sid, Sock)
end;

Else -> Else ->
?LOG(warn, "Not sending, request invalid Key: ~p, Transfertoken: ~p", [{Ref,Sid}, Else]) ?LOG(warn, "Not sending, request invalid Key: ~p, Transfertoken: ~p", [{Ref,Sid}, Else])
end; end;
Expand Down Expand Up @@ -75,14 +90,17 @@ receive_stream(Ref, Sid, Sock, Pid) ->
Pid ! {Ref, eof}, Pid ! {Ref, eof},
gen_tcp:close(Sock) gen_tcp:close(Sock)
end; end;
_ -> Wtf ->
?LOG(info, "Unhandled packet in receive_stream", []), ?LOG(info, "Unhandled packet in receive_stream: ~p", [Wtf]),
error error
end; end;


{error, closed} -> Pid ! {Ref, eof}; {error, closed} -> Pid ! {Ref, eof};


{error, Err} -> Pid ! {Ref, error, Err} {error, timeout} ->
?LOG(warn, "Timeout receiving packets", []),
Pid ! {Ref, error, timeout}

end. end.




Expand All @@ -92,8 +110,10 @@ send_stream(Ref, Sid, Sock) ->
A = playdar_resolver:result(Sid), A = playdar_resolver:result(Sid),
case playdar_reader_registry:get_streamer(A, self(), Ref) of case playdar_reader_registry:get_streamer(A, self(), Ref) of
undefined -> undefined ->
?LOG(warn, "No streamer available, aborting", []),
Msg = ?T2B({sid_response, Ref, Sid, {error, 5031}}), Msg = ?T2B({sid_response, Ref, Sid, {error, 5031}}),
ok = gen_tcp:send(Sock, Msg), gen_tcp:send(Sock, Msg),
gen_tcp:close(Sock),
ok; ok;
Sfun -> Sfun ->
% we trap exits, so if the streaming fun crashes we can catch it % we trap exits, so if the streaming fun crashes we can catch it
Expand Down

0 comments on commit 7953f0f

Please sign in to comment.