diff --git a/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/api/domain/QueryEngineFactory.scala b/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/api/domain/QueryEngineFactory.scala index df70f1379..ff467af70 100644 --- a/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/api/domain/QueryEngineFactory.scala +++ b/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/api/domain/QueryEngineFactory.scala @@ -11,6 +11,23 @@ import org.apache.spark.rdd.RDD */ trait QueryEngineFactory { + /** + * Creates a query engine factory on a pre-partitioned dataset. The RDF data is distributed among Spark + * tables located either in the given database or the default database. Mappings from triples to tables is + * provided by the mapping model. + * + * @param database the database that holds the tables for the RDF data, if `null` the default database will be used + * @param mappingModel the model containing the mappings + * @return a query execution factory + */ def create(database: String, mappingModel: Model): QueryExecutionFactorySpark + /** + * Creates a query engine factory for the given RDD of triples. + * + * @param triples the RDD of triples + * @return a query execution factory + */ + def create(triples: RDD[Triple]): QueryExecutionFactorySpark + } diff --git a/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/api/impl/QueryEngineFactoryBase.scala b/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/api/impl/QueryEngineFactoryBase.scala index f0bd7043d..85b0f7073 100644 --- a/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/api/impl/QueryEngineFactoryBase.scala +++ b/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/api/impl/QueryEngineFactoryBase.scala @@ -1,12 +1,59 @@ package net.sansa_stack.query.spark.api.impl +import org.aksw.sparqlify.core.sql.common.serialization.{SqlEscaper, SqlEscaperBacktick, SqlEscaperDoubleQuote} +import org.apache.jena.graph +import org.apache.jena.rdf.model.ModelFactory +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession -import net.sansa_stack.query.spark.api.domain.QueryEngineFactory +import net.sansa_stack.query.spark.api.domain.{QueryEngineFactory, QueryExecutionFactorySpark} +import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitioner, RdfPartitionerComplex, RdfPartitionerDefault} +import net.sansa_stack.rdf.common.partition.r2rml.R2rmlUtils +import net.sansa_stack.rdf.spark.partition.core.{RdfPartitionUtilsSpark, SQLUtils, SparkTableGenerator} /** * @author Lorenz Buehmann */ abstract class QueryEngineFactoryBase(spark: SparkSession) extends QueryEngineFactory { + protected def createWithPartitioning(triples: RDD[graph.Triple], + partitioner: RdfPartitioner[RdfPartitionStateDefault] = RdfPartitionerDefault, + explodeLanguageTags: Boolean = false, + sqlEscaper: SqlEscaper = new SqlEscaperBacktick(), + escapeIdentifiers: Boolean = false): QueryExecutionFactorySpark = { + // apply vertical partitioning + val partitions2RDD = RdfPartitionUtilsSpark.partitionGraph(triples, partitioner) + + val tableNameFn: RdfPartitionStateDefault => String = p => SQLUtils.escapeTablename(R2rmlUtils.createDefaultTableName(p)) + + partitions2RDD.foreach { + case (p, rdd) => + println(p) + rdd.collect().foreach(println) + } + + // create the Spark tables + SparkTableGenerator(spark).createAndRegisterSparkTables(partitioner, + partitions2RDD, + extractTableName = tableNameFn) + + // create the mappings model + val mappingsModel = ModelFactory.createDefaultModel() + R2rmlUtils.createR2rmlMappings( + partitioner, + partitions2RDD.keySet.toSeq, + tableNameFn, + sqlEscaper, + mappingsModel, + explodeLanguageTags, + escapeIdentifiers) + + create(null, mappingsModel) + + } + + override def create(triples: RDD[graph.Triple]): QueryExecutionFactorySpark = { + createWithPartitioning(triples) + } + } diff --git a/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/ontop/QueryEngineFactoryOntop.scala b/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/ontop/QueryEngineFactoryOntop.scala index ec47d2be1..10cbf9e3a 100644 --- a/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/ontop/QueryEngineFactoryOntop.scala +++ b/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/ontop/QueryEngineFactoryOntop.scala @@ -1,18 +1,41 @@ package net.sansa_stack.query.spark.ontop +import org.aksw.sparqlify.core.sql.common.serialization.{SqlEscaper, SqlEscaperDoubleQuote} +import org.apache.jena.graph import org.apache.jena.rdf.model.Model +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession +import org.semanticweb.owlapi.model.OWLOntology import net.sansa_stack.query.spark.api.domain.QueryExecutionFactorySpark import net.sansa_stack.query.spark.api.impl.QueryEngineFactoryBase +import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitioner, RdfPartitionerComplex} /** + * A query engine factory with Ontop as SPARQL to SQL rewriter. + * * @author Lorenz Buehmann */ class QueryEngineFactoryOntop(spark: SparkSession) extends QueryEngineFactoryBase(spark) { + override protected def createWithPartitioning(triples: RDD[graph.Triple], + partitioner: RdfPartitioner[RdfPartitionStateDefault], + explodeLanguageTags: Boolean, + sqlEscaper: SqlEscaper, + escapeIdentifiers: Boolean): QueryExecutionFactorySpark = { + super.createWithPartitioning(triples, + new RdfPartitionerComplex(), + explodeLanguageTags = true, + new SqlEscaperDoubleQuote(), + escapeIdentifiers = true) + } + override def create(database: String, mappingModel: Model): QueryExecutionFactorySpark = { - val ontop: QueryEngineOntop = null + create(database, mappingModel, null) + } + + def create(database: String, mappingModel: Model, ontology: OWLOntology): QueryExecutionFactorySpark = { + val ontop: QueryEngineOntop = QueryEngineOntop(spark, database, mappingModel, Option(ontology)) new QueryExecutionFactorySparkOntop(spark, ontop) } diff --git a/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/ontop/QueryEngineOntop.scala b/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/ontop/QueryEngineOntop.scala index 11cf38134..1920973e2 100644 --- a/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/ontop/QueryEngineOntop.scala +++ b/sansa-query/sansa-query-spark/src/main/scala/net/sansa_stack/query/spark/ontop/QueryEngineOntop.scala @@ -126,7 +126,7 @@ class QueryEngineOntop(val spark: SparkSession, val mappingsModel: Model, var ontology: Option[OWLOntology]) { - private val logger = com.typesafe.scalalogging.Logger[OntopSPARQLEngine] + private val logger = com.typesafe.scalalogging.Logger[QueryEngineOntop] // set the current Spark SQL database if given if (databaseName != null && databaseName.trim.nonEmpty) { @@ -156,6 +156,10 @@ class QueryEngineOntop(val spark: SparkSession, val rdfDatatype2SQLCastName = DatatypeMappings(typeFactory) + // some debug stuff + mappingsModel.write(System.out, "Turtle") + spark.catalog.listTables().collect().foreach(t => spark.table(sqlEscaper.escapeTableName(t.name)).show(false)) + /** * creates an ontology from the given partitions. diff --git a/sansa-query/sansa-query-spark/src/test/scala/net/sansa_stack/query/spark/compliance/SPARQL11TestSuiteRunnerSparkOntop.scala b/sansa-query/sansa-query-spark/src/test/scala/net/sansa_stack/query/spark/compliance/SPARQL11TestSuiteRunnerSparkOntop.scala index 98a7cdb16..7257face4 100644 --- a/sansa-query/sansa-query-spark/src/test/scala/net/sansa_stack/query/spark/compliance/SPARQL11TestSuiteRunnerSparkOntop.scala +++ b/sansa-query/sansa-query-spark/src/test/scala/net/sansa_stack/query/spark/compliance/SPARQL11TestSuiteRunnerSparkOntop.scala @@ -2,24 +2,15 @@ package net.sansa_stack.query.spark.compliance import java.util.Properties -import com.google.common.collect.Sets -import it.unibz.inf.ontop.exception.OntopInternalBugException -import it.unibz.inf.ontop.model.term._ -import net.sansa_stack.query.spark.ontop.OntopSPARQLEngine -import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitionerComplex} -import net.sansa_stack.rdf.spark.partition.core.RdfPartitionUtilsSpark +import scala.collection.JavaConverters._ + import org.apache.jena.query._ -import org.apache.jena.rdf.model.{Model, ModelFactory} -import org.apache.jena.sparql.engine.ResultSetStream -import org.apache.jena.sparql.engine.binding.Binding -import org.apache.jena.sparql.graph.GraphFactory +import org.apache.jena.rdf.model.Model import org.apache.jena.sparql.resultset.SPARQLResult -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row import org.scalatest.tags.Slow import org.scalatest.{ConfigMap, DoNotDiscover} -import scala.collection.JavaConverters._ +import net.sansa_stack.query.spark.ontop.QueryEngineFactoryOntop /** @@ -84,73 +75,49 @@ class SPARQL11TestSuiteRunnerSparkOntop ) - override lazy val IGNORE_FILTER = t => t.name.startsWith("HOUR") + override lazy val IGNORE_FILTER = t => t.name.startsWith("CONTAINS") - var ontopProperties: Properties = _ + var engineFactory: QueryEngineFactoryOntop = _ override def beforeAll(configMap: ConfigMap): Unit = { super.beforeAll(configMap) - ontopProperties = new Properties() - ontopProperties.load(getClass.getClassLoader.getResourceAsStream("ontop-spark.properties")) + engineFactory = new QueryEngineFactoryOntop(spark) } override def runQuery(query: Query, data: Model): SPARQLResult = { // distribute on Spark val triplesRDD = spark.sparkContext.parallelize(data.getGraph.find().toList.asScala) - val partitioner = new RdfPartitionerComplex(distinguishStringLiterals = false) + // we create a Spark database here to keep the implicit partitioning separate + val db = "TEST" + spark.sql(s"CREATE DATABASE IF NOT EXISTS $db") + spark.sql(s"USE $db") - // do partitioning here - val partitions: Map[RdfPartitionStateDefault, RDD[Row]] = - RdfPartitionUtilsSpark.partitionGraph(triplesRDD, partitioner) - - // create the query engine - val queryEngine = OntopSPARQLEngine(spark, partitioner, partitions, ontology = None) + val qef = engineFactory.create(triplesRDD) + val qe = qef.createQueryExecution(query) // produce result based on query type val result = if (query.isSelectType) { // SELECT - // convert to bindings - implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[Binding] - val bindings = queryEngine.execSelect(query.toString()).collect() - - // we have to get the vars from the initial query as bindings are just hashmaps without preserving order - val vars = query.getResultVars - val bindingVars = bindings.flatMap(_.vars().asScala).distinct.map(_.getVarName).toList.asJava - // sanity check if there is a difference in the projected vars - if (!Sets.symmetricDifference(Sets.newHashSet(vars), Sets.newHashSet(bindingVars)).isEmpty) { - println(s"projected vars do not match\nexpected: $vars\ngot:$bindingVars") - } - - // create the SPARQL result - val model = ModelFactory.createDefaultModel() - - val resultsActual = new ResultSetStream(vars, model, bindings.toList.asJava.iterator()) - new SPARQLResult(resultsActual) + val rs = qe.execSelect() + new SPARQLResult(rs) } else if (query.isAskType) { // ASK - // map DF entry to boolean - val b = queryEngine.execAsk(query.toString()) + val b = qe.execAsk() new SPARQLResult(b) } else if (query.isConstructType) { // CONSTRUCT - val triples = queryEngine.execConstruct(query.toString()).collect() - // create the SPARQL result - val g = GraphFactory.createDefaultGraph() - triples.foreach(g.add) - val model = ModelFactory.createModelForGraph(g) - new SPARQLResult(model) - + val triples = qe.execConstruct() + new SPARQLResult(triples) } else { // DESCRIBE todo fail("unsupported query type: DESCRIBE") null } // clean up - queryEngine.clear() + // we drop the Spark database to remove all tables + spark.sql(s"DROP DATABASE IF EXISTS $db") + qe.close() result } - class InvalidTermAsResultException(term: ImmutableTerm) extends OntopInternalBugException("Term " + term + " does not evaluate to a constant") - class InvalidConstantTypeInResultException(message: String) extends OntopInternalBugException(message) - } import org.scalatest.Tag diff --git a/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/core/RdfPartitionerBase.scala b/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/core/RdfPartitionerBase.scala index c410d7a86..8400912f9 100644 --- a/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/core/RdfPartitionerBase.scala +++ b/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/core/RdfPartitionerBase.scala @@ -83,7 +83,11 @@ abstract class RdfPartitionerBase(distinguishStringLiterals: Boolean = false, || (!distinguishStringLiterals && isPlainLiteral(o))) - val lang = if (langTagPresent && partitionPerLangTag && o.getLiteralLanguage.nonEmpty) Set(o.getLiteralLanguage) else Set.empty[String] + val lang = if (langTagPresent && partitionPerLangTag) { + Set(o.getLiteralLanguage) + } else { + Set.empty[String] + } RdfPartitionStateDefault(subjectType, predicate, objectType, datatype, langTagPresent, lang) } @@ -130,21 +134,18 @@ abstract class RdfPartitionerBase(distinguishStringLiterals: Boolean = false, }.toSeq ++ partitions.filter(_.languages.isEmpty) } - /* - def determineLayout(t: RdfPartitionDefault): TripleLayout = { - val oType = t.objectType - - val layout = oType match { - case 0 => TripleLayoutString - case 1 => TripleLayoutString - case 2 => if (isPlainLiteralDatatype(t.datatype)) TripleLayoutStringLang else determineLayoutDatatype(t.datatype) - // if(!t.langTagPresent) - // TripleLayoutString else TripleLayoutStringLang - case _ => throw new RuntimeException("Unsupported object type: " + t) - } - layout + // we have to override this method here, because now with a partition state holding possibly multiple languages, + // we have to check for containment instead of equality + override def matches(partition: RdfPartitionStateDefault, triple: Triple): Boolean = { + val newPartition = fromTriple(triple) + + partition.predicate == newPartition.predicate && + partition.subjectType == newPartition.subjectType && + partition.objectType == newPartition.objectType && + partition.langTagPresent == newPartition.langTagPresent && + partition.datatype == newPartition.datatype && + newPartition.languages.subsetOf(partition.languages) } - */ protected val intDTypeURIs: Set[String] = Set(XSDDatatype.XSDnegativeInteger, XSDDatatype.XSDpositiveInteger, XSDDatatype.XSDnonNegativeInteger, XSDDatatype.XSDnonPositiveInteger, diff --git a/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/r2rml/R2rmlUtils.scala b/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/r2rml/R2rmlUtils.scala index e37adf84d..92872240e 100644 --- a/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/r2rml/R2rmlUtils.scala +++ b/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/r2rml/R2rmlUtils.scala @@ -129,7 +129,7 @@ object R2rmlUtils { // and the object map val om: ObjectMap = pom.addNewObjectMap() om.setColumn(escapedColumns(1)) - om.setLanguage(p.languages.head) + if (p.languages.head.trim.nonEmpty) om.setLanguage(p.languages.head) tm.getOrSetLogicalTable().asBaseTableOrView().setTableName(tableName) @@ -150,7 +150,7 @@ object R2rmlUtils { val om: ObjectMap = pom.addNewObjectMap() om.setColumn(escapedColumns(1)) - om.setLanguage(lang) + if (lang.trim.nonEmpty) om.setLanguage(lang) tm.getOrSetLogicalTable().asR2rmlView().setSqlQuery(s"SELECT $columnsSql FROM $tableNameSql WHERE $langColSql = $langSql")