diff --git a/src/main/java/org/codehaus/plexus/util/cli/CommandLineUtils.java b/src/main/java/org/codehaus/plexus/util/cli/CommandLineUtils.java index b2ac489e..f7cf36d5 100644 --- a/src/main/java/org/codehaus/plexus/util/cli/CommandLineUtils.java +++ b/src/main/java/org/codehaus/plexus/util/cli/CommandLineUtils.java @@ -146,13 +146,13 @@ public void run() { @Override public Integer call() throws CommandLineException { - StreamFeeder inputFeeder = null; + StreamPollFeeder inputFeeder = null; StreamPumper outputPumper = null; StreamPumper errorPumper = null; boolean success = false; try { if (systemIn != null) { - inputFeeder = new StreamFeeder(systemIn, p.getOutputStream()); + inputFeeder = new StreamPollFeeder(systemIn, p.getOutputStream()); inputFeeder.start(); } @@ -288,11 +288,11 @@ private static void handleException(final StreamPumper streamPumper, final Strin } } - private static void handleException(final StreamFeeder streamFeeder, final String streamName) + private static void handleException(final StreamPollFeeder streamPollFeeder, final String streamName) throws CommandLineException { - if (streamFeeder.getException() != null) { + if (streamPollFeeder.getException() != null) { throw new CommandLineException( - String.format("Failure processing %s.", streamName), streamFeeder.getException()); + String.format("Failure processing %s.", streamName), streamPollFeeder.getException()); } } diff --git a/src/main/java/org/codehaus/plexus/util/cli/StreamFeeder.java b/src/main/java/org/codehaus/plexus/util/cli/StreamFeeder.java index 81289044..f5e11451 100644 --- a/src/main/java/org/codehaus/plexus/util/cli/StreamFeeder.java +++ b/src/main/java/org/codehaus/plexus/util/cli/StreamFeeder.java @@ -24,8 +24,9 @@ * Read from an InputStream and write the output to an OutputStream. * * @author Trygve Laugstøl - * + * @deprecated Use {@link StreamPollFeeder} instead. This class can block when used with System.in */ +@Deprecated public class StreamFeeder extends AbstractStreamHandler { private InputStream input; diff --git a/src/main/java/org/codehaus/plexus/util/cli/StreamPollFeeder.java b/src/main/java/org/codehaus/plexus/util/cli/StreamPollFeeder.java new file mode 100644 index 00000000..8bd4f97a --- /dev/null +++ b/src/main/java/org/codehaus/plexus/util/cli/StreamPollFeeder.java @@ -0,0 +1,133 @@ +package org.codehaus.plexus.util.cli; + +/* + * Copyright The Codehaus Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Poll InputStream for available data and write the output to an OutputStream. + *
+ * This class is designed to avoid blocking when reading from streams like System.in. + * It polls the input stream for available data instead of blocking on read operations. + * + * @author Trygve Laugstøl + */ +public class StreamPollFeeder extends AbstractStreamHandler { + + public static final int BUF_LEN = 80; + + private InputStream input; + + private OutputStream output; + + private volatile Throwable exception = null; + + private final Object lock = new Object(); + + /** + * Create a new StreamPollFeeder + * + * @param input Stream to read from + * @param output Stream to write to + */ + public StreamPollFeeder(InputStream input, OutputStream output) { + super(); + this.input = input; + this.output = output; + } + + @Override + public void run() { + byte[] buf = new byte[BUF_LEN]; + + try { + while (!isDone()) { + if (input.available() > 0) { + int i = input.read(buf); + if (i > 0) { + output.write(buf, 0, i); + output.flush(); + } else { + setDone(); + } + } else { + synchronized (lock) { + if (!isDone()) { + lock.wait(100); + } + } + } + } + } catch (IOException e) { + exception = e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + close(); + } + } + + public void close() { + if (input != null) { + synchronized (input) { + try { + input.close(); + } catch (IOException ex) { + if (exception == null) { + exception = ex; + } + } + + input = null; + } + } + + if (output != null) { + synchronized (output) { + try { + output.close(); + } catch (IOException ex) { + if (exception == null) { + exception = ex; + } + } + + output = null; + } + } + } + + /** + * @since 3.1.0 + * @return the Exception + */ + public Throwable getException() { + return exception; + } + + @Override + public synchronized void waitUntilDone() throws InterruptedException { + synchronized (lock) { + setDone(); + lock.notifyAll(); + } + + join(); + } +} diff --git a/src/test/java/org/codehaus/plexus/util/cli/StreamPollFeederTest.java b/src/test/java/org/codehaus/plexus/util/cli/StreamPollFeederTest.java new file mode 100644 index 00000000..19ea6376 --- /dev/null +++ b/src/test/java/org/codehaus/plexus/util/cli/StreamPollFeederTest.java @@ -0,0 +1,57 @@ +package org.codehaus.plexus.util.cli; + +/* + * Copyright The Codehaus Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class StreamPollFeederTest { + + @Test + public void dataShouldBeCopied() throws InterruptedException, IOException { + + StringBuilder TEST_DATA = new StringBuilder(); + for (int i = 0; i < 100; i++) { + TEST_DATA.append("TestData"); + } + + ByteArrayInputStream inputStream = + new ByteArrayInputStream(TEST_DATA.toString().getBytes()); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + StreamPollFeeder streamPollFeeder = new StreamPollFeeder(inputStream, outputStream); + + streamPollFeeder.start(); + + // wait until all data from steam will be read + while (outputStream.size() < TEST_DATA.length()) { + Thread.sleep(100); + } + + // wait until process finish + streamPollFeeder.waitUntilDone(); + assertNull(streamPollFeeder.getException()); + + assertEquals(TEST_DATA.toString(), outputStream.toString()); + } +}