diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoReadFuture.java b/sshd-common/src/main/java/org/apache/sshd/common/io/IoReadFuture.java index 3a4e1c2c1..d2661dd2f 100644 --- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoReadFuture.java +++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoReadFuture.java @@ -26,12 +26,25 @@ * @author Apache MINA SSHD Project */ public interface IoReadFuture extends SshFuture, VerifiableFuture { + + /** + * Retrieves the buffer data was read into. + * + * @return the buffer, {@code null} if {@link #isDone()} {@code == false} + */ Buffer getBuffer(); + /** + * Retrieves the number of bytes read. + * + * @return The number of bytes read, or -1 if the source of the read has been exhausted (is at EOF), or zero if the + * read is not done yet ({@link #isDone()} {@code == false}) + */ int getRead(); /** - * Returns the cause of the read failure. + * Returns the cause of the read failure. An {@link java.io.EOFException} indicates that nothing was read because + * the source of the read is exhausted. * * @return {@code null} if the read operation is not finished yet, or if the read attempt is successful (use * {@link #isDone()} to distinguish between the two). diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java index 2b6ab4db1..666222968 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java @@ -18,6 +18,7 @@ */ package org.apache.sshd.client.channel; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -45,8 +46,12 @@ import org.apache.sshd.common.channel.RequestHandler; import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.channel.exception.SshChannelOpenException; +import org.apache.sshd.common.future.CloseFuture; +import org.apache.sshd.common.future.DefaultCloseFuture; +import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoInputStream; import org.apache.sshd.common.io.IoOutputStream; +import org.apache.sshd.common.io.IoReadFuture; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.EventNotifier; import org.apache.sshd.common.util.ExceptionUtils; @@ -61,6 +66,15 @@ * @author Apache MINA SSHD Project */ public abstract class AbstractClientChannel extends AbstractChannel implements ClientChannel { + + private static final InputStream NULL_INPUT_STREAM = new InputStream() { + + @Override + public int read() throws IOException { + return -1; + } + }; + protected final AtomicBoolean opened = new AtomicBoolean(); protected Streaming streaming; @@ -134,6 +148,9 @@ public IoInputStream getAsyncOut() { @Override public IoInputStream getAsyncErr() { + if (asyncErr == asyncOut) { + return NullIoInputStream.INSTANCE; + } return asyncErr; } @@ -167,6 +184,9 @@ public void setOut(OutputStream out) { @Override public InputStream getInvertedErr() { + if (invertedErr == invertedOut) { + return NULL_INPUT_STREAM; + } return invertedErr; } @@ -474,4 +494,48 @@ public Integer getExitStatus() { public String getExitSignal() { return exitSignalHolder.get(); } + + private enum NullIoInputStream implements IoInputStream { + + INSTANCE; + + private final CloseFuture closing = new DefaultCloseFuture("", null); + + NullIoInputStream() { + closing.setClosed(); + } + + @Override + public CloseFuture close(boolean immediately) { + return closing; + } + + @Override + public void addCloseFutureListener(SshFutureListener listener) { + closing.addListener(listener); + } + + @Override + public void removeCloseFutureListener(SshFutureListener listener) { + closing.removeListener(listener); + } + + @Override + public boolean isClosed() { + return true; + } + + @Override + public boolean isClosing() { + return true; + } + + @Override + public IoReadFuture read(Buffer buffer) { + ChannelAsyncInputStream.IoReadFutureImpl future = new ChannelAsyncInputStream.IoReadFutureImpl("", buffer); + future.setValue(new EOFException("Closed")); + return future; + } + + } } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java index bbbbefe7d..41060452f 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java @@ -18,11 +18,11 @@ */ package org.apache.sshd.common.channel; +import java.io.EOFException; import java.io.IOException; import java.util.Objects; import org.apache.sshd.common.RuntimeSshException; -import org.apache.sshd.common.SshException; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.DefaultVerifiableSshFuture; import org.apache.sshd.common.io.IoInputStream; @@ -69,12 +69,12 @@ public IoReadFuture read(Buffer buf) { throw new ReadPendingException("Previous pending read not handled"); } if (buffer.available() > 0) { - Buffer fb = future.getBuffer(); + Buffer fb = future.buffer; int nbRead = fb.putBuffer(buffer, false); buffer.compact(); future.setValue(nbRead); } else { - future.setValue(new IOException("Closed")); + future.setValue(new EOFException("Closed")); } } } else { @@ -94,7 +94,7 @@ protected void preClose() { synchronized (buffer) { if (buffer.available() == 0) { if (pending != null) { - pending.setValue(new SshException("Closed")); + pending.setValue(new EOFException("Closed")); } } } @@ -153,7 +153,8 @@ public String toString() { } public static class IoReadFutureImpl extends DefaultVerifiableSshFuture implements IoReadFuture { - private final Buffer buffer; + + final Buffer buffer; public IoReadFutureImpl(Object id, Buffer buffer) { super(id, null); @@ -162,7 +163,7 @@ public IoReadFutureImpl(Object id, Buffer buffer) { @Override public Buffer getBuffer() { - return buffer; + return isDone() ? buffer : null; } @Override @@ -180,14 +181,18 @@ public IoReadFuture verify(long timeoutMillis) throws IOException { @Override public int getRead() { Object v = getValue(); - if (v instanceof RuntimeException) { + if (v == null) { + return 0; + } else if (v instanceof Number) { + return ((Number) v).intValue(); + } else if (v instanceof EOFException) { + return -1; + } else if (v instanceof RuntimeException) { throw (RuntimeException) v; } else if (v instanceof Error) { throw (Error) v; } else if (v instanceof Throwable) { throw new RuntimeSshException("Error reading from channel.", (Throwable) v); - } else if (v instanceof Number) { - return ((Number) v).intValue(); } else { throw formatExceptionMessage( IllegalStateException::new, diff --git a/sshd-core/src/test/java/org/apache/sshd/client/session/ClientSessionTest.java b/sshd-core/src/test/java/org/apache/sshd/client/session/ClientSessionTest.java index 73a7a58db..adafc00b3 100644 --- a/sshd-core/src/test/java/org/apache/sshd/client/session/ClientSessionTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/client/session/ClientSessionTest.java @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.rmi.RemoteException; @@ -40,6 +41,7 @@ import org.apache.sshd.common.session.Session; import org.apache.sshd.common.session.SessionListener; import org.apache.sshd.common.util.GenericUtils; +import org.apache.sshd.common.util.io.IoUtils; import org.apache.sshd.core.CoreModuleProperties; import org.apache.sshd.server.SshServer; import org.apache.sshd.server.auth.keyboard.KeyboardInteractiveAuthenticator; @@ -369,4 +371,67 @@ private void writeResponse(OutputStream out, String rsp) throws IOException { assertTrue("Unexpected response remainders: " + values, values.isEmpty()); } + + @Test // SSHD-1303 + public void testRedirectCommandErrorStreamIsEmpty() throws Exception { + String expectedCommand = getCurrentTestName() + "-CMD"; + String expectedStdout = getCurrentTestName() + "-STDOUT"; + String expectedStderr = getCurrentTestName() + "-STDERR"; + sshd.setCommandFactory((session, command) -> new CommandExecutionHelper(command) { + private boolean cmdProcessed; + + @Override + protected boolean handleCommandLine(String command) throws Exception { + assertEquals("Mismatched incoming command", expectedCommand, command); + assertFalse("Duplicated command call", cmdProcessed); + writeResponse(getOutputStream(), expectedStdout); + writeResponse(getErrorStream(), expectedStderr); + cmdProcessed = true; + return false; + } + + private void writeResponse(OutputStream out, String rsp) throws IOException { + out.write(rsp.getBytes(StandardCharsets.US_ASCII)); + out.write((byte) '\n'); + out.flush(); + } + }); + + String response; + try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(CONNECT_TIMEOUT) + .getSession()) { + session.addPasswordIdentity(getCurrentTestName()); + session.auth().verify(AUTH_TIMEOUT); + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + // NOTE !!! The LF is only because we are using a buffered reader on the server end to read the command + try (ClientChannel channel = session.createExecChannel(expectedCommand + '\n')) { + channel.setRedirectErrorStream(true); + + channel.open().verify(OPEN_TIMEOUT); + try (InputStream stderr = channel.getInvertedErr()) { + assertEquals(-1, stderr.read()); + } + try (InputStream stdout = channel.getInvertedOut()) { + IoUtils.copy(stdout, baos, 32); // Use a small buffer on purpose + } + } + byte[] bytes = baos.toByteArray(); + response = new String(bytes, StandardCharsets.US_ASCII); + } + } + + String[] lines = GenericUtils.split(response, '\n'); + assertEquals("Mismatched response lines count", 2, lines.length); + + Collection values = new ArrayList<>(Arrays.asList(lines)); + // We don't rely on the order the strings were written + for (String expected : new String[] { expectedStdout, expectedStderr }) { + if (!values.remove(expected)) { + fail(expected + " not in response=" + values); + } + } + + assertTrue("Unexpected response remainders: " + values, values.isEmpty()); + } }