Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
Merge pull request #14 from SANSA-Stack/feature/flink-jena
Browse files Browse the repository at this point in the history
Feature/flink jena
  • Loading branch information
GezimSejdiu committed Jun 28, 2019
2 parents 0f9741f + 50260fe commit 8de2064
Show file tree
Hide file tree
Showing 32 changed files with 1,024 additions and 523 deletions.
Original file line number Diff line number Diff line change
@@ -1,33 +1,21 @@
package net.sansa_stack.inference.utils

import org.apache.jena.graph.Triple
import org.apache.jena.shared.PrefixMapping
import org.apache.jena.sparql.util.FmtUtils


/**
* Convert a Jena Triple to an N-Triples string.
*
* @note it turns out, that it might be more efficient to use the Jena stream based writer API per partition.
*
* @author Lorenz Buehmann
*/
class JenaTripleToNTripleString
extends Function[Triple, String]
with java.io.Serializable {
override def apply(t: Triple): String = {
val subStr =
if (t.getSubject.isBlank) {
s"_:${t.getSubject.getBlankNodeLabel}"
} else {
s"<${t.getSubject.getURI}>"
}

val objStr =
if (t.getObject.isLiteral) {
t.getObject
} else if (t.getObject.isBlank) {
s"_:${t.getObject}"
} else {
s"<${t.getObject}>"
}
s"$subStr <${t.getPredicate}> $objStr ."
}
override def apply(t: Triple): String = s"${FmtUtils.stringForTriple(t, null.asInstanceOf[PrefixMapping])} ."
}

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package net.sansa_stack.inference.utils

import java.io.ByteArrayInputStream

import org.apache.jena.graph.Triple
import org.apache.jena.riot.{Lang, RDFDataMgr}

import net.sansa_stack.inference.data.RDFTriple
Expand All @@ -13,7 +12,7 @@ import net.sansa_stack.inference.data.RDFTriple
* @author Lorenz Buehmann
*/
class NTriplesStringToRDFTriple
extends Function1[String, Option[RDFTriple]]
extends ((String) => Option[RDFTriple])
with java.io.Serializable {
override def apply(s: String): Option[RDFTriple] = {
val t = RDFDataMgr.createIteratorTriples(new ByteArrayInputStream(s.getBytes), Lang.NTRIPLES, null).next()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package net.sansa_stack.inference.utils

/**
* Some utils for logical combinations of boolean functions.
*/
object PredicateUtils {

implicit class RichPredicate[A](f: A => Boolean) extends (A => Boolean) {
def apply(v: A): Boolean = f(v)

/**
* Logical 'and'.
*
* @param g
* @return
*/
def &&(g: A => Boolean): A => Boolean = { x: A =>
f(x) && g(x)
}

/**
* Logical 'or'.
*
* @param g
* @return
*/
def ||(g: A => Boolean): A => Boolean = { x: A =>
f(x) || g(x)
}

/**
* Logical 'not'
*
* @return
*/
def unary_! : A => Boolean = { x: A =>
!f(x)
}
}
}
33 changes: 20 additions & 13 deletions sansa-inference-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,26 @@ under the License.
<scope>test</scope>
</dependency>

<!-- Apache Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
</dependency>
<!-- RDF Layer -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<groupId>net.sansa-stack</groupId>
<artifactId>sansa-rdf-flink_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Apache Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-clients_${scala.binary.version}</artifactId>-->
<!-- </dependency>-->

<!-- Apache JENA 3.x-->
<dependency>
Expand Down Expand Up @@ -517,12 +524,12 @@ under the License.
<exclude>org.apache.commons:commons-math</exclude>
<exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
<exclude>commons-logging:commons-logging</exclude>
<exclude>org.apache.httpcomponents:httpclient</exclude>
<exclude>org.apache.httpcomponents:httpcore</exclude>
<!-- <exclude>org.apache.httpcomponents:httpclient</exclude>-->
<!-- <exclude>org.apache.httpcomponents:httpcore</exclude>-->
<exclude>commons-codec:commons-codec</exclude>
<exclude>com.fasterxml.jackson.core:jackson-core</exclude>
<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
<!--<exclude>com.fasterxml.jackson.core:jackson-core</exclude>-->
<!--<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>-->
<!--<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>-->
<exclude>org.codehaus.jettison:jettison</exclude>
<exclude>stax:stax-api</exclude>
<exclude>com.typesafe:config</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import net.sansa_stack.inference.rules.ReasoningProfile._
import net.sansa_stack.inference.rules.{RDFSLevel, ReasoningProfile}

/**
* The class to compute the RDFS materialization of a given RDF graph.
* A class to compute the materialization of a given RDF graph for a given reasoning profile.
* Basically, used as the main class for inference.
*
* @author Lorenz Buehmann
*
Expand Down Expand Up @@ -66,6 +67,7 @@ object RDFGraphMaterializer {

// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// and disable logging to standard out
env.getConfig.disableSysoutLogging()
// env.setParallelism(4)

Expand All @@ -74,7 +76,6 @@ object RDFGraphMaterializer {

// load triples from disk
val graph = RDFGraphLoader.loadFromDisk(input, env)
// println(s"|G| = ${graph.size()}")

// create reasoner
val reasoner = profile match {
Expand All @@ -90,17 +91,22 @@ object RDFGraphMaterializer {

// compute inferred graph
val inferredGraph = reasoner.apply(graph)
println(s"|G_inf| = ${inferredGraph.size()}")
// println(s"|G_inf| = ${inferredGraph.size}")

// println(env.getExecutionPlan())

// write triples to disk
// RDFGraphWriter.writeToDisk(inferredGraph, output, writeToSingleFile, sortedOutput)
RDFGraphWriter.writeToDisk(inferredGraph, output, writeToSingleFile, sortedOutput)

// println(env.getExecutionPlan())
// println(env.getExecutionPlan())

val jn = if (jobName.isEmpty) s"${profile} Reasoning" else jobName
val jn = if (jobName.isEmpty) s"$profile Reasoning" else jobName

// run the program
env.execute(jn)



}

// the config object
Expand All @@ -119,7 +125,7 @@ object RDFGraphMaterializer {

// the CLI parser
val parser = new scopt.OptionParser[Config]("RDFGraphMaterializer") {
head("RDFGraphMaterializer", "0.1.0")
head("RDFGraphMaterializer", "0.6.0")

// opt[Seq[File]]('i', "input").required().valueName("<path1>,<path2>,...").
// action((x, c) => c.copy(in = x)).
Expand All @@ -128,7 +134,7 @@ object RDFGraphMaterializer {
.required()
.valueName("<path>")
.action((x, c) => c.copy(in = x))
.text("path to file or directory that contains the input files (in N-Triple format)")
.text("path to file or directory that contains the input files (in N-Triples format)")

opt[URI]('o', "out")
.required()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package net.sansa_stack.inference.flink.data

import net.sansa_stack.inference.flink.utils.DataSetUtils
import org.apache.flink.api.scala.{DataSet, _}
import org.apache.jena.graph.Triple
import net.sansa_stack.inference.data.RDFTriple
import org.apache.jena.graph.{Node, Triple}

import net.sansa_stack.inference.flink.utils.DataSetUtils.DataSetOps

/**
Expand All @@ -12,7 +11,7 @@ import net.sansa_stack.inference.flink.utils.DataSetUtils.DataSetOps
* @author Lorenz Buehmann
*
*/
case class RDFGraph(triples: DataSet[RDFTriple]) {
case class RDFGraph(triples: DataSet[Triple]) {

/**
* Returns a DataSet of triples that match with the given input.
Expand All @@ -22,11 +21,11 @@ case class RDFGraph(triples: DataSet[RDFTriple]) {
* @param o the object
* @return DataSet of triples
*/
def find(s: Option[String] = None, p: Option[String] = None, o: Option[String] = None): DataSet[RDFTriple] = {
def find(s: Option[Node] = None, p: Option[Node] = None, o: Option[Node] = None): DataSet[Triple] = {
triples.filter(t =>
(s == None || t.s == s.get) &&
(p == None || t.p == p.get) &&
(o == None || t.o == o.get)
(s.isEmpty || t.subjectMatches(s.get)) &&
(p.isEmpty || t.predicateMatches(p.get)) &&
(o.isEmpty || t.objectMatches(o.get))
)
}

Expand All @@ -35,11 +34,11 @@ case class RDFGraph(triples: DataSet[RDFTriple]) {
*
* @return DataSet of triples
*/
def find(triple: Triple): DataSet[RDFTriple] = {
def find(triple: Triple): DataSet[Triple] = {
find(
if (triple.getSubject.isVariable) None else Option(triple.getSubject.toString),
if (triple.getPredicate.isVariable) None else Option(triple.getPredicate.toString),
if (triple.getObject.isVariable) None else Option(triple.getObject.toString)
if (triple.getSubject.isVariable) None else Option(triple.getSubject),
if (triple.getPredicate.isVariable) None else Option(triple.getPredicate),
if (triple.getObject.isVariable) None else Option(triple.getObject)
)
}

Expand Down Expand Up @@ -68,7 +67,5 @@ case class RDFGraph(triples: DataSet[RDFTriple]) {
*
* @return the number of triples
*/
def size(): Long = {
triples.count()
}
lazy val size: Long = triples.count()
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package net.sansa_stack.inference.flink.data

import java.io.File
import java.net.URI

import org.apache.flink.api.scala.{ExecutionEnvironment, _}

import net.sansa_stack.inference.data.RDFTriple
import org.apache.flink.configuration.Configuration
import scala.language.implicitConversions

import net.sansa_stack.inference.utils.NTriplesStringToRDFTriple
import net.sansa_stack.rdf.flink.io.ntriples.NTriplesReader
import org.apache.flink.api.scala.{ExecutionEnvironment, _}


/**
* @author Lorenz Buehmann
Expand All @@ -18,39 +15,31 @@ object RDFGraphLoader {

implicit def pathURIsConverter(uris: Seq[URI]): String = uris.map(p => p.toString).mkString(",")

def loadFromFile(path: String, env: ExecutionEnvironment): RDFGraph = {
val triples = env.readTextFile(path)
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
.map(tokens => RDFTriple(tokens(0), tokens(1), tokens(2))) // tokens to triple

RDFGraph(triples)
def loadFromDisk(path: String, env: ExecutionEnvironment): RDFGraph = {
loadFromDisk(URI.create(path), env)
}

def loadFromDisk(path: URI, env: ExecutionEnvironment): RDFGraph = {
// create a configuration object
val parameters = new Configuration

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true)

// pass the configuration to the data source
val triples = env.readTextFile(path.toString).withParameters(parameters)
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
.map(tokens => RDFTriple(tokens(0), tokens(1), tokens(2)))
.name("triples") // tokens to triple

RDFGraph(triples)
loadFromDisk(Seq(path), env)
}

def loadFromDisk(paths: Seq[URI], env: ExecutionEnvironment): RDFGraph = {
RDFGraph(NTriplesReader.load(env, paths).name("triples"))
}

def main(args: Array[String]): Unit = {
if (args.length == 0) println("Usage: RDFGraphLoader <PATH_TO_FILE>")

val tmp: List[String] = paths.map(path => path.toString).toList
val path = args(0)

val converter = new NTriplesStringToRDFTriple()
// val env = ExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.createLocalEnvironment(parallelism = 2)

val triples = tmp.map(f => env.readTextFile(f).flatMap(line => converter.apply(line))).reduce(_ union _).name("triples")
val ds = RDFGraphLoader.loadFromDisk(path, env).triples

RDFGraph(triples)
println(s"size:${ds.count}")
println("sample data:\n" + ds.first(10).map { _.toString.replaceAll("[\\x00-\\x1f]", "???")}.collect().mkString("\n"))
}

}
Loading

0 comments on commit 8de2064

Please sign in to comment.