Skip to content

Commit

Permalink
get rid of over-zealous sending of channel EOF & close messages which…
Browse files Browse the repository at this point in the history
… was implemented with questionable synchronization

fixes #105

also relevant to #126 since AbstractChannel does not synchronize on
'this' anymore
  • Loading branch information
shikhar committed Jun 24, 2014
1 parent 2a7278d commit 5ee2f0a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ public abstract class AbstractChannel
protected final Event<ConnectionException> openEvent;
/** Channel close event */
protected final Event<ConnectionException> closeEvent;

/* Access to these fields should be synchronized using this object */
private boolean eofSent;
private boolean eofGot;
/** Whether we have already sent a CHANNEL_CLOSE request to the server */
private boolean closeRequested;

/** Local window */
Expand Down Expand Up @@ -282,20 +279,22 @@ public void join(int timeout, TimeUnit unit)
closeEvent.await(timeout, unit);
}

protected synchronized void sendClose()
protected void sendClose()
throws TransportException {
openCloseLock.lock();
try {
if (!closeRequested) {
log.debug("Sending close");
trans.write(newBuffer(Message.CHANNEL_CLOSE));
}
} finally {
closeRequested = true;
openCloseLock.unlock();
}
}

@Override
public synchronized boolean isOpen() {
public boolean isOpen() {
openCloseLock.lock();
try {
return openEvent.isSet() && !closeEvent.isSet() && !closeRequested;
Expand Down Expand Up @@ -405,36 +404,17 @@ private void gotResponse(boolean success)
}
}

private synchronized void gotEOF()
private void gotEOF()
throws TransportException {
log.debug("Got EOF");
eofGot = true;
eofInputStreams();
if (eofSent)
sendClose();
}

/** Called when EOF has been received. Subclasses can override but must call super. */
protected void eofInputStreams() {
in.eof();
}

@Override
public synchronized void sendEOF()
throws TransportException {
try {
if (!closeRequested && !eofSent) {
log.debug("Sending EOF");
trans.write(newBuffer(Message.CHANNEL_EOF));
if (eofGot)
sendClose();
}
} finally {
eofSent = true;
out.setClosed();
}
}

@Override
public String toString() {
return "< " + type + " channel: id=" + id + ", recipient=" + recipient + ", localWin=" + lwin + ", remoteWin="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,6 @@ void close()
/** @return whether the channel is open. */
boolean isOpen();

/**
* Sends an EOF message to the server for this channel; indicating that no more data will be sent by us. The {@code
* OutputStream} for this channel will be closed and no longer usable.
*
* @throws TransportException if there is an error sending the EOF message
*/
void sendEOF()
throws TransportException;

/**
* Set whether local window should automatically expand when data is received, irrespective of whether data has been
* read from that stream. This is useful e.g. when a remote command produces a lot of output that would fill the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ int write(byte[] data, int off, int len)
throws TransportException, ConnectionException {
final int bufferSize = packet.wpos() - dataOffset;
if (bufferSize >= win.getMaxPacketSize()) {
flush(bufferSize);
flush(bufferSize, true);
return 0;
} else {
final int n = Math.min(len, win.getMaxPacketSize() - bufferSize);
Expand All @@ -93,18 +93,23 @@ int write(byte[] data, int off, int len)
}
}

void flush()
boolean flush(boolean canAwaitExpansion)
throws TransportException, ConnectionException {
flush(packet.wpos() - dataOffset);
return flush(packet.wpos() - dataOffset, canAwaitExpansion);
}
void flush(int bufferSize)

boolean flush(int bufferSize, boolean canAwaitExpansion)
throws TransportException, ConnectionException {
while (bufferSize > 0) {

long remoteWindowSize = win.getSize();
if (remoteWindowSize == 0)
remoteWindowSize = win.awaitExpansion(remoteWindowSize);
if (remoteWindowSize == 0) {
if (canAwaitExpansion) {
remoteWindowSize = win.awaitExpansion(remoteWindowSize);
} else {
return false;
}
}

// We can only write the min. of
// a) how much data we have
Expand Down Expand Up @@ -136,6 +141,8 @@ void flush(int bufferSize)

bufferSize = leftOverBytes;
}

return true;
}

}
Expand Down Expand Up @@ -184,18 +191,14 @@ public synchronized void close()
throws IOException {
if (!closed) {
try {
buffer.flush();
chan.sendEOF();
buffer.flush(false);
trans.write(new SSHPacket(Message.CHANNEL_EOF).putUInt32(chan.getRecipient()));
} finally {
setClosed();
closed = true;
}
}
}

public synchronized void setClosed() {
closed = true;
}

/**
* Send all data currently buffered. If window space is exhausted in the process, this will block
* until it is expanded by the server.
Expand All @@ -206,7 +209,7 @@ public synchronized void setClosed() {
public synchronized void flush()
throws IOException {
checkClose();
buffer.flush();
buffer.flush(true);
}

@Override
Expand Down

0 comments on commit 5ee2f0a

Please sign in to comment.