From 3b63f0e6a11527bc5b3f967ccc1c7d256892fea0 Mon Sep 17 00:00:00 2001 From: Claus Stadler Date: Thu, 4 Feb 2021 12:46:19 +0100 Subject: [PATCH] More fiddling with suddenly closed streams (apparently jena closes streams in LangBase:44) --- .../common/InputStreamWithCloseLogging.java | 34 +++++++++++++++++++ .../common/io/hadoop/TrigRecordReader.scala | 28 ++++++++++----- 2 files changed, 53 insertions(+), 9 deletions(-) create mode 100644 sansa-rdf/sansa-rdf-common/src/main/java/net/sansa_stack/rdf/common/InputStreamWithCloseLogging.java diff --git a/sansa-rdf/sansa-rdf-common/src/main/java/net/sansa_stack/rdf/common/InputStreamWithCloseLogging.java b/sansa-rdf/sansa-rdf-common/src/main/java/net/sansa_stack/rdf/common/InputStreamWithCloseLogging.java new file mode 100644 index 000000000..ceb84da4d --- /dev/null +++ b/sansa-rdf/sansa-rdf-common/src/main/java/net/sansa_stack/rdf/common/InputStreamWithCloseLogging.java @@ -0,0 +1,34 @@ +package net.sansa_stack.rdf.common; + +import org.apache.commons.io.input.ProxyInputStream; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Consumer; +import java.util.function.Function; + +/** Util class to debug a stream already closed exception */ +public class InputStreamWithCloseLogging + extends ProxyInputStream +{ + protected Consumer stackTraceConsumer; + + public InputStreamWithCloseLogging(InputStream proxy, Consumer stackTraceConsumer) { + super(proxy); + this.stackTraceConsumer = stackTraceConsumer; + } + + @Override + public void close() throws IOException { + stackTraceConsumer.accept(new Throwable()); + super.close(); + } + + /** Convenience method for e.g. .wrap(inputStream, ExceptionUtils::getFullStackTrace, logger::info) */ + public static InputStream wrap(InputStream proxy, Function toString, Consumer logger) { + return new InputStreamWithCloseLogging(proxy, t -> { + String str = toString.apply(t); + logger.accept(str); + }); + } +} diff --git a/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/io/hadoop/TrigRecordReader.scala b/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/io/hadoop/TrigRecordReader.scala index f95b25c69..73bfc0252 100644 --- a/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/io/hadoop/TrigRecordReader.scala +++ b/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/io/hadoop/TrigRecordReader.scala @@ -10,10 +10,11 @@ import java.util.concurrent.atomic.AtomicLong import java.util.function.Predicate import java.util.regex.{Matcher, Pattern} import io.reactivex.rxjava3.core.Flowable -import net.sansa_stack.rdf.common.{InterruptingReadableByteChannel, ReadableByteChannelWithConditionalBound, SeekableInputStream} +import net.sansa_stack.rdf.common.{InputStreamWithCloseLogging, InterruptingReadableByteChannel, ReadableByteChannelWithConditionalBound, SeekableInputStream} import org.aksw.jena_sparql_api.io.binseach._ import org.aksw.jena_sparql_api.rx.RDFDataMgrRx import org.apache.commons.io.input.{BoundedInputStream, CloseShieldInputStream} +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.fs import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.compress._ @@ -197,7 +198,11 @@ class TrigRecordReader var result: (Long, Long) = null if (null != codec) { - val decompressor = CodecPool.getDecompressor(codec) + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor) + } + + decompressor = CodecPool.getDecompressor(codec) if (codec.isInstanceOf[SplittableCompressionCodec]) { val scc = codec.asInstanceOf[SplittableCompressionCodec] @@ -216,7 +221,7 @@ class TrigRecordReader // val rawPos = rawStream.getPos // println(s"Adjusted: [$start, $end[ -> [$adjustedStart, $adjustedEnd[ - raw pos: $rawPos" ) - stream = tmp + stream = new SeekableInputStream(new CloseShieldInputStream(tmp), tmp) result = (adjustedStart, adjustedEnd) // } catch { @@ -230,7 +235,7 @@ class TrigRecordReader // stream = rawStream // stream.seek(start) stream = new SeekableInputStream( - Channels.newInputStream(new InterruptingReadableByteChannel(rawStream, rawStream, end)), + new CloseShieldInputStream(Channels.newInputStream(new InterruptingReadableByteChannel(rawStream, rawStream, end))), rawStream); result = (start, end) @@ -339,9 +344,14 @@ class TrigRecordReader // Set up the body stream whose read method returns // -1 upon reaching the split boundry - var splitBoundedBodyStream: InputStream = - new CloseShieldInputStream(Channels.newInputStream(new ReadableByteChannelWithConditionalBound[ReadableByteChannel](Channels.newChannel(stream), - xstream => hitSplitBound(stream, adjustedSplitEnd)))) + +// var splitBoundedBodyStream: InputStream = InputStreamWithCloseLogging.wrap( +// new CloseShieldInputStream(Channels.newInputStream(new ReadableByteChannelWithConditionalBound[ReadableByteChannel](Channels.newChannel(stream), +// xstream => hitSplitBound(stream, adjustedSplitEnd)))), ExceptionUtils.getStackTrace(_), logger.info(_)) + + var splitBoundedBodyStream: InputStream = InputStreamWithCloseLogging.wrap( + Channels.newInputStream(new ReadableByteChannelWithConditionalBound[ReadableByteChannel](Channels.newChannel(stream), + xstream => hitSplitBound(stream, adjustedSplitEnd))), ExceptionUtils.getStackTrace(_), logger.info(_)) // Find the second record in the next split - i.e. after splitEnd (inclusive) // This is to detect record parts that although cleanly separated by the split boundary still need to be aggregated, @@ -411,8 +421,8 @@ class TrigRecordReader } - var fullStream: InputStream = new SequenceInputStream(Collections.enumeration( - util.Arrays.asList(prefixStream, headStream, splitBoundedBodyStream, tailStream))) + var fullStream: InputStream = InputStreamWithCloseLogging.wrap(new SequenceInputStream(Collections.enumeration( + util.Arrays.asList(prefixStream, headStream, splitBoundedBodyStream, tailStream))), ExceptionUtils.getStackTrace(_), logger.info(_)) var result: Flowable[Dataset] = null