Skip to content

Commit

Permalink
More fiddling with suddenly closed streams (apparently jena closes st…
Browse files Browse the repository at this point in the history
…reams in LangBase:44)
  • Loading branch information
Aklakan committed Feb 4, 2021
1 parent 2862c47 commit 3b63f0e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 9 deletions.
@@ -0,0 +1,34 @@
package net.sansa_stack.rdf.common;

import org.apache.commons.io.input.ProxyInputStream;

import java.io.IOException;
import java.io.InputStream;
import java.util.function.Consumer;
import java.util.function.Function;

/** Util class to debug a stream already closed exception */
public class InputStreamWithCloseLogging
extends ProxyInputStream
{
protected Consumer<? super Throwable> stackTraceConsumer;

public InputStreamWithCloseLogging(InputStream proxy, Consumer<? super Throwable> stackTraceConsumer) {
super(proxy);
this.stackTraceConsumer = stackTraceConsumer;
}

@Override
public void close() throws IOException {
stackTraceConsumer.accept(new Throwable());
super.close();
}

/** Convenience method for e.g. .wrap(inputStream, ExceptionUtils::getFullStackTrace, logger::info) */
public static InputStream wrap(InputStream proxy, Function<? super Throwable, String> toString, Consumer<? super String> logger) {
return new InputStreamWithCloseLogging(proxy, t -> {
String str = toString.apply(t);
logger.accept(str);
});
}
}
Expand Up @@ -10,10 +10,11 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.function.Predicate
import java.util.regex.{Matcher, Pattern}
import io.reactivex.rxjava3.core.Flowable
import net.sansa_stack.rdf.common.{InterruptingReadableByteChannel, ReadableByteChannelWithConditionalBound, SeekableInputStream}
import net.sansa_stack.rdf.common.{InputStreamWithCloseLogging, InterruptingReadableByteChannel, ReadableByteChannelWithConditionalBound, SeekableInputStream}
import org.aksw.jena_sparql_api.io.binseach._
import org.aksw.jena_sparql_api.rx.RDFDataMgrRx
import org.apache.commons.io.input.{BoundedInputStream, CloseShieldInputStream}
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.fs
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.compress._
Expand Down Expand Up @@ -197,7 +198,11 @@ class TrigRecordReader
var result: (Long, Long) = null

if (null != codec) {
val decompressor = CodecPool.getDecompressor(codec)
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor)
}

decompressor = CodecPool.getDecompressor(codec)

if (codec.isInstanceOf[SplittableCompressionCodec]) {
val scc = codec.asInstanceOf[SplittableCompressionCodec]
Expand All @@ -216,7 +221,7 @@ class TrigRecordReader
// val rawPos = rawStream.getPos
// println(s"Adjusted: [$start, $end[ -> [$adjustedStart, $adjustedEnd[ - raw pos: $rawPos" )

stream = tmp
stream = new SeekableInputStream(new CloseShieldInputStream(tmp), tmp)

result = (adjustedStart, adjustedEnd)
// } catch {
Expand All @@ -230,7 +235,7 @@ class TrigRecordReader
// stream = rawStream
// stream.seek(start)
stream = new SeekableInputStream(
Channels.newInputStream(new InterruptingReadableByteChannel(rawStream, rawStream, end)),
new CloseShieldInputStream(Channels.newInputStream(new InterruptingReadableByteChannel(rawStream, rawStream, end))),
rawStream);

result = (start, end)
Expand Down Expand Up @@ -339,9 +344,14 @@ class TrigRecordReader

// Set up the body stream whose read method returns
// -1 upon reaching the split boundry
var splitBoundedBodyStream: InputStream =
new CloseShieldInputStream(Channels.newInputStream(new ReadableByteChannelWithConditionalBound[ReadableByteChannel](Channels.newChannel(stream),
xstream => hitSplitBound(stream, adjustedSplitEnd))))

// var splitBoundedBodyStream: InputStream = InputStreamWithCloseLogging.wrap(
// new CloseShieldInputStream(Channels.newInputStream(new ReadableByteChannelWithConditionalBound[ReadableByteChannel](Channels.newChannel(stream),
// xstream => hitSplitBound(stream, adjustedSplitEnd)))), ExceptionUtils.getStackTrace(_), logger.info(_))

var splitBoundedBodyStream: InputStream = InputStreamWithCloseLogging.wrap(
Channels.newInputStream(new ReadableByteChannelWithConditionalBound[ReadableByteChannel](Channels.newChannel(stream),
xstream => hitSplitBound(stream, adjustedSplitEnd))), ExceptionUtils.getStackTrace(_), logger.info(_))

// Find the second record in the next split - i.e. after splitEnd (inclusive)
// This is to detect record parts that although cleanly separated by the split boundary still need to be aggregated,
Expand Down Expand Up @@ -411,8 +421,8 @@ class TrigRecordReader
}


var fullStream: InputStream = new SequenceInputStream(Collections.enumeration(
util.Arrays.asList(prefixStream, headStream, splitBoundedBodyStream, tailStream)))
var fullStream: InputStream = InputStreamWithCloseLogging.wrap(new SequenceInputStream(Collections.enumeration(
util.Arrays.asList(prefixStream, headStream, splitBoundedBodyStream, tailStream))), ExceptionUtils.getStackTrace(_), logger.info(_))


var result: Flowable[Dataset] = null
Expand Down

0 comments on commit 3b63f0e

Please sign in to comment.