From 13bdfa39a4ed34967bf137bc91f6ea63eb530fce Mon Sep 17 00:00:00 2001 From: Jacek Grzebyta Date: Thu, 6 Sep 2018 12:58:34 +0100 Subject: [PATCH 1/9] Ref ANY23-396 - add unit test with expected solution --- .../apache/any23/cli/ExtractorsFlowTest.java | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java diff --git a/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java b/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java new file mode 100644 index 000000000..7131bda90 --- /dev/null +++ b/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.any23.cli; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.FileUtils; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Model; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.model.impl.TreeModelFactory; +import org.eclipse.rdf4j.model.vocabulary.RDF; +import org.eclipse.rdf4j.model.vocabulary.XMLSchema; +import org.eclipse.rdf4j.rio.Rio; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileReader; +import java.util.Arrays; +import java.util.function.Function; + +/** + * This is example for task ANY23-396 + */ +public class ExtractorsFlowTest extends ToolTestBase { + + public ExtractorsFlowTest() { + super(Rover.class); + } + + private static final String testingDatafile = "/org/apache/any23/extractor/csv/test-comma.csv"; + private static final ValueFactory vf = SimpleValueFactory.getInstance(); + private Logger log = LoggerFactory.getLogger(getClass()); + + /* + Domain ontology & data model + */ + private static final String NAMESPACE = "http://supercustom.net/ontology/"; + private static final IRI PERSON = vf.createIRI(NAMESPACE, "Person"); + private static final IRI FULL_NAME = vf.createIRI(NAMESPACE, "fullName"); + private static final IRI HASH = vf.createIRI(NAMESPACE, "hash"); + + private static final String DATA_NAMESPACE = "http://rdf.supercustom.net/data/"; + + // domain ontology person IRI factory + private Function personIRIFactory = (String s) -> { + return vf.createIRI(DATA_NAMESPACE, DigestUtils.sha1Hex(s)); + }; + + + + /** + * Emulates action described in description of issue ANY23-396. + * @throws Exception + */ + @Test + public void runTestFor396() throws Exception { + File outputFile = File.createTempFile("mockdata-", ".ttl", tempDirectory); + File logFile = File.createTempFile("log-exec-", ".txt", tempDirectory); + + runTool(String.format("-l %s -o %s -f turtle -e csv,people -d %s %s", + logFile.getAbsolutePath(), + outputFile.getAbsolutePath(), + "urn:dataser:raw/", + copyResourceToTempFile(testingDatafile).getAbsolutePath())); + + // create some statement of expected model + Model expected = new TreeModelFactory().createEmptyModel(); + String[] fullNames = new String[] {"Davide Palmisano", "Michele Mostarda", "Giovanni Tummarello"}; + + // populate expected model + Arrays.asList(fullNames).stream().forEach( fullN -> { + IRI person = personIRIFactory.apply(fullN); + expected.add(person, RDF.TYPE, PERSON); + expected.add(person, FULL_NAME, vf.createLiteral(fullN)); + expected.add(person, HASH, vf.createLiteral(DigestUtils.sha1Hex(fullN), XMLSchema.HEXBINARY)); + }); + + log.info("\n\nlog file content:\n{}", FileUtils.readFileToString(logFile, "utf-8")); + log.info("\n\nData file: \n{}", FileUtils.readFileToString(outputFile, "utf-8")); + + Assert.assertTrue(assertCompareModels(expected, outputFile)); + } + + + + /** + * Compare expected model and received from input File + * @param expected + * @param received + * @return + */ + public boolean assertCompareModels(Model expected, File received) throws Exception { + Model receivedModel = Rio.parse(new FileReader(received), + received.toURI().toString(), + Rio.getParserFormatForFileName(received.toPath().getFileName().toString()).get()); + + return receivedModel.containsAll(expected); + } + +} From 6d756b8b873deb781729860255be539db463ee33 Mon Sep 17 00:00:00 2001 From: Jacek Grzebyta Date: Thu, 6 Sep 2018 13:18:56 +0100 Subject: [PATCH 2/9] Ref ANY23-396 - add --workflows command line argument to test --- cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java b/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java index 7131bda90..a4986f6f6 100644 --- a/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java +++ b/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java @@ -75,7 +75,7 @@ public void runTestFor396() throws Exception { File outputFile = File.createTempFile("mockdata-", ".ttl", tempDirectory); File logFile = File.createTempFile("log-exec-", ".txt", tempDirectory); - runTool(String.format("-l %s -o %s -f turtle -e csv,people -d %s %s", + runTool(String.format("-l %s --workflows -o %s -f turtle -e csv,people -d %s %s", logFile.getAbsolutePath(), outputFile.getAbsolutePath(), "urn:dataser:raw/", From ed195e5c9bf4f93067cefc82138800a6fe41eef7 Mon Sep 17 00:00:00 2001 From: Jacek Grzebyta Date: Thu, 6 Sep 2018 13:42:33 +0100 Subject: [PATCH 3/9] Simplify models assertion function --- .../org/apache/any23/cli/ExtractorsFlowTest.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java b/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java index a4986f6f6..9e5efa1c9 100644 --- a/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java +++ b/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java @@ -16,6 +16,7 @@ */ package org.apache.any23.cli; +import org.apache.any23.rdf.RDFUtils; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FileUtils; import org.eclipse.rdf4j.model.IRI; @@ -31,7 +32,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FileReader; import java.util.Arrays; import java.util.function.Function; @@ -108,9 +111,12 @@ public void runTestFor396() throws Exception { * @return */ public boolean assertCompareModels(Model expected, File received) throws Exception { - Model receivedModel = Rio.parse(new FileReader(received), - received.toURI().toString(), - Rio.getParserFormatForFileName(received.toPath().getFileName().toString()).get()); + Model receivedModel = new TreeModelFactory().createEmptyModel(); + receivedModel.addAll(Arrays.asList(RDFUtils.parseRDF( + Rio.getParserFormatForFileName(received.getName()).get(), + new BufferedInputStream(new FileInputStream(received)), + received.toURI().toString() + ))); return receivedModel.containsAll(expected); } From 51fd987bd5a4f5081e57d0d5e6908ef37d5d8603 Mon Sep 17 00:00:00 2001 From: Jacek Grzebyta Date: Thu, 6 Sep 2018 14:55:34 +0100 Subject: [PATCH 4/9] Add myself to authors list --- pom.xml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pom.xml b/pom.xml index 2aa62785f..769df39f7 100644 --- a/pom.xml +++ b/pom.xml @@ -189,6 +189,15 @@ Mentor + + jgrzebyta + Jacek Grzebyta + jgrzebyta[at]apache[dot]org + + Committer + PMC Member + + From 4b09a900f248ba31ced50295da5d857614a69ef4 Mon Sep 17 00:00:00 2001 From: Jacek Grzebyta Date: Thu, 6 Sep 2018 17:44:40 +0100 Subject: [PATCH 5/9] Ref ANY23-396 Add lower level classes & update Rover.class - add BefferedTripleHandler - bind global model within ThreadLocal - add --workflows option to rover --- .../main/java/org/apache/any23/cli/Rover.java | 14 +-- .../apache/any23/cli/ExtractorsFlowTest.java | 5 +- .../any23/writer/BufferedTripleHandler.java | 93 +++++++++++++++++++ 3 files changed, 104 insertions(+), 8 deletions(-) create mode 100644 core/src/main/java/org/apache/any23/writer/BufferedTripleHandler.java diff --git a/cli/src/main/java/org/apache/any23/cli/Rover.java b/cli/src/main/java/org/apache/any23/cli/Rover.java index 5b49b393c..d638325a8 100644 --- a/cli/src/main/java/org/apache/any23/cli/Rover.java +++ b/cli/src/main/java/org/apache/any23/cli/Rover.java @@ -30,12 +30,7 @@ import org.apache.any23.filter.IgnoreAccidentalRDFa; import org.apache.any23.filter.IgnoreTitlesOfEmptyDocuments; import org.apache.any23.source.DocumentSource; -import org.apache.any23.writer.BenchmarkTripleHandler; -import org.apache.any23.writer.LoggingTripleHandler; -import org.apache.any23.writer.ReportingTripleHandler; -import org.apache.any23.writer.TripleHandler; -import org.apache.any23.writer.TripleHandlerException; -import org.apache.any23.writer.WriterFactoryRegistry; +import org.apache.any23.writer.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,6 +100,9 @@ public class Rover extends BaseTool { @Parameter(names = { "-d", "--defaultns" }, description = "Override the default namespace used to produce statements.") private String defaultns; + @Parameter(names = "--workflows", description = "Run extractors in workflow.") + private boolean workflows = false; + // non parameters private TripleHandler tripleHandler; @@ -139,6 +137,10 @@ protected void configure() { ); } + if (workflows) { + tripleHandler = new BufferedTripleHandler(tripleHandler); + } + if (logFile != null) { try { tripleHandler = new LoggingTripleHandler(tripleHandler, new PrintWriter(logFile)); diff --git a/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java b/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java index 9e5efa1c9..467e062e8 100644 --- a/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java +++ b/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java @@ -35,12 +35,13 @@ import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileReader; import java.util.Arrays; import java.util.function.Function; /** * This is example for task ANY23-396 + * + * @author Jacek Grzebyta (jgrzebyta@apache.org) */ public class ExtractorsFlowTest extends ToolTestBase { @@ -105,7 +106,7 @@ public void runTestFor396() throws Exception { /** - * Compare expected model and received from input File + * Compare expected model and received from input File. * @param expected * @param received * @return diff --git a/core/src/main/java/org/apache/any23/writer/BufferedTripleHandler.java b/core/src/main/java/org/apache/any23/writer/BufferedTripleHandler.java new file mode 100644 index 000000000..f1beeb484 --- /dev/null +++ b/core/src/main/java/org/apache/any23/writer/BufferedTripleHandler.java @@ -0,0 +1,93 @@ +package org.apache.any23.writer; + +import com.google.common.base.Throwables; +import org.apache.any23.extractor.ExtractionContext; +import org.eclipse.rdf4j.model.*; +import org.eclipse.rdf4j.model.impl.SimpleNamespace; +import org.eclipse.rdf4j.model.impl.TreeModelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Collects all statements until end document. + * + * All statements are kept within {@link Model}. + * + * @author Jacek Grzebyta (jgrzebyta@apache.org) + */ +public class BufferedTripleHandler implements TripleHandler { + + private final Logger log = LoggerFactory.getLogger(BufferedTripleHandler.class); + private Model buffer; + + private final TripleHandler underlying; + private ExtractionContext extractionContext; + + private static final ThreadLocal globalModel = new ThreadLocal<>(); + + public static Model getModel() { + return BufferedTripleHandler.globalModel.get(); + } + + public BufferedTripleHandler(TripleHandler underlying, Model buffer) { + this.buffer = buffer; + this.underlying = underlying; + + // hide model in the thread + globalModel.set(buffer); + } + + public BufferedTripleHandler(TripleHandler underlying) { + this(underlying, new TreeModelFactory().createEmptyModel()); + } + + @Override + public void startDocument(IRI documentIRI) throws TripleHandlerException { + underlying.startDocument(documentIRI); + } + + @Override + public void openContext(ExtractionContext context) throws TripleHandlerException { + this.extractionContext = context; + underlying.openContext(context); + } + + @Override + public void receiveTriple(Resource s, IRI p, Value o, IRI g, ExtractionContext context) throws TripleHandlerException { + buffer.add(s, p, o, g); + } + + @Override + public void receiveNamespace(String prefix, String uri, ExtractionContext context) throws TripleHandlerException { + buffer.setNamespace(new SimpleNamespace(prefix, uri)); + underlying.receiveNamespace(prefix, uri, context); + } + + @Override + public void closeContext(ExtractionContext context) throws TripleHandlerException { + underlying.closeContext(context); + } + + @Override + public void endDocument(IRI documentIRI) throws TripleHandlerException { + // final populate underlying rdf handler. + buffer.stream().forEach(st->{ + try { + underlying.receiveTriple(st.getSubject(), st.getPredicate(), st.getObject(), (IRI) st.getContext(), extractionContext); + } catch (TripleHandlerException e) { + Throwables.propagateIfPossible(e, RuntimeException.class); + } + }); + underlying.endDocument(documentIRI); + } + + @Override + public void setContentLength(long contentLength) { + underlying.setContentLength(contentLength); + } + + @Override + public void close() throws TripleHandlerException { + underlying.close(); + } +} From 3a7d72a1bd8c467698500d24967d94672c67de35 Mon Sep 17 00:00:00 2001 From: Jacek Grzebyta Date: Fri, 7 Sep 2018 16:20:11 +0100 Subject: [PATCH 6/9] Fix ANY23-396 Add ModelExtractor interface - add Model aware interface --- .../main/java/org/apache/any23/extractor/Extractor.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/api/src/main/java/org/apache/any23/extractor/Extractor.java b/api/src/main/java/org/apache/any23/extractor/Extractor.java index 2d34084e0..42c50d2a8 100644 --- a/api/src/main/java/org/apache/any23/extractor/Extractor.java +++ b/api/src/main/java/org/apache/any23/extractor/Extractor.java @@ -18,6 +18,7 @@ package org.apache.any23.extractor; import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Model; import org.w3c.dom.Document; import java.io.IOException; @@ -60,6 +61,14 @@ public interface ContentExtractor extends Extractor { public interface TagSoupDOMExtractor extends Extractor { } + + /** + * This interface specializes an {@link Extractor} able to handle + * {@link org.eclipse.rdf4j.model.Model} as input format + */ + public interface ModelExtractor extends Extractor { + } + /** * Executes the extractor. Will be invoked only once, extractors are * not reusable. From cca21079e3c451baee0e86bbfd97dc91d6873e17 Mon Sep 17 00:00:00 2001 From: Jacek Grzebyta Date: Fri, 7 Sep 2018 16:24:46 +0100 Subject: [PATCH 7/9] Fix ANY23-396 Implement extractors workflow - add workflow flag and adding the value to extraction parameters - add support for new extractor type in SingleDocumentExtraction - implement Model wrapping BufferedTripleHandler --- .../any23/extractor/ExtractionParameters.java | 2 + .../default-configuration.properties | 3 + .../main/java/org/apache/any23/cli/Rover.java | 8 +- .../extractor/SingleDocumentExtraction.java | 21 ++++- .../any23/writer/BufferedTripleHandler.java | 81 +++++++++++++------ 5 files changed, 83 insertions(+), 32 deletions(-) diff --git a/api/src/main/java/org/apache/any23/extractor/ExtractionParameters.java b/api/src/main/java/org/apache/any23/extractor/ExtractionParameters.java index 96a6218f9..5c9a59272 100644 --- a/api/src/main/java/org/apache/any23/extractor/ExtractionParameters.java +++ b/api/src/main/java/org/apache/any23/extractor/ExtractionParameters.java @@ -46,6 +46,8 @@ public class ExtractionParameters { public static final String EXTRACTION_CONTEXT_IRI_PROPERTY = "any23.extraction.context.iri"; + public static final String EXTRACTION_WORKFLOWS_FLAG = "any23.extraction.workflows"; + /** * Constructor. * diff --git a/api/src/main/resources/default-configuration.properties b/api/src/main/resources/default-configuration.properties index 4f68586d3..1c93e242f 100644 --- a/api/src/main/resources/default-configuration.properties +++ b/api/src/main/resources/default-configuration.properties @@ -76,3 +76,6 @@ any23.extraction.csv.comment=# # A confidence threshold for the OpenIE extractions # Any extractions below this value will not be processed. any23.extraction.openie.confidence.threshold=0.5 + +# Allows to enable(on)/disable(off) the workflow feature +any23.extraction.workflows=off \ No newline at end of file diff --git a/cli/src/main/java/org/apache/any23/cli/Rover.java b/cli/src/main/java/org/apache/any23/cli/Rover.java index d638325a8..bc7eccc5e 100644 --- a/cli/src/main/java/org/apache/any23/cli/Rover.java +++ b/cli/src/main/java/org/apache/any23/cli/Rover.java @@ -100,8 +100,8 @@ public class Rover extends BaseTool { @Parameter(names = { "-d", "--defaultns" }, description = "Override the default namespace used to produce statements.") private String defaultns; - @Parameter(names = "--workflows", description = "Run extractors in workflow.") - private boolean workflows = false; + @Parameter(names = "--workflow", description = "Run extractors in workflow.") + private boolean workflow = false; // non parameters @@ -137,7 +137,7 @@ protected void configure() { ); } - if (workflows) { + if (workflow) { tripleHandler = new BufferedTripleHandler(tripleHandler); } @@ -174,6 +174,8 @@ protected void configure() { defaultns); } + extractionParameters.setFlag(ExtractionParameters.EXTRACTION_WORKFLOWS_FLAG, workflow); + any23 = (extractors.isEmpty()) ? new Any23() : new Any23(extractors.toArray(new String[extractors.size()])); any23.setHTTPUserAgent(Any23.DEFAULT_HTTP_CLIENT_USER_AGENT + "/" + Any23.VERSION); diff --git a/core/src/main/java/org/apache/any23/extractor/SingleDocumentExtraction.java b/core/src/main/java/org/apache/any23/extractor/SingleDocumentExtraction.java index 77ed28cfb..d8cbbd45d 100644 --- a/core/src/main/java/org/apache/any23/extractor/SingleDocumentExtraction.java +++ b/core/src/main/java/org/apache/any23/extractor/SingleDocumentExtraction.java @@ -35,15 +35,14 @@ import org.apache.any23.validator.EmptyValidationReport; import org.apache.any23.validator.ValidatorException; import org.apache.any23.vocab.SINDICE; -import org.apache.any23.writer.CompositeTripleHandler; -import org.apache.any23.writer.CountingTripleHandler; -import org.apache.any23.writer.TripleHandler; -import org.apache.any23.writer.TripleHandlerException; +import org.apache.any23.writer.*; import org.apache.any23.extractor.Extractor.BlindExtractor; import org.apache.any23.extractor.Extractor.ContentExtractor; import org.apache.any23.extractor.Extractor.TagSoupDOMExtractor; +import org.apache.any23.extractor.Extractor.ModelExtractor; import org.eclipse.rdf4j.model.BNode; import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Model; import org.eclipse.rdf4j.model.impl.SimpleValueFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -295,6 +294,12 @@ public SingleDocumentExtractionReport run(ExtractionParameters extractionParamet } finally { try { output.endDocument(documentIRI); + + // in case of workflow flag release data from model + if (extractionParameters.getFlag(ExtractionParameters.EXTRACTION_WORKFLOWS_FLAG)) { + BufferedTripleHandler.releaseModel(); + } + } catch (TripleHandlerException e) { log.error(String.format("Error ending document with IRI %s", documentIRI)); throw new ExtractionException(String.format("Error ending document with IRI %s", documentIRI), @@ -483,6 +488,14 @@ private SingleExtractionReport runExtractor( documentReport.getDocument(), extractionResult ); + } else if (extractor instanceof ModelExtractor) { + final ModelExtractor modelExtractor = (ModelExtractor) extractor; + final Model singleModel = BufferedTripleHandler.getModel(); + modelExtractor.run( + extractionParameters, + extractionContext, + singleModel, + extractionResult); } else { throw new IllegalStateException("Extractor type not supported: " + extractor.getClass()); } diff --git a/core/src/main/java/org/apache/any23/writer/BufferedTripleHandler.java b/core/src/main/java/org/apache/any23/writer/BufferedTripleHandler.java index f1beeb484..94f750616 100644 --- a/core/src/main/java/org/apache/any23/writer/BufferedTripleHandler.java +++ b/core/src/main/java/org/apache/any23/writer/BufferedTripleHandler.java @@ -2,7 +2,10 @@ import com.google.common.base.Throwables; import org.apache.any23.extractor.ExtractionContext; -import org.eclipse.rdf4j.model.*; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Model; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Value; import org.eclipse.rdf4j.model.impl.SimpleNamespace; import org.eclipse.rdf4j.model.impl.TreeModelFactory; import org.slf4j.Logger; @@ -17,16 +20,21 @@ */ public class BufferedTripleHandler implements TripleHandler { - private final Logger log = LoggerFactory.getLogger(BufferedTripleHandler.class); + private static final Logger log = LoggerFactory.getLogger(BufferedTripleHandler.class); private Model buffer; - - private final TripleHandler underlying; - private ExtractionContext extractionContext; - - private static final ThreadLocal globalModel = new ThreadLocal<>(); - - public static Model getModel() { - return BufferedTripleHandler.globalModel.get(); + private TripleHandler underlying; + private static boolean isDocumentFinish = false; + + private static class WorkflowContext { + WorkflowContext(TripleHandler underlying, Model model) { + this.model = model; + this.rootHandler = underlying; + } + + Model model; + ExtractionContext context = null; + IRI documentIRI = null; + TripleHandler rootHandler ; } public BufferedTripleHandler(TripleHandler underlying, Model buffer) { @@ -34,22 +42,28 @@ public BufferedTripleHandler(TripleHandler underlying, Model buffer) { this.underlying = underlying; // hide model in the thread - globalModel.set(buffer); + WorkflowContext wc = new WorkflowContext(underlying, buffer); + BufferedTripleHandler.workflowContext.set(wc); } public BufferedTripleHandler(TripleHandler underlying) { this(underlying, new TreeModelFactory().createEmptyModel()); } + private static final ThreadLocal workflowContext = new ThreadLocal<>(); + + public static Model getModel() { + return BufferedTripleHandler.workflowContext.get().model; + } + @Override public void startDocument(IRI documentIRI) throws TripleHandlerException { - underlying.startDocument(documentIRI); + BufferedTripleHandler.workflowContext.get().documentIRI = documentIRI; } @Override public void openContext(ExtractionContext context) throws TripleHandlerException { - this.extractionContext = context; - underlying.openContext(context); + BufferedTripleHandler.workflowContext.get().context = context; } @Override @@ -60,25 +74,16 @@ public void receiveTriple(Resource s, IRI p, Value o, IRI g, ExtractionContext c @Override public void receiveNamespace(String prefix, String uri, ExtractionContext context) throws TripleHandlerException { buffer.setNamespace(new SimpleNamespace(prefix, uri)); - underlying.receiveNamespace(prefix, uri, context); } @Override public void closeContext(ExtractionContext context) throws TripleHandlerException { - underlying.closeContext(context); + // } @Override public void endDocument(IRI documentIRI) throws TripleHandlerException { - // final populate underlying rdf handler. - buffer.stream().forEach(st->{ - try { - underlying.receiveTriple(st.getSubject(), st.getPredicate(), st.getObject(), (IRI) st.getContext(), extractionContext); - } catch (TripleHandlerException e) { - Throwables.propagateIfPossible(e, RuntimeException.class); - } - }); - underlying.endDocument(documentIRI); + BufferedTripleHandler.isDocumentFinish = true; } @Override @@ -90,4 +95,30 @@ public void setContentLength(long contentLength) { public void close() throws TripleHandlerException { underlying.close(); } + + /** + * Releases content of the model into underlying writer. + */ + public static void releaseModel() throws TripleHandlerException { + if(!BufferedTripleHandler.isDocumentFinish) { + throw new RuntimeException("Before releasing document should be finished."); + } + + WorkflowContext workflowContext = BufferedTripleHandler.workflowContext.get(); + Model buffer = workflowContext.model; + TripleHandler underlying = workflowContext.rootHandler; + + + // final populate underlying rdf handler. + underlying.startDocument(workflowContext.documentIRI); + buffer.stream().forEach(st -> { + try { + underlying.receiveTriple(st.getSubject(), st.getPredicate(), st.getObject(), (IRI) st.getContext(), workflowContext.context); + } catch (TripleHandlerException e) { + Throwables.propagateIfPossible(e, RuntimeException.class); + } + }); + underlying.endDocument(workflowContext.documentIRI); + underlying.close(); + } } From f2cfb24a748527c3f361000fcafbd8a0df85335e Mon Sep 17 00:00:00 2001 From: Jacek Grzebyta Date: Fri, 7 Sep 2018 16:26:17 +0100 Subject: [PATCH 8/9] ANY23-396 Full implementation of the proof of concept - add custom extractor & factory --- .../apache/any23/cli/ExtractorsFlowTest.java | 14 +-- .../any23/cli/flows/PeopleExtractor.java | 109 ++++++++++++++++++ .../cli/flows/PeopleExtractorFactory.java | 47 ++++++++ .../apache/any23/cli/flows/package-info.java | 24 ++++ ...rg.apache.any23.extractor.ExtractorFactory | 1 + 5 files changed, 188 insertions(+), 7 deletions(-) create mode 100644 cli/src/test/java/org/apache/any23/cli/flows/PeopleExtractor.java create mode 100644 cli/src/test/java/org/apache/any23/cli/flows/PeopleExtractorFactory.java create mode 100644 cli/src/test/java/org/apache/any23/cli/flows/package-info.java create mode 100644 cli/src/test/resources/META-INF/services/org.apache.any23.extractor.ExtractorFactory diff --git a/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java b/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java index 467e062e8..a017b2069 100644 --- a/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java +++ b/cli/src/test/java/org/apache/any23/cli/ExtractorsFlowTest.java @@ -56,15 +56,15 @@ public ExtractorsFlowTest() { /* Domain ontology & data model */ - private static final String NAMESPACE = "http://supercustom.net/ontology/"; - private static final IRI PERSON = vf.createIRI(NAMESPACE, "Person"); - private static final IRI FULL_NAME = vf.createIRI(NAMESPACE, "fullName"); - private static final IRI HASH = vf.createIRI(NAMESPACE, "hash"); + public static final String NAMESPACE = "http://supercustom.net/ontology/"; + public static final IRI PERSON = vf.createIRI(NAMESPACE, "Person"); + public static final IRI FULL_NAME = vf.createIRI(NAMESPACE, "fullName"); + public static final IRI HASH = vf.createIRI(NAMESPACE, "hash"); - private static final String DATA_NAMESPACE = "http://rdf.supercustom.net/data/"; + public static final String DATA_NAMESPACE = "http://rdf.supercustom.net/data/"; // domain ontology person IRI factory - private Function personIRIFactory = (String s) -> { + public static Function personIRIFactory = (String s) -> { return vf.createIRI(DATA_NAMESPACE, DigestUtils.sha1Hex(s)); }; @@ -79,7 +79,7 @@ public void runTestFor396() throws Exception { File outputFile = File.createTempFile("mockdata-", ".ttl", tempDirectory); File logFile = File.createTempFile("log-exec-", ".txt", tempDirectory); - runTool(String.format("-l %s --workflows -o %s -f turtle -e csv,people -d %s %s", + runTool(String.format("-l %s --workflow -o %s -f turtle -e csv,people -d %s %s", logFile.getAbsolutePath(), outputFile.getAbsolutePath(), "urn:dataser:raw/", diff --git a/cli/src/test/java/org/apache/any23/cli/flows/PeopleExtractor.java b/cli/src/test/java/org/apache/any23/cli/flows/PeopleExtractor.java new file mode 100644 index 000000000..be93ef3ef --- /dev/null +++ b/cli/src/test/java/org/apache/any23/cli/flows/PeopleExtractor.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.any23.cli.flows; + + +import org.apache.any23.cli.ExtractorsFlowTest; +import org.apache.any23.extractor.*; +import org.apache.any23.vocab.CSV; +import org.apache.commons.codec.digest.DigestUtils; +import org.eclipse.rdf4j.model.*; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.model.util.Models; +import org.eclipse.rdf4j.model.vocabulary.RDF; +import org.eclipse.rdf4j.model.vocabulary.XMLSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Proof of concept for ANY23-396 example. + */ +public class PeopleExtractor implements Extractor.ModelExtractor { + + private Logger log = LoggerFactory.getLogger(PeopleExtractor.class); + + private static final String RAW_NS = "urn:dataser:raw/"; + private CSV csv = CSV.getInstance(); + private ValueFactory vf = SimpleValueFactory.getInstance(); + + @Override + public void run(ExtractionParameters extractionParameters, ExtractionContext context, Model in, ExtractionResult out) throws IOException, ExtractionException { + if (in.isEmpty()) { + throw new ExtractionException("model is empty "); + } + + + //for reach row + Set subjects = in.filter(null, RDF.TYPE, csv.rowType) + .stream() + .map( s -> {return s.getSubject(); }) // get subjects from each triple + .collect(Collectors.toSet()); + + log.debug("List of rows: {}", subjects); + + subjects.stream() + .forEach( rowId -> { + String firstName = ""; + Optional firstNameO = Models.objectLiteral(in.filter(rowId, vf.createIRI(RAW_NS, "FirstName"), null)); + if (firstNameO.isPresent()) { + firstName = firstNameO.get().stringValue(); + } + + String lastName = ""; + Optional lastNameO = Models.objectLiteral(in.filter(rowId, vf.createIRI(RAW_NS, "LastName"), null)); + if (lastNameO.isPresent()) { + lastName = lastNameO.get().stringValue(); + } + + String fullName = firstName + " " + lastName; + + IRI personID = ExtractorsFlowTest.personIRIFactory.apply(fullName); + in.add(personID, RDF.TYPE, ExtractorsFlowTest.PERSON); + in.add(personID, ExtractorsFlowTest.FULL_NAME, vf.createLiteral(fullName)); + in.add(personID, ExtractorsFlowTest.HASH, vf.createLiteral(DigestUtils.sha1Hex(fullName), XMLSchema.HEXBINARY)); + + // clean model + in.remove(rowId, null, null); + }); + + // remove description + Set collumns = in.filter(null, csv.columnPosition, null).stream().map(s -> { + return s.getSubject(); + }).collect(Collectors.toSet()); + + collumns.stream() + .forEach(s -> { + in.remove(s, null, null); + }); + + // remove metadata + Resource datasetId = in.filter(null, csv.numberOfColumns, null).iterator().next().getSubject(); + in.remove(datasetId, null, null); + + log.info("Display model: \n\n{}", in); + } + + @Override + public ExtractorDescription getDescription() { + return PeopleExtractorFactory.getDescriptionInstance(); + } +} diff --git a/cli/src/test/java/org/apache/any23/cli/flows/PeopleExtractorFactory.java b/cli/src/test/java/org/apache/any23/cli/flows/PeopleExtractorFactory.java new file mode 100644 index 000000000..a6835df61 --- /dev/null +++ b/cli/src/test/java/org/apache/any23/cli/flows/PeopleExtractorFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.any23.cli.flows; + +import org.apache.any23.extractor.ExtractorDescription; +import org.apache.any23.extractor.ExtractorFactory; +import org.apache.any23.extractor.SimpleExtractorFactory; +import org.apache.any23.rdf.Prefixes; + +import java.util.Arrays; + +public class PeopleExtractorFactory extends SimpleExtractorFactory + implements ExtractorFactory { + + public static final String NAME = "people"; + + public static final Prefixes prefixes = null; + + private static final ExtractorDescription descriptionInstance = new PeopleExtractorFactory(); + + public PeopleExtractorFactory() { + super(NAME, prefixes, Arrays.asList("*/*"), null); + } + + @Override + public PeopleExtractor createExtractor() { + return new PeopleExtractor(); + } + + public static ExtractorDescription getDescriptionInstance() { + return descriptionInstance; + } +} diff --git a/cli/src/test/java/org/apache/any23/cli/flows/package-info.java b/cli/src/test/java/org/apache/any23/cli/flows/package-info.java new file mode 100644 index 000000000..37133a7c6 --- /dev/null +++ b/cli/src/test/java/org/apache/any23/cli/flows/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * This package contains the proof of concept for running extractors in flow. + * + * @see ANY23-396 + */ +package org.apache.any23.cli.flows; diff --git a/cli/src/test/resources/META-INF/services/org.apache.any23.extractor.ExtractorFactory b/cli/src/test/resources/META-INF/services/org.apache.any23.extractor.ExtractorFactory new file mode 100644 index 000000000..899f1b2b3 --- /dev/null +++ b/cli/src/test/resources/META-INF/services/org.apache.any23.extractor.ExtractorFactory @@ -0,0 +1 @@ +org.apache.any23.cli.flows.PeopleExtractorFactory \ No newline at end of file From 7a14f025be61b2e4f9760cda524353c876ad8458 Mon Sep 17 00:00:00 2001 From: Jacek Grzebyta Date: Mon, 10 Sep 2018 12:04:35 +0100 Subject: [PATCH 9/9] ANY23-396 Return last extractor dataset - separate all extractors outcomes --- .../main/java/org/apache/any23/cli/Rover.java | 8 +- .../any23/cli/flows/PeopleExtractor.java | 24 +----- .../any23/writer/BufferedTripleHandler.java | 79 ++++++++++++++----- 3 files changed, 65 insertions(+), 46 deletions(-) diff --git a/cli/src/main/java/org/apache/any23/cli/Rover.java b/cli/src/main/java/org/apache/any23/cli/Rover.java index bc7eccc5e..b4a56d0f2 100644 --- a/cli/src/main/java/org/apache/any23/cli/Rover.java +++ b/cli/src/main/java/org/apache/any23/cli/Rover.java @@ -137,10 +137,6 @@ protected void configure() { ); } - if (workflow) { - tripleHandler = new BufferedTripleHandler(tripleHandler); - } - if (logFile != null) { try { tripleHandler = new LoggingTripleHandler(tripleHandler, new PrintWriter(logFile)); @@ -160,6 +156,10 @@ protected void configure() { ); } + if (workflow) { + tripleHandler = new BufferedTripleHandler(tripleHandler); + } + reportingTripleHandler = new ReportingTripleHandler(tripleHandler); final Configuration configuration = DefaultConfiguration.singleton(); diff --git a/cli/src/test/java/org/apache/any23/cli/flows/PeopleExtractor.java b/cli/src/test/java/org/apache/any23/cli/flows/PeopleExtractor.java index be93ef3ef..eedbc1deb 100644 --- a/cli/src/test/java/org/apache/any23/cli/flows/PeopleExtractor.java +++ b/cli/src/test/java/org/apache/any23/cli/flows/PeopleExtractor.java @@ -77,29 +77,11 @@ public void run(ExtractionParameters extractionParameters, ExtractionContext con String fullName = firstName + " " + lastName; IRI personID = ExtractorsFlowTest.personIRIFactory.apply(fullName); - in.add(personID, RDF.TYPE, ExtractorsFlowTest.PERSON); - in.add(personID, ExtractorsFlowTest.FULL_NAME, vf.createLiteral(fullName)); - in.add(personID, ExtractorsFlowTest.HASH, vf.createLiteral(DigestUtils.sha1Hex(fullName), XMLSchema.HEXBINARY)); - - // clean model - in.remove(rowId, null, null); + out.writeTriple(personID, RDF.TYPE, ExtractorsFlowTest.PERSON); + out.writeTriple(personID, ExtractorsFlowTest.FULL_NAME, vf.createLiteral(fullName)); + out.writeTriple(personID, ExtractorsFlowTest.HASH, vf.createLiteral(DigestUtils.sha1Hex(fullName), XMLSchema.HEXBINARY)); }); - // remove description - Set collumns = in.filter(null, csv.columnPosition, null).stream().map(s -> { - return s.getSubject(); - }).collect(Collectors.toSet()); - - collumns.stream() - .forEach(s -> { - in.remove(s, null, null); - }); - - // remove metadata - Resource datasetId = in.filter(null, csv.numberOfColumns, null).iterator().next().getSubject(); - in.remove(datasetId, null, null); - - log.info("Display model: \n\n{}", in); } @Override diff --git a/core/src/main/java/org/apache/any23/writer/BufferedTripleHandler.java b/core/src/main/java/org/apache/any23/writer/BufferedTripleHandler.java index 94f750616..b9165e205 100644 --- a/core/src/main/java/org/apache/any23/writer/BufferedTripleHandler.java +++ b/core/src/main/java/org/apache/any23/writer/BufferedTripleHandler.java @@ -6,11 +6,15 @@ import org.eclipse.rdf4j.model.Model; import org.eclipse.rdf4j.model.Resource; import org.eclipse.rdf4j.model.Value; -import org.eclipse.rdf4j.model.impl.SimpleNamespace; +import org.eclipse.rdf4j.model.impl.LinkedHashModelFactory; import org.eclipse.rdf4j.model.impl.TreeModelFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.Stack; +import java.util.TreeMap; + /** * Collects all statements until end document. * @@ -21,39 +25,51 @@ public class BufferedTripleHandler implements TripleHandler { private static final Logger log = LoggerFactory.getLogger(BufferedTripleHandler.class); - private Model buffer; private TripleHandler underlying; private static boolean isDocumentFinish = false; + private static class ContextHandler { + ContextHandler(ExtractionContext ctx, Model m) { + extractionContext = ctx; + extractionModel = m; + } + ExtractionContext extractionContext; + Model extractionModel; + } + private static class WorkflowContext { - WorkflowContext(TripleHandler underlying, Model model) { - this.model = model; + WorkflowContext(TripleHandler underlying) { this.rootHandler = underlying; } - Model model; - ExtractionContext context = null; + + Stack extractors = new Stack<>(); + Map modelMap = new TreeMap<>(); IRI documentIRI = null; TripleHandler rootHandler ; } - public BufferedTripleHandler(TripleHandler underlying, Model buffer) { - this.buffer = buffer; + public BufferedTripleHandler(TripleHandler underlying) { this.underlying = underlying; // hide model in the thread - WorkflowContext wc = new WorkflowContext(underlying, buffer); + WorkflowContext wc = new WorkflowContext(underlying); BufferedTripleHandler.workflowContext.set(wc); } - public BufferedTripleHandler(TripleHandler underlying) { - this(underlying, new TreeModelFactory().createEmptyModel()); - } - private static final ThreadLocal workflowContext = new ThreadLocal<>(); + /** + * Returns model which contains all other models. + * @return + */ public static Model getModel() { - return BufferedTripleHandler.workflowContext.get().model; + return BufferedTripleHandler.workflowContext.get().modelMap.values().stream() + .map(ch -> ch.extractionModel) + .reduce(new LinkedHashModelFactory().createEmptyModel(), (mf, exm) -> { + mf.addAll(exm); + return mf; + }); } @Override @@ -63,17 +79,17 @@ public void startDocument(IRI documentIRI) throws TripleHandlerException { @Override public void openContext(ExtractionContext context) throws TripleHandlerException { - BufferedTripleHandler.workflowContext.get().context = context; + // } @Override public void receiveTriple(Resource s, IRI p, Value o, IRI g, ExtractionContext context) throws TripleHandlerException { - buffer.add(s, p, o, g); + getModelForContext(context).add(s,p,o,g); } @Override public void receiveNamespace(String prefix, String uri, ExtractionContext context) throws TripleHandlerException { - buffer.setNamespace(new SimpleNamespace(prefix, uri)); + getModelForContext(context).setNamespace(prefix, uri); } @Override @@ -105,20 +121,41 @@ public static void releaseModel() throws TripleHandlerException { } WorkflowContext workflowContext = BufferedTripleHandler.workflowContext.get(); - Model buffer = workflowContext.model; - TripleHandler underlying = workflowContext.rootHandler; + String lastExtractor = ((Stack) workflowContext.extractors).peek(); + + Map models = workflowContext.modelMap; + TripleHandler underlying = workflowContext.rootHandler; // final populate underlying rdf handler. underlying.startDocument(workflowContext.documentIRI); - buffer.stream().forEach(st -> { + + ExtractionContext outContext = models.get(lastExtractor).extractionContext; + Model outModel = models.get(lastExtractor).extractionModel; + + outModel.stream().forEach( st -> { try { - underlying.receiveTriple(st.getSubject(), st.getPredicate(), st.getObject(), (IRI) st.getContext(), workflowContext.context); + underlying.receiveTriple(st.getSubject(), st.getPredicate(), st.getObject(), (IRI) st.getContext(), outContext); } catch (TripleHandlerException e) { Throwables.propagateIfPossible(e, RuntimeException.class); } }); + underlying.endDocument(workflowContext.documentIRI); underlying.close(); } + + private static Model getModelForContext(ExtractionContext ctx) { + Map modelMap = BufferedTripleHandler.workflowContext.get().modelMap; + Stack extractors = BufferedTripleHandler.workflowContext.get().extractors; + + if (modelMap.containsKey(ctx.getUniqueID())) { + return modelMap.get(ctx.getUniqueID()).extractionModel; + } else { + Model empty = new TreeModelFactory().createEmptyModel(); + modelMap.put(ctx.getUniqueID(), new ContextHandler(ctx, empty)); + extractors.push(ctx.getUniqueID()); + return empty; + } + } }