Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Feature/flink jena #14

Merged
merged 28 commits into from Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
44067c4
Flink reasoning with Jena datastructures.
LorenzBuehmann Jun 26, 2018
24d641b
Merge branch 'develop' into feature/flink-jena
LorenzBuehmann Jun 27, 2018
2df2c0c
Cont. Flink RDF
LorenzBuehmann Jun 27, 2018
7d231ec
Minor Pom changes
LorenzBuehmann Dec 9, 2016
272f0e7
Flink reasoning with Jena datastructures.
LorenzBuehmann Jun 26, 2018
69624cf
Cont. Flink RDF
LorenzBuehmann Jun 27, 2018
8719a31
Use RDF layer for loading/writing
LorenzBuehmann Jun 16, 2019
ec65954
Merge branch 'feature/flink-jena' of github.com:SANSA-Stack/SANSA-Inf…
LorenzBuehmann Jun 16, 2019
4d99e4f
Key type wrapper.
LorenzBuehmann Jun 17, 2019
fc07298
Flink needs either key type or key selector function for join()
LorenzBuehmann Jun 17, 2019
be1bf6f
Minor changes in main entry class, e.g. always write to disk and bump…
LorenzBuehmann Jun 17, 2019
3a34c4a
Merge branch 'develop' into feature/flink-jena
LorenzBuehmann Jun 18, 2019
095b5a0
boolean function utils.
LorenzBuehmann Jun 27, 2019
04effc9
POM cleanup
LorenzBuehmann Jun 27, 2019
3dd33b9
Merge remote-tracking branch 'origin/feature/0.6.0-SNAPSHOT' into fea…
LorenzBuehmann Jun 27, 2019
d5e6b6c
Utils simplified
LorenzBuehmann Jun 27, 2019
7cf311c
Extended test output if test fails.
LorenzBuehmann Jun 27, 2019
f66a9e0
Simplified conversion from RDD[Triple] to Jena Model
LorenzBuehmann Jun 27, 2019
79e2e3a
minor changes in I/O
LorenzBuehmann Jun 27, 2019
9da7479
subtraction operation for Flink DataSet with Jena Triple
LorenzBuehmann Jun 27, 2019
2e84e5b
Improved Flink conformance test setup
LorenzBuehmann Jun 27, 2019
89394b0
log output of test base class
LorenzBuehmann Jun 27, 2019
af1ff94
Flink conformance test clean up
LorenzBuehmann Jun 28, 2019
8a4d6d3
Flink reasoning on Triple DataSet
LorenzBuehmann Jun 28, 2019
634099a
Added missing deps for dist package
LorenzBuehmann Jun 28, 2019
2ddb2f9
Minor
LorenzBuehmann Jun 28, 2019
b5e03b2
Scalastyle cleanup
LorenzBuehmann Jun 28, 2019
50260fe
Merge branch 'develop' into feature/flink-jena
GezimSejdiu Jun 28, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,33 +1,21 @@
package net.sansa_stack.inference.utils

import org.apache.jena.graph.Triple
import org.apache.jena.shared.PrefixMapping
import org.apache.jena.sparql.util.FmtUtils


/**
* Convert a Jena Triple to an N-Triples string.
*
* @note it turns out, that it might be more efficient to use the Jena stream based writer API per partition.
*
* @author Lorenz Buehmann
*/
class JenaTripleToNTripleString
extends Function[Triple, String]
with java.io.Serializable {
override def apply(t: Triple): String = {
val subStr =
if (t.getSubject.isBlank) {
s"_:${t.getSubject.getBlankNodeLabel}"
} else {
s"<${t.getSubject.getURI}>"
}

val objStr =
if (t.getObject.isLiteral) {
t.getObject
} else if (t.getObject.isBlank) {
s"_:${t.getObject}"
} else {
s"<${t.getObject}>"
}
s"$subStr <${t.getPredicate}> $objStr ."
}
override def apply(t: Triple): String = s"${FmtUtils.stringForTriple(t, null.asInstanceOf[PrefixMapping])} ."
}

Expand Up @@ -2,7 +2,6 @@ package net.sansa_stack.inference.utils

import java.io.ByteArrayInputStream

import org.apache.jena.graph.Triple
import org.apache.jena.riot.{Lang, RDFDataMgr}

import net.sansa_stack.inference.data.RDFTriple
Expand All @@ -13,7 +12,7 @@ import net.sansa_stack.inference.data.RDFTriple
* @author Lorenz Buehmann
*/
class NTriplesStringToRDFTriple
extends Function1[String, Option[RDFTriple]]
extends ((String) => Option[RDFTriple])
with java.io.Serializable {
override def apply(s: String): Option[RDFTriple] = {
val t = RDFDataMgr.createIteratorTriples(new ByteArrayInputStream(s.getBytes), Lang.NTRIPLES, null).next()
Expand Down
@@ -0,0 +1,40 @@
package net.sansa_stack.inference.utils

/**
* Some utils for logical combinations of boolean functions.
*/
object PredicateUtils {

implicit class RichPredicate[A](f: A => Boolean) extends (A => Boolean) {
def apply(v: A): Boolean = f(v)

/**
* Logical 'and'.
*
* @param g
* @return
*/
def &&(g: A => Boolean): A => Boolean = { x: A =>
f(x) && g(x)
}

/**
* Logical 'or'.
*
* @param g
* @return
*/
def ||(g: A => Boolean): A => Boolean = { x: A =>
f(x) || g(x)
}

/**
* Logical 'not'
*
* @return
*/
def unary_! : A => Boolean = { x: A =>
!f(x)
}
}
}
33 changes: 20 additions & 13 deletions sansa-inference-flink/pom.xml
Expand Up @@ -45,19 +45,26 @@ under the License.
<scope>test</scope>
</dependency>

<!-- Apache Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
</dependency>
<!-- RDF Layer -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<groupId>net.sansa-stack</groupId>
<artifactId>sansa-rdf-flink_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Apache Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-clients_${scala.binary.version}</artifactId>-->
<!-- </dependency>-->

<!-- Apache JENA 3.x-->
<dependency>
Expand Down Expand Up @@ -517,12 +524,12 @@ under the License.
<exclude>org.apache.commons:commons-math</exclude>
<exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
<exclude>commons-logging:commons-logging</exclude>
<exclude>org.apache.httpcomponents:httpclient</exclude>
<exclude>org.apache.httpcomponents:httpcore</exclude>
<!-- <exclude>org.apache.httpcomponents:httpclient</exclude>-->
<!-- <exclude>org.apache.httpcomponents:httpcore</exclude>-->
<exclude>commons-codec:commons-codec</exclude>
<exclude>com.fasterxml.jackson.core:jackson-core</exclude>
<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
<!--<exclude>com.fasterxml.jackson.core:jackson-core</exclude>-->
<!--<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>-->
<!--<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>-->
<exclude>org.codehaus.jettison:jettison</exclude>
<exclude>stax:stax-api</exclude>
<exclude>com.typesafe:config</exclude>
Expand Down
Expand Up @@ -21,7 +21,8 @@ import net.sansa_stack.inference.rules.ReasoningProfile._
import net.sansa_stack.inference.rules.{RDFSLevel, ReasoningProfile}

/**
* The class to compute the RDFS materialization of a given RDF graph.
* A class to compute the materialization of a given RDF graph for a given reasoning profile.
* Basically, used as the main class for inference.
*
* @author Lorenz Buehmann
*
Expand Down Expand Up @@ -66,6 +67,7 @@ object RDFGraphMaterializer {

// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// and disable logging to standard out
env.getConfig.disableSysoutLogging()
// env.setParallelism(4)

Expand All @@ -74,7 +76,6 @@ object RDFGraphMaterializer {

// load triples from disk
val graph = RDFGraphLoader.loadFromDisk(input, env)
// println(s"|G| = ${graph.size()}")

// create reasoner
val reasoner = profile match {
Expand All @@ -90,17 +91,22 @@ object RDFGraphMaterializer {

// compute inferred graph
val inferredGraph = reasoner.apply(graph)
println(s"|G_inf| = ${inferredGraph.size()}")
// println(s"|G_inf| = ${inferredGraph.size}")

// println(env.getExecutionPlan())

// write triples to disk
// RDFGraphWriter.writeToDisk(inferredGraph, output, writeToSingleFile, sortedOutput)
RDFGraphWriter.writeToDisk(inferredGraph, output, writeToSingleFile, sortedOutput)

// println(env.getExecutionPlan())
// println(env.getExecutionPlan())

val jn = if (jobName.isEmpty) s"${profile} Reasoning" else jobName
val jn = if (jobName.isEmpty) s"$profile Reasoning" else jobName

// run the program
env.execute(jn)



}

// the config object
Expand All @@ -119,7 +125,7 @@ object RDFGraphMaterializer {

// the CLI parser
val parser = new scopt.OptionParser[Config]("RDFGraphMaterializer") {
head("RDFGraphMaterializer", "0.1.0")
head("RDFGraphMaterializer", "0.6.0")

// opt[Seq[File]]('i', "input").required().valueName("<path1>,<path2>,...").
// action((x, c) => c.copy(in = x)).
Expand All @@ -128,7 +134,7 @@ object RDFGraphMaterializer {
.required()
.valueName("<path>")
.action((x, c) => c.copy(in = x))
.text("path to file or directory that contains the input files (in N-Triple format)")
.text("path to file or directory that contains the input files (in N-Triples format)")

opt[URI]('o', "out")
.required()
Expand Down
@@ -1,9 +1,8 @@
package net.sansa_stack.inference.flink.data

import net.sansa_stack.inference.flink.utils.DataSetUtils
import org.apache.flink.api.scala.{DataSet, _}
import org.apache.jena.graph.Triple
import net.sansa_stack.inference.data.RDFTriple
import org.apache.jena.graph.{Node, Triple}

import net.sansa_stack.inference.flink.utils.DataSetUtils.DataSetOps

/**
Expand All @@ -12,7 +11,7 @@ import net.sansa_stack.inference.flink.utils.DataSetUtils.DataSetOps
* @author Lorenz Buehmann
*
*/
case class RDFGraph(triples: DataSet[RDFTriple]) {
case class RDFGraph(triples: DataSet[Triple]) {

/**
* Returns a DataSet of triples that match with the given input.
Expand All @@ -22,11 +21,11 @@ case class RDFGraph(triples: DataSet[RDFTriple]) {
* @param o the object
* @return DataSet of triples
*/
def find(s: Option[String] = None, p: Option[String] = None, o: Option[String] = None): DataSet[RDFTriple] = {
def find(s: Option[Node] = None, p: Option[Node] = None, o: Option[Node] = None): DataSet[Triple] = {
triples.filter(t =>
(s == None || t.s == s.get) &&
(p == None || t.p == p.get) &&
(o == None || t.o == o.get)
(s.isEmpty || t.subjectMatches(s.get)) &&
(p.isEmpty || t.predicateMatches(p.get)) &&
(o.isEmpty || t.objectMatches(o.get))
)
}

Expand All @@ -35,11 +34,11 @@ case class RDFGraph(triples: DataSet[RDFTriple]) {
*
* @return DataSet of triples
*/
def find(triple: Triple): DataSet[RDFTriple] = {
def find(triple: Triple): DataSet[Triple] = {
find(
if (triple.getSubject.isVariable) None else Option(triple.getSubject.toString),
if (triple.getPredicate.isVariable) None else Option(triple.getPredicate.toString),
if (triple.getObject.isVariable) None else Option(triple.getObject.toString)
if (triple.getSubject.isVariable) None else Option(triple.getSubject),
if (triple.getPredicate.isVariable) None else Option(triple.getPredicate),
if (triple.getObject.isVariable) None else Option(triple.getObject)
)
}

Expand Down Expand Up @@ -68,7 +67,5 @@ case class RDFGraph(triples: DataSet[RDFTriple]) {
*
* @return the number of triples
*/
def size(): Long = {
triples.count()
}
lazy val size: Long = triples.count()
}
@@ -1,15 +1,12 @@
package net.sansa_stack.inference.flink.data

import java.io.File
import java.net.URI

import org.apache.flink.api.scala.{ExecutionEnvironment, _}

import net.sansa_stack.inference.data.RDFTriple
import org.apache.flink.configuration.Configuration
import scala.language.implicitConversions

import net.sansa_stack.inference.utils.NTriplesStringToRDFTriple
import net.sansa_stack.rdf.flink.io.ntriples.NTriplesReader
import org.apache.flink.api.scala.{ExecutionEnvironment, _}


/**
* @author Lorenz Buehmann
Expand All @@ -18,39 +15,31 @@ object RDFGraphLoader {

implicit def pathURIsConverter(uris: Seq[URI]): String = uris.map(p => p.toString).mkString(",")

def loadFromFile(path: String, env: ExecutionEnvironment): RDFGraph = {
val triples = env.readTextFile(path)
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
.map(tokens => RDFTriple(tokens(0), tokens(1), tokens(2))) // tokens to triple

RDFGraph(triples)
def loadFromDisk(path: String, env: ExecutionEnvironment): RDFGraph = {
loadFromDisk(URI.create(path), env)
}

def loadFromDisk(path: URI, env: ExecutionEnvironment): RDFGraph = {
// create a configuration object
val parameters = new Configuration

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true)

// pass the configuration to the data source
val triples = env.readTextFile(path.toString).withParameters(parameters)
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
.map(tokens => RDFTriple(tokens(0), tokens(1), tokens(2)))
.name("triples") // tokens to triple

RDFGraph(triples)
loadFromDisk(Seq(path), env)
}

def loadFromDisk(paths: Seq[URI], env: ExecutionEnvironment): RDFGraph = {
RDFGraph(NTriplesReader.load(env, paths).name("triples"))
}

def main(args: Array[String]): Unit = {
if (args.length == 0) println("Usage: RDFGraphLoader <PATH_TO_FILE>")

val tmp: List[String] = paths.map(path => path.toString).toList
val path = args(0)

val converter = new NTriplesStringToRDFTriple()
// val env = ExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.createLocalEnvironment(parallelism = 2)

val triples = tmp.map(f => env.readTextFile(f).flatMap(line => converter.apply(line))).reduce(_ union _).name("triples")
val ds = RDFGraphLoader.loadFromDisk(path, env).triples

RDFGraph(triples)
println(s"size:${ds.count}")
println("sample data:\n" + ds.first(10).map { _.toString.replaceAll("[\\x00-\\x1f]", "???")}.collect().mkString("\n"))
}

}