Skip to content

Commit

Permalink
cntd Ontop integration
Browse files Browse the repository at this point in the history
  • Loading branch information
LorenzBuehmann committed Jan 17, 2021
1 parent 3078340 commit 3387704
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 74 deletions.
Expand Up @@ -11,6 +11,23 @@ import org.apache.spark.rdd.RDD
*/
trait QueryEngineFactory {

/**
* Creates a query engine factory on a pre-partitioned dataset. The RDF data is distributed among Spark
* tables located either in the given database or the default database. Mappings from triples to tables is
* provided by the mapping model.
*
* @param database the database that holds the tables for the RDF data, if `null` the default database will be used
* @param mappingModel the model containing the mappings
* @return a query execution factory
*/
def create(database: String, mappingModel: Model): QueryExecutionFactorySpark

/**
* Creates a query engine factory for the given RDD of triples.
*
* @param triples the RDD of triples
* @return a query execution factory
*/
def create(triples: RDD[Triple]): QueryExecutionFactorySpark

}
@@ -1,12 +1,59 @@
package net.sansa_stack.query.spark.api.impl

import org.aksw.sparqlify.core.sql.common.serialization.{SqlEscaper, SqlEscaperBacktick, SqlEscaperDoubleQuote}
import org.apache.jena.graph
import org.apache.jena.rdf.model.ModelFactory
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import net.sansa_stack.query.spark.api.domain.QueryEngineFactory
import net.sansa_stack.query.spark.api.domain.{QueryEngineFactory, QueryExecutionFactorySpark}
import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitioner, RdfPartitionerComplex, RdfPartitionerDefault}
import net.sansa_stack.rdf.common.partition.r2rml.R2rmlUtils
import net.sansa_stack.rdf.spark.partition.core.{RdfPartitionUtilsSpark, SQLUtils, SparkTableGenerator}

/**
* @author Lorenz Buehmann
*/
abstract class QueryEngineFactoryBase(spark: SparkSession) extends QueryEngineFactory {

protected def createWithPartitioning(triples: RDD[graph.Triple],
partitioner: RdfPartitioner[RdfPartitionStateDefault] = RdfPartitionerDefault,
explodeLanguageTags: Boolean = false,
sqlEscaper: SqlEscaper = new SqlEscaperBacktick(),
escapeIdentifiers: Boolean = false): QueryExecutionFactorySpark = {
// apply vertical partitioning
val partitions2RDD = RdfPartitionUtilsSpark.partitionGraph(triples, partitioner)

val tableNameFn: RdfPartitionStateDefault => String = p => SQLUtils.escapeTablename(R2rmlUtils.createDefaultTableName(p))

partitions2RDD.foreach {
case (p, rdd) =>
println(p)
rdd.collect().foreach(println)
}

// create the Spark tables
SparkTableGenerator(spark).createAndRegisterSparkTables(partitioner,
partitions2RDD,
extractTableName = tableNameFn)

// create the mappings model
val mappingsModel = ModelFactory.createDefaultModel()
R2rmlUtils.createR2rmlMappings(
partitioner,
partitions2RDD.keySet.toSeq,
tableNameFn,
sqlEscaper,
mappingsModel,
explodeLanguageTags,
escapeIdentifiers)

create(null, mappingsModel)

}

override def create(triples: RDD[graph.Triple]): QueryExecutionFactorySpark = {
createWithPartitioning(triples)
}

}
@@ -1,18 +1,41 @@
package net.sansa_stack.query.spark.ontop

import org.aksw.sparqlify.core.sql.common.serialization.{SqlEscaper, SqlEscaperDoubleQuote}
import org.apache.jena.graph
import org.apache.jena.rdf.model.Model
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.semanticweb.owlapi.model.OWLOntology

import net.sansa_stack.query.spark.api.domain.QueryExecutionFactorySpark
import net.sansa_stack.query.spark.api.impl.QueryEngineFactoryBase
import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitioner, RdfPartitionerComplex}

/**
* A query engine factory with Ontop as SPARQL to SQL rewriter.
*
* @author Lorenz Buehmann
*/
class QueryEngineFactoryOntop(spark: SparkSession) extends QueryEngineFactoryBase(spark) {

override protected def createWithPartitioning(triples: RDD[graph.Triple],
partitioner: RdfPartitioner[RdfPartitionStateDefault],
explodeLanguageTags: Boolean,
sqlEscaper: SqlEscaper,
escapeIdentifiers: Boolean): QueryExecutionFactorySpark = {
super.createWithPartitioning(triples,
new RdfPartitionerComplex(),
explodeLanguageTags = true,
new SqlEscaperDoubleQuote(),
escapeIdentifiers = true)
}

override def create(database: String, mappingModel: Model): QueryExecutionFactorySpark = {
val ontop: QueryEngineOntop = null
create(database, mappingModel, null)
}

def create(database: String, mappingModel: Model, ontology: OWLOntology): QueryExecutionFactorySpark = {
val ontop: QueryEngineOntop = QueryEngineOntop(spark, database, mappingModel, Option(ontology))

new QueryExecutionFactorySparkOntop(spark, ontop)
}
Expand Down
Expand Up @@ -126,7 +126,7 @@ class QueryEngineOntop(val spark: SparkSession,
val mappingsModel: Model,
var ontology: Option[OWLOntology]) {

private val logger = com.typesafe.scalalogging.Logger[OntopSPARQLEngine]
private val logger = com.typesafe.scalalogging.Logger[QueryEngineOntop]

// set the current Spark SQL database if given
if (databaseName != null && databaseName.trim.nonEmpty) {
Expand Down Expand Up @@ -156,6 +156,10 @@ class QueryEngineOntop(val spark: SparkSession,
val rdfDatatype2SQLCastName = DatatypeMappings(typeFactory)


// some debug stuff
mappingsModel.write(System.out, "Turtle")
spark.catalog.listTables().collect().foreach(t => spark.table(sqlEscaper.escapeTableName(t.name)).show(false))


/**
* creates an ontology from the given partitions.
Expand Down
Expand Up @@ -2,24 +2,15 @@ package net.sansa_stack.query.spark.compliance

import java.util.Properties

import com.google.common.collect.Sets
import it.unibz.inf.ontop.exception.OntopInternalBugException
import it.unibz.inf.ontop.model.term._
import net.sansa_stack.query.spark.ontop.OntopSPARQLEngine
import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitionerComplex}
import net.sansa_stack.rdf.spark.partition.core.RdfPartitionUtilsSpark
import scala.collection.JavaConverters._

import org.apache.jena.query._
import org.apache.jena.rdf.model.{Model, ModelFactory}
import org.apache.jena.sparql.engine.ResultSetStream
import org.apache.jena.sparql.engine.binding.Binding
import org.apache.jena.sparql.graph.GraphFactory
import org.apache.jena.rdf.model.Model
import org.apache.jena.sparql.resultset.SPARQLResult
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.scalatest.tags.Slow
import org.scalatest.{ConfigMap, DoNotDiscover}

import scala.collection.JavaConverters._
import net.sansa_stack.query.spark.ontop.QueryEngineFactoryOntop


/**
Expand Down Expand Up @@ -84,73 +75,49 @@ class SPARQL11TestSuiteRunnerSparkOntop
)


override lazy val IGNORE_FILTER = t => t.name.startsWith("HOUR")
override lazy val IGNORE_FILTER = t => t.name.startsWith("CONTAINS")

var ontopProperties: Properties = _
var engineFactory: QueryEngineFactoryOntop = _

override def beforeAll(configMap: ConfigMap): Unit = {
super.beforeAll(configMap)
ontopProperties = new Properties()
ontopProperties.load(getClass.getClassLoader.getResourceAsStream("ontop-spark.properties"))
engineFactory = new QueryEngineFactoryOntop(spark)
}

override def runQuery(query: Query, data: Model): SPARQLResult = {
// distribute on Spark
val triplesRDD = spark.sparkContext.parallelize(data.getGraph.find().toList.asScala)

val partitioner = new RdfPartitionerComplex(distinguishStringLiterals = false)
// we create a Spark database here to keep the implicit partitioning separate
val db = "TEST"
spark.sql(s"CREATE DATABASE IF NOT EXISTS $db")
spark.sql(s"USE $db")

// do partitioning here
val partitions: Map[RdfPartitionStateDefault, RDD[Row]] =
RdfPartitionUtilsSpark.partitionGraph(triplesRDD, partitioner)

// create the query engine
val queryEngine = OntopSPARQLEngine(spark, partitioner, partitions, ontology = None)
val qef = engineFactory.create(triplesRDD)
val qe = qef.createQueryExecution(query)

// produce result based on query type
val result = if (query.isSelectType) { // SELECT
// convert to bindings
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[Binding]
val bindings = queryEngine.execSelect(query.toString()).collect()

// we have to get the vars from the initial query as bindings are just hashmaps without preserving order
val vars = query.getResultVars
val bindingVars = bindings.flatMap(_.vars().asScala).distinct.map(_.getVarName).toList.asJava
// sanity check if there is a difference in the projected vars
if (!Sets.symmetricDifference(Sets.newHashSet(vars), Sets.newHashSet(bindingVars)).isEmpty) {
println(s"projected vars do not match\nexpected: $vars\ngot:$bindingVars")
}

// create the SPARQL result
val model = ModelFactory.createDefaultModel()

val resultsActual = new ResultSetStream(vars, model, bindings.toList.asJava.iterator())
new SPARQLResult(resultsActual)
val rs = qe.execSelect()
new SPARQLResult(rs)
} else if (query.isAskType) { // ASK
// map DF entry to boolean
val b = queryEngine.execAsk(query.toString())
val b = qe.execAsk()
new SPARQLResult(b)
} else if (query.isConstructType) { // CONSTRUCT
val triples = queryEngine.execConstruct(query.toString()).collect()
// create the SPARQL result
val g = GraphFactory.createDefaultGraph()
triples.foreach(g.add)
val model = ModelFactory.createModelForGraph(g)
new SPARQLResult(model)

val triples = qe.execConstruct()
new SPARQLResult(triples)
} else { // DESCRIBE todo
fail("unsupported query type: DESCRIBE")
null
}
// clean up
queryEngine.clear()
// we drop the Spark database to remove all tables
spark.sql(s"DROP DATABASE IF EXISTS $db")
qe.close()

result
}

class InvalidTermAsResultException(term: ImmutableTerm) extends OntopInternalBugException("Term " + term + " does not evaluate to a constant")
class InvalidConstantTypeInResultException(message: String) extends OntopInternalBugException(message)

}

import org.scalatest.Tag
Expand Down
Expand Up @@ -83,7 +83,11 @@ abstract class RdfPartitionerBase(distinguishStringLiterals: Boolean = false,
|| (!distinguishStringLiterals && isPlainLiteral(o)))


val lang = if (langTagPresent && partitionPerLangTag && o.getLiteralLanguage.nonEmpty) Set(o.getLiteralLanguage) else Set.empty[String]
val lang = if (langTagPresent && partitionPerLangTag) {
Set(o.getLiteralLanguage)
} else {
Set.empty[String]
}

RdfPartitionStateDefault(subjectType, predicate, objectType, datatype, langTagPresent, lang)
}
Expand Down Expand Up @@ -130,21 +134,18 @@ abstract class RdfPartitionerBase(distinguishStringLiterals: Boolean = false,
}.toSeq ++ partitions.filter(_.languages.isEmpty)
}

/*
def determineLayout(t: RdfPartitionDefault): TripleLayout = {
val oType = t.objectType
val layout = oType match {
case 0 => TripleLayoutString
case 1 => TripleLayoutString
case 2 => if (isPlainLiteralDatatype(t.datatype)) TripleLayoutStringLang else determineLayoutDatatype(t.datatype)
// if(!t.langTagPresent)
// TripleLayoutString else TripleLayoutStringLang
case _ => throw new RuntimeException("Unsupported object type: " + t)
}
layout
// we have to override this method here, because now with a partition state holding possibly multiple languages,
// we have to check for containment instead of equality
override def matches(partition: RdfPartitionStateDefault, triple: Triple): Boolean = {
val newPartition = fromTriple(triple)

partition.predicate == newPartition.predicate &&
partition.subjectType == newPartition.subjectType &&
partition.objectType == newPartition.objectType &&
partition.langTagPresent == newPartition.langTagPresent &&
partition.datatype == newPartition.datatype &&
newPartition.languages.subsetOf(partition.languages)
}
*/

protected val intDTypeURIs: Set[String] = Set(XSDDatatype.XSDnegativeInteger, XSDDatatype.XSDpositiveInteger,
XSDDatatype.XSDnonNegativeInteger, XSDDatatype.XSDnonPositiveInteger,
Expand Down
Expand Up @@ -129,7 +129,7 @@ object R2rmlUtils {
// and the object map
val om: ObjectMap = pom.addNewObjectMap()
om.setColumn(escapedColumns(1))
om.setLanguage(p.languages.head)
if (p.languages.head.trim.nonEmpty) om.setLanguage(p.languages.head)

tm.getOrSetLogicalTable().asBaseTableOrView().setTableName(tableName)

Expand All @@ -150,7 +150,7 @@ object R2rmlUtils {

val om: ObjectMap = pom.addNewObjectMap()
om.setColumn(escapedColumns(1))
om.setLanguage(lang)
if (lang.trim.nonEmpty) om.setLanguage(lang)

tm.getOrSetLogicalTable().asR2rmlView().setSqlQuery(s"SELECT $columnsSql FROM $tableNameSql WHERE $langColSql = $langSql")

Expand Down

0 comments on commit 3387704

Please sign in to comment.