From e498234a9117158b65e641cf365a0549b0f2913b Mon Sep 17 00:00:00 2001 From: Claus Stadler Date: Tue, 12 Jan 2021 19:01:46 +0100 Subject: [PATCH] More consolidation of the r2rml/virtual knowledge graph subsystem (sparqlify / ontop) --- .../core/RdfPartitionImportExport.scala | 32 +---- .../common/partition/r2rml/R2rmlUtils.scala | 127 +++++++++++------- .../core/RdfPartitionerDefaultTests.scala | 15 ++- .../rdf/spark/utils/PartitionLib.scala | 41 ++++++ 4 files changed, 133 insertions(+), 82 deletions(-) create mode 100644 sansa-rdf/sansa-rdf-spark/src/main/scala/net/sansa_stack/rdf/spark/utils/PartitionLib.scala diff --git a/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/core/RdfPartitionImportExport.scala b/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/core/RdfPartitionImportExport.scala index ef4761358..49ab741f5 100644 --- a/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/core/RdfPartitionImportExport.scala +++ b/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/core/RdfPartitionImportExport.scala @@ -1,39 +1,14 @@ package net.sansa_stack.rdf.common.partition.core +import org.aksw.r2rml.jena.arq.lib.R2rmlLib import org.aksw.r2rml.jena.domain.api.TriplesMap -import org.aksw.r2rml.jena.vocab.RR -import org.apache.jena.rdf.model.{Model, ModelFactory} - -import net.sansa_stack.rdf.common.partition.r2rml.R2rmlUtils - -import scala.collection.JavaConverters._ +import org.apache.jena.rdf.model.Model /** * @author Lorenz Buehmann */ object RdfPartitionImportExport { - /** - * Exports the RDF partition states as R2RML. - * - * @param partitioner the RDF partitioner - * @param partitions the RDF partition states - * @param explodeLanguageTags if `true` a separate mapping/TriplesMap will be created for each language tag, - * otherwise a mapping to a column for the language tag represented by - * `rr:langColumn` property will be used (note, this is an extension of R2RML) - * @return the model containing the RDF partition states as as R2RML syntax - */ - def exportAsR2RML(partitioner: RdfPartitioner[RdfPartitionStateDefault], - partitions: Seq[RdfPartitionStateDefault], - explodeLanguageTags: Boolean = false): Model = { - // put all triple maps into a single model - val model = ModelFactory.createDefaultModel() - - R2rmlUtils.createR2rmlMappings(partitioner, partitions, model, explodeLanguageTags) - - model - } - /** * Imports the RDF partition states as `TriplesMap` from the given RDF data model. * @@ -41,6 +16,7 @@ object RdfPartitionImportExport { * @return the RDF partition states as `TriplesMap` */ def importFromR2RML(model: Model): Seq[TriplesMap] = { - model.listResourcesWithProperty(RR.logicalTable).mapWith(_.as(classOf[TriplesMap])).asScala.toSeq + import collection.JavaConverters._ + R2rmlLib.streamTriplesMaps(model).iterator().asScala.toSeq } } diff --git a/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/r2rml/R2rmlUtils.scala b/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/r2rml/R2rmlUtils.scala index 813621946..f4a9d97c2 100644 --- a/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/r2rml/R2rmlUtils.scala +++ b/sansa-rdf/sansa-rdf-common/src/main/scala/net/sansa_stack/rdf/common/partition/r2rml/R2rmlUtils.scala @@ -1,21 +1,15 @@ package net.sansa_stack.rdf.common.partition.r2rml -import java.util - -import com.google.common.collect.ImmutableMap - import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitioner} -import net.sansa_stack.rdf.common.partition.model.sparqlify.SparqlifyUtils2.createExprForNode -import org.aksw.jena_sparql_api.utils.Vars -import org.aksw.obda.domain.api.Constraint -import org.aksw.obda.domain.impl.LogicalTableTableName -import org.aksw.r2rml.jena.domain.api.{ObjectMap, PredicateObjectMap, SubjectMap, TermMap, TriplesMap} +import org.aksw.r2rml.jena.domain.api._ import org.aksw.r2rml.jena.vocab.RR import org.aksw.r2rmlx.domain.api.TermMapX +import org.aksw.sparqlify.core.sql.common.serialization.{SqlEscaper, SqlEscaperBacktick} import org.apache.jena.graph.NodeFactory import org.apache.jena.rdf.model.{Model, ModelFactory, ResourceFactory} -import org.apache.jena.sparql.core.{Quad, Var} -import org.apache.jena.sparql.expr.{Expr, ExprVar} +import org.apache.jena.sparql.core.Var +import org.apache.jena.sparql.expr.ExprVar + import scala.reflect.runtime.universe.MethodSymbol object R2rmlUtils { @@ -27,86 +21,93 @@ object R2rmlUtils { attrName } + + def createR2rmlMappings(partitioner: RdfPartitioner[RdfPartitionStateDefault], + partition: RdfPartitionStateDefault, + model: Model, + explodeLanguageTags: Boolean): Seq[TriplesMap] = { + createR2rmlMappings( + partitioner, + partition, + x => createDefaultTableName(x), // Map the partition to a name + new SqlEscaperBacktick, + model, + explodeLanguageTags) + } + def createR2rmlMappings(partitioner: RdfPartitioner[RdfPartitionStateDefault], partitions: Seq[RdfPartitionStateDefault], model: Model, explodeLanguageTags: Boolean): Seq[TriplesMap] = { partitions - .flatMap(p => createR2rmlMappings(partitioner, p, model, explodeLanguageTags)) + .flatMap(p => createR2rmlMappings( + partitioner, + p, + model, + explodeLanguageTags)) } /** * Transform a RdfPartitionStateDefault into a sequence of R2RML mappings. - * TODO Clarify whether the returned mappings share the same Model or are backed by individual ones * If the language handling strategy demands a dedicated column for language tags then the * resulting R2RML contains the non-standard 'rr:langColumn' property. * * FIXME Creating mappings per language tag needs yet to be implemented * - * @param partitioner - * @param p - * @return + * @param partitioner The partitioner + * @param partitionState The partition state generated by the partitioner + * @param extractTableName A function to obtain a table name from the partition state + * @param sqlEscaper SQL escaping policies for table names, column names, string literals and aliases + * @param outModel The output model + * @param explodeLanguageTags If true then a mapping is generated for each language tag listed in the partition state. + * Otherwise a generic language column is introduces + * @return The set of {@link TriplesMap}s added to the output model */ def createR2rmlMappings(partitioner: RdfPartitioner[RdfPartitionStateDefault], - p: RdfPartitionStateDefault, - model: Model, + partitionState: RdfPartitionStateDefault, + extractTableName: RdfPartitionStateDefault => String, + sqlEscaper: SqlEscaper, + outModel: Model, explodeLanguageTags: Boolean): Seq[TriplesMap] = { - // val basicTableInfo = basicTableInfoProvider.getBasicTableInfo(sqlQueryStr) - // println("Result schema: " + basicTableInfoProvider.getBasicTableInfo(sqlQueryStr)) - - // items.foreach(x => println("Item: " + x)) + val p = partitionState // Shorthand + val t = partitioner.determineLayout(partitionState).schema - val t = partitioner.determineLayout(p).schema - // val t = p.layout.schema val attrNames = t.members.sorted.collect({ case m: MethodSymbol if m.isCaseAccessor => m.name.toString }) - // println("Counting the dataset: " + ds.count()) - val pred = p.predicate - - // For now let's just use the full predicate as the uri - // val predPart = pred.substring(pred.lastIndexOf("/") + 1) - val predPart = pred - val pn = NodeFactory.createURI(p.predicate) - - val dt = p.datatype - val dtPart = if (dt != null && dt.nonEmpty) "_" + dt.substring(dt.lastIndexOf("/") + 1) else "" - val langPart = if (p.langTagPresent) "_lang" else "" - - val sTermTypePart = if (p.subjectType == 0) "sbn" else "" - val oTermTypePart = if (p.objectType == 0) "obn" else "" - - val tableName = predPart + dtPart + langPart + sTermTypePart + oTermTypePart - // .replace("#", "__").replace("-", "_") - - // TODO Ensure tableName is safe + val predicateIri: String = partitionState.predicate + val tableName = extractTableName(partitionState) if (explodeLanguageTags && attrNames.length == 3) { - val langCol = attrNames(2) - val columns = attrNames.slice(0, 2).mkString(", ") + val langColSql = sqlEscaper.escapeColumnName(attrNames(2)) + val columnsSql = attrNames.slice(0, 2).map(sqlEscaper.escapeColumnName(_)).mkString(", ") p.languages.map(lang => { - val tm: TriplesMap = model.createResource.as(classOf[TriplesMap]) + val tableNameSql = sqlEscaper.escapeTableName(tableName) + val langSql = sqlEscaper.escapeStringLiteral(lang) + + val tm: TriplesMap = outModel.createResource.as(classOf[TriplesMap]) // create subject map val sm: SubjectMap = tm.getOrSetSubjectMap() setTermMapForNode(sm, 0, attrNames, p.subjectType, "", false) val pom: PredicateObjectMap = tm.addNewPredicateObjectMap() - pom.getPredicates.add(tm.getModel.wrapAsResource(pn)) + pom.addPredicate(predicateIri) val om: ObjectMap = pom.addNewObjectMap() om.setColumn(attrNames(1)) om.setLanguage(lang) - tm.getOrSetLogicalTable().setSqlQuery(s"SELECT $columns FROM $tableName WHERE $langCol = '$lang'") + + tm.getOrSetLogicalTable().setSqlQuery(s"SELECT $columnsSql FROM $tableNameSql WHERE $langColSql = $langSql") tm }).toSeq } else { val tm: TriplesMap = ModelFactory.createDefaultModel.createResource.as(classOf[TriplesMap]) val pom: PredicateObjectMap = tm.addNewPredicateObjectMap() - pom.getPredicates.add(tm.getModel.wrapAsResource(pn)) + pom.addPredicate(predicateIri) val sm: SubjectMap = tm.getOrSetSubjectMap() val om: ObjectMap = pom.addNewObjectMap() @@ -144,4 +145,32 @@ object R2rmlUtils { target } + + + /** + * Creates a SQL table name for a partition. + * + * FIXME Consolidate with SQLUtils in the spark module? + * + * @param p the RDF partition + * @return + */ + def createDefaultTableName(p: RdfPartitionStateDefault): String = { + + // For now let's just use the full predicate as the uri + // val predPart = pred.substring(pred.lastIndexOf("/") + 1) + val predPart = p.predicate + val pn = NodeFactory.createURI(p.predicate) + + val dt = p.datatype + val dtPart = if (dt != null && dt.nonEmpty) "_" + dt.substring(dt.lastIndexOf("/") + 1) else "" + val langPart = if (p.langTagPresent) "_lang" else "" + + val sTermTypePart = if (p.subjectType == 0) "sbn" else "" + val oTermTypePart = if (p.objectType == 0) "obn" else "" + + val tableName = predPart + dtPart + langPart + sTermTypePart + oTermTypePart + + tableName + } } diff --git a/sansa-rdf/sansa-rdf-common/src/test/scala/net/sansa_stack/rdf/common/partition/core/RdfPartitionerDefaultTests.scala b/sansa-rdf/sansa-rdf-common/src/test/scala/net/sansa_stack/rdf/common/partition/core/RdfPartitionerDefaultTests.scala index e341a8716..88ecacf46 100644 --- a/sansa-rdf/sansa-rdf-common/src/test/scala/net/sansa_stack/rdf/common/partition/core/RdfPartitionerDefaultTests.scala +++ b/sansa-rdf/sansa-rdf-common/src/test/scala/net/sansa_stack/rdf/common/partition/core/RdfPartitionerDefaultTests.scala @@ -88,7 +88,7 @@ class RdfPartitionerDefaultTests extends FunSuite { RDFDataMgr.read(expected, new ByteArrayInputStream( """ | @base . - |[ <#logicalTable> [ <#sqlQuery> "SELECT s, o FROM http://xmlns.com/foaf/0.1/givenName_XMLSchema#string_lang WHERE l = 'fr'" ] ; + |[ <#logicalTable> [ <#sqlQuery> "SELECT `s`, `o` FROM `http://xmlns.com/foaf/0.1/givenName_XMLSchema#string_lang` WHERE `l` = 'fr'" ] ; | <#predicateObjectMap> [ <#objectMap> [ <#column> "o" ; | <#language> "fr" | ] ; @@ -99,7 +99,7 @@ class RdfPartitionerDefaultTests extends FunSuite { | ] |] . | - |[ <#logicalTable> [ <#sqlQuery> "SELECT s, o FROM http://xmlns.com/foaf/0.1/givenName_XMLSchema#string_lang WHERE l = 'de'" ] ; + |[ <#logicalTable> [ <#sqlQuery> "SELECT `s`, `o` FROM `http://xmlns.com/foaf/0.1/givenName_XMLSchema#string_lang` WHERE `l` = 'de'" ] ; | <#predicateObjectMap> [ <#objectMap> [ <#column> "o" ; | <#language> "de" | ] ; @@ -110,7 +110,7 @@ class RdfPartitionerDefaultTests extends FunSuite { | ] |] . | - |[ <#logicalTable> [ <#sqlQuery> "SELECT s, o FROM http://xmlns.com/foaf/0.1/givenName_XMLSchema#string_lang WHERE l = 'en'" ] ; + |[ <#logicalTable> [ <#sqlQuery> "SELECT `s`, `o` FROM `http://xmlns.com/foaf/0.1/givenName_XMLSchema#string_lang` WHERE `l` = 'en'" ] ; | <#predicateObjectMap> [ <#objectMap> [ <#column> "o" ; | <#language> "en" | ] ; @@ -131,6 +131,8 @@ class RdfPartitionerDefaultTests extends FunSuite { val triplesMaps = R2rmlUtils.createR2rmlMappings(RdfPartitionerDefault, partitionState, actual, true) + RDFDataMgr.write(System.out, actual, RDFFormat.TURTLE_PRETTY) + assert(triplesMaps.size == languages.size) assert(expected.isIsomorphicWith(actual)) @@ -141,11 +143,14 @@ class RdfPartitionerDefaultTests extends FunSuite { val partitionState = RdfPartitionStateDefault(1, "http://xmlns.com/foaf/0.1/givenName", 2, "http://www.w3.org/2001/XMLSchema#string", true, Set("en", "de", "fr")) - val triplesMaps = R2rmlUtils.createR2rmlMappings(RdfPartitionerDefault, partitionState, ModelFactory.createDefaultModel(), true) + val exportModel = ModelFactory.createDefaultModel() + val triplesMaps = R2rmlUtils.createR2rmlMappings(RdfPartitionerDefault, partitionState, exportModel, true) - val exportModel = RdfPartitionImportExport.exportAsR2RML(RdfPartitionerDefault, Seq(partitionState), true) + // val exportModel = exportModel // RdfPartitionImportExport.exportAsR2RML(RdfPartitionerDefault, partitionState, true) exportModel.write(System.out, "Turtle", "http://www.w3.org/ns/r2rml#") + // TODO Add serialization and deserialization + val importedTriplesMaps = RdfPartitionImportExport.importFromR2RML(exportModel) assert(triplesMaps.size == importedTriplesMaps.size) diff --git a/sansa-rdf/sansa-rdf-spark/src/main/scala/net/sansa_stack/rdf/spark/utils/PartitionLib.scala b/sansa-rdf/sansa-rdf-spark/src/main/scala/net/sansa_stack/rdf/spark/utils/PartitionLib.scala new file mode 100644 index 000000000..271661732 --- /dev/null +++ b/sansa-rdf/sansa-rdf-spark/src/main/scala/net/sansa_stack/rdf/spark/utils/PartitionLib.scala @@ -0,0 +1,41 @@ +package net.sansa_stack.rdf.spark.utils + +import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitioner} +import net.sansa_stack.rdf.common.partition.r2rml.R2rmlUtils +import net.sansa_stack.rdf.spark.partition.core.{BlankNodeStrategy, SQLUtils} +import org.aksw.sparqlify.core.sql.common.serialization.SqlEscaperBacktick +import org.apache.jena.rdf.model.{Model, ModelFactory} + +/** + * @author Lorenz Buehmann + */ +object PartitionLib { + /** + * Exports the RDF partition states as R2RML. + * Uses table name and sql escaping configuration suitable for spark. + * + * @param partitioner the RDF partitioner + * @param partitions the RDF partition states + * @param explodeLanguageTags if `true` a separate mapping/TriplesMap will be created for each language tag, + * otherwise a mapping to a column for the language tag represented by + * `rr:langColumn` property will be used (note, this is an extension of R2RML) + * @return the model containing the RDF partition states as as R2RML syntax + */ + def exportAsR2RML(partitioner: RdfPartitioner[RdfPartitionStateDefault], + partitions: Seq[RdfPartitionStateDefault], + explodeLanguageTags: Boolean = false): Model = { + // put all triple maps into a single model + val model = ModelFactory.createDefaultModel() + + partitions.flatMap(partition => + R2rmlUtils.createR2rmlMappings( + partitioner, + partition, + p => SQLUtils.createTableName(p, BlankNodeStrategy.Table), + new SqlEscaperBacktick, + model, explodeLanguageTags) + ) + + model + } +}