Skip to content

Commit

Permalink
Attempt to bump TRL or trig reader from 4 (works under laboratory con…
Browse files Browse the repository at this point in the history
…ditions) to 6/7 (works in a relevant setup)...

Seems to work for decompressed data so far.
  • Loading branch information
Aklakan committed Jan 31, 2021
1 parent 41d0e31 commit 3b661ae
Show file tree
Hide file tree
Showing 6 changed files with 468 additions and 116 deletions.
Expand Up @@ -75,6 +75,11 @@ object RddOfBindingOps {
}


def naturalJoin(rdd1: RDD[Binding], rdd2: RDD[Binding], commonVars: util.Set[Var]): RDD[Binding] = {
// This op might better fit into ResultSetSparkOps
???
}

def group(rdd: RDD[Binding], groupVars: VarExprList, aggregators: util.List[ExprAggregator]): RDD[Binding] = {
import collection.JavaConverters._

Expand Down
@@ -0,0 +1,69 @@
package net.sansa_stack.rdf.common;

import com.google.common.primitives.Ints;
import org.apache.hadoop.fs.Seekable;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;

/** ReadabalByteChannel whose read method is guaranteed to return at a specific position
* (before crossing it) such as a split boundary
*/
public class InterruptingReadableByteChannel
implements ReadableByteChannel, SeekableBase
{
protected byte[] buffer = new byte[1024 * 1024];
protected boolean interrupted = false;
protected InputStream in;
protected Seekable seekable;
protected long interruptPos;

public InterruptingReadableByteChannel(
InputStream in,
Seekable seekable,
long interruptPos) {
this.in = in;
this.seekable = seekable;
this.interruptPos = interruptPos;
}

@Override
public int read(ByteBuffer byteBuffer) throws IOException {
long pos = seekable.getPos();
int remainingUntilInterrupt = !interrupted && pos <= interruptPos
? Ints.saturatedCast(interruptPos - pos)
: Integer.MAX_VALUE;

int toRead = Math.min(byteBuffer.remaining(), remainingUntilInterrupt);
toRead = Math.min(toRead, buffer.length);

int contrib = in.read(buffer, 0, toRead);

if (contrib >= 0) {
byteBuffer.put(buffer, 0, contrib);
}

if (pos == interruptPos) {
interrupted = true;
}

return contrib;
}

@Override
public boolean isOpen() {
return true;
}

@Override
public void close() throws IOException {
in.close();
}

@Override
public Seekable getSeekable() {
return seekable;
}
}
@@ -0,0 +1,26 @@
package net.sansa_stack.rdf.common;

import org.apache.hadoop.fs.Seekable;

import java.io.IOException;

public interface SeekableBase
extends Seekable
{
Seekable getSeekable();

@Override
default void seek(long l) throws IOException {
getSeekable().seek(l);
}

@Override
default long getPos() throws IOException {
return getSeekable().getPos();
}

@Override
default boolean seekToNewSource(long l) throws IOException {
return getSeekable().seekToNewSource(l);
}
}
@@ -0,0 +1,28 @@
package net.sansa_stack.rdf.common;

import org.apache.commons.io.input.ProxyInputStream;
import org.apache.hadoop.fs.Seekable;

import java.io.InputStream;

/** Combine Seekable and InputStream into one class */
public class SeekableInputStream
extends ProxyInputStream implements SeekableBase
{
protected Seekable seekable;

/**
* Constructs a new ProxyInputStream.
*
* @param proxy the InputStream to delegate to
*/
public SeekableInputStream(InputStream proxy, Seekable seekable) {
super(proxy);
this.seekable = seekable;
}

@Override
public Seekable getSeekable() {
return seekable;
}
}

0 comments on commit 3b661ae

Please sign in to comment.