package net.sansa_stack.examples.spark.inference
import org.apache.jena.graph.{ Node, NodeFactory }
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import net.sansa_stack.inference.rules.ReasoningProfile._
import net.sansa_stack.inference.rules.{ RDFSLevel, ReasoningProfile }
import net.sansa_stack.inference.spark.forwardchaining.triples.{ForwardRuleReasonerOWLHorst, ForwardRuleReasonerRDFS, TransitiveReasoner}
object RDFGraphInference {
def main(args: Array[String]) {
parser.parse(args, Config()) match {
case Some(config) =>
run(, config.out, config.profile,, config.writeToSingleFile, config.sortedOutput, config.parallelism)
case None =>
def run(input: Seq[URI], output: URI, profile: ReasoningProfile, properties: Seq[Node] = Seq(),
writeToSingleFile: Boolean, sortedOutput: Boolean, parallelism: Int): Unit = {
// the SPARK config
val spark = SparkSession.builder
.appName(s"SPARK $profile Reasoning")
.config("spark.hadoop.validateOutputSpecs", "false") // override output files
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", parallelism)
.config("spark.ui.showConsoleProgress", "false")
.config("spark.sql.shuffle.partitions", parallelism)
// load triples from disk
val graph = RDFGraphLoader.loadFromDisk(spark, input, parallelism)
println(s"|G| = ${graph.size()}")
// create reasoner
val reasoner = profile match {
case TRANSITIVE => new TransitiveReasoner(spark.sparkContext, properties, parallelism)
case RDFS => new ForwardRuleReasonerRDFS(spark.sparkContext, parallelism)
val r = new ForwardRuleReasonerRDFS(spark.sparkContext, parallelism)
r.level = RDFSLevel.SIMPLE
case OWL_HORST => new ForwardRuleReasonerOWLHorst(spark.sparkContext)
// compute inferred graph
val inferredGraph = reasoner.apply(graph)
println(s"|G_inf| = ${inferredGraph.size()}")
// write triples to disk
RDFGraphWriter.writeToDisk(inferredGraph, output.toString, writeToSingleFile, sortedOutput)
case class Config(
in: Seq[URI] = Seq(),
out: URI = new URI("."),
properties: Seq[Node] = Seq(),
profile: ReasoningProfile = ReasoningProfile.RDFS,
writeToSingleFile: Boolean = false,
sortedOutput: Boolean = false,
parallelism: Int = 4)
// read ReasoningProfile enum
implicit val profilesRead: scopt.Read[ReasoningProfile.Value] =
scopt.Read.reads(ReasoningProfile forName _.toLowerCase())
// read ReasoningProfile enum
implicit val nodeRead: scopt.Read[Node] =
// the CLI parser
val parser = new scopt.OptionParser[Config]("RDFGraphMaterializer") {
head("RDFGraphMaterializer", "0.1.0")
opt[Seq[URI]]('i', "input").required().valueName("<path1>,<path2>,...").
action((x, c) => c.copy(in = x)).
text("path to file or directory that contains the input files (in N-Triples format)")
opt[URI]('o', "out").required().valueName("<directory>").
action((x, c) => c.copy(out = x)).
text("the output directory")
action((x, c) => {
c.copy(properties = x)
text("list of properties for which the transitive closure will be computed (used only for profile 'transitive')")
opt[ReasoningProfile]('p', "profile").required().valueName("{rdfs | rdfs-simple | owl-horst | transitive}").
action((x, c) => c.copy(profile = x)).
text("the reasoning profile")
opt[Unit]("single-file").optional().action((_, c) =>
c.copy(writeToSingleFile = true)).text("write the output to a single file in the output directory")
opt[Unit]("sorted").optional().action((_, c) =>
c.copy(sortedOutput = true)).text("sorted output of the triples (per file)")
opt[Int]("parallelism").optional().action((x, c) =>
c.copy(parallelism = x)).text("the degree of parallelism, i.e. the number of Spark partitions used in the Spark operations")
help("help").text("prints this usage text")
checkConfig(c =>
if (c.profile == TRANSITIVE && failure("Option --properties must not be empty if profile 'transitive' is set")
else success)