Skip to content

Commit

Permalink
Significant reworking of RecordReaderGenericBase for #144; when colle…
Browse files Browse the repository at this point in the history
…cting the split's flow to lists all tests succeed; otherwise there is a 'MissingBackPressure' exception.

Maybe still a bug in FlowableOperatorSequentialGroupBy?
  • Loading branch information
Aklakan committed Feb 27, 2021
1 parent 26cb153 commit 42ebefa
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 27 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Expand Up @@ -393,6 +393,13 @@
<version>${aksw-commons.version}</version>
</dependency>

<!-- Framework for building nestable aggregators; used in SchemaMapper and RecordReaderGenericBase -->
<dependency>
<groupId>org.aksw.commons</groupId>
<artifactId>aksw-commons-collectors</artifactId>
<version>${aksw-commons.version}</version>
</dependency>

<dependency>
<groupId>org.aksw.commons</groupId>
<artifactId>aksw-commons-io-utils</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions sansa-hadoop-jena/pom.xml
Expand Up @@ -50,6 +50,12 @@
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>

<!-- Aggregator framework -->
<dependency>
<groupId>org.aksw.commons</groupId>
<artifactId>aksw-commons-collectors</artifactId>
</dependency>

<dependency>
<groupId>org.aksw.jena-sparql-api</groupId>
<artifactId>jena-sparql-api-rx</artifactId>
Expand Down
@@ -0,0 +1,41 @@
package net.sansa_stack.hadoop.generic;

import org.aksw.jena_sparql_api.mapper.Acc;

public interface Accumulating<T, G, A, U> {
G classify(T item);
A createAccumulator(G groupKey);
void accumulate(A accumulator, T item);
U accumulatedValue(A accumulator);


/**
* Identity accumulator - turns each item into a group that contains only the item and whose value is the item
*
* @param <T>
* @return
*/
static <T> Accumulating identity() {
return new Accumulating<T, T, T, T>() {
@Override
public T classify(T item) {
return item;
}

@Override
public T createAccumulator(T groupKey) {
return groupKey;
}

@Override
public void accumulate(T accumulator, T item) {
/* nothing to do */
}

@Override
public T accumulatedValue(T accumulator) {
return accumulator;
}
};
}
}
Expand Up @@ -3,6 +3,7 @@
import com.google.common.base.StandardSystemProperty;
import io.reactivex.rxjava3.core.Flowable;
import net.sansa_stack.hadoop.util.*;
import org.aksw.commons.rx.op.FlowableOperatorSequentialGroupBy;
import org.aksw.jena_sparql_api.io.binseach.BufferFromInputStream;
import org.aksw.jena_sparql_api.io.binseach.CharSequenceFromSeekable;
import org.aksw.jena_sparql_api.io.binseach.Seekable;
Expand Down Expand Up @@ -86,7 +87,7 @@
* @author Claus Stadler
* @author Lorenz Buehmann
*/
public abstract class RecordReaderGenericBase<T>
public abstract class RecordReaderGenericBase<U, G, A, T>
extends RecordReader<LongWritable, T> // TODO use CombineFileInputFormat?
{
private static final Logger logger = LoggerFactory.getLogger(RecordReaderGenericBase.class);
Expand All @@ -104,6 +105,7 @@ public abstract class RecordReaderGenericBase<T>
*/
protected final Pattern recordStartPattern;

protected final Accumulating<U, G, A, T> accumulating;

protected String baseIri;
protected long maxRecordLength;
Expand Down Expand Up @@ -139,13 +141,15 @@ public RecordReaderGenericBase(
String probeRecordCountKey,
Pattern recordStartPattern,
String baseIriKey,
String headerBytesKey) {
String headerBytesKey,
Accumulating<U, G, A, T> accumulating) {
this.minRecordLengthKey = minRecordLengthKey;
this.maxRecordLengthKey = maxRecordLengthKey;
this.probeRecordCountKey = probeRecordCountKey;
this.recordStartPattern = recordStartPattern;
this.baseIriKey = baseIriKey;
this.headerBytesKey = headerBytesKey;
this.accumulating = accumulating;
}

/**
Expand Down Expand Up @@ -218,15 +222,15 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws
}
}

public void initDatasetFlow() throws IOException {
public void initRecordFlow() throws IOException {


// val sw = Stopwatch.createStarted()
// val (arr, extraLength) = readToBuffer(stream, isEncoded, splitStart, splitEnd, desiredExtraBytes)

// println("TRIGREADER READ " + arr.length + " bytes (including " + desiredExtraBytes + " extra) in " + sw.elapsed(TimeUnit.MILLISECONDS) + " ms")

Flowable<T> tmp = createDatasetFlow();
Flowable<T> tmp = createRecordFlow();
datasetFlow = tmp.blockingIterable().iterator();
}

Expand All @@ -241,8 +245,7 @@ public void initDatasetFlow() throws IOException {
* Taken streams should be closed by the client code.
* @return
*/
protected abstract Flowable<T> parse(Callable<InputStream> inputStreamSupplier);

protected abstract Flowable<U> parse(Callable<InputStream> inputStreamSupplier);

/**
* Seek to a given offset and prepare to read up to the 'end' position (exclusive)
Expand Down Expand Up @@ -335,7 +338,116 @@ public static void logUnexpectedClose(String str) {
throw new RuntimeException("Unexpected close");
}

protected Flowable<T> createDatasetFlow() throws IOException {
/**
* Flowable transform that skips the first n groups of a flowable
*
* @param skipGroupCount
* @param classifier
* @param <I>
* @param <O>
* @param <K>
* @return
*/
/*
public static <I, O, K> FlowableTransformer<I, O> skipGroups(long skipGroupCount, Function<? super I, ? extends K> classifier) {
return upstream -> {
long[] numGroupsSkipped = {0};
boolean[] firstItemSeen = {false};
Object[] lastSeenKey = {null};
upstream.filter(item -> {
boolean r;
if (numGroupsSkipped[0] >= skipGroupCount) {
r = true;
} else {
Object key = classifier.apply(item);
if (!firstItemSeen[0]) {
lastSeenKey[0] = key;
firstItemSeen[0] = true;
} else {
boolean isEqual = Objects.equals(lastSeenKey, key);
if (!isEqual) {
++numGroupsSkipped[0];
lastSeenKey[0] = key;
}
}
r = numGroupsSkipped[0] >= skipGroupCount;
}
return r;
});
};
}
public static <I, O, K> FlowableTransformer<I, O> takeGroups(long takeGroupCount, Function<? super I, ? extends K> classifier) {
return upstream -> {
long[] numGroupsTaken = {0};
boolean[] firstItemSeen = {false};
Object[] lastSeenKey = {null};
upstream.filter(item -> {
boolean r;
if (numGroupsTaken[0] >= takeGroupCount) {
r = false;
} else {
Object key = classifier.apply(item);
if (!firstItemSeen[0]) {
lastSeenKey[0] = key;
firstItemSeen[0] = true;
} else {
boolean isEqual = Objects.equals(lastSeenKey, key);
if (!isEqual) {
++numGroupsTaken[0];
lastSeenKey[0] = key;
}
}
r = numGroupsSkipped[0] >= skipGroupCount;
}
return r;
});
};
}
*/

/**
* Modify a flow to perform aggregation of items
* into records according to specification
* The complex part here is to correctly combine the two flows:
* - The first group of the splitAggregateFlow needs to be skipped as this in handled by the previous split's processor
* - If there are no further groups in splitFlow then no items are emitted at all (because all items belong to s previous split)
* - ONLY if the splitFlow owned at least one group: The first group in the tailFlow needs to be emitted
*
* @param isFirstSplit If true then the first record is included in the output; otherwise it is skipped
* @param splitFlow The flow of items obtained from the split
* @param tailItems The first set of group items after in the next split
* @return
*/
protected Flowable<T> aggregate(boolean isFirstSplit, Flowable<U> splitFlow, List<U> tailItems) {
// We need to be careful to not return null as a flow item:
// FlowableOperatorSequentialGroupBy returns a stream of (key, accumulator) pairs
// Returning a null accumulator is ok as long it resides within the pair

Flowable<T> result = splitFlow
.concatWith(Flowable.fromIterable(tailItems))
.lift(FlowableOperatorSequentialGroupBy.create(
accumulating::classify,
(accNum, groupKey) -> !isFirstSplit && accNum == 0
? null
: accumulating.createAccumulator(groupKey),
accumulating::accumulate))
.compose(flowable -> isFirstSplit ? flowable : flowable.skip(1))
.map(e -> accumulating.accumulatedValue(e.getValue()));

List<T> tmp = result.toList().blockingGet();
result = Flowable.fromIterable(tmp);
return result;
}

protected Flowable<T> createRecordFlow() throws IOException {

String splitName = split.getPath().getName();
String splitId = splitName + ":" + splitStart + "+" + splitLength;

// System.err.println(s"Processing split $absSplitStart: $splitStart - $splitEnd | --+$actualExtraBytes--> $dataRegionEnd")

Expand All @@ -358,10 +470,10 @@ protected Flowable<T> createDatasetFlow() throws IOException {

// Predicate<T> isNonEmptyDataset = t -> !isEmptyRecord(t);

Function<Seekable, Flowable<T>> parserHelper = seekable -> {
Function<Seekable, Flowable<U>> parseFromSeekable = seekable -> {
Callable<InputStream> inSupp = () -> effectiveInputStreamSupp.apply(seekable);

Flowable<T> r = parse(inSupp);
Flowable<U> r = parse(inSupp);
return r;
};

Expand All @@ -374,7 +486,7 @@ protected Flowable<T> createDatasetFlow() throws IOException {
}

// printSeekable(seekable)
long recordCount = parserHelper.apply(seekable)
long recordCount = parseFromSeekable.apply(seekable)
// .doOnError(t -> t.printStackTrace())
.take(probeRecordCount)
.count()
Expand Down Expand Up @@ -428,6 +540,19 @@ protected Flowable<T> createDatasetFlow() throws IOException {
// starts anymore
long tailBytes = tmp < 0 ? tailBuffer.getKnownDataSize() : Ints.checkedCast(tmp);


// Now that we found an offset in the tail region, read out one more complete list of items that belongs to one group
// Note, that these items may need to be aggregated with items from the current split - that's why we retain them as a list
tailNav.setPos(tailBytes);
List<U> tailItems = parseFromSeekable.apply(tailNav)
.lift(FlowableOperatorSequentialGroupBy.create(accumulating::classify, () -> (List<U>)new ArrayList(), Collection::add))
.map(Map.Entry::getValue)
.first(Collections.emptyList())
// .firstElement()
.blockingGet();

logger.info(String.format("In split %s got %d tail items", splitId, tailItems.size()));

// Set the stream to the start of the split and get the head buffer
// Note that we will use the stream in its state to read the body part
long knownDecodedDataLength[] = new long[]{-1};
Expand Down Expand Up @@ -499,7 +624,8 @@ protected Flowable<T> createDatasetFlow() throws IOException {
BufferFromInputStream headBuffer = BufferFromInputStream.create(new BoundedInputStream(splitBoundedHeadStream, desiredExtraBytes), 1024 * 1024);
Seekable headNav = headBuffer.newChannel();

int headBytes = splitStart == 0
boolean isFirstSplit = splitStart == 0;
int headBytes = isFirstSplit
? 0
: Ints.checkedCast(skipToNthRecord(skipRecordCount, headNav, 0, 0, maxRecordLength, desiredExtraBytes, posValidator, prober));

Expand Down Expand Up @@ -617,7 +743,7 @@ protected Flowable<T> createDatasetFlow() throws IOException {
boolean writeOutSegments = false;

if (writeOutSegments) {
String splitName = split.getPath().getName();
// String splitName = split.getPath().getName();

Path basePath = Paths.get(StandardSystemProperty.JAVA_IO_TMPDIR.value()).toAbsolutePath();

Expand Down Expand Up @@ -647,7 +773,7 @@ protected Flowable<T> createDatasetFlow() throws IOException {

Flowable<T> result = null;
if (headBytes >= 0) {
result = parse(() -> fullStream);
result = aggregate(isFirstSplit, parse(() -> fullStream), tailItems);

// val parseLength = effectiveRecordRangeEnd - effectiveRecordRangeStart
// nav.setPos(effectiveRecordRangeStart - splitStart)
Expand Down Expand Up @@ -824,10 +950,9 @@ public static long findFirstPositionWithProbeSuccess(
break;
}

// Artificially create errors
// absPos += 5;
// Set the seekable to the found position ...
seekable.setPos(absPos);

// .. which may reveal that we have actually reached the end
isEndReached = seekable.isPosAfterEnd();

if (!isEndReached) {
Expand All @@ -852,7 +977,7 @@ public static long findFirstPositionWithProbeSuccess(
@Override
public boolean nextKeyValue() throws IOException {
if (datasetFlow == null) {
initDatasetFlow();
initRecordFlow();
}

boolean result;
Expand Down
@@ -1,12 +1,13 @@
package net.sansa_stack.hadoop.jena.rdf.base;

import net.sansa_stack.hadoop.generic.Accumulating;
import net.sansa_stack.hadoop.generic.RecordReaderGenericBase;
import org.apache.jena.riot.Lang;

import java.util.regex.Pattern;

public abstract class RecordReaderGenericRdfBase<T>
extends RecordReaderGenericBase<T>
public abstract class RecordReaderGenericRdfBase<U, G, A, T>
extends RecordReaderGenericBase<U, G, A, T>
{
protected Lang lang;

Expand All @@ -18,13 +19,15 @@ public RecordReaderGenericRdfBase(
String probeRecordCountKey,
String prefixesMaxLengthKey,
Pattern recordSearchPattern,
Lang lang) {
Lang lang,
Accumulating<U, G, A, T> accumulating) {
super(minRecordLengthKey,
maxRecordLengthKey,
probeRecordCountKey,
recordSearchPattern,
FileInputFormatRdfBase.BASE_IRI_KEY,
FileInputFormatRdfBase.PREFIXES_KEY);
FileInputFormatRdfBase.PREFIXES_KEY,
accumulating);
this.lang = lang;
this.prefixesMaxLengthKey = prefixesMaxLengthKey;
}
Expand Down

0 comments on commit 42ebefa

Please sign in to comment.