Skip to content

Commit

Permalink
Merge branch 'develop' into use_case/eauc_sansa_api
Browse files Browse the repository at this point in the history
  • Loading branch information
carstendraschner committed Feb 19, 2021
2 parents 7eaf1ca + 5082840 commit e1c3f8c
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 4 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Expand Up @@ -1241,6 +1241,13 @@
<version>2.4.2</version>
</dependency>

<!-- JSQL parser -->
<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
<version>4.0</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
Expand Up @@ -13,6 +13,7 @@ import org.apache.jena.vocabulary.RDF

import net.sansa_stack.rdf.common.partition.r2rml.R2rmlUtils
import net.sansa_stack.rdf.common.partition.utils.SQLUtils
import net.sansa_stack.rdf.spark.utils.ScalaUtils

/**
* An extractor for an ontology.
Expand Down Expand Up @@ -49,7 +50,7 @@ object OntologyExtractor {

// we have to unwrap the quote from H2 escape and also apply Spark SQL escape
val tn = SQLUtils.parseTableIdentifier(tableName)
val to = sqlEscaper.escapeColumnName(o.stripPrefix("\"").stripSuffix("\""))
val to = sqlEscaper.escapeColumnName(ScalaUtils.unQuote(o))

s"SELECT DISTINCT $to AS $clsCol FROM $tn"

Expand Down
Expand Up @@ -28,6 +28,7 @@ import org.semanticweb.owlapi.model.OWLOntology

import net.sansa_stack.rdf.common.partition.r2rml.R2rmlUtils
import net.sansa_stack.rdf.common.partition.utils.SQLUtils
import net.sansa_stack.rdf.spark.utils.ScalaUtils

trait SPARQL2SQLRewriter[T <: QueryRewrite] {
def createSQLQuery(sparqlQuery: String): T
Expand Down Expand Up @@ -182,7 +183,7 @@ class QueryEngineOntop(val spark: SparkSession,
// we have to unwrap the quote from H2 escape and also apply Spark SQL escape
// we have to unwrap the quote from H2 escape and also apply Spark SQL escape
val tn = SQLUtils.parseTableIdentifier(tableName)
val to = sqlEscaper.escapeColumnName(o.stripPrefix("\"").stripSuffix("\""))
val to = sqlEscaper.escapeColumnName(ScalaUtils.unQuote(o))
val df = spark.sql(s"SELECT DISTINCT $to FROM $tn")

val classes = df.collect().map(_.getString(0))
Expand Down
5 changes: 5 additions & 0 deletions sansa-rdf/sansa-rdf-common/pom.xml
Expand Up @@ -122,6 +122,11 @@
<artifactId>r2rml-jena-plugin</artifactId>
</dependency>

<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
</dependency>

</dependencies>

<build>
Expand Down
Expand Up @@ -4,6 +4,8 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.MethodSymbol

import net.sf.jsqlparser.schema.Table
import net.sf.jsqlparser.statement.select.Select
import org.aksw.r2rml.jena.arq.lib.R2rmlLib
import org.aksw.r2rml.jena.domain.api._
import org.aksw.r2rml.jena.vocab.RR
Expand All @@ -14,6 +16,9 @@ import org.apache.jena.rdf.model.{Model, Property, Resource, ResourceFactory}
import org.apache.jena.sparql.core.Var
import org.apache.jena.sparql.expr.ExprVar

import net.sf.jsqlparser.parser.CCJSqlParserUtil
import net.sf.jsqlparser.util.TablesNamesFinder

import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitioner, TermType}
import net.sansa_stack.rdf.common.partition.utils.SQLUtils

Expand Down Expand Up @@ -282,4 +287,128 @@ object R2rmlUtils {
.filter(tm =>
tm.getPredicateObjectMaps.asScala.exists(_.getPredicateMaps.asScala.exists(pm => Option(pm.getConstant).contains(predicate))))
}

/**
* Make all table identifiers being qualified with the given database resp. schema name.
*
* @param database the database schema name
* @param model the R2RML mappings
* @return the modified R2RML mappings
*/
def makeQualifiedTableIdentifiers(database: String, model: Model): Model = {
streamTriplesMaps(model).foreach(tm => {
val lt = tm.getOrSetLogicalTable()
if (lt.qualifiesAsBaseTableOrView()) {
lt.asBaseTableOrView().setTableName(database + "." + lt.asBaseTableOrView().getTableName)
} else {
val view = lt.asR2rmlView()
var query = view.getSqlQuery
query = makeQualifiedTableNames(database, query)
view.setSqlQuery(query)
}
})
model
}

private def makeQualifiedTableNames(qualifier: String, query: String): String = {
val statement = CCJSqlParserUtil.parse(query)
val selectStatement = statement.asInstanceOf[Select]
val tablesNamesFinder = new TablesNamesFinder {
override def visit(tableName: Table): Unit = {
tableName.setSchemaName(qualifier)
}
}
selectStatement.accept(tablesNamesFinder)
statement.toString
}

val escapeChars = Seq('"', '`')
/**
* Unescapes all SQL identifiers, i.e. the table and column names.
*
* @param model the R2RML mappings
* @return the modified R2RML mappings
*/
def unescapeIdentifiers(model: Model): Model = {
escapeChars.foreach(c => replaceEscapeChars(model, s"$c", ""))
model
}

/**
* Replaces the escape chars of all SQL identifiers, i.e. the table and column names.
*
* @param model the R2RML mappings
* @param oldEscapeChar the old escape char
* @param newEscapeChar the new escape char
* @return the modified R2RML mappings
*/
def replaceEscapeChars(model: Model, oldEscapeChar: String, newEscapeChar: String): Model = {
streamTriplesMaps(model).foreach(tm => {
val lt = tm.getOrSetLogicalTable()

if (lt.qualifiesAsBaseTableOrView()) {// tables
val tn = lt.asBaseTableOrView().getTableName
lt.asBaseTableOrView().setTableName(replaceIdentifier(tn, oldEscapeChar, newEscapeChar))
} else { // views
val view = lt.asR2rmlView()
val query = view.getSqlQuery
view.setSqlQuery(replaceQueryIdentifiers(query, oldEscapeChar, newEscapeChar))
}

// column names
// s
val sm = tm.getSubjectMap
if (sm != null) {
val col = sm.getColumn
if(col != null) {
sm.setColumn(replaceIdentifier(col, oldEscapeChar, newEscapeChar))
}
}

tm.getPredicateObjectMaps.forEach(pm => {
// p
val pms = pm.getPredicateMaps
if (pms != null) {
pms.forEach(pm => {
val col = pm.getColumn
if (col != null) {
pm.setColumn(replaceIdentifier(col, oldEscapeChar, newEscapeChar))
}
})
}

// o
val oms = pm.getObjectMaps
if (oms != null) {
oms.forEach(om => {
if (om.qualifiesAsTermMap()) {
val tm = om.asTermMap()
val col = tm.getColumn
if (col != null) {
tm.setColumn(replaceIdentifier(col, oldEscapeChar, newEscapeChar))
}
}
})
}
})
})

model
}

private def replaceIdentifier(identifier: String, oldEscapeChar: String, newEscapeChar: String): String = {
identifier.replace(oldEscapeChar, newEscapeChar)
}

private def replaceQueryIdentifiers(query: String, oldEscapeChar: String, newEscapeChar: String): String = {
val statement = CCJSqlParserUtil.parse(query)
val selectStatement = statement.asInstanceOf[Select]
val tablesNamesFinder = new TablesNamesFinder {
override def visit(tableName: Table): Unit = {
tableName.setName(tableName.getName.replace(oldEscapeChar, newEscapeChar))
}
}
selectStatement.accept(tablesNamesFinder)
statement.toString
}
}
Expand Up @@ -16,10 +16,11 @@ import net.sansa_stack.rdf.common.partition.core.RdfPartitionStateDefault
object SQLUtils {

import scala.util.matching.Regex
val qualifiedTableNameRegex: Regex = "^(\"(.*)\".)?\"(.*)\"$".r
val qualifiedTableNameDoubleQuotesRegex: Regex = "^(\"(.*)\".)?\"(.*)\"$".r
val qualifiedTableNameBackticksRegex: Regex = "^(`(.*)`.)?`(.*)`$".r

def parseTableIdentifier(tableName: String): TableIdentifier = {
qualifiedTableNameRegex.findFirstMatchIn(tableName) match {
qualifiedTableNameDoubleQuotesRegex.findFirstMatchIn(tableName) match {
case Some(i) =>
val tn = i.group(3)
val dn = Option(i.group(2))
Expand Down
Expand Up @@ -69,4 +69,14 @@ object ScalaUtils extends Logging {
}
}
}

/**
* @see org.apache.commons.lang3.StringUtils.unwrap
* @param s
* @param quoteChar
* @return
*/
def unQuote(s: String, quoteChar: Char = '"'): String = {
org.apache.commons.lang3.StringUtils.unwrap(s, quoteChar)
}
}

0 comments on commit e1c3f8c

Please sign in to comment.