This repository has been archived by the owner on Oct 8, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 13
/
SparqlifyUtils3.scala
66 lines (52 loc) · 3.14 KB
/
SparqlifyUtils3.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package net.sansa_stack.query.spark.sparqlify
import net.sansa_stack.rdf.common.partition.core.RdfPartitionDefault
import net.sansa_stack.rdf.common.partition.model.sparqlify.SparqlifyUtils2
import org.aksw.obda.domain.impl.LogicalTableTableName
import org.aksw.sparqlify.algebra.sql.nodes.SqlOpTable
import org.aksw.sparqlify.backend.postgres.DatatypeToStringCast
import org.aksw.sparqlify.config.syntax.Config
import org.aksw.sparqlify.core.algorithms.CandidateViewSelectorSparqlify
import org.aksw.sparqlify.core.algorithms.ViewDefinitionNormalizerImpl
import org.aksw.sparqlify.core.interfaces.SparqlSqlStringRewriter
import org.aksw.sparqlify.core.sql.common.serialization.SqlEscaperBase
import org.aksw.sparqlify.util.{SparqlifyCoreInit, SparqlifyUtils, SqlBackendConfig}
import org.aksw.sparqlify.validation.LoggerCount
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType
object SparqlifyUtils3 // extends StrictLogging
{
def createSparqlSqlRewriter(sparkSession: SparkSession, partitions: Map[RdfPartitionDefault, RDD[Row]]): SparqlSqlStringRewriter = {
val config = new Config()
// val loggerCount = new LoggerCount(logger.underlying)
val backendConfig = new SqlBackendConfig(new DatatypeToStringCast(), new SqlEscaperBase("`", "`")) // new SqlEscaperBacktick())
val sqlEscaper = backendConfig.getSqlEscaper()
val typeSerializer = backendConfig.getTypeSerializer()
val sqlFunctionMapping = SparqlifyCoreInit.loadSqlFunctionDefinitions("functions-spark.xml")
val ers = SparqlifyUtils.createDefaultExprRewriteSystem()
val mappingOps = SparqlifyUtils.createDefaultMappingOps(ers)
val candidateViewSelector = new CandidateViewSelectorSparqlify(mappingOps, new ViewDefinitionNormalizerImpl());
val views = partitions.map {
case (p, rdd) =>
//
// logger.debug("Processing RdfPartition: " + p)
val vd = SparqlifyUtils2.createViewDefinition(p)
// logger.debug("Created view definition: " + vd)
val tableName = vd.getLogicalTable match {
case o: LogicalTableTableName => o.getTableName
case _ => throw new RuntimeException("Table name required - instead got: " + vd)
}
val scalaSchema = p.layout.schema
val sparkSchema = ScalaReflection.schemaFor(scalaSchema).dataType.asInstanceOf[StructType]
val df = sparkSession.createDataFrame(rdd, sparkSchema)
df.createOrReplaceTempView(sqlEscaper.escapeTableName(tableName))
config.getViewDefinitions.add(vd)
}
val basicTableInfoProvider = new BasicTableInfoProviderSpark(sparkSession)
val rewriter = SparqlifyUtils.createDefaultSparqlSqlStringRewriter(basicTableInfoProvider, null, config, typeSerializer, sqlEscaper, sqlFunctionMapping)
// val rewrite = rewriter.rewrite(QueryFactory.create("Select * { <http://dbpedia.org/resource/Guy_de_Maupassant> ?p ?o }"))
// val rewrite = rewriter.rewrite(QueryFactory.create("Select * { ?s <http://xmlns.com/foaf/0.1/givenName> ?o ; <http://dbpedia.org/ontology/deathPlace> ?d }"))
rewriter
}
}