From cd1615229abcf5f1673a027192085278cc848861 Mon Sep 17 00:00:00 2001 From: Jesse Hatfield Date: Thu, 21 Dec 2017 17:32:47 -0500 Subject: [PATCH] RYA-416 A new query node type to represent a MongoDB aggregation pipeline whose results can be converted to binding sets, and tools for optionally transforming some SPARQL expressions into such a node. --- dao/mongodb.rya/pom.xml | 5 + ...bstractMongoDBRdfConfigurationBuilder.java | 16 + .../rya/mongodb/MongoDBRdfConfiguration.java | 40 +- .../AggregationPipelineQueryNode.java | 856 ++++++++++++++++++ .../AggregationPipelineQueryOptimizer.java | 73 ++ .../aggregation/PipelineResultIteration.java | 135 +++ .../SparqlToPipelineTransformVisitor.java | 196 ++++ .../dao/SimpleMongoDBStorageStrategy.java | 21 +- .../AggregationPipelineQueryNodeTest.java | 331 +++++++ .../mongodb/aggregation/PipelineQueryIT.java | 421 +++++++++ .../PipelineResultIterationTest.java | 152 ++++ .../SparqlToPipelineTransformVisitorTest.java | 207 +++++ 12 files changed, 2446 insertions(+), 7 deletions(-) create mode 100644 dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java create mode 100644 dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java create mode 100644 dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java create mode 100644 dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java create mode 100644 dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNodeTest.java create mode 100644 dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java create mode 100644 dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineResultIterationTest.java create mode 100644 dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java diff --git a/dao/mongodb.rya/pom.xml b/dao/mongodb.rya/pom.xml index 0803aa8d1..0afac815f 100644 --- a/dao/mongodb.rya/pom.xml +++ b/dao/mongodb.rya/pom.xml @@ -86,5 +86,10 @@ Tests will fail with the following error when using 32bit JVM on either Linux or junit test + + org.mockito + mockito-all + test + diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java index bb14a39c1..369f7a0d8 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java @@ -43,6 +43,7 @@ public abstract class AbstractMongoDBRdfConfigurationBuilder> getOptimizers() { + List> optimizers = super.getOptimizers(); + if (getUseAggregationPipeline()) { + Class cl = AggregationPipelineQueryOptimizer.class; + @SuppressWarnings("unchecked") + Class optCl = (Class) cl; + optimizers.add(optCl); + } + return optimizers; + } +} diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java new file mode 100644 index 000000000..7a84f5def --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java @@ -0,0 +1,856 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.CONTEXT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.DOCUMENT_VISIBILITY; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_TYPE; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.STATEMENT_METADATA; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.function.Function; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.domain.StatementMetadata; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.mongodb.MongoDbRdfConstants; +import org.apache.rya.mongodb.dao.MongoDBStorageStrategy; +import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy; +import org.apache.rya.mongodb.document.operators.query.ConditionalOperators; +import org.apache.rya.mongodb.document.visibility.DocumentVisibilityAdapter; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.openrdf.model.Literal; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Compare; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.impl.ExternalSet; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.mongodb.BasicDBObject; +import com.mongodb.DBObject; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.BsonField; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Projections; + +import info.aduna.iteration.CloseableIteration; + +/** + * Represents a portion of a query tree as MongoDB aggregation pipeline. Should + * be built bottom-up: start with a statement pattern implemented as a $match + * step, then add steps to the pipeline to handle higher levels of the query + * tree. Methods are provided to add certain supported query operations to the + * end of the internal pipeline. In some cases, specific arguments may be + * unsupported, in which case the pipeline is unchanged and the method returns + * false. + */ +public class AggregationPipelineQueryNode extends ExternalSet { + /** + * An aggregation result corresponding to a solution should map this key + * to an object which itself maps variable names to variable values. + */ + static final String VALUES = ""; + + /** + * An aggregation result corresponding to a solution should map this key + * to an object which itself maps variable names to the corresponding hashes + * of their values. + */ + static final String HASHES = ""; + + /** + * An aggregation result corresponding to a solution should map this key + * to an object which itself maps variable names to their datatypes, if any. + */ + static final String TYPES = ""; + + private static final String LEVEL = "derivation_level"; + private static final String[] FIELDS = { VALUES, HASHES, TYPES, LEVEL, TIMESTAMP }; + + private static final String JOINED_TRIPLE = ""; + private static final String FIELDS_MATCH = ""; + + private static final MongoDBStorageStrategy strategy = new SimpleMongoDBStorageStrategy(); + + private static final Bson DEFAULT_TYPE = new Document("$literal", XMLSchema.ANYURI.stringValue()); + private static final Bson DEFAULT_CONTEXT = new Document("$literal", ""); + private static final Bson DEFAULT_DV = DocumentVisibilityAdapter.toDBObject(MongoDbRdfConstants.EMPTY_DV); + private static final Bson DEFAULT_METADATA = new Document("$literal", + StatementMetadata.EMPTY_METADATA.toString()); + + private static boolean isValidFieldName(String name) { + return !(name == null || name.contains(".") || name.contains("$") + || name.equals("_id")); + } + + /** + * For a given statement pattern, represents a mapping from query variables + * to their corresponding parts of matching triples. If necessary, also + * substitute variable names including invalid characters with temporary + * replacements, while producing a map back to the original names. + */ + private static class StatementVarMapping { + private final Map varToTripleValue = new HashMap<>(); + private final Map varToTripleHash = new HashMap<>(); + private final Map varToTripleType = new HashMap<>(); + private final BiMap varToOriginalName; + + String valueField(String varName) { + return varToTripleValue.get(varName); + } + String hashField(String varName) { + return varToTripleHash.get(varName); + } + String typeField(String varName) { + return varToTripleType.get(varName); + } + + Set varNames() { + return varToTripleValue.keySet(); + } + + private String replace(String original) { + if (varToOriginalName.containsValue(original)) { + return varToOriginalName.inverse().get(original); + } + else { + String replacement = "field-" + UUID.randomUUID(); + varToOriginalName.put(replacement, original); + return replacement; + } + } + + private String sanitize(String name) { + if (varToOriginalName.containsValue(name)) { + return varToOriginalName.inverse().get(name); + } + else if (name != null && !isValidFieldName(name)) { + return replace(name); + } + return name; + } + + StatementVarMapping(StatementPattern sp, BiMap varToOriginalName) { + this.varToOriginalName = varToOriginalName; + if (sp.getSubjectVar() != null && !sp.getSubjectVar().hasValue()) { + String name = sanitize(sp.getSubjectVar().getName()); + varToTripleValue.put(name, SUBJECT); + varToTripleHash.put(name, SUBJECT_HASH); + } + if (sp.getPredicateVar() != null && !sp.getPredicateVar().hasValue()) { + String name = sanitize(sp.getPredicateVar().getName()); + varToTripleValue.put(name, PREDICATE); + varToTripleHash.put(name, PREDICATE_HASH); + } + if (sp.getObjectVar() != null && !sp.getObjectVar().hasValue()) { + String name = sanitize(sp.getObjectVar().getName()); + varToTripleValue.put(name, OBJECT); + varToTripleHash.put(name, OBJECT_HASH); + varToTripleType.put(name, OBJECT_TYPE); + } + if (sp.getContextVar() != null && !sp.getContextVar().hasValue()) { + String name = sanitize(sp.getContextVar().getName()); + varToTripleValue.put(name, CONTEXT); + } + } + + Bson getProjectExpression() { + return getProjectExpression(new LinkedList<>(), str -> "$" + str); + } + + Bson getProjectExpression(Iterable alsoInclude, + Function getFieldExpr) { + Document values = new Document(); + Document hashes = new Document(); + Document types = new Document(); + for (String varName : varNames()) { + values.append(varName, getFieldExpr.apply(valueField(varName))); + if (varToTripleHash.containsKey(varName)) { + hashes.append(varName, getFieldExpr.apply(hashField(varName))); + } + if (varToTripleType.containsKey(varName)) { + types.append(varName, getFieldExpr.apply(typeField(varName))); + } + } + for (String varName : alsoInclude) { + values.append(varName, 1); + hashes.append(varName, 1); + types.append(varName, 1); + } + List fields = new LinkedList<>(); + fields.add(Projections.excludeId()); + fields.add(Projections.computed(VALUES, values)); + fields.add(Projections.computed(HASHES, hashes)); + if (!types.isEmpty()) { + fields.add(Projections.computed(TYPES, types)); + } + fields.add(Projections.computed(LEVEL, new Document("$max", + Arrays.asList("$" + LEVEL, getFieldExpr.apply(LEVEL), 0)))); + fields.add(Projections.computed(TIMESTAMP, new Document("$max", + Arrays.asList("$" + TIMESTAMP, getFieldExpr.apply(TIMESTAMP), 0)))); + return Projections.fields(fields); + } + } + + /** + * Given a StatementPattern, generate an object representing the arguments + * to a "$match" command that will find matching triples. + * @param sp The StatementPattern to search for + * @param path If given, specify the field that should be matched against + * the statement pattern, using an ordered list of field names for a nested + * field. E.g. to match records { "x": { "y": 0) { + StringBuilder sb = new StringBuilder(); + for (String str : path) { + sb.append(str).append("."); + } + String prefix = sb.toString(); + Set originalKeys = new HashSet<>(obj.keySet()); + originalKeys.forEach(key -> { + Object value = obj.removeField(key); + obj.put(prefix + key, value); + }); + } + return (BasicDBObject) obj; + } + + private static String valueFieldExpr(String varName) { + return "$" + VALUES + "." + varName; + } + private static String hashFieldExpr(String varName) { + return "$" + HASHES + "." + varName; + } + private static String typeFieldExpr(String varName) { + return "$" + TYPES + "." + varName; + } + private static String joinFieldExpr(String triplePart) { + return "$" + JOINED_TRIPLE + "." + triplePart; + } + + /** + * Get an object representing the value field of some value expression, or + * return null if the expression isn't supported. + */ + private Object valueFieldExpr(ValueExpr expr) { + if (expr instanceof Var) { + return valueFieldExpr(((Var) expr).getName()); + } + else if (expr instanceof ValueConstant) { + return new Document("$literal", ((ValueConstant) expr).getValue().stringValue()); + } + else { + return null; + } + } + + private final List pipeline; + private final MongoCollection collection; + private final Set assuredBindingNames; + private final Set bindingNames; + private final BiMap varToOriginalName; + + private String replace(String original) { + if (varToOriginalName.containsValue(original)) { + return varToOriginalName.inverse().get(original); + } + else { + String replacement = "field-" + UUID.randomUUID(); + varToOriginalName.put(replacement, original); + return replacement; + } + } + + /** + * Create a pipeline query node based on a StatementPattern. + * @param collection The collection of triples to query. + * @param baseSP The leaf node in the query tree. + */ + public AggregationPipelineQueryNode(MongoCollection collection, StatementPattern baseSP) { + this.collection = Preconditions.checkNotNull(collection); + Preconditions.checkNotNull(baseSP); + this.varToOriginalName = HashBiMap.create(); + StatementVarMapping mapping = new StatementVarMapping(baseSP, varToOriginalName); + this.assuredBindingNames = new HashSet<>(mapping.varNames()); + this.bindingNames = new HashSet<>(mapping.varNames()); + this.pipeline = new LinkedList<>(); + this.pipeline.add(Aggregates.match(getMatchExpression(baseSP))); + this.pipeline.add(Aggregates.project(mapping.getProjectExpression())); + } + + AggregationPipelineQueryNode(MongoCollection collection, + List pipeline, Set assuredBindingNames, + Set bindingNames, BiMap varToOriginalName) { + this.collection = Preconditions.checkNotNull(collection); + this.pipeline = Preconditions.checkNotNull(pipeline); + this.assuredBindingNames = Preconditions.checkNotNull(assuredBindingNames); + this.bindingNames = Preconditions.checkNotNull(bindingNames); + this.varToOriginalName = Preconditions.checkNotNull(varToOriginalName); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o instanceof AggregationPipelineQueryNode) { + AggregationPipelineQueryNode other = (AggregationPipelineQueryNode) o; + if (this.collection.equals(other.collection) + && this.assuredBindingNames.equals(other.assuredBindingNames) + && this.bindingNames.equals(other.bindingNames) + && this.varToOriginalName.equals(other.varToOriginalName) + && this.pipeline.size() == other.pipeline.size()) { + // Check pipeline steps for equality -- underlying types don't + // have well-behaved equals methods, so check for equivalent + // string representations. + for (int i = 0; i < this.pipeline.size(); i++) { + Bson doc1 = this.pipeline.get(i); + Bson doc2 = other.pipeline.get(i); + if (!doc1.toString().equals(doc2.toString())) { + return false; + } + } + return true; + } + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(collection, pipeline, assuredBindingNames, + bindingNames, varToOriginalName); + } + + @Override + public CloseableIteration evaluate(BindingSet bindings) + throws QueryEvaluationException { + return new PipelineResultIteration(collection.aggregate(pipeline), varToOriginalName, bindings); + } + + @Override + public Set getAssuredBindingNames() { + Set names = new HashSet<>(); + for (String name : assuredBindingNames) { + names.add(varToOriginalName.getOrDefault(name, name)); + } + return names; + } + + @Override + public Set getBindingNames() { + Set names = new HashSet<>(); + for (String name : bindingNames) { + names.add(varToOriginalName.getOrDefault(name, name)); + } + return names; + } + + @Override + public AggregationPipelineQueryNode clone() { + return new AggregationPipelineQueryNode(collection, + new LinkedList<>(pipeline), + new HashSet<>(assuredBindingNames), + new HashSet<>(bindingNames), + HashBiMap.create(varToOriginalName)); + } + + @Override + public String getSignature() { + super.getSignature(); + Set assured = getAssuredBindingNames(); + Set any = getBindingNames(); + StringBuilder sb = new StringBuilder("AggregationPipelineQueryNode (binds: "); + sb.append(String.join(", ", assured)); + if (any.size() > assured.size()) { + Set optionalBindingNames = any; + optionalBindingNames.removeAll(assured); + sb.append(" [") + .append(String.join(", ", optionalBindingNames)) + .append("]"); + } + sb.append(")\n"); + for (Bson doc : pipeline) { + sb.append(doc.toString()).append("\n"); + } + return sb.toString(); + } + + /** + * Get the internal list of aggregation pipeline steps. Note that documents + * resulting from this pipeline will be structured using an internal + * intermediate representation. For documents representing triples, see + * {@link #getTriplePipeline}, and for query solutions, see + * {@link #evaluate}. + * @return The current internal pipeline. + */ + List getPipeline() { + return pipeline; + } + + /** + * Add a join with an individual {@link StatementPattern} to the pipeline. + * @param sp The statement pattern to join with + * @return true if the join was successfully added to the pipeline. + */ + public boolean joinWith(StatementPattern sp) { + Preconditions.checkNotNull(sp); + // 1. Determine shared variables and new variables + StatementVarMapping spMap = new StatementVarMapping(sp, varToOriginalName); + NavigableSet sharedVars = new ConcurrentSkipListSet<>(spMap.varNames()); + sharedVars.retainAll(assuredBindingNames); + // 2. Join on one shared variable + String joinKey = sharedVars.pollFirst(); + String collectionName = collection.getNamespace().getCollectionName(); + Bson join; + if (joinKey == null) { + return false; + } + else { + join = Aggregates.lookup(collectionName, + HASHES + "." + joinKey, + spMap.hashField(joinKey), + JOINED_TRIPLE); + } + pipeline.add(join); + // 3. Unwind the joined triples so each document represents a binding + // set (solution) from the base branch and a triple that may match. + pipeline.add(Aggregates.unwind("$" + JOINED_TRIPLE)); + // 4. (Optional) If there are any shared variables that weren't used as + // the join key, project all existing fields plus a new field that + // tests the equality of those shared variables. + BasicDBObject matchOpts = getMatchExpression(sp, JOINED_TRIPLE); + if (!sharedVars.isEmpty()) { + List eqTests = new LinkedList<>(); + for (String varName : sharedVars) { + String oldField = valueFieldExpr(varName); + String newField = joinFieldExpr(spMap.valueField(varName)); + Bson eqTest = new Document("$eq", Arrays.asList(oldField, newField)); + eqTests.add(eqTest); + } + Bson eqProjectOpts = Projections.fields( + Projections.computed(FIELDS_MATCH, Filters.and(eqTests)), + Projections.include(JOINED_TRIPLE, VALUES, HASHES, TYPES, LEVEL, TIMESTAMP)); + pipeline.add(Aggregates.project(eqProjectOpts)); + matchOpts.put(FIELDS_MATCH, true); + } + // 5. Filter for solutions whose triples match the joined statement + // pattern, and, if applicable, whose additional shared variables + // match the current solution. + pipeline.add(Aggregates.match(matchOpts)); + // 6. Project the results to include variables from the new SP (with + // appropriate renaming) and variables referenced only in the base + // pipeline (with previous names). + Bson finalProjectOpts = new StatementVarMapping(sp, varToOriginalName) + .getProjectExpression(assuredBindingNames, + str -> joinFieldExpr(str)); + assuredBindingNames.addAll(spMap.varNames()); + bindingNames.addAll(spMap.varNames()); + pipeline.add(Aggregates.project(finalProjectOpts)); + return true; + } + + /** + * Add a SPARQL projection or multi-projection operation to the pipeline. + * The number of documents produced by the pipeline after this operation + * will be the number of documents entering this stage (the number of + * intermediate results) multiplied by the number of + * {@link ProjectionElemList}s supplied here. + * @param projections One or more projections, i.e. mappings from the result + * at this stage of the query into a set of variables. + * @return true if the projection(s) were added to the pipeline. + */ + public boolean project(Iterable projections) { + if (projections == null || !projections.iterator().hasNext()) { + return false; + } + List projectOpts = new LinkedList<>(); + Set bindingNamesUnion = new HashSet<>(); + Set bindingNamesIntersection = null; + for (ProjectionElemList projection : projections) { + Document valueDoc = new Document(); + Document hashDoc = new Document(); + Document typeDoc = new Document(); + Set projectionBindingNames = new HashSet<>(); + for (ProjectionElem elem : projection.getElements()) { + String to = elem.getTargetName(); + // If the 'to' name is invalid, replace it internally + if (!isValidFieldName(to)) { + to = replace(to); + } + String from = elem.getSourceName(); + // If the 'from' name is invalid, use the internal substitute + if (varToOriginalName.containsValue(from)) { + from = varToOriginalName.inverse().get(from); + } + projectionBindingNames.add(to); + if (to.equals(from)) { + valueDoc.append(to, 1); + hashDoc.append(to, 1); + typeDoc.append(to, 1); + } + else { + valueDoc.append(to, valueFieldExpr(from)); + hashDoc.append(to, hashFieldExpr(from)); + typeDoc.append(to, typeFieldExpr(from)); + } + } + bindingNamesUnion.addAll(projectionBindingNames); + if (bindingNamesIntersection == null) { + bindingNamesIntersection = new HashSet<>(projectionBindingNames); + } + else { + bindingNamesIntersection.retainAll(projectionBindingNames); + } + projectOpts.add(new Document() + .append(VALUES, valueDoc) + .append(HASHES, hashDoc) + .append(TYPES, typeDoc) + .append(LEVEL, "$" + LEVEL) + .append(TIMESTAMP, "$" + TIMESTAMP)); + } + if (projectOpts.size() == 1) { + pipeline.add(Aggregates.project(projectOpts.get(0))); + } + else { + String listKey = "PROJECTIONS"; + Bson projectIndividual = Projections.fields( + Projections.computed(VALUES, "$" + listKey + "." + VALUES), + Projections.computed(HASHES, "$" + listKey + "." + HASHES), + Projections.computed(TYPES, "$" + listKey + "." + TYPES), + Projections.include(LEVEL), + Projections.include(TIMESTAMP)); + pipeline.add(Aggregates.project(Projections.computed(listKey, projectOpts))); + pipeline.add(Aggregates.unwind("$" + listKey)); + pipeline.add(Aggregates.project(projectIndividual)); + } + assuredBindingNames.clear(); + bindingNames.clear(); + assuredBindingNames.addAll(bindingNamesIntersection); + bindingNames.addAll(bindingNamesUnion); + return true; + } + + /** + * Add a SPARQL extension to the pipeline, if possible. An extension adds + * some number of variables to the result. Adds a "$project" step to the + * pipeline, but differs from the SPARQL project operation in that + * 1) pre-existing variables are always kept, and 2) values of new variables + * are defined by expressions, which may be more complex than simply + * variable names. Not all expressions are supported. If unsupported + * expression types are used in the extension, the pipeline will remain + * unchanged and this method will return false. + * @param extensionElements A list of new variables and their expressions + * @return True if the extension was successfully converted into a pipeline + * step, false otherwise. + */ + public boolean extend(Iterable extensionElements) { + List valueFields = new LinkedList<>(); + List hashFields = new LinkedList<>(); + List typeFields = new LinkedList<>(); + for (String varName : bindingNames) { + valueFields.add(Projections.include(varName)); + hashFields.add(Projections.include(varName)); + typeFields.add(Projections.include(varName)); + } + Set newVarNames = new HashSet<>(); + for (ExtensionElem elem : extensionElements) { + String name = elem.getName(); + if (!isValidFieldName(name)) { + // If the field name is invalid, replace it internally + name = replace(name); + } + // We can only handle certain kinds of value expressions; return + // failure for any others. + ValueExpr expr = elem.getExpr(); + final Object valueField; + final Object hashField; + final Object typeField; + if (expr instanceof Var) { + String varName = ((Var) expr).getName(); + valueField = "$" + varName; + hashField = "$" + varName; + typeField = "$" + varName; + } + else if (expr instanceof ValueConstant) { + Value val = ((ValueConstant) expr).getValue(); + valueField = new Document("$literal", val.stringValue()); + hashField = new Document("$literal", SimpleMongoDBStorageStrategy.hash(val.stringValue())); + if (val instanceof Literal) { + typeField = new Document("$literal", ((Literal) val).getDatatype().stringValue()); + } + else { + typeField = null; + } + } + else { + // if not understood, return failure + return false; + } + valueFields.add(Projections.computed(name, valueField)); + hashFields.add(Projections.computed(name, hashField)); + if (typeField != null) { + typeFields.add(Projections.computed(name, typeField)); + } + newVarNames.add(name); + } + assuredBindingNames.addAll(newVarNames); + bindingNames.addAll(newVarNames); + Bson projectOpts = Projections.fields( + Projections.computed(VALUES, Projections.fields(valueFields)), + Projections.computed(HASHES, Projections.fields(hashFields)), + Projections.computed(TYPES, Projections.fields(typeFields)), + Projections.include(LEVEL), + Projections.include(TIMESTAMP)); + pipeline.add(Aggregates.project(projectOpts)); + return true; + } + + /** + * Add a SPARQL filter to the pipeline, if possible. A filter eliminates + * results that don't satisfy a given condition. Not all conditional + * expressions are supported. If unsupported expressions are used in the + * filter, the pipeline will remain unchanged and this method will return + * false. Currently only supports binary {@link Compare} conditions among + * variables and/or literals. + * @param condition The filter condition + * @return True if the filter was successfully converted into a pipeline + * step, false otherwise. + */ + public boolean filter(ValueExpr condition) { + if (condition instanceof Compare) { + Compare compare = (Compare) condition; + Compare.CompareOp operator = compare.getOperator(); + Object leftArg = valueFieldExpr(compare.getLeftArg()); + Object rightArg = valueFieldExpr(compare.getRightArg()); + if (leftArg == null || rightArg == null) { + // unsupported value expression, can't convert filter + return false; + } + final String opFunc; + switch (operator) { + case EQ: + opFunc = "$eq"; + break; + case NE: + opFunc = "$ne"; + break; + case LT: + opFunc = "$lt"; + break; + case LE: + opFunc = "$le"; + break; + case GT: + opFunc = "$gt"; + break; + case GE: + opFunc = "$ge"; + break; + default: + // unrecognized comparison operator, can't convert filter + return false; + } + Document compareDoc = new Document(opFunc, Arrays.asList(leftArg, rightArg)); + pipeline.add(Aggregates.project(Projections.fields( + Projections.computed("FILTER", compareDoc), + Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP)))); + pipeline.add(Aggregates.match(new Document("FILTER", true))); + pipeline.add(Aggregates.project(Projections.fields( + Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP)))); + return true; + } + return false; + } + + /** + * Add a $group step to filter out redundant solutions. + * @return True if the distinct operation was successfully appended. + */ + public boolean distinct() { + List key = new LinkedList<>(); + for (String varName : bindingNames) { + key.add(hashFieldExpr(varName)); + } + List reduceOps = new LinkedList<>(); + for (String field : FIELDS) { + reduceOps.add(new BsonField(field, new Document("$first", "$" + field))); + } + pipeline.add(Aggregates.group(new Document("$concat", key), reduceOps)); + return true; + } + + /** + * Add a step to the end of the current pipeline which prunes the results + * according to the recorded derivation level of their sources. At least one + * triple that was used to construct the result must have a derivation level + * at least as high as the parameter, indicating that it was derived via + * that many steps from the original data. (A value of zero is equivalent to + * input data that was not derived at all.) Use in conjunction with + * getTriplePipeline (which sets source level for generated triples) to + * avoid repeatedly deriving the same results. + * @param requiredLevel Required derivation depth. Reject a solution to the + * query if all of the triples involved in producing that solution have a + * lower derivation depth than this. If zero, does nothing. + */ + public void requireSourceDerivationDepth(int requiredLevel) { + if (requiredLevel > 0) { + pipeline.add(Aggregates.match(new Document(LEVEL, + new Document("$gte", requiredLevel)))); + } + } + + /** + * Add a step to the end of the current pipeline which prunes the results + * according to the timestamps of their sources. At least one triple that + * was used to construct the result must have a timestamp at least as + * recent as the parameter. Use in iterative applications to avoid deriving + * solutions that would have been generated in an earlier iteration. + * @param t Minimum required timestamp. Reject a solution to the query if + * all of the triples involved in producing that solution have an earlier + * timestamp than this. + */ + public void requireSourceTimestamp(long t) { + pipeline.add(Aggregates.match(new Document(TIMESTAMP, + new Document("$gte", t)))); + } + + /** + * Given that the current state of the pipeline produces data that can be + * interpreted as triples, add a project step to map each result from the + * intermediate result structure to a structure that can be stored in the + * triple store. Does not modify the internal pipeline, which will still + * produce intermediate results suitable for query evaluation. + * @param timestamp Attach this timestamp to the resulting triples. + * @param requireNew If true, add an additional step to check constructed + * triples against existing triples and only include new ones in the + * result. Adds a potentially expensive $lookup step. + * @throws IllegalStateException if the results produced by the current + * pipeline do not have variable names allowing them to be interpreted as + * triples (i.e. "subject", "predicate", and "object"). + */ + public List getTriplePipeline(long timestamp, boolean requireNew) { + if (!assuredBindingNames.contains(SUBJECT) + || !assuredBindingNames.contains(PREDICATE) + || !assuredBindingNames.contains(OBJECT)) { + throw new IllegalStateException("Current pipeline does not produce " + + "records that can be converted into triples.\n" + + "Required variable names: <" + SUBJECT + ", " + PREDICATE + + ", " + OBJECT + ">\nCurrent variable names: " + + assuredBindingNames); + } + List triplePipeline = new LinkedList<>(pipeline); + List fields = new LinkedList<>(); + fields.add(Projections.computed(SUBJECT, valueFieldExpr(SUBJECT))); + fields.add(Projections.computed(SUBJECT_HASH, hashFieldExpr(SUBJECT))); + fields.add(Projections.computed(PREDICATE, valueFieldExpr(PREDICATE))); + fields.add(Projections.computed(PREDICATE_HASH, hashFieldExpr(PREDICATE))); + fields.add(Projections.computed(OBJECT, valueFieldExpr(OBJECT))); + fields.add(Projections.computed(OBJECT_HASH, hashFieldExpr(OBJECT))); + fields.add(Projections.computed(OBJECT_TYPE, + ConditionalOperators.ifNull(typeFieldExpr(OBJECT), DEFAULT_TYPE))); + fields.add(Projections.computed(CONTEXT, DEFAULT_CONTEXT)); + fields.add(Projections.computed(STATEMENT_METADATA, DEFAULT_METADATA)); + fields.add(DEFAULT_DV); + fields.add(Projections.computed(TIMESTAMP, new Document("$literal", timestamp))); + fields.add(Projections.computed(LEVEL, new Document("$add", Arrays.asList("$" + LEVEL, 1)))); + triplePipeline.add(Aggregates.project(Projections.fields(fields))); + if (requireNew) { + // Prune any triples that already exist in the data store + String collectionName = collection.getNamespace().getCollectionName(); + Bson includeAll = Projections.include(SUBJECT, SUBJECT_HASH, + PREDICATE, PREDICATE_HASH, OBJECT, OBJECT_HASH, + OBJECT_TYPE, CONTEXT, STATEMENT_METADATA, + DOCUMENT_VISIBILITY, TIMESTAMP, LEVEL); + List eqTests = new LinkedList<>(); + eqTests.add(new Document("$eq", Arrays.asList("$$this." + PREDICATE_HASH, "$" + PREDICATE_HASH))); + eqTests.add(new Document("$eq", Arrays.asList("$$this." + OBJECT_HASH, "$" + OBJECT_HASH))); + Bson redundantFilter = new Document("$filter", new Document("input", "$" + JOINED_TRIPLE) + .append("as", "this").append("cond", new Document("$and", eqTests))); + triplePipeline.add(Aggregates.lookup(collectionName, SUBJECT_HASH, + SUBJECT_HASH, JOINED_TRIPLE)); + String numRedundant = "REDUNDANT"; + triplePipeline.add(Aggregates.project(Projections.fields(includeAll, + Projections.computed(numRedundant, new Document("$size", redundantFilter))))); + triplePipeline.add(Aggregates.match(Filters.eq(numRedundant, 0))); + triplePipeline.add(Aggregates.project(Projections.fields(includeAll))); + } + return triplePipeline; + } +} diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java new file mode 100644 index 000000000..fb1f558ba --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; +import org.openrdf.query.BindingSet; +import org.openrdf.query.Dataset; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryOptimizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * MongoDB-specific query optimizer that replaces part or all of a SPARQL query + * tree with a MongoDB aggregation pipeline. + *

+ * Transforms query trees using {@link SparqlToPipelineTransformVisitor}. If + * possible, this visitor will replace portions of the query tree, or the entire + * query, with an equivalent aggregation pipeline (contained in an + * {@link AggregationPipelineQueryNode}), thereby allowing query logic to be + * evaluated by the MongoDB server rather than by the client. + */ +public class AggregationPipelineQueryOptimizer implements QueryOptimizer, Configurable { + private StatefulMongoDBRdfConfiguration conf; + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings) { + SparqlToPipelineTransformVisitor pipelineVisitor = new SparqlToPipelineTransformVisitor(conf); + try { + tupleExpr.visit(pipelineVisitor); + } catch (Exception e) { + logger.error("Error attempting to transform query using the aggregation pipeline", e); + } + } + + /** + * @throws IllegalArgumentException if conf is not a {@link StatefulMongoDBRdfConfiguration}. + */ + @Override + public void setConf(Configuration conf) { + Preconditions.checkNotNull(conf); + Preconditions.checkArgument(conf instanceof StatefulMongoDBRdfConfiguration, + "Expected an instance of %s; received %s", + StatefulMongoDBRdfConfiguration.class.getName(), conf.getClass().getName()); + this.conf = (StatefulMongoDBRdfConfiguration) conf; + } + + @Override + public StatefulMongoDBRdfConfiguration getConf() { + return conf; + } +} diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java new file mode 100644 index 000000000..c533efce6 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Map; + +import org.bson.Document; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.base.Preconditions; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCursor; + +import info.aduna.iteration.CloseableIteration; + +/** + * An iterator that converts the documents resulting from an + * {@link AggregationPipelineQueryNode} into {@link BindingSet}s. + */ +public class PipelineResultIteration implements CloseableIteration { + private static final int BATCH_SIZE = 1000; + private static final ValueFactory VF = ValueFactoryImpl.getInstance(); + + private final MongoCursor cursor; + private final Map varToOriginalName; + private final BindingSet bindings; + private BindingSet nextSolution = null; + + /** + * Constructor. + * @param aggIter Iterator of documents in AggregationPipelineQueryNode's + * intermediate solution representation. + * @param varToOriginalName A mapping from field names in the pipeline + * result documents to equivalent variable names in the original query. + * Where an entry does not exist for a field, the field name and variable + * name are assumed to be the same. + * @param bindings A partial solution. May be empty. + */ + public PipelineResultIteration(AggregateIterable aggIter, + Map varToOriginalName, + BindingSet bindings) { + this.varToOriginalName = Preconditions.checkNotNull(varToOriginalName); + this.bindings = Preconditions.checkNotNull(bindings); + Preconditions.checkNotNull(aggIter); + aggIter.batchSize(BATCH_SIZE); + this.cursor = aggIter.iterator(); + } + + private void lookahead() { + while (nextSolution == null && cursor.hasNext()) { + nextSolution = docToBindingSet(cursor.next()); + } + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + lookahead(); + return nextSolution != null; + } + + @Override + public BindingSet next() throws QueryEvaluationException { + lookahead(); + BindingSet solution = nextSolution; + nextSolution = null; + return solution; + } + + /** + * @throws UnsupportedOperationException always. + */ + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException("remove() undefined for query result iteration"); + } + + @Override + public void close() throws QueryEvaluationException { + cursor.close(); + } + + private QueryBindingSet docToBindingSet(Document result) { + QueryBindingSet bindingSet = new QueryBindingSet(bindings); + Document valueSet = result.get(AggregationPipelineQueryNode.VALUES, Document.class); + Document typeSet = result.get(AggregationPipelineQueryNode.TYPES, Document.class); + if (valueSet != null) { + for (Map.Entry entry : valueSet.entrySet()) { + String fieldName = entry.getKey(); + String valueString = entry.getValue().toString(); + String typeString = typeSet == null ? null : typeSet.getString(fieldName); + String varName = varToOriginalName.getOrDefault(fieldName, fieldName); + Value varValue; + if (typeString == null || typeString.equals(XMLSchema.ANYURI.stringValue())) { + varValue = VF.createURI(valueString); + } + else { + varValue = VF.createLiteral(valueString, VF.createURI(typeString)); + } + Binding existingBinding = bindingSet.getBinding(varName); + // If this variable is not already bound, add it. + if (existingBinding == null) { + bindingSet.addBinding(varName, varValue); + } + // If it's bound to something else, the solutions are incompatible. + else if (!existingBinding.getValue().equals(varValue)) { + return null; + } + } + } + return bindingSet; + } +} diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java new file mode 100644 index 000000000..b7f5a67b8 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Arrays; + +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; +import org.bson.Document; +import org.openrdf.query.algebra.Distinct; +import org.openrdf.query.algebra.Extension; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.Reduced; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +import com.google.common.base.Preconditions; +import com.mongodb.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; + +/** + * Visitor that transforms a SPARQL query tree by replacing as much of the tree + * as possible with one or more {@code AggregationPipelineQueryNode}s. + *

+ * Each {@link AggregationPipelineQueryNode} contains a MongoDB aggregation + * pipeline which is equivalent to the replaced portion of the original query. + * Evaluating this node executes the pipeline and converts the results into + * query solutions. If only part of the query was transformed, the remaining + * query logic (higher up in the query tree) can be applied to those + * intermediate solutions as normal. + *

+ * In general, processes the tree in bottom-up order: A leaf node + * ({@link StatementPattern}) is replaced with a pipeline that matches the + * corresponding statements. Then, if the parent node's semantics are supported + * by the visitor, stages are appended to the pipeline and the subtree at the + * parent node is replaced with the extended pipeline. This continues up the + * tree until reaching a node that cannot be transformed, in which case that + * node's child is now a single {@code AggregationPipelineQueryNode} (a leaf + * node) instead of the previous subtree, or until the entire tree has been + * subsumed into a single pipeline node. + *

+ * Nodes which are transformed into pipeline stages: + *

    + *
  • A {@code StatementPattern} node forms the beginning of each pipeline. + *
  • Single-argument operations {@link Projection}, {@link MultiProjection}, + * {@link Extension}, {@link Distinct}, and {@link Reduced} will be transformed + * into pipeline stages whenever the child {@link TupleExpr} represents a + * pipeline. + *
  • A {@link Filter} operation will be appended to the pipeline when its + * child {@code TupleExpr} represents a pipeline and the filter condition is a + * type of {@link ValueExpr} understood by {@code AggregationPipelineQueryNode}. + *
  • A {@link Join} operation will be appended to the pipeline when one child + * is a {@code StatementPattern} and the other is an + * {@code AggregationPipelineQueryNode}. + *
+ */ +public class SparqlToPipelineTransformVisitor extends QueryModelVisitorBase { + private final MongoCollection inputCollection; + + /** + * Instantiate a visitor directly from a {@link MongoCollection}. + * @param inputCollection Stores triples. + */ + public SparqlToPipelineTransformVisitor(MongoCollection inputCollection) { + this.inputCollection = Preconditions.checkNotNull(inputCollection); + } + + /** + * Instantiate a visitor from a {@link MongoDBRdfConfiguration}. + * @param conf Contains database connection information. + */ + public SparqlToPipelineTransformVisitor(StatefulMongoDBRdfConfiguration conf) { + Preconditions.checkNotNull(conf); + MongoClient mongo = conf.getMongoClient(); + MongoDatabase db = mongo.getDatabase(conf.getMongoDBName()); + this.inputCollection = db.getCollection(conf.getTriplesCollectionName()); + } + + @Override + public void meet(StatementPattern sp) { + sp.replaceWith(new AggregationPipelineQueryNode(inputCollection, sp)); + } + + @Override + public void meet(Join join) throws Exception { + // If one branch is a single statement pattern, then try replacing the + // other with a pipeline. + AggregationPipelineQueryNode pipelineNode = null; + StatementPattern joinWithSP = null; + if (join.getRightArg() instanceof StatementPattern) { + join.getLeftArg().visit(this); + if (join.getLeftArg() instanceof AggregationPipelineQueryNode) { + pipelineNode = (AggregationPipelineQueryNode) join.getLeftArg(); + joinWithSP = (StatementPattern) join.getRightArg(); + } + } + else if (join.getLeftArg() instanceof StatementPattern) { + join.getRightArg().visit(this); + if (join.getRightArg() instanceof AggregationPipelineQueryNode) { + pipelineNode = (AggregationPipelineQueryNode) join.getRightArg(); + joinWithSP = (StatementPattern) join.getLeftArg(); + } + } + else { + // Otherwise, visit the children to try to replace smaller subtrees + join.visitChildren(this); + } + // If this is now a join between a pipeline node and a statement + // pattern, add the join step at the end of the pipeline, and replace + // this node with the extended pipeline node. + if (pipelineNode != null && joinWithSP != null && pipelineNode.joinWith(joinWithSP)) { + join.replaceWith(pipelineNode); + } + } + + @Override + public void meet(Projection projectionNode) throws Exception { + projectionNode.visitChildren(this); + if (projectionNode.getArg() instanceof AggregationPipelineQueryNode && projectionNode.getParentNode() != null) { + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) projectionNode.getArg(); + if (pipelineNode.project(Arrays.asList(projectionNode.getProjectionElemList()))) { + projectionNode.replaceWith(pipelineNode); + } + } + } + + @Override + public void meet(MultiProjection projectionNode) throws Exception { + projectionNode.visitChildren(this); + if (projectionNode.getArg() instanceof AggregationPipelineQueryNode && projectionNode.getParentNode() != null) { + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) projectionNode.getArg(); + if (pipelineNode.project(projectionNode.getProjections())) { + projectionNode.replaceWith(pipelineNode); + } + } + } + + @Override + public void meet(Extension extensionNode) throws Exception { + extensionNode.visitChildren(this); + if (extensionNode.getArg() instanceof AggregationPipelineQueryNode && extensionNode.getParentNode() != null) { + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) extensionNode.getArg(); + if (pipelineNode.extend(extensionNode.getElements())) { + extensionNode.replaceWith(pipelineNode); + } + } + } + + @Override + public void meet(Reduced reducedNode) throws Exception { + reducedNode.visitChildren(this); + if (reducedNode.getArg() instanceof AggregationPipelineQueryNode && reducedNode.getParentNode() != null) { + reducedNode.replaceWith(reducedNode.getArg()); + } + } + + @Override + public void meet(Distinct distinctNode) throws Exception { + distinctNode.visitChildren(this); + if (distinctNode.getArg() instanceof AggregationPipelineQueryNode && distinctNode.getParentNode() != null) { + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) distinctNode.getArg(); + pipelineNode.distinct(); + distinctNode.replaceWith(pipelineNode); + } + } + + @Override + public void meet(Filter filterNode) throws Exception { + filterNode.visitChildren(this); + if (filterNode.getArg() instanceof AggregationPipelineQueryNode && filterNode.getParentNode() != null) { + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) filterNode.getArg(); + if (pipelineNode.filter(filterNode.getCondition())) { + filterNode.replaceWith(pipelineNode); + } + } + } +} diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java index db331817b..ecad9c686 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java @@ -63,6 +63,15 @@ public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy collection; + + @Before + @SuppressWarnings("unchecked") + public void setUp() { + collection = Mockito.mock(MongoCollection.class); + Mockito.when(collection.getNamespace()).thenReturn(new MongoNamespace("db", "collection")); + } + + @Test + public void testEquals() { + final AggregationPipelineQueryNode node1 = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + final AggregationPipelineQueryNode node2 = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + Assert.assertEquals(node1, node2); + Assert.assertEquals(node1.hashCode(), node2.hashCode()); + final AggregationPipelineQueryNode diff1 = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y"), + HashBiMap.create()); + final AggregationPipelineQueryNode diff2 = new AggregationPipelineQueryNode( + collection, + Arrays.asList(new Document()), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + HashBiMap varMapping = HashBiMap.create(); + varMapping.put("field-x", "x"); + final AggregationPipelineQueryNode diff3 = new AggregationPipelineQueryNode( + collection, + Arrays.asList(new Document()), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + varMapping); + Assert.assertNotEquals(diff1, node1); + Assert.assertNotEquals(diff2, node1); + Assert.assertNotEquals(diff3, node1); + node1.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c"))); + node2.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c"))); + Assert.assertEquals(node1, node2); + node2.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c"))); + Assert.assertNotEquals(node1, node2); + } + + @Test + public void testClone() { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + final AggregationPipelineQueryNode copy = base.clone(); + Assert.assertEquals(base, copy); + copy.getPipeline().add(new Document("$project", new Document())); + Assert.assertNotEquals(base, copy); + base.getPipeline().add(new Document("$project", new Document())); + Assert.assertEquals(base, copy); + } + + @Test + public void testStatementPattern() throws Exception { + // All variables + StatementPattern sp = new StatementPattern(new Var("s"), new Var("p"), new Var("o")); + AggregationPipelineQueryNode node = new AggregationPipelineQueryNode(collection, sp); + Assert.assertEquals(Sets.newHashSet("s", "p", "o"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("s", "p", "o"), node.getAssuredBindingNames()); + Assert.assertEquals(2, node.getPipeline().size()); + // All constants + sp = new StatementPattern(constant(VF.createURI("urn:Alice")), constant(RDF.TYPE), constant(UNDERGRAD)); + node = new AggregationPipelineQueryNode(collection, sp); + Assert.assertEquals(Sets.newHashSet(), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet(), node.getAssuredBindingNames()); + Assert.assertEquals(2, node.getPipeline().size()); + // Mixture + sp = new StatementPattern(new Var("student"), constant(RDF.TYPE), constant(UNDERGRAD)); + node = new AggregationPipelineQueryNode(collection, sp); + Assert.assertEquals(Sets.newHashSet("student"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("student"), node.getAssuredBindingNames()); + Assert.assertEquals(2, node.getPipeline().size()); + } + + @Test + public void testJoin() throws Exception { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + // Join on one shared variable + AggregationPipelineQueryNode node = base.clone(); + boolean success = node.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c"))); + Assert.assertTrue(success); + Assert.assertEquals(Sets.newHashSet("x", "y", "c", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y", "c"), node.getAssuredBindingNames()); + Assert.assertEquals(4, node.getPipeline().size()); + // Join on multiple shared variables + node = base.clone(); + success = node.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("y"))); + Assert.assertTrue(success); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(5, node.getPipeline().size()); + } + + @Test + public void testProject() { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + // Add a single projection + ProjectionElemList singleProjection = new ProjectionElemList(); + singleProjection.addElement(new ProjectionElem("x", "z")); + singleProjection.addElement(new ProjectionElem("y", "y")); + List projections = Arrays.asList(singleProjection); + AggregationPipelineQueryNode node = base.clone(); + boolean success = node.project(projections); + Assert.assertTrue(success); + Assert.assertEquals(1, node.getPipeline().size()); + Assert.assertEquals(Sets.newHashSet("z", "y"), + node.getAssuredBindingNames()); + Assert.assertEquals(Sets.newHashSet("z", "y"), + node.getBindingNames()); + // Add a multi-projection + ProjectionElemList p1 = new ProjectionElemList(); + p1.addElement(new ProjectionElem("x", "solution")); + ProjectionElemList p2 = new ProjectionElemList(); + p2.addElement(new ProjectionElem("y", "solution")); + ProjectionElemList p3 = new ProjectionElemList(); + p3.addElement(new ProjectionElem("x", "x")); + p3.addElement(new ProjectionElem("x", "solution")); + p3.addElement(new ProjectionElem("y", "y")); + projections = Arrays.asList(p1, p2, p3); + node = base.clone(); + success = node.project(projections); + Assert.assertTrue(success); + Assert.assertEquals(3, node.getPipeline().size()); + Assert.assertEquals(Sets.newHashSet("solution"), + node.getAssuredBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y", "solution"), + node.getBindingNames()); + // Add no projections + node = base.clone(); + success = node.project(Arrays.asList()); + Assert.assertFalse(success); + Assert.assertEquals(base, node); + } + + @Test + public void testExtend() { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + // Extend with a mix of variables and constants + List extensionElements = Arrays.asList( + new ExtensionElem(new Var("x"), "subject"), + new ExtensionElem(new ValueConstant(RDF.TYPE), "predicate"), + new ExtensionElem(new Var("y"), "object")); + AggregationPipelineQueryNode node = base.clone(); + boolean success = node.extend(extensionElements); + Assert.assertTrue(success); + Assert.assertEquals(1, node.getPipeline().size()); + Assert.assertEquals(Sets.newHashSet("x", "y", "subject", "predicate", "object"), + node.getAssuredBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y", "subject", "predicate", "object", "opt"), + node.getBindingNames()); + // Attempt to extend with an unsupported expression + extensionElements = Arrays.asList( + new ExtensionElem(new Var("x"), "subject"), + new ExtensionElem(new Not(new ValueConstant(VF.createLiteral(true))), "notTrue")); + node = base.clone(); + success = node.extend(extensionElements); + Assert.assertFalse(success); + Assert.assertEquals(base, node); + } + + @Test + public void testDistinct() { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + AggregationPipelineQueryNode node = base.clone(); + boolean success = node.distinct(); + Assert.assertTrue(success); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(1, node.getPipeline().size()); + } + + @Test + public void testFilter() { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + // Extend with a supported filter + AggregationPipelineQueryNode node = base.clone(); + boolean success = node.filter(new Compare(new Var("x"), new Var("y"), Compare.CompareOp.EQ)); + Assert.assertTrue(success); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(3, node.getPipeline().size()); + // Extend with an unsupported filter + node = base.clone(); + success = node.filter(new IsLiteral(new Var("opt"))); + Assert.assertFalse(success); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(0, node.getPipeline().size()); + } + + @Test + public void testRequireSourceDerivationLevel() throws Exception { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + // Extend with a level greater than zero + AggregationPipelineQueryNode node = base.clone(); + node.requireSourceDerivationDepth(3); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(1, node.getPipeline().size()); + // Extend with a level of zero (no effect) + node = base.clone(); + node.requireSourceDerivationDepth(0); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(0, node.getPipeline().size()); + } + + @Test + public void testRequireSourceTimestamp() { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + // Extend with a level greater than zero + AggregationPipelineQueryNode node = base.clone(); + node.requireSourceTimestamp(System.currentTimeMillis()); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(1, node.getPipeline().size()); + } +} diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java new file mode 100644 index 000000000..45855a0eb --- /dev/null +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaStatement.RyaStatementBuilder; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.mongodb.MongoDBRyaDAO; +import org.apache.rya.mongodb.MongoITBase; +import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.Literal; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.FOAF; +import org.openrdf.model.vocabulary.OWL; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.model.vocabulary.RDFS; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.QueryRoot; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.impl.ListBindingSet; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; +import com.mongodb.DBObject; +import com.mongodb.util.JSON; + +import info.aduna.iteration.CloseableIteration; + +public class PipelineQueryIT extends MongoITBase { + + private static ValueFactory VF = ValueFactoryImpl.getInstance(); + private static SPARQLParser PARSER = new SPARQLParser(); + + private MongoDBRyaDAO dao; + + @Before + @Override + public void setupTest() throws Exception { + super.setupTest(); + dao = new MongoDBRyaDAO(); + dao.setConf(conf); + dao.init(); + } + + private void insert(Resource subject, URI predicate, Value object) throws RyaDAOException { + insert(subject, predicate, object, 0); + } + + private void insert(Resource subject, URI predicate, Value object, int derivationLevel) throws RyaDAOException { + final RyaStatementBuilder builder = new RyaStatementBuilder(); + builder.setSubject(RdfToRyaConversions.convertResource(subject)); + builder.setPredicate(RdfToRyaConversions.convertURI(predicate)); + builder.setObject(RdfToRyaConversions.convertValue(object)); + final RyaStatement rstmt = builder.build(); + if (derivationLevel > 0) { + DBObject obj = new SimpleMongoDBStorageStrategy().serialize(builder.build()); + obj.put("derivation_level", derivationLevel); + getRyaDbCollection().insert(obj); + } + else { + dao.add(rstmt); + } + } + + private void testPipelineQuery(String query, Multiset expectedSolutions) throws Exception { + // Prepare query and convert to pipeline + QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr()); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection()); + queryTree.visit(visitor); + // Execute pipeline and verify results + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + Multiset solutions = HashMultiset.create(); + CloseableIteration iter = pipelineNode.evaluate(new QueryBindingSet()); + while (iter.hasNext()) { + solutions.add(iter.next()); + } + Assert.assertEquals(expectedSolutions, solutions); + } + + @Test + public void testSingleStatementPattern() throws Exception { + // Insert data + insert(OWL.THING, RDF.TYPE, OWL.CLASS); + insert(FOAF.PERSON, RDF.TYPE, OWL.CLASS, 1); + insert(FOAF.PERSON, RDFS.SUBCLASSOF, OWL.THING); + insert(VF.createURI("urn:Alice"), RDF.TYPE, FOAF.PERSON); + dao.flush(); + // Define query and expected results + final String query = "SELECT * WHERE {\n" + + " ?individual a ?type .\n" + + "}"; + List varNames = Arrays.asList("individual", "type"); + Multiset expectedSolutions = HashMultiset.create(); + expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, OWL.CLASS)); + expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, OWL.CLASS)); + expectedSolutions.add(new ListBindingSet(varNames, VF.createURI("urn:Alice"), FOAF.PERSON)); + // Execute pipeline and verify results + testPipelineQuery(query, expectedSolutions); + } + + @Test + public void testJoinTwoSharedVariables() throws Exception { + // Insert data + URI person = VF.createURI("urn:Person"); + URI livingThing = VF.createURI("urn:LivingThing"); + URI human = VF.createURI("urn:Human"); + URI programmer = VF.createURI("urn:Programmer"); + URI thing = VF.createURI("urn:Thing"); + insert(programmer, RDFS.SUBCLASSOF, person); + insert(person, RDFS.SUBCLASSOF, FOAF.PERSON); + insert(FOAF.PERSON, RDFS.SUBCLASSOF, person); + insert(person, OWL.EQUIVALENTCLASS, human); + insert(person, RDFS.SUBCLASSOF, livingThing); + insert(livingThing, RDFS.SUBCLASSOF, thing); + insert(thing, RDFS.SUBCLASSOF, OWL.THING); + insert(OWL.THING, RDFS.SUBCLASSOF, thing); + dao.flush(); + // Define query and expected results + final String query = "SELECT ?A ?B WHERE {\n" + + " ?A rdfs:subClassOf ?B .\n" + + " ?B rdfs:subClassOf ?A .\n" + + "}"; + List varNames = Arrays.asList("A", "B"); + Multiset expectedSolutions = HashMultiset.create(); + expectedSolutions.add(new ListBindingSet(varNames, person, FOAF.PERSON)); + expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, person)); + expectedSolutions.add(new ListBindingSet(varNames, thing, OWL.THING)); + expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, thing)); + // Execute query and verify results + testPipelineQuery(query, expectedSolutions); + } + + @Test + public void testVariableRename() throws Exception { + // Insert data + URI alice = VF.createURI("urn:Alice"); + URI bob = VF.createURI("urn:Bob"); + URI carol = VF.createURI("urn:Carol"); + URI dan = VF.createURI("urn:Dan"); + URI eve = VF.createURI("urn:Eve"); + URI friend = VF.createURI("urn:friend"); + insert(alice, friend, bob); + insert(alice, friend, carol); + insert(bob, friend, eve); + insert(carol, friend, eve); + insert(dan, friend, carol); + insert(eve, friend, alice); + // Define non-distinct query and expected results + final String query1 = "SELECT ?x (?z as ?friendOfFriend) WHERE {\n" + + " ?x ?y .\n" + + " ?y ?z .\n" + + "}"; + Multiset expectedSolutions1 = HashMultiset.create(); + List varNames = Arrays.asList("x", "friendOfFriend"); + expectedSolutions1.add(new ListBindingSet(varNames, alice, eve)); + expectedSolutions1.add(new ListBindingSet(varNames, alice, eve)); + expectedSolutions1.add(new ListBindingSet(varNames, bob, alice)); + expectedSolutions1.add(new ListBindingSet(varNames, carol, alice)); + expectedSolutions1.add(new ListBindingSet(varNames, dan, eve)); + expectedSolutions1.add(new ListBindingSet(varNames, eve, bob)); + expectedSolutions1.add(new ListBindingSet(varNames, eve, carol)); + // Define distinct query and expected results + final String query2 = "SELECT DISTINCT ?x (?z as ?friendOfFriend) WHERE {\n" + + " ?x ?y .\n" + + " ?y ?z .\n" + + "}"; + Multiset expectedSolutions2 = HashMultiset.create(); + expectedSolutions2.add(new ListBindingSet(varNames, alice, eve)); + expectedSolutions2.add(new ListBindingSet(varNames, bob, alice)); + expectedSolutions2.add(new ListBindingSet(varNames, carol, alice)); + expectedSolutions2.add(new ListBindingSet(varNames, dan, eve)); + expectedSolutions2.add(new ListBindingSet(varNames, eve, bob)); + expectedSolutions2.add(new ListBindingSet(varNames, eve, carol)); + // Execute and verify results + testPipelineQuery(query1, expectedSolutions1); + testPipelineQuery(query2, expectedSolutions2); + } + + @Test + public void testFilterQuery() throws Exception { + // Insert data + URI alice = VF.createURI("urn:Alice"); + URI bob = VF.createURI("urn:Bob"); + URI eve = VF.createURI("urn:Eve"); + URI relatedTo = VF.createURI("urn:relatedTo"); + insert(alice, FOAF.KNOWS, bob); + insert(alice, FOAF.KNOWS, alice); + insert(alice, FOAF.KNOWS, eve); + insert(alice, relatedTo, bob); + insert(bob, FOAF.KNOWS, eve); + insert(bob, relatedTo, bob); + dao.flush(); + // Define query 1 and expected results + final String query1 = "SELECT * WHERE {\n" + + " ?x <" + FOAF.KNOWS.stringValue() + "> ?y1 .\n" + + " ?x <" + relatedTo.stringValue() + "> ?y2 .\n" + + " FILTER (?y1 != ?y2) .\n" + + "}"; + final List varNames = Arrays.asList("x", "y1", "y2"); + final Multiset expected1 = HashMultiset.create(); + expected1.add(new ListBindingSet(varNames, alice, alice, bob)); + expected1.add(new ListBindingSet(varNames, alice, eve, bob)); + expected1.add(new ListBindingSet(varNames, bob, eve, bob)); + // Define query 2 and expected results + final String query2 = "SELECT * WHERE {\n" + + " ?x <" + FOAF.KNOWS.stringValue() + "> ?y1 .\n" + + " ?x <" + relatedTo.stringValue() + "> ?y2 .\n" + + " FILTER (?y1 = ?y2) .\n" + + "}"; + final Multiset expected2 = HashMultiset.create(); + expected2.add(new ListBindingSet(varNames, alice, bob, bob)); + // Execute and verify results + testPipelineQuery(query1, expected1); + testPipelineQuery(query2, expected2); + } + + @Test + public void testMultiConstruct() throws Exception { + // Insert data + URI alice = VF.createURI("urn:Alice"); + URI bob = VF.createURI("urn:Bob"); + URI eve = VF.createURI("urn:Eve"); + URI friend = VF.createURI("urn:friend"); + URI knows = VF.createURI("urn:knows"); + URI person = VF.createURI("urn:Person"); + insert(alice, friend, bob); + insert(bob, knows, eve); + insert(eve, knows, alice); + // Define query and expected results + final String query = "CONSTRUCT {\n" + + " ?x rdf:type owl:Thing .\n" + + " ?x rdf:type .\n" + + "} WHERE { ?x ?y }"; + final Multiset expected = HashMultiset.create(); + List varNames = Arrays.asList("subject", "predicate", "object"); + expected.add(new ListBindingSet(varNames, bob, RDF.TYPE, OWL.THING)); + expected.add(new ListBindingSet(varNames, bob, RDF.TYPE, person)); + expected.add(new ListBindingSet(varNames, eve, RDF.TYPE, OWL.THING)); + expected.add(new ListBindingSet(varNames, eve, RDF.TYPE, person)); + // Test query + testPipelineQuery(query, expected); + } + + @Test + public void testTriplePipeline() throws Exception { + URI alice = VF.createURI("urn:Alice"); + URI bob = VF.createURI("urn:Bob"); + URI eve = VF.createURI("urn:Eve"); + URI friend = VF.createURI("urn:friend"); + URI knows = VF.createURI("urn:knows"); + URI year = VF.createURI("urn:year"); + Literal yearLiteral = VF.createLiteral("2017", XMLSchema.GYEAR); + final String query = "CONSTRUCT {\n" + + " ?x ?y .\n" + + " ?x \"2017\"^^<" + XMLSchema.GYEAR + "> .\n" + + "} WHERE { ?x ?y }"; + insert(alice, friend, bob); + insert(bob, knows, eve); + insert(eve, knows, alice); + // Prepare query and convert to pipeline + QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr()); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection()); + queryTree.visit(visitor); + // Get pipeline, add triple conversion, and verify that the result is a + // properly serialized statement + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + List triplePipeline = pipelineNode.getTriplePipeline(System.currentTimeMillis(), false); + SimpleMongoDBStorageStrategy strategy = new SimpleMongoDBStorageStrategy(); + List results = new LinkedList<>(); + for (Document doc : getRyaCollection().aggregate(triplePipeline)) { + final DBObject dbo = (DBObject) JSON.parse(doc.toJson()); + RyaStatement rstmt = strategy.deserializeDBObject(dbo); + Statement stmt = RyaToRdfConversions.convertStatement(rstmt); + results.add(stmt); + } + Assert.assertEquals(2, results.size()); + Assert.assertTrue(results.contains(VF.createStatement(alice, knows, bob))); + Assert.assertTrue(results.contains(VF.createStatement(alice, year, yearLiteral))); + } + + @Test + public void testRequiredDerivationLevel() throws Exception { + // Insert data + URI person = VF.createURI("urn:Person"); + URI livingThing = VF.createURI("urn:LivingThing"); + URI human = VF.createURI("urn:Human"); + URI programmer = VF.createURI("urn:Programmer"); + URI thing = VF.createURI("urn:Thing"); + insert(programmer, RDFS.SUBCLASSOF, person); + insert(person, RDFS.SUBCLASSOF, FOAF.PERSON); + insert(FOAF.PERSON, RDFS.SUBCLASSOF, person); + insert(person, OWL.EQUIVALENTCLASS, human); + insert(person, RDFS.SUBCLASSOF, livingThing); + insert(livingThing, RDFS.SUBCLASSOF, thing); + insert(thing, RDFS.SUBCLASSOF, OWL.THING, 1); + insert(OWL.THING, RDFS.SUBCLASSOF, thing); + dao.flush(); + // Define query and expected results + final String query = "SELECT ?A ?B WHERE {\n" + + " ?A rdfs:subClassOf ?B .\n" + + " ?B rdfs:subClassOf ?A .\n" + + "}"; + List varNames = Arrays.asList("A", "B"); + Multiset expectedSolutions = HashMultiset.create(); + expectedSolutions.add(new ListBindingSet(varNames, person, FOAF.PERSON)); + expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, person)); + expectedSolutions.add(new ListBindingSet(varNames, thing, OWL.THING)); + expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, thing)); + // Prepare query and convert to pipeline + QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr()); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection()); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + // Extend the pipeline by requiring a derivation level of zero (should have no effect) + pipelineNode.requireSourceDerivationDepth(0); + Multiset solutions = HashMultiset.create(); + CloseableIteration iter = pipelineNode.evaluate(new QueryBindingSet()); + while (iter.hasNext()) { + solutions.add(iter.next()); + } + Assert.assertEquals(expectedSolutions, solutions); + // Extend the pipeline by requiring a derivation level of one (should produce the thing/thing pair) + expectedSolutions = HashMultiset.create(); + expectedSolutions.add(new ListBindingSet(varNames, thing, OWL.THING)); + expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, thing)); + pipelineNode.requireSourceDerivationDepth(1); + solutions = HashMultiset.create(); + iter = pipelineNode.evaluate(new QueryBindingSet()); + while (iter.hasNext()) { + solutions.add(iter.next()); + } + Assert.assertEquals(expectedSolutions, solutions); + } + + @Test + public void testRequiredTimestamp() throws Exception { + // Insert data + URI person = VF.createURI("urn:Person"); + URI livingThing = VF.createURI("urn:LivingThing"); + URI human = VF.createURI("urn:Human"); + URI programmer = VF.createURI("urn:Programmer"); + URI thing = VF.createURI("urn:Thing"); + insert(programmer, RDFS.SUBCLASSOF, person); + insert(person, RDFS.SUBCLASSOF, FOAF.PERSON, 2); + insert(FOAF.PERSON, RDFS.SUBCLASSOF, person); + insert(person, OWL.EQUIVALENTCLASS, human); + insert(person, RDFS.SUBCLASSOF, livingThing); + insert(livingThing, RDFS.SUBCLASSOF, thing); + insert(thing, RDFS.SUBCLASSOF, OWL.THING); + insert(OWL.THING, RDFS.SUBCLASSOF, thing); + dao.flush(); + // Define query and expected results + final String query = "SELECT ?A ?B WHERE {\n" + + " ?A rdfs:subClassOf ?B .\n" + + " ?B rdfs:subClassOf ?A .\n" + + "}"; + List varNames = Arrays.asList("A", "B"); + Multiset expectedSolutions = HashMultiset.create(); + expectedSolutions.add(new ListBindingSet(varNames, person, FOAF.PERSON)); + expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, person)); + expectedSolutions.add(new ListBindingSet(varNames, thing, OWL.THING)); + expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, thing)); + // Prepare query and convert to pipeline + QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr()); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection()); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + // Extend the pipeline by requiring a timestamp of zero (should have no effect) + pipelineNode.requireSourceTimestamp(0); + Multiset solutions = HashMultiset.create(); + CloseableIteration iter = pipelineNode.evaluate(new QueryBindingSet()); + while (iter.hasNext()) { + solutions.add(iter.next()); + } + Assert.assertEquals(expectedSolutions, solutions); + // Extend the pipeline by requiring a future timestamp (should produce no results) + long delta = 1000 * 60 * 60 * 24; + pipelineNode.requireSourceTimestamp(System.currentTimeMillis() + delta); + iter = pipelineNode.evaluate(new QueryBindingSet()); + Assert.assertFalse(iter.hasNext()); + } +} diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineResultIterationTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineResultIterationTest.java new file mode 100644 index 000000000..67752352e --- /dev/null +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineResultIterationTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; + +import org.bson.Document; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.impl.ListBindingSet; + +import com.google.common.collect.Sets; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCursor; + +public class PipelineResultIterationTest { + ValueFactory VF = ValueFactoryImpl.getInstance(); + + @SuppressWarnings("unchecked") + private AggregateIterable documentIterator(Document ... documents) { + Iterator docIter = Arrays.asList(documents).iterator(); + MongoCursor cursor = Mockito.mock(MongoCursor.class); + Mockito.when(cursor.hasNext()).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + return docIter.hasNext(); + } + }); + Mockito.when(cursor.next()).thenAnswer(new Answer() { + @Override + public Document answer(InvocationOnMock invocation) throws Throwable { + return docIter.next(); + } + }); + AggregateIterable aggIter = Mockito.mock(AggregateIterable.class); + Mockito.when(aggIter.iterator()).thenReturn(cursor); + return aggIter; + } + + @Test + public void testIteration() throws QueryEvaluationException { + HashMap nameMap = new HashMap<>(); + nameMap.put("bName", "b"); + nameMap.put("eName", "e"); + PipelineResultIteration iter = new PipelineResultIteration( + documentIterator( + new Document("", new Document("a", "urn:Alice").append("b", "urn:Bob")), + new Document("", new Document("a", "urn:Alice").append("b", "urn:Beth")), + new Document("", new Document("a", "urn:Alice").append("bName", "urn:Bob")), + new Document("", new Document("a", "urn:Alice").append("c", "urn:Carol")), + new Document("", new Document("cName", "urn:Carol").append("d", "urn:Dan"))), + nameMap, + new QueryBindingSet()); + Assert.assertTrue(iter.hasNext()); + BindingSet bs = iter.next(); + Assert.assertEquals(Sets.newHashSet("a", "b"), bs.getBindingNames()); + Assert.assertEquals("urn:Alice", bs.getBinding("a").getValue().stringValue()); + Assert.assertEquals("urn:Bob", bs.getBinding("b").getValue().stringValue()); + Assert.assertTrue(iter.hasNext()); + bs = iter.next(); + Assert.assertEquals(Sets.newHashSet("a", "b"), bs.getBindingNames()); + Assert.assertEquals("urn:Alice", bs.getBinding("a").getValue().stringValue()); + Assert.assertEquals("urn:Beth", bs.getBinding("b").getValue().stringValue()); + Assert.assertTrue(iter.hasNext()); + bs = iter.next(); + Assert.assertEquals(Sets.newHashSet("a", "b"), bs.getBindingNames()); + Assert.assertEquals("urn:Alice", bs.getBinding("a").getValue().stringValue()); + Assert.assertEquals("urn:Bob", bs.getBinding("b").getValue().stringValue()); + Assert.assertTrue(iter.hasNext()); + bs = iter.next(); + Assert.assertEquals(Sets.newHashSet("a", "c"), bs.getBindingNames()); + Assert.assertEquals("urn:Alice", bs.getBinding("a").getValue().stringValue()); + Assert.assertEquals("urn:Carol", bs.getBinding("c").getValue().stringValue()); + bs = iter.next(); + Assert.assertEquals(Sets.newHashSet("cName", "d"), bs.getBindingNames()); + Assert.assertEquals("urn:Carol", bs.getBinding("cName").getValue().stringValue()); + Assert.assertEquals("urn:Dan", bs.getBinding("d").getValue().stringValue()); + Assert.assertFalse(iter.hasNext()); + } + + @Test + public void testIterationGivenBindingSet() throws QueryEvaluationException { + BindingSet solution = new ListBindingSet(Arrays.asList("b", "c"), + VF.createURI("urn:Bob"), VF.createURI("urn:Charlie")); + HashMap nameMap = new HashMap<>(); + nameMap.put("bName", "b"); + nameMap.put("cName", "c"); + nameMap.put("c", "cName"); + PipelineResultIteration iter = new PipelineResultIteration( + documentIterator( + new Document("", new Document("a", "urn:Alice").append("b", "urn:Bob")), + new Document("", new Document("a", "urn:Alice").append("b", "urn:Beth")), + new Document("", new Document("a", "urn:Alice").append("bName", "urn:Bob")), + new Document("", new Document("a", "urn:Alice").append("bName", "urn:Beth")), + new Document("", new Document("a", "urn:Alice").append("cName", "urn:Carol")), + new Document("", new Document("c", "urn:Carol").append("d", "urn:Dan"))), + nameMap, + solution); + Assert.assertTrue(iter.hasNext()); + BindingSet bs = iter.next(); + // Add 'c=Charlie' to first result ('b=Bob' matches) + Assert.assertEquals(Sets.newHashSet("a", "b", "c"), bs.getBindingNames()); + Assert.assertEquals("urn:Alice", bs.getBinding("a").getValue().stringValue()); + Assert.assertEquals("urn:Bob", bs.getBinding("b").getValue().stringValue()); + Assert.assertEquals("urn:Charlie", bs.getBinding("c").getValue().stringValue()); + Assert.assertTrue(iter.hasNext()); + bs = iter.next(); + // Skip second result ('b=Beth' incompatible with 'b=Bob') + // Add 'c=Charlie' to third result ('bName=Bob' resolves to 'b=Bob', matches) + Assert.assertEquals(Sets.newHashSet("a", "b", "c"), bs.getBindingNames()); + Assert.assertEquals("urn:Alice", bs.getBinding("a").getValue().stringValue()); + Assert.assertEquals("urn:Bob", bs.getBinding("b").getValue().stringValue()); + Assert.assertEquals("urn:Charlie", bs.getBinding("c").getValue().stringValue()); + Assert.assertTrue(iter.hasNext()); + bs = iter.next(); + // Skip fourth result ('bName=Beth' resolves to 'b=Beth', incompatible) + // Skip fifth result ('cName=Carol' resolves to 'c=Carol', incompatible with 'c=Charlie') + // Add 'b=Bob' and 'c=Charlie' to sixth result ('c=Carol' resolves to 'cName=Carol', compatible) + Assert.assertEquals(Sets.newHashSet("b", "c", "cName", "d"), bs.getBindingNames()); + Assert.assertEquals("urn:Bob", bs.getBinding("b").getValue().stringValue()); + Assert.assertEquals("urn:Charlie", bs.getBinding("c").getValue().stringValue()); + Assert.assertEquals("urn:Carol", bs.getBinding("cName").getValue().stringValue()); + Assert.assertEquals("urn:Dan", bs.getBinding("d").getValue().stringValue()); + Assert.assertFalse(iter.hasNext()); + } +} diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java new file mode 100644 index 000000000..cc9349b45 --- /dev/null +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Arrays; +import java.util.List; + +import org.bson.Document; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.query.algebra.Extension; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.Not; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.QueryRoot; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.Var; + +import com.google.common.collect.Sets; +import com.mongodb.MongoNamespace; +import com.mongodb.client.MongoCollection; + +public class SparqlToPipelineTransformVisitorTest { + + private static final ValueFactory VF = ValueFactoryImpl.getInstance(); + + private static final String LUBM = "urn:lubm"; + private static final URI UNDERGRAD = VF.createURI(LUBM, "UndergraduateStudent"); + private static final URI PROFESSOR = VF.createURI(LUBM, "Professor"); + private static final URI COURSE = VF.createURI(LUBM, "Course"); + private static final URI TAKES = VF.createURI(LUBM, "takesCourse"); + private static final URI TEACHES = VF.createURI(LUBM, "teachesCourse"); + + private static Var constant(URI value) { + return new Var(value.stringValue(), value); + } + + MongoCollection collection; + + @Before + @SuppressWarnings("unchecked") + public void setUp() { + collection = Mockito.mock(MongoCollection.class); + Mockito.when(collection.getNamespace()).thenReturn(new MongoNamespace("db", "collection")); + } + + @Test + public void testStatementPattern() throws Exception { + QueryRoot query = new QueryRoot(new StatementPattern( + new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD))); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + query.visit(visitor); + Assert.assertTrue(query.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) query.getArg(); + Assert.assertEquals(Sets.newHashSet("x"), pipelineNode.getAssuredBindingNames()); + } + + @Test + public void testJoin() throws Exception { + QueryRoot query = new QueryRoot(new Join( + new StatementPattern(new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)), + new StatementPattern(new Var("x"), constant(TAKES), new Var("course")))); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + query.visit(visitor); + Assert.assertTrue(query.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) query.getArg(); + Assert.assertEquals(Sets.newHashSet("x", "course"), pipelineNode.getAssuredBindingNames()); + } + + @Test + public void testNestedJoins() throws Exception { + StatementPattern isUndergrad = new StatementPattern(new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)); + StatementPattern isProfessor = new StatementPattern(new Var("y"), constant(RDF.TYPE), constant(PROFESSOR)); + StatementPattern takesCourse = new StatementPattern(new Var("x"), constant(TAKES), new Var("c")); + StatementPattern teachesCourse = new StatementPattern(new Var("y"), constant(TEACHES), new Var("c")); + QueryRoot queryTree = new QueryRoot(new Join( + isProfessor, + new Join( + new Join(isUndergrad, takesCourse), + teachesCourse))); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + Assert.assertEquals(Sets.newHashSet("x", "y", "c"), pipelineNode.getAssuredBindingNames()); + } + + @Test + public void testComplexJoin() throws Exception { + StatementPattern isUndergrad = new StatementPattern(new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)); + StatementPattern isProfessor = new StatementPattern(new Var("y"), constant(RDF.TYPE), constant(PROFESSOR)); + StatementPattern takesCourse = new StatementPattern(new Var("x"), constant(TAKES), new Var("c")); + StatementPattern teachesCourse = new StatementPattern(new Var("y"), constant(TEACHES), new Var("c")); + QueryRoot queryTree = new QueryRoot(new Join( + new Join(isUndergrad, takesCourse), + new Join(isProfessor, teachesCourse))); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof Join); + Join topJoin = (Join) queryTree.getArg(); + Assert.assertTrue(topJoin.getLeftArg() instanceof AggregationPipelineQueryNode); + Assert.assertTrue(topJoin.getRightArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode leftPipeline = (AggregationPipelineQueryNode) topJoin.getLeftArg(); + AggregationPipelineQueryNode rightPipeline = (AggregationPipelineQueryNode) topJoin.getRightArg(); + Assert.assertEquals(Sets.newHashSet("x", "c"), leftPipeline.getAssuredBindingNames()); + Assert.assertEquals(Sets.newHashSet("y", "c"), rightPipeline.getAssuredBindingNames()); + } + + @Test + public void testProjection() throws Exception { + StatementPattern isUndergrad = new StatementPattern(new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)); + StatementPattern isCourse = new StatementPattern(new Var("course"), constant(RDF.TYPE), constant(COURSE)); + StatementPattern hasEdge = new StatementPattern(new Var("x"), new Var("p"), new Var("course")); + ProjectionElemList projectionElements = new ProjectionElemList( + new ProjectionElem("p", "relation"), + new ProjectionElem("course")); + QueryRoot queryTree = new QueryRoot(new Projection( + new Join(new Join(isCourse, hasEdge), isUndergrad), + projectionElements)); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + Assert.assertEquals(Sets.newHashSet("relation", "course"), pipelineNode.getAssuredBindingNames()); + } + + @Test + public void testMultiProjection() throws Exception { + StatementPattern isUndergrad = new StatementPattern(new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)); + StatementPattern isCourse = new StatementPattern(new Var("course"), constant(RDF.TYPE), constant(COURSE)); + StatementPattern hasEdge = new StatementPattern(new Var("x"), new Var("p"), new Var("course")); + ProjectionElemList courseHasRelation = new ProjectionElemList( + new ProjectionElem("p", "relation"), + new ProjectionElem("course")); + ProjectionElemList studentHasRelation = new ProjectionElemList( + new ProjectionElem("p", "relation"), + new ProjectionElem("x", "student")); + QueryRoot queryTree = new QueryRoot(new MultiProjection( + new Join(new Join(isCourse, hasEdge), isUndergrad), + Arrays.asList(courseHasRelation, studentHasRelation))); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + Assert.assertEquals(Sets.newHashSet("relation"), pipelineNode.getAssuredBindingNames()); + Assert.assertEquals(Sets.newHashSet("relation", "course", "student"), pipelineNode.getBindingNames()); + } + + @Test + public void testExtension() throws Exception { + QueryRoot queryTree = new QueryRoot(new Extension( + new StatementPattern(new Var("x"), constant(TAKES), new Var("c")), + new ExtensionElem(new Var("x"), "renamed"), + new ExtensionElem(new ValueConstant(TAKES), "constant"))); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + Assert.assertEquals(Sets.newHashSet("x", "c", "renamed", "constant"), pipelineNode.getAssuredBindingNames()); + } + + @Test + public void testUnsupportedExtension() throws Exception { + StatementPattern sp = new StatementPattern(new Var("x"), constant(TAKES), new Var("c")); + List elements = Arrays.asList(new ExtensionElem(new Var("x"), "renamed"), + new ExtensionElem(new Not(new ValueConstant(VF.createLiteral(true))), "notTrue"), + new ExtensionElem(new ValueConstant(TAKES), "constant")); + Extension extensionNode = new Extension(sp, elements); + QueryRoot queryTree = new QueryRoot(extensionNode); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof Extension); + Assert.assertEquals(elements, ((Extension) queryTree.getArg()).getElements()); + TupleExpr innerQuery = ((Extension) queryTree.getArg()).getArg(); + Assert.assertTrue(innerQuery instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) innerQuery; + Assert.assertEquals(Sets.newHashSet("x", "c"), pipelineNode.getAssuredBindingNames()); + } +}