Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
frensjan committed Oct 31, 2023
1 parent d013711 commit 4d9fad5
Show file tree
Hide file tree
Showing 18 changed files with 494 additions and 394 deletions.
4 changes: 4 additions & 0 deletions core/common/iterator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package org.eclipse.rdf4j.common.iteration;

import reactor.core.publisher.Flux;

import java.util.Iterator;
import java.util.stream.Stream;

Expand All @@ -33,6 +35,10 @@
*/
public interface CloseableIteration<E> extends Iterator<E>, AutoCloseable {

default Flux<E> flux() {
return Iterations.flux(this);
}

/**
* Convert the results to a Java 8 Stream.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

package org.eclipse.rdf4j.common.iteration;

import org.eclipse.rdf4j.common.reactor.FluxIteration;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -82,20 +85,24 @@ public static <E, C extends Collection<E>> C addAll(CloseableIteration<? extends
* @return a sequential {@link Stream} object which can be used to process the data from the source iteration.
*/
public static <T> Stream<T> stream(CloseableIteration<T> iteration) {
return StreamSupport
.stream(new CloseableIterationSpliterator<>(iteration), false)
.onClose(() -> {
try {
iteration.close();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
if (iteration instanceof FluxIteration) {
return iteration.flux().toStream();
} else {
return StreamSupport
.stream(new CloseableIterationSpliterator<>(iteration), false)
.onClose(() -> {
try {
iteration.close();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new RuntimeException(e);
}
throw new RuntimeException(e);
}
});
});
}
}

/**
Expand Down Expand Up @@ -155,4 +162,12 @@ public static <E> Set<E> asSet(CloseableIteration<? extends E> iteration,
}
}

public static <T> Flux<T> flux(CloseableIteration<T> iter) {
if (iter instanceof FluxIteration) {
return iter.flux();
} else {
return Flux.fromIterable(() -> iter)
.doOnTerminate(iter::close);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.eclipse.rdf4j.common.reactor;

import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration;
import reactor.core.publisher.Flux;

import java.util.Iterator;
import java.util.NoSuchElementException;

/**
* Wraps a {@link Flux} reactive stream into a {@link org.eclipse.rdf4j.common.iteration.CloseableIteration}.
*
* @param <T> The type of elements iterated over.
* @param <X> The exception type that can be thrown from {@link #hasNext()}, {@link #next()} and other methods.
*/
@SuppressWarnings("deprecation")
public class CloseableFluxIteration<T> extends AbstractCloseableIteration<T> implements FluxIteration<T> {

private final Flux<T> flux;
private Iterator<T> iter;

/**
* Creates a new IterationWrapper that operates on the supplied Iteration.
*
* @param flux The wrapped Iteration for this <var>IterationWrapper</var>, must not be <var>null</var>.
*/
public CloseableFluxIteration(Flux<T> flux) {
this.flux = flux;
}

@Override
public Flux<T> flux() {
return flux;
}

@Override
public boolean hasNext() {
if (iter == null) {
iter = flux.toIterable().iterator();
}

return iter.hasNext();
}

@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return iter.next();
}

@Override
protected void handleClose() {
// TODO interrupt any thread blocking on the iterator from the flux
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.eclipse.rdf4j.common.reactor;

import reactor.core.publisher.Flux;

public interface FluxIteration<T> {

Flux<T> flux();

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteratorIteration;
import org.eclipse.rdf4j.common.iteration.IterationWrapper;
import org.eclipse.rdf4j.common.reactor.FluxIteration;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.TupleQueryResult;
import reactor.core.publisher.Flux;

/**
* An iterating implementation of the {@link TupleQueryResult} interface.
*/
public class IteratingTupleQueryResult extends IterationWrapper<BindingSet>
implements TupleQueryResult {
implements TupleQueryResult, FluxIteration<BindingSet> {

/*-----------*
* Variables *
Expand Down Expand Up @@ -71,6 +73,11 @@ public IteratingTupleQueryResult(List<String> bindingNames,
* Methods *
*---------*/

@Override
public Flux<BindingSet> flux() {
return (Flux<BindingSet>) wrappedIter.flux();
}

@Override
public List<String> getBindingNames() throws QueryEvaluationException {
return bindingNames;
Expand Down
4 changes: 4 additions & 0 deletions core/queryalgebra/evaluation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.eclipse.rdf4j.query.algebra.evaluation;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.reactor.CloseableFluxIteration;
import org.eclipse.rdf4j.query.BindingSet;
import reactor.core.publisher.Flux;

public interface ReactiveQueryEvaluationStep extends QueryEvaluationStep {

Flux<BindingSet> transform(Flux<BindingSet> input);

default Flux<BindingSet> transform(BindingSet input) {
return transform(Flux.just(input));
}

@Override
default CloseableIteration<BindingSet> evaluate(BindingSet bindings) {
Flux<BindingSet> input = Flux.just(bindings);
Flux<BindingSet> output = transform(input);
return new CloseableFluxIteration<>(output);
}

static ReactiveQueryEvaluationStep from(QueryEvaluationStep step) {
if (step instanceof ReactiveQueryEvaluationStep) {
return (ReactiveQueryEvaluationStep) step;
} else {
return input -> input.flatMapSequential(
bindings -> step.evaluate(bindings).flux(),
1,1
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.eclipse.rdf4j.query.algebra.evaluation;

import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import reactor.core.publisher.Flux;

public interface ReactiveTripleSource {

Flux<? extends Statement> getStatements(Resource subj, IRI pred, Value obj, Resource... contexts)
throws QueryEvaluationException;

static ReactiveTripleSource from(TripleSource source) {
return (subj, pred, obj, contexts) -> source.getStatements(subj, pred, obj, contexts).flux();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,61 @@
*******************************************************************************/
package org.eclipse.rdf4j.query.algebra.evaluation.impl.evaluationsteps;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.Iterations;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.algebra.Join;
import org.eclipse.rdf4j.query.algebra.Service;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryEvaluationStep;
import org.eclipse.rdf4j.query.algebra.evaluation.ReactiveQueryEvaluationStep;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.ServiceJoinIterator;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.HashJoinIteration;
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.JoinIterator;
import org.eclipse.rdf4j.query.algebra.helpers.TupleExprs;
import reactor.core.publisher.Flux;

public class JoinQueryEvaluationStep implements QueryEvaluationStep {

private final java.util.function.Function<BindingSet, CloseableIteration<BindingSet>> eval;

public JoinQueryEvaluationStep(EvaluationStrategy strategy, Join join, QueryEvaluationContext context) {
// efficient computation of a SERVICE join using vectored evaluation
// TODO maybe we can create a ServiceJoin node already in the parser?
QueryEvaluationStep leftPrepared = strategy.precompile(join.getLeftArg(), context);
QueryEvaluationStep rightPrepared = strategy.precompile(join.getRightArg(), context);
if (join.getRightArg() instanceof Service) {
eval = bindings -> new ServiceJoinIterator(leftPrepared.evaluate(bindings),
(Service) join.getRightArg(), bindings,
strategy);
join.setAlgorithm(ServiceJoinIterator.class.getSimpleName());
} else if (isOutOfScopeForLeftArgBindings(join.getRightArg())) {
String[] joinAttributes = HashJoinIteration.hashJoinAttributeNames(join);
eval = bindings -> new HashJoinIteration(leftPrepared, rightPrepared, bindings, false,
joinAttributes, context);
join.setAlgorithm(HashJoinIteration.class.getSimpleName());
} else {
eval = bindings -> JoinIterator.getInstance(leftPrepared, rightPrepared, bindings);
join.setAlgorithm(JoinIterator.class.getSimpleName());
}
}

@Override
public CloseableIteration<BindingSet> evaluate(BindingSet bindings) {
return eval.apply(bindings);
}

private static boolean isOutOfScopeForLeftArgBindings(TupleExpr expr) {
return TupleExprs.isVariableScopeChange(expr) || TupleExprs.containsSubquery(expr);
}
public class JoinQueryEvaluationStep implements ReactiveQueryEvaluationStep {

private final java.util.function.Function<Flux<BindingSet>, Flux<BindingSet>> eval;

private final ReactiveQueryEvaluationStep leftPrepared;
private final ReactiveQueryEvaluationStep rightPrepared;

public JoinQueryEvaluationStep(EvaluationStrategy strategy, Join join, QueryEvaluationContext context) {
// efficient computation of a SERVICE join using vectored evaluation
// TODO maybe we can create a ServiceJoin node already in the parser?
leftPrepared = ReactiveQueryEvaluationStep.from(strategy.precompile(join.getLeftArg(), context));
rightPrepared = ReactiveQueryEvaluationStep.from(strategy.precompile(join.getRightArg(), context));
if (join.getRightArg() instanceof Service) {
eval = input -> input.flatMapSequential(bindings -> Iterations.flux(
new ServiceJoinIterator(leftPrepared.evaluate(bindings),
(Service) join.getRightArg(), bindings,
strategy)
));
join.setAlgorithm(ServiceJoinIterator.class.getSimpleName());
} else if (isOutOfScopeForLeftArgBindings(join.getRightArg())) {
String[] joinAttributes = HashJoinIteration.hashJoinAttributeNames(join);
eval = input -> input.flatMapSequential(bindings -> Iterations.flux(
new HashJoinIteration(leftPrepared, rightPrepared, bindings, false,
joinAttributes, context)
));
join.setAlgorithm(HashJoinIteration.class.getSimpleName());
} else {
eval = input -> leftPrepared.transform(input)
.flatMapSequential(rightPrepared::transform,
1,1);
join.setAlgorithm(JoinIterator.class.getSimpleName());
}
}

@Override
public Flux<BindingSet> transform(Flux<BindingSet> input) {
return eval.apply(input);
}

private static boolean isOutOfScopeForLeftArgBindings(TupleExpr expr) {
return TupleExprs.isVariableScopeChange(expr) || TupleExprs.containsSubquery(expr);
}

}
Loading

0 comments on commit 4d9fad5

Please sign in to comment.