Permalink
Browse files

TIKA-2662 add a streaming writer for the RecursiveParserWrapper

and integrate throughout
  • Loading branch information...
tballison committed Jun 7, 2018
1 parent 7a69da1 commit c844fc3928516f710a1e62ab99c4eb3f8ab27410
@@ -63,6 +63,7 @@
description="output directory for output"/> <!-- do we want to make this mandatory -->
<option opt="recursiveParserWrapper"
description="use the RecursiveParserWrapper or not (default = false)"/>
<option opt="streamOut" description="stream the output of the RecursiveParserWrapper (default = false)"/>
<option opt="handleExisting" hasArg="true"
description="if an output file already exists, do you want to: overwrite, rename or skip"/>
<option opt="basicHandlerType" hasArg="true"
@@ -20,25 +20,41 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.logging.Handler;
import org.apache.commons.io.FileUtils;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.serialization.JsonMetadataList;
import org.apache.tika.metadata.serialization.JsonStreamingSerializer;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.RecursiveParserWrapper;
import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
import org.apache.tika.sax.BasicContentHandlerFactory;
import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.sax.RecursiveParserWrapperHandler;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
public class TikaCLIBatchIntegrationTest {
@@ -113,6 +129,27 @@ public void testJsonRecursiveBatchIntegration() throws Exception {
}
}
@Test
public void testStreamingJsonRecursiveBatchIntegration() throws Exception {
String[] params = {"-i", testInputDirForCommandLine,
"-o", tempOutputDirForCommandLine,
"-numConsumers", "10",
"-J", //recursive Json
"-t", //plain text in content
"-streamOut"
};
TikaCLI.main(params);
Path jsonFile = tempOutputDir.resolve("test_recursive_embedded.docx.json");
try (Reader reader = Files.newBufferedReader(jsonFile, UTF_8)) {
List<Metadata> metadataList = JsonMetadataList.fromJson(reader);
assertEquals(12, metadataList.size());
assertTrue(metadataList.get(6).get(AbstractRecursiveParserWrapperHandler.TIKA_CONTENT).contains("human events"));
//test that the last written object has been bumped to the first by JsonMetadataList.fromJson()
assertNull( metadataList.get(0).get(AbstractRecursiveParserWrapperHandler.EMBEDDED_RESOURCE_PATH));
}
}
@Test
public void testProcessLogFileConfig() throws Exception {
String[] params = {"-i", testInputDirForCommandLine,
@@ -171,5 +208,42 @@ private void assertFileExists(Path path) {
Files.isRegularFile(path));
}
@Test
public void oneOff() throws Exception {
Parser p = new AutoDetectParser();
RecursiveParserWrapper w = new RecursiveParserWrapper(p);
try (JsonStreamingSerializer writer = new JsonStreamingSerializer(new BufferedWriter(new OutputStreamWriter(
Files.newOutputStream(Paths.get("C:/data/tika_tmp.json")), StandardCharsets.UTF_8)))) {
ContentHandler contentHandler = new WriteoutRPWHandler(
new BasicContentHandlerFactory(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1),
writer);
try (InputStream is = getClass().getResourceAsStream("/test-data/test_recursive_embedded.docx")) {
w.parse(is, contentHandler, new Metadata(), new ParseContext());
}
}
}
private class WriteoutRPWHandler extends AbstractRecursiveParserWrapperHandler {
private final JsonStreamingSerializer jsonWriter;
public WriteoutRPWHandler(ContentHandlerFactory contentHandlerFactory, JsonStreamingSerializer writer) {
super(contentHandlerFactory);
this.jsonWriter = writer;
}
@Override
public void endEmbeddedDocument(ContentHandler contentHandler, Metadata metadata) throws SAXException {
metadata.add(RecursiveParserWrapperHandler.TIKA_CONTENT, contentHandler.toString());
try {
jsonWriter.add(metadata);
} catch (IOException e) {
throw new SAXException(e);
}
}
@Override
public void endDocument(ContentHandler contentHandler, Metadata metadata) throws SAXException {
endEmbeddedDocument(contentHandler, metadata);
}
}
}
@@ -42,11 +42,8 @@
import org.xml.sax.helpers.DefaultHandler;
/**
* Basic FileResourceConsumer that reads files from an input
* directory and writes content to the output directory.
* <p/>
* This tries to catch most of the common exceptions, log them and
* store them in the metadata list output.
* This runs a RecursiveParserWrapper against an input file
* and outputs the json metadata to an output file.
*/
public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
@@ -55,34 +52,21 @@
private final OutputStreamFactory fsOSFactory;
private String outputEncoding = "UTF-8";
/**
* @deprecated use {@link RecursiveParserWrapperFSConsumer#RecursiveParserWrapperFSConsumer(ArrayBlockingQueue, Parser, ContentHandlerFactory, OutputStreamFactory)}
*
* @param queue
* @param parserFactory
* @param parser -- must be RecursiveParserWrapper or a ForkParser that wraps a RecursiveParserWrapper
* @param contentHandlerFactory
* @param fsOSFactory
* @param config
*/
public RecursiveParserWrapperFSConsumer(ArrayBlockingQueue<FileResource> queue,
ParserFactory parserFactory,
ContentHandlerFactory contentHandlerFactory,
OutputStreamFactory fsOSFactory, TikaConfig config) {
super(queue);
this.contentHandlerFactory = contentHandlerFactory;
this.fsOSFactory = fsOSFactory;
Parser parserToWrap = parserFactory.getParser(config);
this.parser = new RecursiveParserWrapper(parserToWrap, contentHandlerFactory);
}
public RecursiveParserWrapperFSConsumer(ArrayBlockingQueue<FileResource> queue,
Parser parserToWrap,
Parser parser,
ContentHandlerFactory contentHandlerFactory,
OutputStreamFactory fsOSFactory) {
super(queue);
this.contentHandlerFactory = contentHandlerFactory;
this.fsOSFactory = fsOSFactory;
this.parser = new RecursiveParserWrapper(parserToWrap, contentHandlerFactory);
this.parser = parser;
}
@Override
@@ -115,24 +99,10 @@ public boolean processFileResource(FileResource fileResource) {
try {
parse(fileResource.getResourceId(), parser, is, handler,
containerMetadata, context);
metadataList = handler.getMetadataList();
} catch (Throwable t) {
thrown = t;
metadataList = handler.getMetadataList();
if (metadataList == null) {
metadataList = new LinkedList<>();
}
Metadata m = null;
if (metadataList.size() == 0) {
m = containerMetadata;
} else {
//take the top metadata item
m = metadataList.remove(0);
}
String stackTrace = ExceptionUtils.getFilteredStackTrace(t);
m.add(TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX+"runtime", stackTrace);
metadataList.add(0, m);
} finally {
metadataList = handler.getMetadataList();
IOUtils.closeQuietly(is);
}
@@ -152,6 +122,8 @@ public boolean processFileResource(FileResource fileResource) {
if (thrown != null) {
if (thrown instanceof Error) {
throw (Error) thrown;
} else if (thrown instanceof SecurityException) {
throw (SecurityException)thrown;
} else {
return false;
}
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.batch.fs;
import org.apache.commons.io.IOUtils;
import org.apache.tika.batch.FileResource;
import org.apache.tika.batch.OutputStreamFactory;
import org.apache.tika.batch.ParserFactory;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.serialization.JsonStreamingSerializer;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.RecursiveParserWrapper;
import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.sax.RecursiveParserWrapperHandler;
import org.apache.tika.utils.ExceptionUtils;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
/**
* This uses the {@link JsonStreamingSerializer} to write out a
* single metadata object at a time.
*/
public class StreamOutRPWFSConsumer extends AbstractFSConsumer {
private final Parser parser;
private final ContentHandlerFactory contentHandlerFactory;
private final OutputStreamFactory fsOSFactory;
private String outputEncoding = "UTF-8";
public StreamOutRPWFSConsumer(ArrayBlockingQueue<FileResource> queue,
Parser parser,
ContentHandlerFactory contentHandlerFactory,
OutputStreamFactory fsOSFactory) {
super(queue);
this.contentHandlerFactory = contentHandlerFactory;
this.fsOSFactory = fsOSFactory;
this.parser = parser;
}
@Override
public boolean processFileResource(FileResource fileResource) {
ParseContext context = new ParseContext();
//try to open outputstream first
OutputStream os = getOutputStream(fsOSFactory, fileResource);
if (os == null) {
LOG.debug("Skipping: {}", fileResource.getMetadata().get(FSProperties.FS_REL_PATH));
return false;
}
//try to open the inputstream before the parse.
//if the parse hangs or throws a nasty exception, at least there will
//be a zero byte file there so that the batchrunner can skip that problematic
//file during the next run.
InputStream is = getInputStream(fileResource);
if (is == null) {
IOUtils.closeQuietly(os);
return false;
}
Metadata containerMetadata = fileResource.getMetadata();
JsonStreamingSerializer writer = new JsonStreamingSerializer(
new OutputStreamWriter(os, StandardCharsets.UTF_8));
WriteoutRPWHandler handler = new WriteoutRPWHandler(contentHandlerFactory, writer);
Throwable thrown = null;
try {
parse(fileResource.getResourceId(), parser, is, handler,
containerMetadata, context);
} catch (Throwable t) {
thrown = t;
} finally {
try {
writer.close();
} catch (IOException e) {
//this is a stop the world kind of thing
LOG.error("{}", getXMLifiedLogMsg(IO_OS + "json", fileResource.getResourceId(), e));
throw new RuntimeException(e);
} finally {
IOUtils.closeQuietly(is);
}
}
if (thrown != null) {
if (thrown instanceof Error) {
throw (Error) thrown;
} else if (thrown instanceof SecurityException) {
throw (SecurityException)thrown;
} else {
return false;
}
}
return true;
}
public String getOutputEncoding() {
return outputEncoding;
}
public void setOutputEncoding(String outputEncoding) {
this.outputEncoding = outputEncoding;
}
//extend AbstractRPWH instead of RecursiveParserWrapperHandler so that
//if we use the ForkParser, the output will not have to be streamed
//back to the proxy, but can
//be written straight to disk.
private class WriteoutRPWHandler extends AbstractRecursiveParserWrapperHandler {
private final JsonStreamingSerializer jsonWriter;
public WriteoutRPWHandler(ContentHandlerFactory contentHandlerFactory, JsonStreamingSerializer writer) {
super(contentHandlerFactory);
this.jsonWriter = writer;
}
@Override
public void endEmbeddedDocument(ContentHandler contentHandler, Metadata metadata) throws SAXException {
metadata.add(RecursiveParserWrapperHandler.TIKA_CONTENT, contentHandler.toString());
try {
jsonWriter.add(metadata);
} catch (IOException e) {
throw new SAXException(e);
}
}
@Override
public void endDocument(ContentHandler contentHandler, Metadata metadata) throws SAXException {
endEmbeddedDocument(contentHandler, metadata);
}
}
}
Oops, something went wrong.

0 comments on commit c844fc3

Please sign in to comment.