Skip to content

Commit

Permalink
More consolidation of the r2rml/virtual knowledge graph subsystem (sp…
Browse files Browse the repository at this point in the history
…arqlify / ontop)
  • Loading branch information
Aklakan committed Jan 12, 2021
1 parent eaced7c commit e498234
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 82 deletions.
@@ -1,46 +1,22 @@
package net.sansa_stack.rdf.common.partition.core

import org.aksw.r2rml.jena.arq.lib.R2rmlLib
import org.aksw.r2rml.jena.domain.api.TriplesMap
import org.aksw.r2rml.jena.vocab.RR
import org.apache.jena.rdf.model.{Model, ModelFactory}

import net.sansa_stack.rdf.common.partition.r2rml.R2rmlUtils

import scala.collection.JavaConverters._
import org.apache.jena.rdf.model.Model

/**
* @author Lorenz Buehmann
*/
object RdfPartitionImportExport {

/**
* Exports the RDF partition states as R2RML.
*
* @param partitioner the RDF partitioner
* @param partitions the RDF partition states
* @param explodeLanguageTags if `true` a separate mapping/TriplesMap will be created for each language tag,
* otherwise a mapping to a column for the language tag represented by
* `rr:langColumn` property will be used (note, this is an extension of R2RML)
* @return the model containing the RDF partition states as as R2RML syntax
*/
def exportAsR2RML(partitioner: RdfPartitioner[RdfPartitionStateDefault],
partitions: Seq[RdfPartitionStateDefault],
explodeLanguageTags: Boolean = false): Model = {
// put all triple maps into a single model
val model = ModelFactory.createDefaultModel()

R2rmlUtils.createR2rmlMappings(partitioner, partitions, model, explodeLanguageTags)

model
}

/**
* Imports the RDF partition states as `TriplesMap` from the given RDF data model.
*
* @param model the model
* @return the RDF partition states as `TriplesMap`
*/
def importFromR2RML(model: Model): Seq[TriplesMap] = {
model.listResourcesWithProperty(RR.logicalTable).mapWith(_.as(classOf[TriplesMap])).asScala.toSeq
import collection.JavaConverters._
R2rmlLib.streamTriplesMaps(model).iterator().asScala.toSeq
}
}
@@ -1,21 +1,15 @@
package net.sansa_stack.rdf.common.partition.r2rml

import java.util

import com.google.common.collect.ImmutableMap

import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitioner}
import net.sansa_stack.rdf.common.partition.model.sparqlify.SparqlifyUtils2.createExprForNode
import org.aksw.jena_sparql_api.utils.Vars
import org.aksw.obda.domain.api.Constraint
import org.aksw.obda.domain.impl.LogicalTableTableName
import org.aksw.r2rml.jena.domain.api.{ObjectMap, PredicateObjectMap, SubjectMap, TermMap, TriplesMap}
import org.aksw.r2rml.jena.domain.api._
import org.aksw.r2rml.jena.vocab.RR
import org.aksw.r2rmlx.domain.api.TermMapX
import org.aksw.sparqlify.core.sql.common.serialization.{SqlEscaper, SqlEscaperBacktick}
import org.apache.jena.graph.NodeFactory
import org.apache.jena.rdf.model.{Model, ModelFactory, ResourceFactory}
import org.apache.jena.sparql.core.{Quad, Var}
import org.apache.jena.sparql.expr.{Expr, ExprVar}
import org.apache.jena.sparql.core.Var
import org.apache.jena.sparql.expr.ExprVar

import scala.reflect.runtime.universe.MethodSymbol

object R2rmlUtils {
Expand All @@ -27,86 +21,93 @@ object R2rmlUtils {
attrName
}


def createR2rmlMappings(partitioner: RdfPartitioner[RdfPartitionStateDefault],
partition: RdfPartitionStateDefault,
model: Model,
explodeLanguageTags: Boolean): Seq[TriplesMap] = {
createR2rmlMappings(
partitioner,
partition,
x => createDefaultTableName(x), // Map the partition to a name
new SqlEscaperBacktick,
model,
explodeLanguageTags)
}

def createR2rmlMappings(partitioner: RdfPartitioner[RdfPartitionStateDefault],
partitions: Seq[RdfPartitionStateDefault],
model: Model,
explodeLanguageTags: Boolean): Seq[TriplesMap] = {
partitions
.flatMap(p => createR2rmlMappings(partitioner, p, model, explodeLanguageTags))
.flatMap(p => createR2rmlMappings(
partitioner,
p,
model,
explodeLanguageTags))
}


/**
* Transform a RdfPartitionStateDefault into a sequence of R2RML mappings.
* TODO Clarify whether the returned mappings share the same Model or are backed by individual ones
* If the language handling strategy demands a dedicated column for language tags then the
* resulting R2RML contains the non-standard 'rr:langColumn' property.
*
* FIXME Creating mappings per language tag needs yet to be implemented
*
* @param partitioner
* @param p
* @return
* @param partitioner The partitioner
* @param partitionState The partition state generated by the partitioner
* @param extractTableName A function to obtain a table name from the partition state
* @param sqlEscaper SQL escaping policies for table names, column names, string literals and aliases
* @param outModel The output model
* @param explodeLanguageTags If true then a mapping is generated for each language tag listed in the partition state.
* Otherwise a generic language column is introduces
* @return The set of {@link TriplesMap}s added to the output model
*/
def createR2rmlMappings(partitioner: RdfPartitioner[RdfPartitionStateDefault],
p: RdfPartitionStateDefault,
model: Model,
partitionState: RdfPartitionStateDefault,
extractTableName: RdfPartitionStateDefault => String,
sqlEscaper: SqlEscaper,
outModel: Model,
explodeLanguageTags: Boolean): Seq[TriplesMap] = {
// val basicTableInfo = basicTableInfoProvider.getBasicTableInfo(sqlQueryStr)
// println("Result schema: " + basicTableInfoProvider.getBasicTableInfo(sqlQueryStr))

// items.foreach(x => println("Item: " + x))
val p = partitionState // Shorthand
val t = partitioner.determineLayout(partitionState).schema

val t = partitioner.determineLayout(p).schema
// val t = p.layout.schema
val attrNames = t.members.sorted.collect({ case m: MethodSymbol if m.isCaseAccessor => m.name.toString })

// println("Counting the dataset: " + ds.count())
val pred = p.predicate

// For now let's just use the full predicate as the uri
// val predPart = pred.substring(pred.lastIndexOf("/") + 1)
val predPart = pred
val pn = NodeFactory.createURI(p.predicate)

val dt = p.datatype
val dtPart = if (dt != null && dt.nonEmpty) "_" + dt.substring(dt.lastIndexOf("/") + 1) else ""
val langPart = if (p.langTagPresent) "_lang" else ""

val sTermTypePart = if (p.subjectType == 0) "sbn" else ""
val oTermTypePart = if (p.objectType == 0) "obn" else ""

val tableName = predPart + dtPart + langPart + sTermTypePart + oTermTypePart
// .replace("#", "__").replace("-", "_")

// TODO Ensure tableName is safe
val predicateIri: String = partitionState.predicate
val tableName = extractTableName(partitionState)

if (explodeLanguageTags && attrNames.length == 3) {
val langCol = attrNames(2)
val columns = attrNames.slice(0, 2).mkString(", ")
val langColSql = sqlEscaper.escapeColumnName(attrNames(2))
val columnsSql = attrNames.slice(0, 2).map(sqlEscaper.escapeColumnName(_)).mkString(", ")

p.languages.map(lang => {
val tm: TriplesMap = model.createResource.as(classOf[TriplesMap])
val tableNameSql = sqlEscaper.escapeTableName(tableName)
val langSql = sqlEscaper.escapeStringLiteral(lang)

val tm: TriplesMap = outModel.createResource.as(classOf[TriplesMap])

// create subject map
val sm: SubjectMap = tm.getOrSetSubjectMap()
setTermMapForNode(sm, 0, attrNames, p.subjectType, "", false)

val pom: PredicateObjectMap = tm.addNewPredicateObjectMap()
pom.getPredicates.add(tm.getModel.wrapAsResource(pn))
pom.addPredicate(predicateIri)

val om: ObjectMap = pom.addNewObjectMap()
om.setColumn(attrNames(1))
om.setLanguage(lang)

tm.getOrSetLogicalTable().setSqlQuery(s"SELECT $columns FROM $tableName WHERE $langCol = '$lang'")

tm.getOrSetLogicalTable().setSqlQuery(s"SELECT $columnsSql FROM $tableNameSql WHERE $langColSql = $langSql")

tm
}).toSeq
} else {
val tm: TriplesMap = ModelFactory.createDefaultModel.createResource.as(classOf[TriplesMap])
val pom: PredicateObjectMap = tm.addNewPredicateObjectMap()
pom.getPredicates.add(tm.getModel.wrapAsResource(pn))
pom.addPredicate(predicateIri)

val sm: SubjectMap = tm.getOrSetSubjectMap()
val om: ObjectMap = pom.addNewObjectMap()
Expand Down Expand Up @@ -144,4 +145,32 @@ object R2rmlUtils {

target
}


/**
* Creates a SQL table name for a partition.
*
* FIXME Consolidate with SQLUtils in the spark module?
*
* @param p the RDF partition
* @return
*/
def createDefaultTableName(p: RdfPartitionStateDefault): String = {

// For now let's just use the full predicate as the uri
// val predPart = pred.substring(pred.lastIndexOf("/") + 1)
val predPart = p.predicate
val pn = NodeFactory.createURI(p.predicate)

val dt = p.datatype
val dtPart = if (dt != null && dt.nonEmpty) "_" + dt.substring(dt.lastIndexOf("/") + 1) else ""
val langPart = if (p.langTagPresent) "_lang" else ""

val sTermTypePart = if (p.subjectType == 0) "sbn" else ""
val oTermTypePart = if (p.objectType == 0) "obn" else ""

val tableName = predPart + dtPart + langPart + sTermTypePart + oTermTypePart

tableName
}
}
Expand Up @@ -88,7 +88,7 @@ class RdfPartitionerDefaultTests extends FunSuite {
RDFDataMgr.read(expected, new ByteArrayInputStream(
"""
| @base <http://www.w3.org/ns/r2rml#> .
|[ <#logicalTable> [ <#sqlQuery> "SELECT s, o FROM http://xmlns.com/foaf/0.1/givenName_XMLSchema#string_lang WHERE l = 'fr'" ] ;
|[ <#logicalTable> [ <#sqlQuery> "SELECT `s`, `o` FROM `http://xmlns.com/foaf/0.1/givenName_XMLSchema#string_lang` WHERE `l` = 'fr'" ] ;
| <#predicateObjectMap> [ <#objectMap> [ <#column> "o" ;
| <#language> "fr"
| ] ;
Expand All @@ -99,7 +99,7 @@ class RdfPartitionerDefaultTests extends FunSuite {
| ]
|] .
|
|[ <#logicalTable> [ <#sqlQuery> "SELECT s, o FROM http://xmlns.com/foaf/0.1/givenName_XMLSchema#string_lang WHERE l = 'de'" ] ;
|[ <#logicalTable> [ <#sqlQuery> "SELECT `s`, `o` FROM `http://xmlns.com/foaf/0.1/givenName_XMLSchema#string_lang` WHERE `l` = 'de'" ] ;
| <#predicateObjectMap> [ <#objectMap> [ <#column> "o" ;
| <#language> "de"
| ] ;
Expand All @@ -110,7 +110,7 @@ class RdfPartitionerDefaultTests extends FunSuite {
| ]
|] .
|
|[ <#logicalTable> [ <#sqlQuery> "SELECT s, o FROM http://xmlns.com/foaf/0.1/givenName_XMLSchema#string_lang WHERE l = 'en'" ] ;
|[ <#logicalTable> [ <#sqlQuery> "SELECT `s`, `o` FROM `http://xmlns.com/foaf/0.1/givenName_XMLSchema#string_lang` WHERE `l` = 'en'" ] ;
| <#predicateObjectMap> [ <#objectMap> [ <#column> "o" ;
| <#language> "en"
| ] ;
Expand All @@ -131,6 +131,8 @@ class RdfPartitionerDefaultTests extends FunSuite {

val triplesMaps = R2rmlUtils.createR2rmlMappings(RdfPartitionerDefault, partitionState, actual, true)

RDFDataMgr.write(System.out, actual, RDFFormat.TURTLE_PRETTY)

assert(triplesMaps.size == languages.size)

assert(expected.isIsomorphicWith(actual))
Expand All @@ -141,11 +143,14 @@ class RdfPartitionerDefaultTests extends FunSuite {
val partitionState = RdfPartitionStateDefault(1, "http://xmlns.com/foaf/0.1/givenName",
2, "http://www.w3.org/2001/XMLSchema#string", true, Set("en", "de", "fr"))

val triplesMaps = R2rmlUtils.createR2rmlMappings(RdfPartitionerDefault, partitionState, ModelFactory.createDefaultModel(), true)
val exportModel = ModelFactory.createDefaultModel()
val triplesMaps = R2rmlUtils.createR2rmlMappings(RdfPartitionerDefault, partitionState, exportModel, true)

val exportModel = RdfPartitionImportExport.exportAsR2RML(RdfPartitionerDefault, Seq(partitionState), true)
// val exportModel = exportModel // RdfPartitionImportExport.exportAsR2RML(RdfPartitionerDefault, partitionState, true)
exportModel.write(System.out, "Turtle", "http://www.w3.org/ns/r2rml#")

// TODO Add serialization and deserialization

val importedTriplesMaps = RdfPartitionImportExport.importFromR2RML(exportModel)

assert(triplesMaps.size == importedTriplesMaps.size)
Expand Down
@@ -0,0 +1,41 @@
package net.sansa_stack.rdf.spark.utils

import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitioner}
import net.sansa_stack.rdf.common.partition.r2rml.R2rmlUtils
import net.sansa_stack.rdf.spark.partition.core.{BlankNodeStrategy, SQLUtils}
import org.aksw.sparqlify.core.sql.common.serialization.SqlEscaperBacktick
import org.apache.jena.rdf.model.{Model, ModelFactory}

/**
* @author Lorenz Buehmann
*/
object PartitionLib {
/**
* Exports the RDF partition states as R2RML.
* Uses table name and sql escaping configuration suitable for spark.
*
* @param partitioner the RDF partitioner
* @param partitions the RDF partition states
* @param explodeLanguageTags if `true` a separate mapping/TriplesMap will be created for each language tag,
* otherwise a mapping to a column for the language tag represented by
* `rr:langColumn` property will be used (note, this is an extension of R2RML)
* @return the model containing the RDF partition states as as R2RML syntax
*/
def exportAsR2RML(partitioner: RdfPartitioner[RdfPartitionStateDefault],
partitions: Seq[RdfPartitionStateDefault],
explodeLanguageTags: Boolean = false): Model = {
// put all triple maps into a single model
val model = ModelFactory.createDefaultModel()

partitions.flatMap(partition =>
R2rmlUtils.createR2rmlMappings(
partitioner,
partition,
p => SQLUtils.createTableName(p, BlankNodeStrategy.Table),
new SqlEscaperBacktick,
model, explodeLanguageTags)
)

model
}
}

0 comments on commit e498234

Please sign in to comment.