From 453ea4a9e96ae195d7848f715484ba32be42884f Mon Sep 17 00:00:00 2001 From: Sami Nurminen Date: Sat, 19 Dec 2015 13:48:27 +0200 Subject: [PATCH] CAMEL-9375: TarSplitter includes one extra empty entry at the end. --- .../dataformat/tarfile/TarFileDataFormat.java | 2 +- .../camel/dataformat/tarfile/TarIterator.java | 84 +++++++++---------- .../camel/dataformat/tarfile/TarSplitter.java | 2 +- 3 files changed, 42 insertions(+), 46 deletions(-) diff --git a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java index 5465edcc681d1..49b1a825fc7a4 100644 --- a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java +++ b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java @@ -86,7 +86,7 @@ public void marshal(final Exchange exchange, final Object graph, final OutputStr @Override public Object unmarshal(final Exchange exchange, final InputStream stream) throws Exception { if (usingIterator) { - return new TarIterator(exchange.getIn(), stream); + return new TarIterator(exchange, stream); } else { BufferedInputStream bis = new BufferedInputStream(stream); TarArchiveInputStream tis = (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.TAR, bis); diff --git a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java index 91933c08f8018..c5c85ce8cde95 100644 --- a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java +++ b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java @@ -26,6 +26,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.StreamCache; import org.apache.camel.impl.DefaultMessage; import org.apache.camel.util.IOHelper; import org.apache.commons.compress.archivers.ArchiveException; @@ -51,11 +52,13 @@ public class TarIterator implements Iterator, Closeable { private final Message inputMessage; private TarArchiveInputStream tarInputStream; - private Message parent; + private Message nextMessage; - public TarIterator(Message inputMessage, InputStream inputStream) { - this.inputMessage = inputMessage; - //InputStream inputStream = inputMessage.getBody(InputStream.class); + private Exchange exchange; + + public TarIterator(Exchange exchange, InputStream inputStream) { + this.exchange = exchange; + this.inputMessage = exchange.getIn(); if (inputStream instanceof TarArchiveInputStream) { tarInputStream = (TarArchiveInputStream) inputStream; @@ -67,47 +70,38 @@ public TarIterator(Message inputMessage, InputStream inputStream) { throw new RuntimeException(e.getMessage(), e); } } - parent = null; + nextMessage = null; } @Override public boolean hasNext() { - try { - if (tarInputStream == null) { - return false; - } - boolean availableDataInCurrentEntry = tarInputStream.available() > 0; - if (!availableDataInCurrentEntry) { - // advance to the next entry. - parent = getNextElement(); - if (parent == null) { - tarInputStream.close(); - availableDataInCurrentEntry = false; - } else { - availableDataInCurrentEntry = true; - } - } - return availableDataInCurrentEntry; - } catch (IOException exception) { - //Just wrap the IOException as CamelRuntimeException - throw new RuntimeCamelException(exception); - } + tryAdvanceToNext(); + + return this.nextMessage != null; } @Override public Message next() { - if (parent == null) { - parent = getNextElement(); - } + tryAdvanceToNext(); + + //consume element + Message next = this.nextMessage; + this.nextMessage = null; + return next; + } - Message answer = parent; - parent = null; - checkNullAnswer(answer); - return answer; + private void tryAdvanceToNext() { + //return current next + if (this.nextMessage != null) { + return; + } + + this.nextMessage = createNextMessage(); + checkNullAnswer(this.nextMessage); } - private Message getNextElement() { + private Message createNextMessage() { if (tarInputStream == null) { return null; } @@ -122,7 +116,10 @@ private Message getNextElement() { answer.setHeader(TARFILE_ENTRY_NAME_HEADER, current.getName()); answer.setHeader(Exchange.FILE_NAME, current.getName()); if (current.getSize() > 0) { - answer.setBody(new TarElementInputStreamWrapper(tarInputStream)); + //Have to cache current entry's portion of tarInputStream here, because getNextTarEntry + //advances tarInputStream beyond current entry + answer.setBody(exchange.getContext().getTypeConverter().mandatoryConvertTo(StreamCache.class, exchange, + new TarElementInputStreamWrapper(tarInputStream))); } else { // Workaround for the case when the entry is zero bytes big answer.setBody(new ByteArrayInputStream(new byte[0])); @@ -132,16 +129,16 @@ private Message getNextElement() { LOGGER.trace("Closed tarInputStream"); return null; } - } catch (IOException exception) { - //Just wrap the IOException as CamelRuntimeException + } catch (Exception exception) { + this.close(); + //Just wrap the Exception as CamelRuntimeException throw new RuntimeCamelException(exception); } } public void checkNullAnswer(Message answer) { - if (answer == null && tarInputStream != null) { - IOHelper.close(tarInputStream); - tarInputStream = null; + if (answer == null) { + this.close(); } } @@ -163,10 +160,9 @@ public void remove() { } @Override - public void close() throws IOException { - if (tarInputStream != null) { - tarInputStream.close(); - tarInputStream = null; - } + public void close() { + //suppress any exceptions from closing + IOHelper.close(tarInputStream); + tarInputStream = null; } } \ No newline at end of file diff --git a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarSplitter.java b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarSplitter.java index 132dd5583cca5..3ee5371a6a071 100644 --- a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarSplitter.java +++ b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarSplitter.java @@ -33,7 +33,7 @@ public TarSplitter() { public Object evaluate(Exchange exchange) { Message inputMessage = exchange.getIn(); - return new TarIterator(inputMessage, inputMessage.getBody(InputStream.class)); + return new TarIterator(exchange, inputMessage.getBody(InputStream.class)); } @Override