From 318bb199120cf621097fa39b4321dc2dbbd48376 Mon Sep 17 00:00:00 2001 From: Daniel Goertzen Date: Tue, 28 Aug 2012 15:05:34 -0500 Subject: [PATCH] abstract channel data send path so that eof can be accomodated. send_eof() now uses channel data send path so that any buffered data is sent before the eof is sent --- lib/ssh/src/ssh_connection.erl | 57 +++++++++++++++----------- lib/ssh/src/ssh_connection_manager.erl | 24 +++-------- 2 files changed, 40 insertions(+), 41 deletions(-) diff --git a/lib/ssh/src/ssh_connection.erl b/lib/ssh/src/ssh_connection.erl index c46f799b6d26..ba1550b28621 100644 --- a/lib/ssh/src/ssh_connection.erl +++ b/lib/ssh/src/ssh_connection.erl @@ -37,7 +37,7 @@ cancel_tcpip_forward/3, signal/3, exit_status/3, encode_ip/1, close/2, reply_request/4]). --export([channel_data/6, handle_msg/4, channel_eof_msg/1, +-export([channel_data/5, handle_msg/4, channel_eof_msg/1, channel_close_msg/1, channel_success_msg/1, channel_failure_msg/1, channel_adjust_window_msg/2, channel_data_msg/3, channel_open_msg/5, channel_open_confirmation_msg/4, @@ -308,25 +308,22 @@ cancel_tcpip_forward(ConnectionManager, BindIP, Port) -> %%-------------------------------------------------------------------- %%% Internal API %%-------------------------------------------------------------------- -channel_data(ChannelId, DataType, Data, Connection, ConnectionPid, From) +channel_data(ChannelId, {DataType, Data}, Connection, ConnectionPid, From) when is_list(Data)-> - channel_data(ChannelId, DataType, - list_to_binary(Data), Connection, ConnectionPid, From); + channel_data(ChannelId, {DataType, + list_to_binary(Data)}, Connection, ConnectionPid, From); -channel_data(ChannelId, DataType, Data, +channel_data(ChannelId, Cmd, #connection{channel_cache = Cache} = Connection, ConnectionPid, From) -> - case ssh_channel:cache_lookup(Cache, ChannelId) of #channel{remote_id = Id} = Channel0 -> - {SendList, Channel} = update_send_window(Channel0, DataType, - Data, Connection), + {SendList, Channel} = update_send_window(Channel0, Cmd, + Connection), Replies = - lists:map(fun({SendDataType, SendData}) -> + lists:map(fun(SendCmd) -> {connection_reply, ConnectionPid, - channel_data_msg(Id, - SendDataType, - SendData)} + bufitem_to_msg(Id, SendCmd)} end, SendList), FlowCtrlMsgs = flow_control(Replies, Channel#channel{flow_control = From}, @@ -336,6 +333,12 @@ channel_data(ChannelId, DataType, Data, {noreply, Connection} end. +bufitem_to_msg(ChannelId, {Type, Data}) -> + channel_data_msg(ChannelId, Type, Data); +bufitem_to_msg(ChannelId, eof) -> + channel_eof_msg(ChannelId). + + handle_msg(#ssh_msg_channel_open_confirmation{recipient_channel = ChannelId, sender_channel = RemoteId, initial_window_size = WindowSz, @@ -441,11 +444,11 @@ handle_msg(#ssh_msg_channel_window_adjust{recipient_channel = ChannelId, {SendList, Channel} = %% TODO: Datatype 0 ? update_send_window(Channel0#channel{send_window_size = Size + Add}, - 0, <<>>, Connection), + Connection), - Replies = lists:map(fun({Type, Data}) -> - {connection_reply, ConnectionPid, - channel_data_msg(ChannelId, Type, Data)} + RemoteId = Channel0#channel.remote_id, + Replies = lists:map(fun(SendCmd) -> {connection_reply, ConnectionPid, + bufitem_to_msg(RemoteId, SendCmd)} end, SendList), FlowCtrlMsgs = flow_control(Channel, Cache), {{replies, Replies ++ FlowCtrlMsgs}, Connection}; @@ -1074,13 +1077,17 @@ request_reply_or_data(#channel{local_id = ChannelId, user = ChannelPid}, {{channel_data, ChannelPid, Reply}, Connection} end. -update_send_window(Channel0, DataType, Data, - #connection{channel_cache = Cache}) -> - Buf0 = if Data == <<>> -> - Channel0#channel.send_buf; - true -> - Channel0#channel.send_buf ++ [{DataType, Data}] - end, +update_send_window(Channel, Connection) -> + update_send_window2(Channel, Channel#channel.send_buf, Connection). + +update_send_window(Channel, {_DataType, <<>>}, Connection) -> + update_send_window2(Channel, Channel#channel.send_buf, Connection); + +update_send_window(Channel, Cmd, Connection) -> + update_send_window2(Channel, Channel#channel.send_buf ++ [Cmd], Connection). + +update_send_window2(Channel0, Buf0, #connection{channel_cache = Cache}) -> + {Buf1, NewSz, Buf2} = get_window(Buf0, Channel0#channel.send_packet_size, Channel0#channel.send_window_size), @@ -1092,6 +1099,10 @@ update_send_window(Channel0, DataType, Data, get_window(Bs, PSz, WSz) -> get_window(Bs, PSz, WSz, []). + +get_window([eof | Bs], PSz, WSz, Acc) -> %% eof consumes no window and may be sent when window is 0 + get_window(Bs, PSz, WSz, [eof | Acc]); + get_window(Bs, _PSz, 0, Acc) -> {lists:reverse(Acc), 0, Bs}; get_window([B0 = {DataType, Bin} | Bs], PSz, WSz, Acc) -> diff --git a/lib/ssh/src/ssh_connection_manager.erl b/lib/ssh/src/ssh_connection_manager.erl index e53cd4f4f780..dd658f441fc8 100644 --- a/lib/ssh/src/ssh_connection_manager.erl +++ b/lib/ssh/src/ssh_connection_manager.erl @@ -160,10 +160,10 @@ stop(ConnectionManager) -> end. send(ConnectionManager, ChannelId, Type, Data, Timeout) -> - call(ConnectionManager, {data, ChannelId, Type, Data}, Timeout). + call(ConnectionManager, {data, ChannelId, {Type, Data}}, Timeout). send_eof(ConnectionManager, ChannelId) -> - cast(ConnectionManager, {eof, ChannelId}). + call(ConnectionManager, {data, ChannelId, eof}). %%==================================================================== %% gen_server callbacks @@ -288,11 +288,11 @@ handle_call({global_request, Pid, _, _, _} = Request, From, State = add_request(true, Channel#channel.local_id, From, State1), {noreply, State}; -handle_call({data, ChannelId, Type, Data}, From, +handle_call({data, ChannelId, Cmd}, From, #state{connection_state = #connection{channel_cache = _Cache} = Connection0, connection = ConnectionPid} = State) -> - channel_data(ChannelId, Type, Data, Connection0, ConnectionPid, From, + channel_data(ChannelId, Cmd, Connection0, ConnectionPid, From, State); handle_call({connection_info, Options}, From, @@ -453,18 +453,6 @@ handle_cast({adjust_window, ChannelId, Bytes}, end, {noreply, State}; -handle_cast({eof, ChannelId}, - #state{connection = Pid, connection_state = - #connection{channel_cache = Cache}} = State) -> - case ssh_channel:cache_lookup(Cache, ChannelId) of - #channel{remote_id = Id} -> - send_msg({connection_reply, Pid, - ssh_connection:channel_eof_msg(Id)}), - {noreply, State}; - undefined -> - {noreply, State} - end; - handle_cast({success, ChannelId}, #state{connection = Pid} = State) -> Msg = ssh_connection:channel_success_msg(ChannelId), send_msg({connection_reply, Pid, Msg}), @@ -567,8 +555,8 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- -channel_data(Id, Type, Data, Connection0, ConnectionPid, From, State) -> - case ssh_connection:channel_data(Id, Type, Data, Connection0, +channel_data(Id, Cmd, Connection0, ConnectionPid, From, State) -> + case ssh_connection:channel_data(Id, Cmd, Connection0, ConnectionPid, From) of {{replies, Replies}, Connection} -> lists:foreach(fun send_msg/1, Replies),