Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
Align Semantic-based query example with the latest changes on SANSA-Q…
Browse files Browse the repository at this point in the history
…uery
  • Loading branch information
GezimSejdiu committed Apr 9, 2019
1 parent 9fad3c9 commit afe695d
Showing 1 changed file with 7 additions and 51 deletions.
Expand Up @@ -4,13 +4,13 @@ import java.io.IOException
import java.nio.file.{ Files, FileVisitResult, Path, Paths, SimpleFileVisitor }
import java.nio.file.attribute.BasicFileAttributes

import net.sansa_stack.query.spark.semantic.QuerySystem
import net.sansa_stack.rdf.spark.io._
import net.sansa_stack.rdf.spark.partition._
import org.apache.jena.graph.Triple
import org.apache.jena.riot.Lang
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import net.sansa_stack.query.spark.semantic.QuerySystem

/**
* Run SPARQL queries over Spark using Semantic partitioning approach.
Expand All @@ -22,13 +22,13 @@ object Semantic {
def main(args: Array[String]) {
parser.parse(args, Config()) match {
case Some(config) =>
run(config.in, config.queries, config.out)
run(config.in, config.queries)
case None =>
println(parser.usage)
}
}

def run(input: String, queries: String, output: String): Unit = {
def run(input: String, queries: String): Unit = {

println("===========================================")
println("| SANSA - Semantic Partioning example |")
Expand All @@ -43,56 +43,16 @@ object Semantic {
val lang = Lang.NTRIPLES
val triples = spark.rdf(lang)(input)

println("----------------------")
println("Phase 1: RDF Partition")
println("----------------------")

val partitionData = triples.partitionGraphAsSemantic()

// count total number of N-Triples
countNTriples(Left(triples))
countNTriples(Right(partitionData))

println("----------------------")
println("Phase 2: SPARQL System")
println("----------------------")

val qs = new QuerySystem(
partitionData,
input,
output,
numOfFilesPartition = 1)
qs.run()
val result = new QuerySystem(partitionData, queries).run()
result.take(5).foreach(println)

spark.close()

}

// remove path files
def removePathFiles(root: Path): Unit = {
if (Files.exists(root)) {
Files.walkFileTree(root, new SimpleFileVisitor[Path] {
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
Files.delete(file)
FileVisitResult.CONTINUE
}

override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
Files.delete(dir)
FileVisitResult.CONTINUE
}
})
}
}

// count total number of N-Triples
def countNTriples(dataRDD: Either[RDD[Triple], RDD[String]]): Unit = {
dataRDD match {
case Left(x) => println(s"Number of N-Triples before partition: ${x.distinct.count}")
case Right(x) => println(s"Number of N-Triples after partition: ${x.distinct.count}")
}
}
case class Config(in: String = "", queries: String = "", out: String = "")
case class Config(in: String = "", queries: String = "")

val parser = new scopt.OptionParser[Config]("SANSA - Semantic Partioning example") {

Expand All @@ -104,11 +64,7 @@ object Semantic {

opt[String]('q', "queries").required().valueName("<directory>").
action((x, c) => c.copy(queries = x)).
text("the SPARQL query list")

opt[String]('o', "out").required().valueName("<directory>").
action((x, c) => c.copy(out = x)).
text("the output directory")
text("path to the file containing the SPARQL query")

help("help").text("prints this usage text")
}
Expand Down

0 comments on commit afe695d

Please sign in to comment.