Skip to content

Commit

Permalink
Sparqlify now ported to the new the new API towards unified virtual g…
Browse files Browse the repository at this point in the history
…raph handling
  • Loading branch information
Aklakan committed Jan 14, 2021
1 parent cef54b8 commit 5fd1c74
Show file tree
Hide file tree
Showing 19 changed files with 229 additions and 49 deletions.
Expand Up @@ -3,7 +3,7 @@ package net.sansa_stack.examples.spark.query
import java.awt.Desktop
import java.net.URI

import net.sansa_stack.query.spark.sparqlify.{QueryExecutionFactorySparqlifySpark, SparqlifyUtils3}
import net.sansa_stack.query.spark.sparqlify.{JavaQueryExecutionFactorySparqlifySpark, SparqlifyUtils3}
import net.sansa_stack.rdf.common.partition.core.RdfPartitionerDefault
import net.sansa_stack.rdf.spark.io._
import net.sansa_stack.rdf.spark.partition.core.RdfPartitionUtilsSpark
Expand Down Expand Up @@ -66,7 +66,7 @@ object Sparklify {
val partitions = RdfPartitionUtilsSpark.partitionGraph(graphRdd, partitioner)
val rewriter = SparqlifyUtils3.createSparqlSqlRewriter(spark, partitioner, partitions)

val qef = new QueryExecutionFactorySparqlifySpark(spark, rewriter)
val qef = new JavaQueryExecutionFactorySparqlifySpark(spark, rewriter)
val server = FactoryBeanSparqlServer.newInstance.setSparqlServiceFactory(qef).setPort(config.port).create()
if (Desktop.isDesktopSupported) {
Desktop.getDesktop.browse(URI.create("http://localhost:" + config.port + "/sparql"))
Expand Down
@@ -0,0 +1,15 @@
package net.sansa_stack.query.spark.api.domain;

import org.aksw.jena_sparql_api.core.QueryExecutionFactory;
import org.apache.jena.query.Query;
import org.apache.jena.query.QueryExecution;

public interface JavaQueryExecutionFactorySpark
extends QueryExecutionFactory
{
@Override
JavaQueryExecutionSpark createQueryExecution(Query query);

@Override
JavaQueryExecutionSpark createQueryExecution(String queryString);
}
@@ -1,17 +1,19 @@
package net.sansa_stack.query.spark.sparqlify;

import net.sansa_stack.query.spark.api.domain.JavaQueryExecutionFactorySpark;
import org.aksw.jena_sparql_api.core.QueryExecutionFactoryBackQuery;
import org.aksw.sparqlify.core.interfaces.SparqlSqlStringRewriter;
import org.apache.jena.query.Query;
import org.apache.spark.sql.SparkSession;

public class QueryExecutionFactorySparqlifySpark
public class JavaQueryExecutionFactorySparqlifySpark
extends QueryExecutionFactoryBackQuery
implements JavaQueryExecutionFactorySpark
{
protected SparkSession sparkSession;
protected SparqlSqlStringRewriter sparqlSqlRewriter;

public QueryExecutionFactorySparqlifySpark(SparkSession sparkSession, SparqlSqlStringRewriter sparqlSqlRewriter) {
public JavaQueryExecutionFactorySparqlifySpark(SparkSession sparkSession, SparqlSqlStringRewriter sparqlSqlRewriter) {
super();
this.sparkSession = sparkSession;
this.sparqlSqlRewriter = sparqlSqlRewriter;
Expand All @@ -25,6 +27,11 @@ public JavaQueryExecutionSparkSparqlify createQueryExecution(Query query) {
return result;
}

@Override
public JavaQueryExecutionSparkSparqlify createQueryExecution(String queryString) {
return (JavaQueryExecutionSparkSparqlify)super.createQueryExecution(queryString);
}

@Override
public String getId() {
return "spark";
Expand Down
@@ -0,0 +1,13 @@
package net.sansa_stack.query.spark.api.impl

import net.sansa_stack.query.spark.api.domain.{JavaQueryExecutionFactorySpark, QueryExecutionFactorySpark, QueryExecutionSpark}
import org.aksw.jena_sparql_api.core.QueryExecutionFactoryDecoratorBase
import org.apache.jena.query.Query

class QueryExecutionFactorySparkJavaWrapper(delegate: JavaQueryExecutionFactorySpark)
extends QueryExecutionFactoryDecoratorBase[JavaQueryExecutionFactorySpark](delegate)
with QueryExecutionFactorySpark
{
override def createQueryExecution(query: Query): QueryExecutionSpark = new QueryExecutionSparkJavaWrapper(decoratee.createQueryExecution(query))
override def createQueryExecution(queryString: String): QueryExecutionSpark = new QueryExecutionSparkJavaWrapper(decoratee.createQueryExecution(queryString))
}
Expand Up @@ -11,7 +11,7 @@ import org.apache.spark.rdd.RDD
*
* @param decoratee The JavaQueryExecutionSpark instance to wrap
*/
class QueryExecutionSparkFromJava(decoratee: JavaQueryExecutionSpark)
class QueryExecutionSparkJavaWrapper(decoratee: JavaQueryExecutionSpark)
extends QueryExecutionDecoratorBase[JavaQueryExecutionSpark](decoratee)
with QueryExecutionSpark
{
Expand Down
@@ -1,13 +1,17 @@
package net.sansa_stack.query.spark

import com.google.common.cache.{CacheBuilder, CacheLoader}
import net.sansa_stack.query.spark.api.domain.QueryExecutionFactorySpark
import net.sansa_stack.query.spark.datalake.DataLakeEngine
import net.sansa_stack.query.spark.ontop.OntopSPARQLEngine
import net.sansa_stack.query.spark.semantic.QuerySystem
import net.sansa_stack.query.spark.sparqlify.{QueryExecutionSpark, SparkRowMapperSparqlify, SparqlifyUtils3}
import net.sansa_stack.query.spark.sparqlify.{QueryEngineFactorySparqlify, QueryExecutionSpark, SparkRowMapperSparqlify, SparqlifyUtils3}
import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitioner, RdfPartitionerComplex, RdfPartitionerDefault}
import net.sansa_stack.rdf.common.partition.r2rml.R2rmlMappingCollection
import net.sansa_stack.rdf.spark.partition.core.RdfPartitionUtilsSpark
import net.sansa_stack.rdf.spark.utils.kryo.io.JavaKryoSerializationWrapper
import org.aksw.sparqlify.core.domain.input.SparqlSqlStringRewrite
import org.aksw.sparqlify.core.interfaces.SparqlSqlStringRewriter
import org.apache.jena.graph.Triple
import org.apache.jena.query.QueryFactory
import org.apache.jena.sparql.engine.binding.Binding
Expand Down Expand Up @@ -148,8 +152,21 @@ package object query {
override def sparqlRDD(sparqlQuery: String): RDD[Binding] = sparqlEngine.execSelect(sparqlQuery)
}

implicit class SparqlifySPARQLExecutor2(val partitions: R2rmlMappingCollection) {
// extends QueryExecutor
// with Serializable {

def sparqlify(): QueryExecutionFactorySpark = {
val sparkSession = SparkSession.builder().getOrCreate()

val engineFactory = new QueryEngineFactorySparqlify(sparkSession)
engineFactory.create(null, partitions.getR2rmlModel)
}
}

/**
* A Sparqlify backed SPARQL executor working on the given RDF partitions.
* FIXME Needs porting
*
* @param partitions the RDF partitions to work on
*/
Expand All @@ -170,7 +187,7 @@ package object query {

val spark = SparkSession.builder().getOrCreate()

val rewriter = SparqlifyUtils3.createSparqlSqlRewriter(spark, RdfPartitionerDefault, partitions)
val rewriter: SparqlSqlStringRewriter = null // SparqlifyUtils3.createSparqlSqlRewriter(spark, RdfPartitionerDefault, partitions)

override def sparql(sparqlQuery: String): DataFrame = {
val query = QueryFactory.create(sparqlQuery)
Expand Down Expand Up @@ -198,6 +215,7 @@ package object query {

}


implicit class Semantic(partitions: RDD[String]) extends Serializable {

/**
Expand Down
@@ -0,0 +1,18 @@
package net.sansa_stack.query.spark.sparqlify

import net.sansa_stack.query.spark.api.domain.QueryExecutionFactorySpark
import net.sansa_stack.query.spark.api.impl.{QueryEngineFactoryBase, QueryExecutionFactorySparkJavaWrapper}
import org.aksw.sparqlify.core.interfaces.SparqlSqlStringRewriter
import org.apache.jena.rdf.model.Model
import org.apache.spark.sql.SparkSession

class QueryEngineFactorySparqlify(sparkSession: SparkSession) extends QueryEngineFactoryBase(sparkSession) {
override def create(databaseName: String, mappingModel: Model): QueryExecutionFactorySpark = {
val rewriter: SparqlSqlStringRewriter = SparqlifyUtils3.createSparqlSqlRewriter(sparkSession, databaseName, mappingModel)

val result = new QueryExecutionFactorySparkJavaWrapper(
new JavaQueryExecutionFactorySparqlifySpark(sparkSession, rewriter))

result
}
}
Expand Up @@ -3,19 +3,57 @@ package net.sansa_stack.query.spark.sparqlify
import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitioner}
import net.sansa_stack.rdf.common.partition.model.sparqlify.SparqlifyUtils2
import org.aksw.obda.domain.impl.LogicalTableTableName
import org.aksw.obda.jena.r2rml.impl.R2rmlImporter
import org.aksw.sparqlify.backend.postgres.DatatypeToStringCast
import org.aksw.sparqlify.config.syntax.Config
import org.aksw.sparqlify.core.algorithms.{CandidateViewSelectorSparqlify, ViewDefinitionNormalizerImpl}
import org.aksw.sparqlify.core.interfaces.SparqlSqlStringRewriter
import org.aksw.sparqlify.core.sql.common.serialization.SqlEscaperBase
import org.aksw.sparqlify.util.{SparqlifyCoreInit, SparqlifyUtils, SqlBackendConfig}
import org.apache.jena.rdf.model.Model
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSession}

object SparqlifyUtils3 // extends StrictLogging
{
/**
* Create a sparql-to-sql rewriter using the sparqlify engine.
* Engine initialization attempts to retrieve metadata from all table names and sql queries mentioned
* in the R2RML mappings. Hence, the database schema must be present before calling this function.
*
* @param sparkSession
* @param databaseName
* @param r2rmlModel
* @return
*/
def createSparqlSqlRewriter(sparkSession: SparkSession, databaseName: String, r2rmlModel: Model): SparqlSqlStringRewriter = {
val backendConfig = new SqlBackendConfig(new DatatypeToStringCast(), new SqlEscaperBase("`", "`")) // new SqlEscaperBacktick())
val sqlEscaper = backendConfig.getSqlEscaper()
val typeSerializer = backendConfig.getTypeSerializer()
val sqlFunctionMapping = SparqlifyCoreInit.loadSqlFunctionDefinitions("functions-spark.xml")

val ers = SparqlifyUtils.createDefaultExprRewriteSystem()
val mappingOps = SparqlifyUtils.createDefaultMappingOps(ers)

val candidateViewSelector = new CandidateViewSelectorSparqlify(mappingOps, new ViewDefinitionNormalizerImpl());

val basicTableInfoProvider = new BasicTableInfoProviderSpark(sparkSession)

val config = new Config()
// val loggerCount = new LoggerCount(logger.underlying)
val r2rmlImporter = new R2rmlImporter

val viewDefinitions = r2rmlImporter.read(r2rmlModel)
config.getViewDefinitions.addAll(viewDefinitions)

val rewriter = SparqlifyUtils.createDefaultSparqlSqlStringRewriter(basicTableInfoProvider, null, config, typeSerializer, sqlEscaper, sqlFunctionMapping)

rewriter
}

// FIXME Delete once ported
def createSparqlSqlRewriter(sparkSession: SparkSession, partitioner: RdfPartitioner[RdfPartitionStateDefault], partitions: Map[RdfPartitionStateDefault, RDD[Row]]): SparqlSqlStringRewriter = {
val config = new Config()
// val loggerCount = new LoggerCount(logger.underlying)
Expand Down Expand Up @@ -61,4 +99,5 @@ object SparqlifyUtils3 // extends StrictLogging
rewriter
}


}
Expand Up @@ -2,7 +2,7 @@ package net.sansa_stack.query.spark.sparqlify.server

import java.io.File

import net.sansa_stack.query.spark.sparqlify.{QueryExecutionFactorySparqlifySpark, SparqlifyUtils3}
import net.sansa_stack.query.spark.sparqlify.{JavaQueryExecutionFactorySparqlifySpark, SparqlifyUtils3}
import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitionerDefault}
import net.sansa_stack.rdf.spark.partition.core.RdfPartitionUtilsSpark
import org.aksw.jena_sparql_api.server.utils.FactoryBeanSparqlServer
Expand All @@ -14,6 +14,9 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}

import scala.collection.JavaConverters._
import net.sansa_stack.rdf.spark.partition._
import net.sansa_stack.query.spark._
import net.sansa_stack.query.spark.query.SparqlifySPARQLExecutor2

object MainSansaSparqlServer {

Expand Down Expand Up @@ -66,13 +69,15 @@ object MainSansaSparqlServer {
// it.foreach { x => println("GOT: " + (if(x.getObject.isLiteral) x.getObject.getLiteralLanguage else "-")) }
val graphRdd = sparkSession.sparkContext.parallelize(it)

// val map = graphRdd.partitionGraphByPredicates
val partitioner = RdfPartitionerDefault

val partitions: Map[RdfPartitionStateDefault, RDD[Row]] = RdfPartitionUtilsSpark.partitionGraph(graphRdd, partitioner)
val rewriter = SparqlifyUtils3.createSparqlSqlRewriter(sparkSession, partitioner, partitions)
val qef = graphRdd.verticalPartition(RdfPartitionerDefault).sparqlify

val qef = new QueryExecutionFactorySparqlifySpark(sparkSession, rewriter)
// val map = graphRdd.partitionGraphByPredicates
// val partitioner = RdfPartitionerDefault
//
// val partitions: Map[RdfPartitionStateDefault, RDD[Row]] = RdfPartitionUtilsSpark.partitionGraph(graphRdd, partitioner)
// val rewriter = SparqlifyUtils3.createSparqlSqlRewriter(sparkSession, partitioner, partitions)
//
// val qef = new JavaQueryExecutionFactorySparqlifySpark(sparkSession, rewriter)

val server = FactoryBeanSparqlServer.newInstance.setSparqlServiceFactory(qef).create
server.join()
Expand Down
Expand Up @@ -68,7 +68,7 @@ object MainSansaBSBM {

// Spark SQL does not support OFFSET - so for testing just remove it from the query
val conn = new SparqlQueryConnectionJsa(FluentQueryExecutionFactory
.from(new QueryExecutionFactorySparqlifySpark(sparkSession, rewriter))
.from(new JavaQueryExecutionFactorySparqlifySpark(sparkSession, rewriter))
.config()
.withQueryTransform(q => { q.setOffset(Query.NOLIMIT); q })
.withParser(q => QueryFactory.create(q))
Expand Down
Expand Up @@ -153,7 +153,7 @@ class SparklifyQueryEngineTests extends FunSuite with DataFrameSuiteBase {
val partitioner = RdfPartitionerDefault
val partitions: Map[RdfPartitionStateDefault, RDD[Row]] = RdfPartitionUtilsSpark.partitionGraph(triples, RdfPartitionerDefault)
val rewriter = SparqlifyUtils3.createSparqlSqlRewriter(spark, partitioner, partitions)
val qef = new QueryExecutionFactorySparqlifySpark(spark, rewriter)
val qef = new JavaQueryExecutionFactorySparqlifySpark(spark, rewriter)
val str = ResultSetFormatter.asText(qef.createQueryExecution(queryStr).execSelect())
.toLowerCase
assert(!str.contains("null"))
Expand Down

This file was deleted.

Expand Up @@ -16,6 +16,7 @@ import org.apache.jena.graph.NodeFactory
import org.apache.jena.sparql.core.{Quad, Var}
import org.apache.jena.sparql.expr.{Expr, ExprVar, NodeValue}

// This is now all unified in R2rmlUtils
object SparqlifyUtils2 {
implicit def newExprVar(varName: String): ExprVar = new ExprVar(Var.alloc(varName))
implicit def newExprVar(varId: Int): ExprVar = "_" + varId
Expand Down
@@ -0,0 +1,13 @@
package net.sansa_stack.rdf.common.partition.r2rml

import org.apache.jena.rdf.model.Model

/**
* A dedicated type for an R2RML model such that
* implicit functions can be defined for it
*/
trait R2rmlMappingCollection { // TODO Consider renaming to R2rmlBasedPartitioning

def getR2rmlModel(): Model

}
@@ -0,0 +1,8 @@
package net.sansa_stack.rdf.common.partition.r2rml

import org.apache.jena.rdf.model.Model

class R2rmlMappingCollectionImpl(val r2rmlModel: Model)
extends R2rmlMappingCollection with Serializable {
override def getR2rmlModel(): Model = r2rmlModel
}
@@ -1,6 +1,7 @@
package net.sansa_stack.rdf.common.partition.r2rml

import net.sansa_stack.rdf.common.partition.core.{RdfPartitionStateDefault, RdfPartitioner}
import org.aksw.r2rml.jena.arq.lib.R2rmlLib
import org.aksw.r2rml.jena.domain.api._
import org.aksw.r2rml.jena.vocab.RR
import org.aksw.r2rmlx.domain.api.TermMapX
Expand Down Expand Up @@ -105,7 +106,7 @@ object R2rmlUtils {
tm
}).toSeq
} else {
val tm: TriplesMap = ModelFactory.createDefaultModel.createResource.as(classOf[TriplesMap])
val tm: TriplesMap = outModel.createResource.as(classOf[TriplesMap])
val pom: PredicateObjectMap = tm.addNewPredicateObjectMap()
pom.addPredicate(predicateIri)

Expand Down Expand Up @@ -173,4 +174,15 @@ object R2rmlUtils {

tableName
}

/**
* Imports the RDF partition states as `TriplesMap` from the given RDF data model.
*
* @param model the model
* @return the RDF partition states as `TriplesMap`
*/
def streamTriplesMaps(model: Model): Iterator[TriplesMap] = {
import collection.JavaConverters._
R2rmlLib.streamTriplesMaps(model).iterator().asScala.toIterator
}
}
Expand Up @@ -151,7 +151,7 @@ class RdfPartitionerDefaultTests extends FunSuite {

// TODO Add serialization and deserialization

val importedTriplesMaps = RdfPartitionImportExport.importFromR2RML(exportModel)
val importedTriplesMaps = R2rmlUtils.streamTriplesMaps(exportModel).toSeq

assert(triplesMaps.size == importedTriplesMaps.size)

Expand Down

0 comments on commit 5fd1c74

Please sign in to comment.