From b914a8f5c4c214240b6fbea6a2d4e69d193825ae Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Mon, 11 Sep 2017 16:11:10 +0100 Subject: [PATCH 01/17] Fixing the doc typo in TikaOptions --- .../src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java index fb97678a9187..1934778d8ae2 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java @@ -60,7 +60,7 @@ public interface TikaOptions extends PipelineOptions { Long getQueueMaxPollTime(); void setQueueMaxPollTime(Long value); - @Description("Minumin text fragment length for Tika Parser to report") + @Description("Minimum text fragment length for Tika Parser to report") @Default.Integer(0) Integer getMinimumTextLength(); void setMinimumTextLength(Integer value); From eb2b7c4b038c3e7a2b2e9648defe862311e6b0bb Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Mon, 11 Sep 2017 16:29:09 +0100 Subject: [PATCH 02/17] Fixing the doc typos in TikaSource --- .../src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index 4876dcfdb75f..490dcca80715 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -173,7 +173,7 @@ public Read withInputMetadata(Metadata metadata) { } /** - * Returns a new transform which will report the metadata. + * Returns a new transform which will report the file metadata. */ public Read withReadOutputMetadata(Boolean value) { return toBuilder().setReadOutputMetadata(value).build(); @@ -209,7 +209,7 @@ public Read withParseSynchronously(Boolean value) { } /** - * Path to Tika configuration resource. + * Returns a new transform which will use TikaOptions. */ public Read withOptions(TikaOptions options) { checkNotNull(options, "TikaOptions cannot be empty."); From 30fabd807b4e0717fc1c0a9f7a8694cbec076716 Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Thu, 28 Sep 2017 15:38:11 +0100 Subject: [PATCH 03/17] Preparing for a move from Bounded Source and Reader --- .../apache/beam/sdk/io/tika/ParseResult.java | 120 +++++++++++ .../org/apache/beam/sdk/io/tika/TikaIO.java | 105 ++-------- .../apache/beam/sdk/io/tika/TikaOptions.java | 25 --- .../apache/beam/sdk/io/tika/TikaSource.java | 191 ++++-------------- .../apache/beam/sdk/io/tika/TikaIOTest.java | 170 +++++----------- .../beam/sdk/io/tika/TikaReaderTest.java | 82 -------- .../beam/sdk/io/tika/TikaSourceTest.java | 73 ------- 7 files changed, 216 insertions(+), 550 deletions(-) create mode 100644 sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java delete mode 100644 sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java delete mode 100644 sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java new file mode 100644 index 000000000000..da9fcc31303c --- /dev/null +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.tika; + +import java.io.Serializable; + +import org.apache.tika.metadata.Metadata; + +/** + * Tika parse result containing the file location, metadata + * and content converted to String. + */ +public class ParseResult implements Serializable { + private static final long serialVersionUID = 6133510503781405912L; + private String content; + private Metadata metadata; + private String fileLocation; + + public ParseResult() { + } + + public ParseResult(String fileLocation, String content) { + this(fileLocation, content, new Metadata()); + } + + public ParseResult(String fileLocation, String content, Metadata metadata) { + this.fileLocation = fileLocation; + this.content = content; + this.metadata = metadata; + } + + /** + * Gets a file content. + */ + public String getContent() { + return content; + } + + /** + * Sets a file content. + */ + public void setContent(String content) { + this.content = content; + } + + /** + * Gets a file metadata. + */ + public Metadata getMetadata() { + return metadata; + } + + /** + * Sets a file metadata. + */ + public void setMetadata(Metadata metadata) { + this.metadata = metadata; + } + + /** + * Gets a file location. + */ + public String getFileLocation() { + return fileLocation; + } + + /** + * Sets a file location. + */ + public void setFileLocation(String fileLocation) { + this.fileLocation = fileLocation; + } + + @Override + public int hashCode() { + return fileLocation.hashCode() + 37 * content.hashCode() + 37 * metadata.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ParseResult)) { + return false; + } + + ParseResult pr = (ParseResult) obj; + return this.fileLocation.equals(pr.fileLocation) + && this.content.equals(pr.content) + && isMetadataEqual(this.metadata, pr.metadata); + } + + private static boolean isMetadataEqual(Metadata m1, Metadata m2) { + String[] names = m1.names(); + if (names.length != m2.names().length) { + return false; + } + for (String n : names) { + String v1 = m1.get(n); + String v2 = m2.get(n); + if (!v1.equals(v2)) { + return false; + } + } + return true; + } +} diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index 490dcca80715..f359706f46d2 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -21,9 +21,10 @@ import com.google.auto.value.AutoValue; import javax.annotation.Nullable; + import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -34,6 +35,9 @@ import org.apache.tika.metadata.Metadata; + + + /** * {@link PTransform} for parsing arbitrary files using Apache Tika. * Files in many well known text, binary or scientific formats can be processed. @@ -65,26 +69,17 @@ public class TikaIO { */ public static Read read() { return new AutoValue_TikaIO_Read.Builder() - .setQueuePollTime(Read.DEFAULT_QUEUE_POLL_TIME) - .setQueueMaxPollTime(Read.DEFAULT_QUEUE_MAX_POLL_TIME) .build(); } /** Implementation of {@link #read}. */ @AutoValue - public abstract static class Read extends PTransform> { + public abstract static class Read extends PTransform> { private static final long serialVersionUID = 2198301984784351829L; - public static final long DEFAULT_QUEUE_POLL_TIME = 50L; - public static final long DEFAULT_QUEUE_MAX_POLL_TIME = 3000L; @Nullable abstract ValueProvider getFilepattern(); @Nullable abstract ValueProvider getTikaConfigPath(); @Nullable abstract Metadata getInputMetadata(); - @Nullable abstract Boolean getReadOutputMetadata(); - @Nullable abstract Long getQueuePollTime(); - @Nullable abstract Long getQueueMaxPollTime(); - @Nullable abstract Integer getMinimumTextLength(); - @Nullable abstract Boolean getParseSynchronously(); abstract Builder toBuilder(); @@ -93,11 +88,6 @@ abstract static class Builder { abstract Builder setFilepattern(ValueProvider filepattern); abstract Builder setTikaConfigPath(ValueProvider tikaConfigPath); abstract Builder setInputMetadata(Metadata metadata); - abstract Builder setReadOutputMetadata(Boolean value); - abstract Builder setQueuePollTime(Long value); - abstract Builder setQueueMaxPollTime(Long value); - abstract Builder setMinimumTextLength(Integer value); - abstract Builder setParseSynchronously(Boolean value); abstract Read build(); } @@ -124,8 +114,6 @@ public Read from(ValueProvider filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder() .setFilepattern(filepattern) - .setQueuePollTime(Read.DEFAULT_QUEUE_POLL_TIME) - .setQueueMaxPollTime(Read.DEFAULT_QUEUE_MAX_POLL_TIME) .build(); } @@ -172,53 +160,13 @@ public Read withInputMetadata(Metadata metadata) { return toBuilder().setInputMetadata(inputMetadata).build(); } - /** - * Returns a new transform which will report the file metadata. - */ - public Read withReadOutputMetadata(Boolean value) { - return toBuilder().setReadOutputMetadata(value).build(); - } - - /** - * Returns a new transform which will use the specified queue poll time. - */ - public Read withQueuePollTime(Long value) { - return toBuilder().setQueuePollTime(value).build(); - } - - /** - * Returns a new transform which will use the specified queue max poll time. - */ - public Read withQueueMaxPollTime(Long value) { - return toBuilder().setQueueMaxPollTime(value).build(); - } - - /** - * Returns a new transform which will operate on the text blocks with the - * given minimum text length. - */ - public Read withMinimumTextlength(Integer value) { - return toBuilder().setMinimumTextLength(value).build(); - } - - /** - * Returns a new transform which will use the synchronous reader. - */ - public Read withParseSynchronously(Boolean value) { - return toBuilder().setParseSynchronously(value).build(); - } - /** * Returns a new transform which will use TikaOptions. */ public Read withOptions(TikaOptions options) { checkNotNull(options, "TikaOptions cannot be empty."); Builder builder = toBuilder(); - builder.setFilepattern(StaticValueProvider.of(options.getInput())) - .setQueuePollTime(options.getQueuePollTime()) - .setQueueMaxPollTime(options.getQueueMaxPollTime()) - .setMinimumTextLength(options.getMinimumTextLength()) - .setParseSynchronously(options.getParseSynchronously()); + builder.setFilepattern(StaticValueProvider.of(options.getInput())); if (options.getContentTypeHint() != null) { Metadata metadata = this.getInputMetadata(); if (metadata == null) { @@ -230,17 +178,14 @@ public Read withOptions(TikaOptions options) { if (options.getTikaConfigPath() != null) { builder.setTikaConfigPath(StaticValueProvider.of(options.getTikaConfigPath())); } - if (Boolean.TRUE.equals(options.getReadOutputMetadata())) { - builder.setReadOutputMetadata(options.getReadOutputMetadata()); - } return builder.build(); } @Override - public PCollection expand(PBegin input) { + public PCollection expand(PBegin input) { checkNotNull(this.getFilepattern(), "Filepattern cannot be empty."); - final Bounded read = org.apache.beam.sdk.io.Read.from(new TikaSource(this)); - PCollection pcol = input.getPipeline().apply(read); + final Bounded read = org.apache.beam.sdk.io.Read.from(new TikaSource(this)); + PCollection pcol = input.getPipeline().apply(read); pcol.setCoder(getDefaultOutputCoder()); return pcol; } @@ -275,36 +220,12 @@ public void populateDisplayData(DisplayData.Builder builder) { .add(DisplayData.item("inputMetadata", sb.toString()) .withLabel("Input Metadata")); } - if (Boolean.TRUE.equals(getParseSynchronously())) { - builder - .add(DisplayData.item("parseMode", "synchronous") - .withLabel("Parse Mode")); - } else { - builder - .add(DisplayData.item("parseMode", "asynchronous") - .withLabel("Parse Mode")); - builder - .add(DisplayData.item("queuePollTime", getQueuePollTime().toString()) - .withLabel("Queue Poll Time")) - .add(DisplayData.item("queueMaxPollTime", getQueueMaxPollTime().toString()) - .withLabel("Queue Max Poll Time")); - } - Integer minTextLen = getMinimumTextLength(); - if (minTextLen != null && minTextLen > 0) { - builder - .add(DisplayData.item("minTextLen", getMinimumTextLength().toString()) - .withLabel("Minimum Text Length")); - } - if (Boolean.TRUE.equals(getReadOutputMetadata())) { - builder - .add(DisplayData.item("readOutputMetadata", "true") - .withLabel("Read Output Metadata")); - } } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - protected Coder getDefaultOutputCoder() { - return StringUtf8Coder.of(); + protected Coder getDefaultOutputCoder() { + return SerializableCoder.of((Class) ParseResult.class); } } } diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java index 1934778d8ae2..24349574054f 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java @@ -40,31 +40,6 @@ public interface TikaOptions extends PipelineOptions { String getContentTypeHint(); void setContentTypeHint(String value); - @Description("Metadata report status") - @Default.Boolean(false) - Boolean getReadOutputMetadata(); - void setReadOutputMetadata(Boolean value); - - @Description("Optional use of the synchronous reader") - @Default.Boolean(false) - Boolean getParseSynchronously(); - void setParseSynchronously(Boolean value); - - @Description("Tika Parser queue poll time in milliseconds") - @Default.Long(TikaIO.Read.DEFAULT_QUEUE_POLL_TIME) - Long getQueuePollTime(); - void setQueuePollTime(Long value); - - @Description("Tika Parser queue maximum poll time in milliseconds") - @Default.Long(TikaIO.Read.DEFAULT_QUEUE_MAX_POLL_TIME) - Long getQueueMaxPollTime(); - void setQueueMaxPollTime(Long value); - - @Description("Minimum text fragment length for Tika Parser to report") - @Default.Integer(0) - Integer getMinimumTextLength(); - void setMinimumTextLength(Integer value); - @Description("Pipeline name") @Default.String("TikaRead") String getPipelineName(); diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java index 7c8852beca77..9b221aa01131 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java @@ -28,22 +28,16 @@ import java.io.InputStream; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.NoSuchElementException; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.Source; @@ -56,18 +50,19 @@ import org.apache.tika.parser.AutoDetectParser; import org.apache.tika.parser.ParseContext; import org.apache.tika.parser.Parser; +import org.apache.tika.sax.ToTextContentHandler; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xml.sax.SAXException; -import org.xml.sax.helpers.DefaultHandler; + /** * Implementation detail of {@link TikaIO.Read}. * *

A {@link Source} which can represent the content of the files parsed by Apache Tika. */ -class TikaSource extends BoundedSource { +class TikaSource extends BoundedSource { private static final long serialVersionUID = -509574062910491122L; private static final Logger LOG = LoggerFactory.getLogger(TikaSource.class); @@ -95,7 +90,7 @@ public enum Mode { } @Override - public BoundedReader createReader(PipelineOptions options) throws IOException { + public BoundedReader createReader(PipelineOptions options) throws IOException { this.validate(); checkState(spec.getFilepattern().isAccessible(), "Cannot create a Tika reader without access to the file" @@ -143,9 +138,10 @@ public TikaIO.Read getTikaInputRead() { return spec; } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public Coder getDefaultOutputCoder() { - return StringUtf8Coder.of(); + public Coder getDefaultOutputCoder() { + return SerializableCoder.of((Class) ParseResult.class); } @Override @@ -198,7 +194,7 @@ private static List expandFilePattern(String fileOrPattern) throws IOE * TODO: This is mostly a copy of FileBasedSource internal file-pattern reader * so that code would need to be generalized as part of the future contribution */ - static class FilePatternTikaReader extends BoundedReader { + static class FilePatternTikaReader extends BoundedReader { private final TikaSource source; final ListIterator fileReadersIterator; TikaReader currentReader = null; @@ -234,7 +230,7 @@ private boolean startNextNonemptyReader() throws IOException { } @Override - public String getCurrent() throws NoSuchElementException { + public ParseResult getCurrent() throws NoSuchElementException { return currentReader.getCurrent(); } @@ -259,15 +255,14 @@ public TikaSource getCurrentSource() { } } - static class TikaReader extends BoundedReader { - private ExecutorService execService; + static class TikaReader extends BoundedReader { private final ContentHandlerImpl tikaHandler = new ContentHandlerImpl(); private String current; private TikaSource source; private String filePath; private TikaIO.Read spec; private org.apache.tika.metadata.Metadata tikaMetadata; - private Iterator metadataIterator; + private volatile boolean docParsed; TikaReader(TikaSource source, String filePath) { this.source = source; @@ -293,35 +288,12 @@ public boolean start() throws IOException { tikaMetadata = spec.getInputMetadata() != null ? spec.getInputMetadata() : new org.apache.tika.metadata.Metadata(); - if (spec.getMinimumTextLength() != null) { - tikaHandler.setMinTextLength(spec.getMinimumTextLength()); - } - - if (!Boolean.TRUE.equals(spec.getParseSynchronously())) { - // Try to parse the file on the executor thread to make the best effort - // at letting the pipeline thread advancing over the file content - // without immediately parsing all of it - execService = Executors.newFixedThreadPool(1); - execService.submit(new Runnable() { - public void run() { - try { - parser.parse(is, tikaHandler, tikaMetadata, context); - is.close(); - } catch (Exception ex) { - tikaHandler.setParseException(ex); - } - } - }); - } else { - // Some parsers might not be able to report the content in chunks. - // It does not make sense to create extra threads in such cases - try { - parser.parse(is, tikaHandler, tikaMetadata, context); - } catch (Exception ex) { - throw new IOException(ex); - } finally { - is.close(); - } + try { + parser.parse(is, tikaHandler, tikaMetadata, context); + } catch (Exception ex) { + throw new IOException(ex); + } finally { + is.close(); } return advanceToNext(); } @@ -333,134 +305,41 @@ public boolean advance() throws IOException { } protected boolean advanceToNext() throws IOException { - current = null; - // The content is reported first - if (metadataIterator == null) { - // Check if some content is already available - current = tikaHandler.getCurrent(); - - if (current == null && !Boolean.TRUE.equals(spec.getParseSynchronously())) { - long maxPollTime = 0; - long configuredMaxPollTime = spec.getQueueMaxPollTime() == null - ? TikaIO.Read.DEFAULT_QUEUE_MAX_POLL_TIME : spec.getQueueMaxPollTime(); - long configuredPollTime = spec.getQueuePollTime() == null - ? TikaIO.Read.DEFAULT_QUEUE_POLL_TIME : spec.getQueuePollTime(); - - // Poll the queue till the next piece of data is available - while (current == null && maxPollTime < configuredMaxPollTime) { - boolean docEnded = tikaHandler.waitForNext(configuredPollTime); - current = tikaHandler.getCurrent(); - // End of Document ? - if (docEnded) { - break; - } - maxPollTime += spec.getQueuePollTime(); - } - } - // No more content ? - if (current == null && Boolean.TRUE.equals(spec.getReadOutputMetadata())) { - // Time to report the metadata - metadataIterator = Arrays.asList(tikaMetadata.names()).iterator(); - } - } - - if (metadataIterator != null && metadataIterator.hasNext()) { - String key = metadataIterator.next(); - // The metadata name/value separator can be configured if needed - current = key + "=" + tikaMetadata.get(key); + if (!docParsed) { + current = tikaHandler.toString().trim(); + docParsed = true; + return true; + } else { + return false; } - return current != null; } @Override - public String getCurrent() throws NoSuchElementException { + public ParseResult getCurrent() throws NoSuchElementException { if (current == null) { throw new NoSuchElementException(); } - return current; + ParseResult result = new ParseResult(); + result.setContent(current); + result.setMetadata(tikaMetadata); + result.setFileLocation(filePath); + return result; } @Override - public void close() throws IOException { - if (execService != null) { - execService.shutdown(); - } - } - - ExecutorService getExecutorService() { - return execService; + public BoundedSource getCurrentSource() { + return source; } @Override - public BoundedSource getCurrentSource() { - return source; + public void close() throws IOException { + // complete } } /** * Tika Parser Content Handler. */ - static class ContentHandlerImpl extends DefaultHandler { - private Queue queue = new ConcurrentLinkedQueue<>(); - private volatile boolean documentEnded; - private volatile Exception parseException; - private volatile String current; - private int minTextLength; - - @Override - public void characters(char ch[], int start, int length) throws SAXException { - String value = new String(ch, start, length).trim(); - if (!value.isEmpty()) { - if (minTextLength <= 0) { - queue.add(value); - } else { - current = current == null ? value : current + " " + value; - if (current.length() >= minTextLength) { - queue.add(current); - current = null; - } - } - } - } - - public void setParseException(Exception ex) { - this.parseException = ex; - } - - public synchronized boolean waitForNext(long pollTime) throws IOException { - if (!documentEnded) { - try { - wait(pollTime); - } catch (InterruptedException ex) { - // continue; - } - } - return documentEnded; - } - - @Override - public synchronized void endDocument() throws SAXException { - this.documentEnded = true; - notify(); - } - - public String getCurrent() throws IOException { - checkParseException(); - String value = queue.poll(); - if (value == null && documentEnded) { - return current; - } else { - return value; - } - } - public void checkParseException() throws IOException { - if (parseException != null) { - throw new IOException(parseException); - } - } - - public void setMinTextLength(int minTextLength) { - this.minTextLength = minTextLength; - } + static class ContentHandlerImpl extends ToTextContentHandler { } } diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java index 40ff56990103..3bfb63540975 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import org.apache.tika.exception.TikaException; +import org.apache.tika.metadata.Metadata; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -41,45 +42,30 @@ * Tests TikaInput. */ public class TikaIOTest { - private static final String[] PDF_FILE = new String[] { - "Combining", "can help to ingest", "Apache Beam", "in most known formats.", - "the content from the files", "and", "Apache Tika" - }; - private static final String[] PDF_ZIP_FILE = new String[] { - "Combining", "can help to ingest", "Apache Beam", "in most known formats.", - "the content from the files", "and", "Apache Tika", - "apache-beam-tika.pdf" - }; - private static final String[] ODT_FILE = new String[] { - "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.", - "the content from the files", "and", "Apache Tika" - }; - private static final String[] ODT_FILE_WITH_METADATA = new String[] { - "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.", - "the content from the files", "and", "Apache Tika", - "Author=BeamTikaUser" - }; - private static final String[] ODT_FILE_WITH_MIN_TEXT_LEN = new String[] { - "Combining Apache Beam", "and Apache Tika can help to ingest", "in most known formats.", - "the content from the files" - }; - private static final String[] ODT_FILES = new String[] { - "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.", - "the content from the files", "and", "Apache Tika", - "Open Office", "Text", "PDF", "Excel", "Scientific", - "and other formats", "are supported." - }; + private static final String PDF_FILE = + "Combining\n\nApache Beam\n\nand\n\nApache Tika\n\ncan help to ingest\n\n" + + "the content from the files\n\nin most known formats."; + + private static final String PDF_ZIP_FILE = + "apache-beam-tika.pdf\n\n\nCombining\n\n\nApache Beam\n\n\nand\n\n\nApache Tika\n\n\n" + + "can help to ingest\n\n\nthe content from the files\n\n\nin most known formats."; + + private static final String ODT_FILE = + "Combining\nApache Beam\nand\nApache Tika\ncan help to ingest\n" + + "the content from the files\nin most known formats."; + + private static final String ODT_FILE2 = + "Open Office\nPDF\nExcel\nText\nScientific\nand other formats\nare supported."; @Rule public TestPipeline p = TestPipeline.create(); - @Ignore @Test public void testReadPdfFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath(); - doTestReadFiles(resourcePath, PDF_FILE); + doTestReadFiles(resourcePath, new ParseResult(resourcePath, PDF_FILE)); } @Test @@ -87,7 +73,7 @@ public void testReadZipPdfFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika-pdf.zip").getPath(); - doTestReadFiles(resourcePath, PDF_ZIP_FILE); + doTestReadFiles(resourcePath, new ParseResult(resourcePath, PDF_ZIP_FILE)); } @Test @@ -95,74 +81,35 @@ public void testReadOdtFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - doTestReadFiles(resourcePath, ODT_FILE); + doTestReadFiles(resourcePath, new ParseResult(resourcePath, ODT_FILE, getOdtMetadata())); } @Test public void testReadOdtFiles() throws IOException { - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - resourcePath = resourcePath.replace("apache-beam-tika1", "*"); + String resourcePath1 = getClass().getResource("/apache-beam-tika1.odt").getPath(); + String resourcePath2 = getClass().getResource("/apache-beam-tika2.odt").getPath(); + String resourcePath = resourcePath1.replace("apache-beam-tika1", "*"); - doTestReadFiles(resourcePath, ODT_FILES); + doTestReadFiles(resourcePath, new ParseResult(resourcePath1, ODT_FILE, getOdtMetadata()), + new ParseResult(resourcePath2, ODT_FILE2)); } - private void doTestReadFiles(String resourcePath, String[] expected) throws IOException { - PCollection output = p.apply("ParseFiles", TikaIO.read().from(resourcePath)); - PAssert.that(output).containsInAnyOrder(expected); - p.run(); - } - - @Test - public void testReadOdtFileWithMetadata() throws IOException { - - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - - PCollection output = p.apply("ParseOdtFile", - TikaIO.read().from(resourcePath).withReadOutputMetadata(true)) + private void doTestReadFiles(String resourcePath, ParseResult... expectedResults) + throws IOException { + PCollection output = p.apply("ParseFiles", + TikaIO.read().from(resourcePath)) .apply(ParDo.of(new FilterMetadataFn())); - PAssert.that(output).containsInAnyOrder(ODT_FILE_WITH_METADATA); - p.run(); - } - - @Test - public void testReadOdtFileWithMinTextLength() throws IOException { - - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - - PCollection output = p.apply("ParseOdtFile", - TikaIO.read().from(resourcePath).withMinimumTextlength(20)); - PAssert.that(output).containsInAnyOrder(ODT_FILE_WITH_MIN_TEXT_LEN); - p.run(); - } - - @Test - public void testReadPdfFileSync() throws IOException { - - String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath(); - - PCollection output = p.apply("ParsePdfFile", - TikaIO.read().from(resourcePath).withParseSynchronously(true)); - PAssert.that(output).containsInAnyOrder(PDF_FILE); + PAssert.that(output).containsInAnyOrder(expectedResults); p.run(); } @Test public void testReadDamagedPdfFile() throws IOException { - doTestReadDamagedPdfFile(false); - } - - @Test - public void testReadDamagedPdfFileSync() throws IOException { - doTestReadDamagedPdfFile(true); - } - - private void doTestReadDamagedPdfFile(boolean sync) throws IOException { - String resourcePath = getClass().getResource("/damaged.pdf").getPath(); p.apply("ParseInvalidPdfFile", - TikaIO.read().from(resourcePath).withParseSynchronously(sync)); + TikaIO.read().from(resourcePath)); try { p.run(); fail("Transform failure is expected"); @@ -176,9 +123,7 @@ public void testReadDisplayData() { TikaIO.Read read = TikaIO.read() .from("foo.*") .withTikaConfigPath("tikaconfigpath") - .withContentTypeHint("application/pdf") - .withMinimumTextlength(100) - .withReadOutputMetadata(true); + .withContentTypeHint("application/pdf"); DisplayData displayData = DisplayData.from(read); @@ -186,25 +131,7 @@ public void testReadDisplayData() { assertThat(displayData, hasDisplayItem("tikaConfigPath", "tikaconfigpath")); assertThat(displayData, hasDisplayItem("inputMetadata", "[Content-Type=application/pdf]")); - assertThat(displayData, hasDisplayItem("readOutputMetadata", "true")); - assertThat(displayData, hasDisplayItem("parseMode", "asynchronous")); - assertThat(displayData, hasDisplayItem("queuePollTime", "50")); - assertThat(displayData, hasDisplayItem("queueMaxPollTime", "3000")); - assertThat(displayData, hasDisplayItem("minTextLen", "100")); - assertEquals(8, displayData.items().size()); - } - - @Test - public void testReadDisplayDataSyncMode() { - TikaIO.Read read = TikaIO.read() - .from("foo.*") - .withParseSynchronously(true); - - DisplayData displayData = DisplayData.from(read); - - assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); - assertThat(displayData, hasDisplayItem("parseMode", "synchronous")); - assertEquals(2, displayData.items().size()); + assertEquals(3, displayData.items().size()); } @Test @@ -215,32 +142,22 @@ public void testReadDisplayDataWithDefaultOptions() { DisplayData displayData = DisplayData.from(read); assertThat(displayData, hasDisplayItem("filePattern", "/input/tika.pdf")); - assertThat(displayData, hasDisplayItem("parseMode", "asynchronous")); - assertThat(displayData, hasDisplayItem("queuePollTime", "50")); - assertThat(displayData, hasDisplayItem("queueMaxPollTime", "3000")); - assertEquals(4, displayData.items().size()); + assertEquals(1, displayData.items().size()); } @Test public void testReadDisplayDataWithCustomOptions() { final String[] args = new String[]{"--input=/input/tika.pdf", "--tikaConfigPath=/tikaConfigPath", - "--queuePollTime=10", - "--queueMaxPollTime=1000", - "--contentTypeHint=application/pdf", - "--readOutputMetadata=true"}; + "--contentTypeHint=application/pdf"}; TikaIO.Read read = TikaIO.read().withOptions(createOptions(args)); DisplayData displayData = DisplayData.from(read); assertThat(displayData, hasDisplayItem("filePattern", "/input/tika.pdf")); assertThat(displayData, hasDisplayItem("tikaConfigPath", "/tikaConfigPath")); - assertThat(displayData, hasDisplayItem("parseMode", "asynchronous")); - assertThat(displayData, hasDisplayItem("queuePollTime", "10")); - assertThat(displayData, hasDisplayItem("queueMaxPollTime", "1000")); assertThat(displayData, hasDisplayItem("inputMetadata", "[Content-Type=application/pdf]")); - assertThat(displayData, hasDisplayItem("readOutputMetadata", "true")); - assertEquals(7, displayData.items().size()); + assertEquals(3, displayData.items().size()); } private static TikaOptions createOptions(String[] args) { @@ -248,16 +165,25 @@ private static TikaOptions createOptions(String[] args) { .withValidation().as(TikaOptions.class); } - static class FilterMetadataFn extends DoFn { + static class FilterMetadataFn extends DoFn { private static final long serialVersionUID = 6338014219600516621L; @ProcessElement public void processElement(ProcessContext c) { - String word = c.element(); - if (word.contains("=") && !word.startsWith("Author")) { - return; + ParseResult result = c.element(); + ParseResult newResult = new ParseResult(result.getFileLocation(), result.getContent()); + Metadata m = new Metadata(); + if (result.getFileLocation().endsWith("apache-beam-tika1.odt")) { + m.set("Author", result.getMetadata().get("Author")); } - c.output(word); + newResult.setMetadata(m); + c.output(newResult); } } + + static Metadata getOdtMetadata() { + Metadata m = new Metadata(); + m.set("Author", "BeamTikaUser"); + return m; + } } diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java deleted file mode 100644 index 5c4e7542444a..000000000000 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.tika; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; - -import org.apache.beam.sdk.io.tika.TikaSource.TikaReader; -import org.junit.Test; - -/** - * Tests TikaReader. - */ -public class TikaReaderTest { - private static final List ODT_FILE = Arrays.asList( - "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.", - "the content from the files", "and", "Apache Tika"); - - @Test - public void testOdtFileAsyncReader() throws Exception { - doTestOdtFileReader(false); - } - @Test - public void testOdtFileSyncReader() throws Exception { - doTestOdtFileReader(true); - } - private void doTestOdtFileReader(boolean sync) throws Exception { - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - TikaSource source = new TikaSource(TikaIO.read() - .withParseSynchronously(sync) - .from(resourcePath)); - TikaReader reader = (TikaReader) source.createReader(null); - - List content = new LinkedList(); - for (boolean available = reader.start(); available; available = reader.advance()) { - content.add(reader.getCurrent()); - } - assertTrue(content.containsAll(ODT_FILE)); - if (!sync) { - assertNotNull(reader.getExecutorService()); - } else { - assertNull(reader.getExecutorService()); - } - reader.close(); - } - - @Test - public void testOdtFilesReader() throws Exception { - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - String filePattern = resourcePath.replace("apache-beam-tika1", "*"); - - TikaSource source = new TikaSource(TikaIO.read().from(filePattern)); - TikaSource.FilePatternTikaReader reader = - (TikaSource.FilePatternTikaReader) source.createReader(null); - List content = new LinkedList(); - for (boolean available = reader.start(); available; available = reader.advance()) { - content.add(reader.getCurrent()); - } - assertTrue(content.containsAll(ODT_FILE)); - reader.close(); - } -} diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java deleted file mode 100644 index 550f4695734d..000000000000 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.tika; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.List; - -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.tika.TikaSource.TikaReader; -import org.junit.Test; - -/** - * Tests TikaSource. - */ -public class TikaSourceTest { - - @Test - public void testOdtFileSource() throws Exception { - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - TikaSource source = new TikaSource(TikaIO.read().from(resourcePath)); - assertEquals(StringUtf8Coder.of(), source.getDefaultOutputCoder()); - - assertEquals(TikaSource.Mode.FILEPATTERN, source.getMode()); - assertTrue(source.createReader(null) instanceof TikaReader); - - List sources = source.split(1, null); - assertEquals(1, sources.size()); - TikaSource nextSource = sources.get(0); - assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource.getMode()); - assertEquals(resourcePath, nextSource.getSingleFileMetadata().resourceId().toString()); - } - - @Test - public void testOdtFilesSource() throws Exception { - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - String resourcePath2 = getClass().getResource("/apache-beam-tika2.odt").getPath(); - String filePattern = resourcePath.replace("apache-beam-tika1", "*"); - - TikaSource source = new TikaSource(TikaIO.read().from(filePattern)); - assertEquals(StringUtf8Coder.of(), source.getDefaultOutputCoder()); - - assertEquals(TikaSource.Mode.FILEPATTERN, source.getMode()); - assertTrue(source.createReader(null) instanceof TikaSource.FilePatternTikaReader); - - List sources = source.split(1, null); - assertEquals(2, sources.size()); - TikaSource nextSource = sources.get(0); - assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource.getMode()); - String nextSourceResource = nextSource.getSingleFileMetadata().resourceId().toString(); - TikaSource nextSource2 = sources.get(1); - assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource2.getMode()); - String nextSourceResource2 = nextSource2.getSingleFileMetadata().resourceId().toString(); - assertTrue(nextSourceResource.equals(resourcePath) && nextSourceResource2.equals(resourcePath2) - || nextSourceResource.equals(resourcePath2) && nextSourceResource2.equals(resourcePath)); - } -} From 6b314493c128773a63bc9f4811e6416e8c8e6df3 Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Mon, 2 Oct 2017 11:26:16 +0100 Subject: [PATCH 04/17] Addressing PR comments from Eugene --- .../apache/beam/sdk/io/tika/ParseResult.java | 53 ++++++------------- .../org/apache/beam/sdk/io/tika/TikaIO.java | 14 +---- .../apache/beam/sdk/io/tika/TikaSource.java | 25 ++------- .../apache/beam/sdk/io/tika/TikaIOTest.java | 3 +- 4 files changed, 23 insertions(+), 72 deletions(-) diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java index da9fcc31303c..05325ccf7fab 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.tika; import java.io.Serializable; +import java.util.Objects; import org.apache.tika.metadata.Metadata; @@ -31,9 +32,6 @@ public class ParseResult implements Serializable { private Metadata metadata; private String fileLocation; - public ParseResult() { - } - public ParseResult(String fileLocation, String content) { this(fileLocation, content, new Metadata()); } @@ -51,25 +49,11 @@ public String getContent() { return content; } - /** - * Sets a file content. - */ - public void setContent(String content) { - this.content = content; - } - /** * Gets a file metadata. */ public Metadata getMetadata() { - return metadata; - } - - /** - * Sets a file metadata. - */ - public void setMetadata(Metadata metadata) { - this.metadata = metadata; + return getMetadataCopy(); } /** @@ -79,16 +63,9 @@ public String getFileLocation() { return fileLocation; } - /** - * Sets a file location. - */ - public void setFileLocation(String fileLocation) { - this.fileLocation = fileLocation; - } - @Override public int hashCode() { - return fileLocation.hashCode() + 37 * content.hashCode() + 37 * metadata.hashCode(); + return fileLocation.hashCode() + 37 * content.hashCode() + 37 * getMetadataHashCode(); } @Override @@ -100,21 +77,23 @@ public boolean equals(Object obj) { ParseResult pr = (ParseResult) obj; return this.fileLocation.equals(pr.fileLocation) && this.content.equals(pr.content) - && isMetadataEqual(this.metadata, pr.metadata); + && this.metadata.equals(pr.metadata); } - private static boolean isMetadataEqual(Metadata m1, Metadata m2) { - String[] names = m1.names(); - if (names.length != m2.names().length) { - return false; + private int getMetadataHashCode() { + int hashCode = 0; + for (String name : metadata.names()) { + hashCode += name.hashCode() ^ Objects.hashCode(metadata.getValues(name)); } - for (String n : names) { - String v1 = m1.get(n); - String v2 = m2.get(n); - if (!v1.equals(v2)) { - return false; + return hashCode; + } + private Metadata getMetadataCopy() { + Metadata metadataCopy = new Metadata(); + for (String name : metadata.names()) { + for (String value : metadata.getValues(name)) { + metadataCopy.add(name, value); } } - return true; + return metadataCopy; } } diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index f359706f46d2..cee348b32ce5 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -23,9 +23,6 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; @@ -184,10 +181,7 @@ public Read withOptions(TikaOptions options) { @Override public PCollection expand(PBegin input) { checkNotNull(this.getFilepattern(), "Filepattern cannot be empty."); - final Bounded read = org.apache.beam.sdk.io.Read.from(new TikaSource(this)); - PCollection pcol = input.getPipeline().apply(read); - pcol.setCoder(getDefaultOutputCoder()); - return pcol; + return input.apply(org.apache.beam.sdk.io.Read.from(new TikaSource(this))); } @Override @@ -221,11 +215,5 @@ public void populateDisplayData(DisplayData.Builder builder) { .withLabel("Input Metadata")); } } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - protected Coder getDefaultOutputCoder() { - return SerializableCoder.of((Class) ParseResult.class); - } } } diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java index 9b221aa01131..5cc8f3a25290 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java @@ -140,7 +140,7 @@ public TikaIO.Read getTikaInputRead() { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public Coder getDefaultOutputCoder() { + public Coder getOutputCoder() { return SerializableCoder.of((Class) ParseResult.class); } @@ -262,7 +262,6 @@ static class TikaReader extends BoundedReader { private String filePath; private TikaIO.Read spec; private org.apache.tika.metadata.Metadata tikaMetadata; - private volatile boolean docParsed; TikaReader(TikaSource source, String filePath) { this.source = source; @@ -295,23 +294,13 @@ public boolean start() throws IOException { } finally { is.close(); } - return advanceToNext(); + current = tikaHandler.toString().trim(); + return true; } @Override public boolean advance() throws IOException { - checkState(current != null, "Call start() before advance()"); - return advanceToNext(); - } - - protected boolean advanceToNext() throws IOException { - if (!docParsed) { - current = tikaHandler.toString().trim(); - docParsed = true; - return true; - } else { - return false; - } + return false; } @Override @@ -319,11 +308,7 @@ public ParseResult getCurrent() throws NoSuchElementException { if (current == null) { throw new NoSuchElementException(); } - ParseResult result = new ParseResult(); - result.setContent(current); - result.setMetadata(tikaMetadata); - result.setFileLocation(filePath); - return result; + return new ParseResult(filePath, current, tikaMetadata); } @Override diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java index 3bfb63540975..c4ecdeeda42e 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java @@ -171,12 +171,11 @@ static class FilterMetadataFn extends DoFn { @ProcessElement public void processElement(ProcessContext c) { ParseResult result = c.element(); - ParseResult newResult = new ParseResult(result.getFileLocation(), result.getContent()); Metadata m = new Metadata(); if (result.getFileLocation().endsWith("apache-beam-tika1.odt")) { m.set("Author", result.getMetadata().get("Author")); } - newResult.setMetadata(m); + ParseResult newResult = new ParseResult(result.getFileLocation(), result.getContent(), m); c.output(newResult); } } From 4d60ec7eefaa59f1c12494e7895d1102813f7a03 Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Tue, 3 Oct 2017 12:32:57 +0100 Subject: [PATCH 05/17] Minor optimizations to ParseResult, removing ContentHandler extension for now --- .../apache/beam/sdk/io/tika/ParseResult.java | 18 ++++++++++++------ .../apache/beam/sdk/io/tika/TikaSource.java | 9 ++------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java index 05325ccf7fab..93566fe52a45 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java @@ -28,9 +28,10 @@ */ public class ParseResult implements Serializable { private static final long serialVersionUID = 6133510503781405912L; - private String content; - private Metadata metadata; - private String fileLocation; + private final String fileLocation; + private final String content; + private final Metadata metadata; + private final String[] metadataNames; public ParseResult(String fileLocation, String content) { this(fileLocation, content, new Metadata()); @@ -40,6 +41,7 @@ public ParseResult(String fileLocation, String content, Metadata metadata) { this.fileLocation = fileLocation; this.content = content; this.metadata = metadata; + this.metadataNames = metadata.names(); } /** @@ -65,7 +67,11 @@ public String getFileLocation() { @Override public int hashCode() { - return fileLocation.hashCode() + 37 * content.hashCode() + 37 * getMetadataHashCode(); + int hashCode = 1; + hashCode = 31 * hashCode + fileLocation.hashCode(); + hashCode = 31 * hashCode + content.hashCode(); + hashCode = 31 * hashCode + getMetadataHashCode(); + return hashCode; } @Override @@ -82,14 +88,14 @@ public boolean equals(Object obj) { private int getMetadataHashCode() { int hashCode = 0; - for (String name : metadata.names()) { + for (String name : metadataNames) { hashCode += name.hashCode() ^ Objects.hashCode(metadata.getValues(name)); } return hashCode; } private Metadata getMetadataCopy() { Metadata metadataCopy = new Metadata(); - for (String name : metadata.names()) { + for (String name : metadataNames) { for (String value : metadata.getValues(name)) { metadataCopy.add(name, value); } diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java index 5cc8f3a25290..aa55b617a803 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java @@ -54,6 +54,7 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.xml.sax.ContentHandler; import org.xml.sax.SAXException; @@ -256,7 +257,7 @@ public TikaSource getCurrentSource() { } static class TikaReader extends BoundedReader { - private final ContentHandlerImpl tikaHandler = new ContentHandlerImpl(); + private final ContentHandler tikaHandler = new ToTextContentHandler(); private String current; private TikaSource source; private String filePath; @@ -321,10 +322,4 @@ public void close() throws IOException { // complete } } - - /** - * Tika Parser Content Handler. - */ - static class ContentHandlerImpl extends ToTextContentHandler { - } } From 26af36cbc5a1fd3eef89b9796ab14faf73078152 Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Wed, 4 Oct 2017 09:57:04 +0100 Subject: [PATCH 06/17] Removing unused import in TikaIOTest --- .../src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java index c4ecdeeda42e..408c48452088 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java @@ -34,7 +34,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.tika.exception.TikaException; import org.apache.tika.metadata.Metadata; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; From 9fbaa45b0247cb521b6b26b06f156f29dd7803b3 Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Thu, 5 Oct 2017 17:45:55 +0100 Subject: [PATCH 07/17] Removing TikaSource, more updates to follow --- sdks/java/io/tika/pom.xml | 10 - .../org/apache/beam/sdk/io/tika/TikaIO.java | 119 ++++--- .../apache/beam/sdk/io/tika/TikaSource.java | 325 ------------------ .../apache/beam/sdk/io/tika/TikaIOTest.java | 42 ++- 4 files changed, 97 insertions(+), 399 deletions(-) delete mode 100644 sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java diff --git a/sdks/java/io/tika/pom.xml b/sdks/java/io/tika/pom.xml index b8f7ecefcdde..d7f7e42d595f 100644 --- a/sdks/java/io/tika/pom.xml +++ b/sdks/java/io/tika/pom.xml @@ -53,16 +53,6 @@ jsr305 - - joda-time - joda-time - - - - org.slf4j - slf4j-api - - org.apache.tika tika-core diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index cee348b32ce5..ced97f71ca83 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -20,18 +20,31 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; + import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.tika.config.TikaConfig; +import org.apache.tika.exception.TikaException; +import org.apache.tika.io.TikaInputStream; import org.apache.tika.metadata.Metadata; - - +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.parser.ParseContext; +import org.apache.tika.parser.Parser; +import org.apache.tika.sax.ToTextContentHandler; +import org.xml.sax.ContentHandler; +import org.xml.sax.SAXException; @@ -64,14 +77,15 @@ public class TikaIO { * A {@link PTransform} that parses one or more files and returns a bounded {@link PCollection} * containing one element for each sequence of characters reported by Apache Tika SAX Parser. */ - public static Read read() { - return new AutoValue_TikaIO_Read.Builder() + public static ParseAll parseAll() { + return new AutoValue_TikaIO_ParseAll.Builder() .build(); } /** Implementation of {@link #read}. */ @AutoValue - public abstract static class Read extends PTransform> { + public abstract static class ParseAll extends + PTransform, PCollection> { private static final long serialVersionUID = 2198301984784351829L; @Nullable abstract ValueProvider getFilepattern(); @@ -86,7 +100,7 @@ abstract static class Builder { abstract Builder setTikaConfigPath(ValueProvider tikaConfigPath); abstract Builder setInputMetadata(Metadata metadata); - abstract Read build(); + abstract ParseAll build(); } /** @@ -101,13 +115,13 @@ abstract static class Builder { *

Standard Java * Filesystem glob patterns ("*", "?", "[..]") are supported. */ - public Read from(String filepattern) { + public ParseAll from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } /** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */ - public Read from(ValueProvider filepattern) { + public ParseAll from(ValueProvider filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder() .setFilepattern(filepattern) @@ -117,13 +131,13 @@ public Read from(ValueProvider filepattern) { /** * Returns a new transform which will use the custom TikaConfig. */ - public Read withTikaConfigPath(String tikaConfigPath) { + public ParseAll withTikaConfigPath(String tikaConfigPath) { checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty."); return withTikaConfigPath(StaticValueProvider.of(tikaConfigPath)); } /** Same as {@code with(tikaConfigPath)}, but accepting a {@link ValueProvider}. */ - public Read withTikaConfigPath(ValueProvider tikaConfigPath) { + public ParseAll withTikaConfigPath(ValueProvider tikaConfigPath) { checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty."); return toBuilder() .setTikaConfigPath(tikaConfigPath) @@ -134,7 +148,7 @@ public Read withTikaConfigPath(ValueProvider tikaConfigPath) { * Returns a new transform which will use the provided content type hint * to make the file parser detection more efficient. */ - public Read withContentTypeHint(String contentType) { + public ParseAll withContentTypeHint(String contentType) { checkNotNull(contentType, "ContentType cannot be empty."); Metadata metadata = new Metadata(); metadata.add(Metadata.CONTENT_TYPE, contentType); @@ -145,7 +159,7 @@ public Read withContentTypeHint(String contentType) { * Returns a new transform which will use the provided input metadata * for parsing the files. */ - public Read withInputMetadata(Metadata metadata) { + public ParseAll withInputMetadata(Metadata metadata) { Metadata inputMetadata = this.getInputMetadata(); if (inputMetadata != null) { for (String name : metadata.names()) { @@ -157,42 +171,20 @@ public Read withInputMetadata(Metadata metadata) { return toBuilder().setInputMetadata(inputMetadata).build(); } - /** - * Returns a new transform which will use TikaOptions. - */ - public Read withOptions(TikaOptions options) { - checkNotNull(options, "TikaOptions cannot be empty."); - Builder builder = toBuilder(); - builder.setFilepattern(StaticValueProvider.of(options.getInput())); - if (options.getContentTypeHint() != null) { - Metadata metadata = this.getInputMetadata(); - if (metadata == null) { - metadata = new Metadata(); - } - metadata.add(Metadata.CONTENT_TYPE, options.getContentTypeHint()); - builder.setInputMetadata(metadata); - } - if (options.getTikaConfigPath() != null) { - builder.setTikaConfigPath(StaticValueProvider.of(options.getTikaConfigPath())); - } - return builder.build(); - } - @Override - public PCollection expand(PBegin input) { - checkNotNull(this.getFilepattern(), "Filepattern cannot be empty."); - return input.apply(org.apache.beam.sdk.io.Read.from(new TikaSource(this))); + public PCollection expand(PCollection input) { + return input.apply(ParDo.of(new ParseToStringFn(this))); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - String filepatternDisplay = getFilepattern().isAccessible() - ? getFilepattern().get() : getFilepattern().toString(); - builder - .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay) - .withLabel("File Pattern")); + //String filepatternDisplay = getFilepattern().isAccessible() + // ? getFilepattern().get() : getFilepattern().toString(); + //builder + // .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay) + // .withLabel("File Pattern")); if (getTikaConfigPath() != null) { String tikaConfigPathDisplay = getTikaConfigPath().isAccessible() ? getTikaConfigPath().get() : getTikaConfigPath().toString(); @@ -215,5 +207,48 @@ public void populateDisplayData(DisplayData.Builder builder) { .withLabel("Input Metadata")); } } + + private static class ParseToStringFn extends DoFn { + + private static final long serialVersionUID = 6837207505313720989L; + private TikaIO.ParseAll spec; + ParseToStringFn(TikaIO.ParseAll spec) { + this.spec = spec; + } + @ProcessElement + public void processElement(ProcessContext c) throws IOException { + ReadableFile file = c.element(); + InputStream stream = Channels.newInputStream(file.open()); + + final InputStream is = TikaInputStream.get(stream); + TikaConfig tikaConfig = null; + if (spec.getTikaConfigPath() != null) { + try { + tikaConfig = new TikaConfig(spec.getTikaConfigPath().get()); + } catch (TikaException | SAXException e) { + throw new IOException(e); + } + } + final Parser parser = tikaConfig == null ? new AutoDetectParser() + : new AutoDetectParser(tikaConfig); + final ParseContext context = new ParseContext(); + context.set(Parser.class, parser); + org.apache.tika.metadata.Metadata tikaMetadata = spec.getInputMetadata() != null + ? spec.getInputMetadata() : new org.apache.tika.metadata.Metadata(); + + ContentHandler tikaHandler = new ToTextContentHandler(); + try { + parser.parse(is, tikaHandler, tikaMetadata, context); + } catch (Exception ex) { + throw new IOException(ex); + } finally { + is.close(); + } + + String content = tikaHandler.toString().trim(); + String filePath = file.getMetadata().resourceId().toString(); + c.output(new ParseResult(filePath, content, tikaMetadata)); + } + } } } diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java deleted file mode 100644 index aa55b617a803..000000000000 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.tika; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.ListIterator; -import java.util.NoSuchElementException; - -import javax.annotation.Nullable; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.io.Source; -import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.tika.config.TikaConfig; -import org.apache.tika.exception.TikaException; -import org.apache.tika.io.TikaInputStream; -import org.apache.tika.parser.AutoDetectParser; -import org.apache.tika.parser.ParseContext; -import org.apache.tika.parser.Parser; -import org.apache.tika.sax.ToTextContentHandler; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.xml.sax.ContentHandler; -import org.xml.sax.SAXException; - - -/** - * Implementation detail of {@link TikaIO.Read}. - * - *

A {@link Source} which can represent the content of the files parsed by Apache Tika. - */ -class TikaSource extends BoundedSource { - private static final long serialVersionUID = -509574062910491122L; - private static final Logger LOG = LoggerFactory.getLogger(TikaSource.class); - - @Nullable - private MatchResult.Metadata singleFileMetadata; - private final Mode mode; - private final TikaIO.Read spec; - - /** - * Source mode. - */ - public enum Mode { - FILEPATTERN, SINGLE_FILE - } - - TikaSource(TikaIO.Read spec) { - this.mode = Mode.FILEPATTERN; - this.spec = spec; - } - - TikaSource(Metadata fileMetadata, TikaIO.Read spec) { - mode = Mode.SINGLE_FILE; - this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata"); - this.spec = spec; - } - - @Override - public BoundedReader createReader(PipelineOptions options) throws IOException { - this.validate(); - checkState(spec.getFilepattern().isAccessible(), - "Cannot create a Tika reader without access to the file" - + " or pattern specification: {}.", spec.getFilepattern()); - if (spec.getTikaConfigPath() != null) { - checkState(spec.getTikaConfigPath().isAccessible(), - "Cannot create a Tika reader without access to its configuration", - spec.getTikaConfigPath()); - } - - String fileOrPattern = spec.getFilepattern().get(); - if (mode == Mode.FILEPATTERN) { - List fileMetadata = expandFilePattern(fileOrPattern); - List fileReaders = new ArrayList<>(); - for (Metadata metadata : fileMetadata) { - fileReaders.add(new TikaReader(this, metadata.resourceId().toString())); - } - if (fileReaders.size() == 1) { - return fileReaders.get(0); - } - return new FilePatternTikaReader(this, fileReaders); - } else { - return new TikaReader(this, singleFileMetadata.resourceId().toString()); - } - - } - - @Override - public List split(long desiredBundleSizeBytes, PipelineOptions options) - throws Exception { - if (mode == Mode.SINGLE_FILE) { - return ImmutableList.of(this); - } else { - List fileMetadata = expandFilePattern(spec.getFilepattern().get()); - - List splitResults = new LinkedList<>(); - for (Metadata metadata : fileMetadata) { - splitResults.add(new TikaSource(metadata, spec)); - } - return splitResults; - } - } - - public TikaIO.Read getTikaInputRead() { - return spec; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public Coder getOutputCoder() { - return SerializableCoder.of((Class) ParseResult.class); - } - - @Override - public void validate() { - switch (mode) { - case FILEPATTERN: - checkArgument(this.singleFileMetadata == null, - "Unexpected initialized singleFileMetadata value"); - break; - case SINGLE_FILE: - checkNotNull(this.singleFileMetadata, - "Unexpected null singleFileMetadata value"); - break; - default: - throw new IllegalStateException("Unknown mode: " + mode); - } - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - long totalSize = 0; - List fileMetadata = expandFilePattern(spec.getFilepattern().get()); - for (Metadata metadata : fileMetadata) { - totalSize += metadata.sizeBytes(); - } - return totalSize; - } - - Mode getMode() { - return this.mode; - } - - Metadata getSingleFileMetadata() { - return this.singleFileMetadata; - } - - private static List expandFilePattern(String fileOrPattern) throws IOException { - MatchResult matches = Iterables.getOnlyElement( - FileSystems.match(Collections.singletonList(fileOrPattern))); - LOG.info("Matched {} files for pattern {}", matches.metadata().size(), fileOrPattern); - List metadata = ImmutableList.copyOf(matches.metadata()); - checkArgument(!metadata.isEmpty(), - "Unable to find any files matching %s", fileOrPattern); - - return metadata; - } - - /** - * FilePatternTikaReader. - * TODO: This is mostly a copy of FileBasedSource internal file-pattern reader - * so that code would need to be generalized as part of the future contribution - */ - static class FilePatternTikaReader extends BoundedReader { - private final TikaSource source; - final ListIterator fileReadersIterator; - TikaReader currentReader = null; - - public FilePatternTikaReader(TikaSource source, List fileReaders) { - this.source = source; - this.fileReadersIterator = fileReaders.listIterator(); - } - - @Override - public boolean start() throws IOException { - return startNextNonemptyReader(); - } - - @Override - public boolean advance() throws IOException { - checkState(currentReader != null, "Call start() before advance()"); - if (currentReader.advance()) { - return true; - } - return startNextNonemptyReader(); - } - - private boolean startNextNonemptyReader() throws IOException { - while (fileReadersIterator.hasNext()) { - currentReader = fileReadersIterator.next(); - if (currentReader.start()) { - return true; - } - currentReader.close(); - } - return false; - } - - @Override - public ParseResult getCurrent() throws NoSuchElementException { - return currentReader.getCurrent(); - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return currentReader.getCurrentTimestamp(); - } - - @Override - public void close() throws IOException { - if (currentReader != null) { - currentReader.close(); - } - while (fileReadersIterator.hasNext()) { - fileReadersIterator.next().close(); - } - } - - @Override - public TikaSource getCurrentSource() { - return source; - } - } - - static class TikaReader extends BoundedReader { - private final ContentHandler tikaHandler = new ToTextContentHandler(); - private String current; - private TikaSource source; - private String filePath; - private TikaIO.Read spec; - private org.apache.tika.metadata.Metadata tikaMetadata; - - TikaReader(TikaSource source, String filePath) { - this.source = source; - this.filePath = filePath; - this.spec = source.getTikaInputRead(); - } - - @Override - public boolean start() throws IOException { - final InputStream is = TikaInputStream.get(Paths.get(filePath)); - TikaConfig tikaConfig = null; - if (spec.getTikaConfigPath() != null) { - try { - tikaConfig = new TikaConfig(spec.getTikaConfigPath().get()); - } catch (TikaException | SAXException e) { - throw new IOException(e); - } - } - final Parser parser = tikaConfig == null ? new AutoDetectParser() - : new AutoDetectParser(tikaConfig); - final ParseContext context = new ParseContext(); - context.set(Parser.class, parser); - tikaMetadata = spec.getInputMetadata() != null ? spec.getInputMetadata() - : new org.apache.tika.metadata.Metadata(); - - try { - parser.parse(is, tikaHandler, tikaMetadata, context); - } catch (Exception ex) { - throw new IOException(ex); - } finally { - is.close(); - } - current = tikaHandler.toString().trim(); - return true; - } - - @Override - public boolean advance() throws IOException { - return false; - } - - @Override - public ParseResult getCurrent() throws NoSuchElementException { - if (current == null) { - throw new NoSuchElementException(); - } - return new ParseResult(filePath, current, tikaMetadata); - } - - @Override - public BoundedSource getCurrentSource() { - return source; - } - - @Override - public void close() throws IOException { - // complete - } - } -} diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java index 408c48452088..66a3537587b6 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java @@ -25,6 +25,8 @@ import java.io.IOException; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -34,6 +36,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.tika.exception.TikaException; import org.apache.tika.metadata.Metadata; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -94,9 +97,11 @@ public void testReadOdtFiles() throws IOException { } private void doTestReadFiles(String resourcePath, ParseResult... expectedResults) - throws IOException { - PCollection output = p.apply("ParseFiles", - TikaIO.read().from(resourcePath)) + throws IOException { + PCollection output = + p.apply("ParseFiles", FileIO.match().filepattern(resourcePath)) + .apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)) + .apply(TikaIO.parseAll()) .apply(ParDo.of(new FilterMetadataFn())); PAssert.that(output).containsInAnyOrder(expectedResults); p.run(); @@ -107,8 +112,9 @@ public void testReadDamagedPdfFile() throws IOException { String resourcePath = getClass().getResource("/damaged.pdf").getPath(); - p.apply("ParseInvalidPdfFile", - TikaIO.read().from(resourcePath)); + p.apply("ParseInvalidPdfFile", FileIO.match().filepattern(resourcePath)) + .apply(FileIO.readMatches()) + .apply(TikaIO.parseAll()); try { p.run(); fail("Transform failure is expected"); @@ -118,8 +124,9 @@ public void testReadDamagedPdfFile() throws IOException { } @Test + @Ignore public void testReadDisplayData() { - TikaIO.Read read = TikaIO.read() + TikaIO.ParseAll read = TikaIO.parseAll() .from("foo.*") .withTikaConfigPath("tikaconfigpath") .withContentTypeHint("application/pdf"); @@ -134,21 +141,17 @@ public void testReadDisplayData() { } @Test - public void testReadDisplayDataWithDefaultOptions() { - final String[] args = new String[]{"--input=/input/tika.pdf"}; - TikaIO.Read read = TikaIO.read().withOptions(createOptions(args)); - - DisplayData displayData = DisplayData.from(read); - - assertThat(displayData, hasDisplayItem("filePattern", "/input/tika.pdf")); - assertEquals(1, displayData.items().size()); - } - @Test + @Ignore public void testReadDisplayDataWithCustomOptions() { final String[] args = new String[]{"--input=/input/tika.pdf", "--tikaConfigPath=/tikaConfigPath", "--contentTypeHint=application/pdf"}; - TikaIO.Read read = TikaIO.read().withOptions(createOptions(args)); + TikaOptions options = PipelineOptionsFactory.fromArgs(args) + .withValidation().as(TikaOptions.class); + TikaIO.ParseAll read = TikaIO.parseAll() + .from(options.getInput()) + .withTikaConfigPath(options.getTikaConfigPath()) + .withContentTypeHint(options.getContentTypeHint()); DisplayData displayData = DisplayData.from(read); @@ -159,11 +162,6 @@ public void testReadDisplayDataWithCustomOptions() { assertEquals(3, displayData.items().size()); } - private static TikaOptions createOptions(String[] args) { - return PipelineOptionsFactory.fromArgs(args) - .withValidation().as(TikaOptions.class); - } - static class FilterMetadataFn extends DoFn { private static final long serialVersionUID = 6338014219600516621L; From 2672b8720b0e0eac8f4cac96f13abcf093e93d5f Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Fri, 6 Oct 2017 16:24:02 +0100 Subject: [PATCH 08/17] Continuing the refactoring, addressing some of the latest review comments --- .../org/apache/beam/sdk/io/tika/TikaIO.java | 101 +++++------------- .../apache/beam/sdk/io/tika/TikaIOTest.java | 35 +++--- 2 files changed, 44 insertions(+), 92 deletions(-) diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index ced97f71ca83..85ee1e4d6ef6 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; -import java.io.IOException; import java.io.InputStream; import java.nio.channels.Channels; @@ -28,6 +27,8 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.FileIO.ReadableFile; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.DoFn; @@ -36,7 +37,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import org.apache.tika.config.TikaConfig; -import org.apache.tika.exception.TikaException; import org.apache.tika.io.TikaInputStream; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.AutoDetectParser; @@ -44,7 +44,7 @@ import org.apache.tika.parser.Parser; import org.apache.tika.sax.ToTextContentHandler; import org.xml.sax.ContentHandler; -import org.xml.sax.SAXException; + @@ -88,7 +88,6 @@ public abstract static class ParseAll extends PTransform, PCollection> { private static final long serialVersionUID = 2198301984784351829L; - @Nullable abstract ValueProvider getFilepattern(); @Nullable abstract ValueProvider getTikaConfigPath(); @Nullable abstract Metadata getInputMetadata(); @@ -96,38 +95,12 @@ public abstract static class ParseAll extends @AutoValue.Builder abstract static class Builder { - abstract Builder setFilepattern(ValueProvider filepattern); abstract Builder setTikaConfigPath(ValueProvider tikaConfigPath); abstract Builder setInputMetadata(Metadata metadata); abstract ParseAll build(); } - /** - * A {@link PTransform} that parses one or more files with the given filename - * or filename pattern and returns a bounded {@link PCollection} containing - * one element for each sequence of characters reported by Apache Tika SAX Parser. - * - *

Filepattern can be a local path (if running locally), or a Google Cloud Storage - * filename or filename pattern of the form {@code "gs:///"} - * (if running locally or using remote execution service). - * - *

Standard Java - * Filesystem glob patterns ("*", "?", "[..]") are supported. - */ - public ParseAll from(String filepattern) { - checkNotNull(filepattern, "Filepattern cannot be empty."); - return from(StaticValueProvider.of(filepattern)); - } - - /** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */ - public ParseAll from(ValueProvider filepattern) { - checkNotNull(filepattern, "Filepattern cannot be empty."); - return toBuilder() - .setFilepattern(filepattern) - .build(); - } - /** * Returns a new transform which will use the custom TikaConfig. */ @@ -180,11 +153,6 @@ public PCollection expand(PCollection input) { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - //String filepatternDisplay = getFilepattern().isAccessible() - // ? getFilepattern().get() : getFilepattern().toString(); - //builder - // .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay) - // .withLabel("File Pattern")); if (getTikaConfigPath() != null) { String tikaConfigPathDisplay = getTikaConfigPath().isAccessible() ? getTikaConfigPath().get() : getTikaConfigPath().toString(); @@ -193,17 +161,8 @@ public void populateDisplayData(DisplayData.Builder builder) { } Metadata metadata = getInputMetadata(); if (metadata != null) { - StringBuilder sb = new StringBuilder(); - sb.append('['); - for (String name : metadata.names()) { - if (sb.length() > 1) { - sb.append(','); - } - sb.append(name).append('=').append(metadata.get(name)); - } - sb.append(']'); builder - .add(DisplayData.item("inputMetadata", sb.toString()) + .add(DisplayData.item("inputMetadata", metadata.toString().trim()) .withLabel("Input Metadata")); } } @@ -216,38 +175,34 @@ private static class ParseToStringFn extends DoFn { this.spec = spec; } @ProcessElement - public void processElement(ProcessContext c) throws IOException { + public void processElement(ProcessContext c) throws Exception { ReadableFile file = c.element(); InputStream stream = Channels.newInputStream(file.open()); - - final InputStream is = TikaInputStream.get(stream); - TikaConfig tikaConfig = null; - if (spec.getTikaConfigPath() != null) { - try { - tikaConfig = new TikaConfig(spec.getTikaConfigPath().get()); - } catch (TikaException | SAXException e) { - throw new IOException(e); + try (InputStream tikaStream = TikaInputStream.get(stream)) { + + TikaConfig tikaConfig = null; + if (spec.getTikaConfigPath() != null) { + ResourceId configResource = + FileSystems.matchSingleFileSpec(spec.getTikaConfigPath().get()).resourceId(); + tikaConfig = new TikaConfig( + Channels.newInputStream(FileSystems.open(configResource))); } - } - final Parser parser = tikaConfig == null ? new AutoDetectParser() - : new AutoDetectParser(tikaConfig); - final ParseContext context = new ParseContext(); - context.set(Parser.class, parser); - org.apache.tika.metadata.Metadata tikaMetadata = spec.getInputMetadata() != null - ? spec.getInputMetadata() : new org.apache.tika.metadata.Metadata(); - - ContentHandler tikaHandler = new ToTextContentHandler(); - try { - parser.parse(is, tikaHandler, tikaMetadata, context); - } catch (Exception ex) { - throw new IOException(ex); - } finally { - is.close(); - } - String content = tikaHandler.toString().trim(); - String filePath = file.getMetadata().resourceId().toString(); - c.output(new ParseResult(filePath, content, tikaMetadata)); + final Parser parser = tikaConfig == null + ? new AutoDetectParser() : new AutoDetectParser(tikaConfig); + + final ParseContext context = new ParseContext(); + context.set(Parser.class, parser); + org.apache.tika.metadata.Metadata tikaMetadata = spec.getInputMetadata() != null + ? spec.getInputMetadata() : new org.apache.tika.metadata.Metadata(); + + ContentHandler tikaHandler = new ToTextContentHandler(); + parser.parse(tikaStream, tikaHandler, tikaMetadata, context); + + c.output(new ParseResult(file.getMetadata().resourceId().toString(), + tikaHandler.toString(), + tikaMetadata)); + } } } } diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java index 66a3537587b6..ff05ee066ad4 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.tika.exception.TikaException; import org.apache.tika.metadata.Metadata; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -45,19 +44,23 @@ */ public class TikaIOTest { private static final String PDF_FILE = - "Combining\n\nApache Beam\n\nand\n\nApache Tika\n\ncan help to ingest\n\n" - + "the content from the files\n\nin most known formats."; + "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n" + + "Combining\n\nApache Beam\n\nand\n\nApache Tika\n\ncan help to ingest\n\n" + + "the content from the files\n\nin most known formats.\n\n\n"; private static final String PDF_ZIP_FILE = - "apache-beam-tika.pdf\n\n\nCombining\n\n\nApache Beam\n\n\nand\n\n\nApache Tika\n\n\n" - + "can help to ingest\n\n\nthe content from the files\n\n\nin most known formats."; + "\n\n\n\n\n\n\n\napache-beam-tika.pdf\n\n\nCombining\n\n\nApache Beam\n\n\n" + + "and\n\n\nApache Tika\n\n\ncan help to ingest\n\n\nthe content from the files\n\n\n" + + "in most known formats.\n\n\n\n\n\n\n"; private static final String ODT_FILE = - "Combining\nApache Beam\nand\nApache Tika\ncan help to ingest\n" - + "the content from the files\nin most known formats."; + "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n" + + "Combining\nApache Beam\nand\nApache Tika\ncan help to ingest\nthe content from the" + + " files\nin most known formats.\n"; private static final String ODT_FILE2 = - "Open Office\nPDF\nExcel\nText\nScientific\nand other formats\nare supported."; + "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n" + + "Open Office\nPDF\nExcel\nText\nScientific\nand other formats\nare supported.\n"; @Rule public TestPipeline p = TestPipeline.create(); @@ -119,29 +122,25 @@ public void testReadDamagedPdfFile() throws IOException { p.run(); fail("Transform failure is expected"); } catch (RuntimeException ex) { - assertTrue(ex.getCause().getCause() instanceof TikaException); + assertTrue(ex.getCause() instanceof TikaException); } } @Test - @Ignore public void testReadDisplayData() { TikaIO.ParseAll read = TikaIO.parseAll() - .from("foo.*") .withTikaConfigPath("tikaconfigpath") .withContentTypeHint("application/pdf"); DisplayData displayData = DisplayData.from(read); - assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); assertThat(displayData, hasDisplayItem("tikaConfigPath", "tikaconfigpath")); assertThat(displayData, hasDisplayItem("inputMetadata", - "[Content-Type=application/pdf]")); - assertEquals(3, displayData.items().size()); + "Content-Type=application/pdf")); + assertEquals(2, displayData.items().size()); } @Test - @Ignore public void testReadDisplayDataWithCustomOptions() { final String[] args = new String[]{"--input=/input/tika.pdf", "--tikaConfigPath=/tikaConfigPath", @@ -149,17 +148,15 @@ public void testReadDisplayDataWithCustomOptions() { TikaOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation().as(TikaOptions.class); TikaIO.ParseAll read = TikaIO.parseAll() - .from(options.getInput()) .withTikaConfigPath(options.getTikaConfigPath()) .withContentTypeHint(options.getContentTypeHint()); DisplayData displayData = DisplayData.from(read); - assertThat(displayData, hasDisplayItem("filePattern", "/input/tika.pdf")); assertThat(displayData, hasDisplayItem("tikaConfigPath", "/tikaConfigPath")); assertThat(displayData, hasDisplayItem("inputMetadata", - "[Content-Type=application/pdf]")); - assertEquals(3, displayData.items().size()); + "Content-Type=application/pdf")); + assertEquals(2, displayData.items().size()); } static class FilterMetadataFn extends DoFn { From 361363c759da977248e34d26e453c6eeab7a1152 Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Tue, 10 Oct 2017 14:52:13 +0100 Subject: [PATCH 09/17] Updating ParseResult.hashCode to use Arrays.hashCode --- .../apache/beam/sdk/io/tika/ParseResult.java | 4 +- .../beam/sdk/io/tika/ParseResultTest.java | 40 +++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) create mode 100644 sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java index 93566fe52a45..580ec0fe2aa8 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.io.tika; import java.io.Serializable; -import java.util.Objects; +import java.util.Arrays; import org.apache.tika.metadata.Metadata; @@ -89,7 +89,7 @@ public boolean equals(Object obj) { private int getMetadataHashCode() { int hashCode = 0; for (String name : metadataNames) { - hashCode += name.hashCode() ^ Objects.hashCode(metadata.getValues(name)); + hashCode += name.hashCode() ^ Arrays.hashCode(metadata.getValues(name)); } return hashCode; } diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java new file mode 100644 index 000000000000..fd5d30270cb3 --- /dev/null +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.tika; + +import static org.junit.Assert.assertEquals; +import org.apache.tika.metadata.Metadata; +import org.junit.Test; + +/** + * Tests ParseResult. + */ +public class ParseResultTest { + @Test + public void testEqualsAndHashCode() { + ParseResult p1 = new ParseResult("a.txt", "hello", getMetadata()); + ParseResult p2 = new ParseResult("a.txt", "hello", getMetadata()); + assertEquals(p1, p2); + } + + static Metadata getMetadata() { + Metadata m = new Metadata(); + m.set("Author", "BeamTikaUser"); + return m; + } +} From 739f2111dc7ddc3a4eb62b4cbb280ae8b7c008af Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Wed, 11 Oct 2017 16:11:45 +0100 Subject: [PATCH 10/17] Reacting to the most of the latest review comments --- .../apache/beam/sdk/io/tika/ParseResult.java | 2 + .../org/apache/beam/sdk/io/tika/TikaIO.java | 5 +- .../apache/beam/sdk/io/tika/TikaOptions.java | 53 ------------------- .../beam/sdk/io/tika/ParseResultTest.java | 5 +- .../apache/beam/sdk/io/tika/TikaIOTest.java | 25 +++------ 5 files changed, 17 insertions(+), 73 deletions(-) delete mode 100644 sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java index 580ec0fe2aa8..a4dc9ef4787b 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java @@ -86,6 +86,8 @@ public boolean equals(Object obj) { && this.metadata.equals(pr.metadata); } + //TODO: + // Remove this function and use metadata.hashCode() once Apache Tika 1.17 gets released. private int getMetadataHashCode() { int hashCode = 0; for (String name : metadataNames) { diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index 85ee1e4d6ef6..587d1719ba53 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -161,6 +161,7 @@ public void populateDisplayData(DisplayData.Builder builder) { } Metadata metadata = getInputMetadata(); if (metadata != null) { + //TODO: use metadata.toString() only without a trim() once Apache Tika 1.17 gets released builder .add(DisplayData.item("inputMetadata", metadata.toString().trim()) .withLabel("Input Metadata")); @@ -170,7 +171,7 @@ public void populateDisplayData(DisplayData.Builder builder) { private static class ParseToStringFn extends DoFn { private static final long serialVersionUID = 6837207505313720989L; - private TikaIO.ParseAll spec; + private final TikaIO.ParseAll spec; ParseToStringFn(TikaIO.ParseAll spec) { this.spec = spec; } @@ -193,7 +194,7 @@ public void processElement(ProcessContext c) throws Exception { final ParseContext context = new ParseContext(); context.set(Parser.class, parser); - org.apache.tika.metadata.Metadata tikaMetadata = spec.getInputMetadata() != null + Metadata tikaMetadata = spec.getInputMetadata() != null ? spec.getInputMetadata() : new org.apache.tika.metadata.Metadata(); ContentHandler tikaHandler = new ToTextContentHandler(); diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java deleted file mode 100644 index 24349574054f..000000000000 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.tika; - -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.Validation; - -/** - * TikaInput Options to support the command-line applications. - */ -public interface TikaOptions extends PipelineOptions { - - @Description("Input path") - @Validation.Required - String getInput(); - void setInput(String value); - - @Description("Tika Config path") - String getTikaConfigPath(); - void setTikaConfigPath(String value); - - @Description("Tika Parser Content Type hint") - String getContentTypeHint(); - void setContentTypeHint(String value); - - @Description("Pipeline name") - @Default.String("TikaRead") - String getPipelineName(); - void setPipelineName(String value); - - @Description("Output path") - @Default.String("/tmp/tika/out") - String getOutput(); - void setOutput(String value); - -} diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java index fd5d30270cb3..fd86152c6888 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java @@ -30,11 +30,14 @@ public void testEqualsAndHashCode() { ParseResult p1 = new ParseResult("a.txt", "hello", getMetadata()); ParseResult p2 = new ParseResult("a.txt", "hello", getMetadata()); assertEquals(p1, p2); + assertEquals(p1.hashCode(), p2.hashCode()); } static Metadata getMetadata() { Metadata m = new Metadata(); - m.set("Author", "BeamTikaUser"); + m.add("Author", "BeamTikaUser"); + m.add("Author", "BeamTikaUser2"); + m.add("Date", "2017-09-01"); return m; } } diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java index ff05ee066ad4..d79a943bb919 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java @@ -18,16 +18,14 @@ package org.apache.beam.sdk.io.tika; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.IOException; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileIO; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; @@ -38,6 +36,7 @@ import org.apache.tika.metadata.Metadata; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; /** * Tests TikaInput. @@ -64,6 +63,8 @@ public class TikaIOTest { @Rule public TestPipeline p = TestPipeline.create(); + @Rule + public ExpectedException thrown = ExpectedException.none(); @Test public void testReadPdfFile() throws IOException { @@ -112,18 +113,13 @@ private void doTestReadFiles(String resourcePath, ParseResult... expectedResults @Test public void testReadDamagedPdfFile() throws IOException { - + thrown.expectCause(isA(TikaException.class)); String resourcePath = getClass().getResource("/damaged.pdf").getPath(); p.apply("ParseInvalidPdfFile", FileIO.match().filepattern(resourcePath)) .apply(FileIO.readMatches()) .apply(TikaIO.parseAll()); - try { - p.run(); - fail("Transform failure is expected"); - } catch (RuntimeException ex) { - assertTrue(ex.getCause() instanceof TikaException); - } + p.run(); } @Test @@ -142,14 +138,9 @@ public void testReadDisplayData() { @Test public void testReadDisplayDataWithCustomOptions() { - final String[] args = new String[]{"--input=/input/tika.pdf", - "--tikaConfigPath=/tikaConfigPath", - "--contentTypeHint=application/pdf"}; - TikaOptions options = PipelineOptionsFactory.fromArgs(args) - .withValidation().as(TikaOptions.class); TikaIO.ParseAll read = TikaIO.parseAll() - .withTikaConfigPath(options.getTikaConfigPath()) - .withContentTypeHint(options.getContentTypeHint()); + .withTikaConfigPath("/tikaConfigPath") + .withContentTypeHint("application/pdf"); DisplayData displayData = DisplayData.from(read); From 4856b6b727847848a757a627bb94512db4462df0 Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Mon, 16 Oct 2017 17:43:02 +0100 Subject: [PATCH 11/17] Continuing cleaning up TikaIO --- .../apache/beam/sdk/io/tika/ParseResult.java | 13 ++----- .../org/apache/beam/sdk/io/tika/TikaIO.java | 24 +++++++------ .../apache/beam/sdk/io/tika/TikaIOTest.java | 35 ++++++++++--------- 3 files changed, 34 insertions(+), 38 deletions(-) diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java index a4dc9ef4787b..0a77491cdfdb 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java @@ -26,8 +26,8 @@ * Tika parse result containing the file location, metadata * and content converted to String. */ +@SuppressWarnings("serial") public class ParseResult implements Serializable { - private static final long serialVersionUID = 6133510503781405912L; private final String fileLocation; private final String content; private final Metadata metadata; @@ -55,7 +55,7 @@ public String getContent() { * Gets a file metadata. */ public Metadata getMetadata() { - return getMetadataCopy(); + return metadata; } /** @@ -95,13 +95,4 @@ private int getMetadataHashCode() { } return hashCode; } - private Metadata getMetadataCopy() { - Metadata metadataCopy = new Metadata(); - for (String name : metadataNames) { - for (String value : metadata.getValues(name)) { - metadataCopy.add(name, value); - } - } - return metadataCopy; - } } diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index 587d1719ba53..5a3a9f0bf60e 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -52,20 +52,22 @@ * {@link PTransform} for parsing arbitrary files using Apache Tika. * Files in many well known text, binary or scientific formats can be processed. * - *

To read a {@link PCollection} from one or more files - * use {@link TikaIO.Read#from(String)} - * to specify the path of the file(s) to be read. + *

Combine {@link TikaIO.ParseAll} with {@link FileIO.Match} + * and {@link FileIO.ReadMatches} to read and parse the files. * - *

{@link TikaIO.Read} returns a bounded {@link PCollection} of {@link String Strings}, - * each corresponding to a sequence of characters reported by Apache Tika SAX Parser. + *

{@link TikaIO.ParseAll} returns a bounded bounded {@link PCollection} + * containing one {@link ParseResult} per each file. * *

Example: * *

{@code
  * Pipeline p = ...;
  *
- * // A simple Read of a local PDF file (only runs locally):
- * PCollection content = p.apply(TikaInput.from("/local/path/to/file.pdf"));
+ * // A simple parse of a local PDF file (only runs locally):
+ * PCollection results =
+ *   p.apply(FileIO.match().filepattern("/local/path/to/file.pdf"))
+ *    .apply(FileIO.readMatches())
+      .apply(TikaIO.parseAll());
  * }
* * Warning: the API of this IO is likely to change in the next release. @@ -74,19 +76,19 @@ public class TikaIO { /** - * A {@link PTransform} that parses one or more files and returns a bounded {@link PCollection} - * containing one element for each sequence of characters reported by Apache Tika SAX Parser. + * A {@link PTransform} that parses one or more files and returns a bounded + * {@link PCollection} containing one {@link ParseResult} per each file. */ public static ParseAll parseAll() { return new AutoValue_TikaIO_ParseAll.Builder() .build(); } - /** Implementation of {@link #read}. */ + /** Implementation of {@link #parseAll}. */ + @SuppressWarnings("serial") @AutoValue public abstract static class ParseAll extends PTransform, PCollection> { - private static final long serialVersionUID = 2198301984784351829L; @Nullable abstract ValueProvider getTikaConfigPath(); @Nullable abstract Metadata getInputMetadata(); diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java index d79a943bb919..98080c334586 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java @@ -67,40 +67,40 @@ public class TikaIOTest { public ExpectedException thrown = ExpectedException.none(); @Test - public void testReadPdfFile() throws IOException { + public void testParseAllPdfFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath(); - doTestReadFiles(resourcePath, new ParseResult(resourcePath, PDF_FILE)); + doTestParseAllFiles(resourcePath, new ParseResult(resourcePath, PDF_FILE)); } @Test - public void testReadZipPdfFile() throws IOException { + public void testParseAllZipPdfFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika-pdf.zip").getPath(); - doTestReadFiles(resourcePath, new ParseResult(resourcePath, PDF_ZIP_FILE)); + doTestParseAllFiles(resourcePath, new ParseResult(resourcePath, PDF_ZIP_FILE)); } @Test - public void testReadOdtFile() throws IOException { + public void testParseAllOdtFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - doTestReadFiles(resourcePath, new ParseResult(resourcePath, ODT_FILE, getOdtMetadata())); + doTestParseAllFiles(resourcePath, new ParseResult(resourcePath, ODT_FILE, getOdtMetadata())); } @Test - public void testReadOdtFiles() throws IOException { + public void testParseAllOdtFiles() throws IOException { String resourcePath1 = getClass().getResource("/apache-beam-tika1.odt").getPath(); String resourcePath2 = getClass().getResource("/apache-beam-tika2.odt").getPath(); String resourcePath = resourcePath1.replace("apache-beam-tika1", "*"); - doTestReadFiles(resourcePath, new ParseResult(resourcePath1, ODT_FILE, getOdtMetadata()), + doTestParseAllFiles(resourcePath, new ParseResult(resourcePath1, ODT_FILE, getOdtMetadata()), new ParseResult(resourcePath2, ODT_FILE2)); } - private void doTestReadFiles(String resourcePath, ParseResult... expectedResults) + private void doTestParseAllFiles(String resourcePath, ParseResult... expectedResults) throws IOException { PCollection output = p.apply("ParseFiles", FileIO.match().filepattern(resourcePath)) @@ -112,7 +112,7 @@ private void doTestReadFiles(String resourcePath, ParseResult... expectedResults } @Test - public void testReadDamagedPdfFile() throws IOException { + public void testParseAllDamagedPdfFile() throws IOException { thrown.expectCause(isA(TikaException.class)); String resourcePath = getClass().getResource("/damaged.pdf").getPath(); @@ -123,12 +123,12 @@ public void testReadDamagedPdfFile() throws IOException { } @Test - public void testReadDisplayData() { - TikaIO.ParseAll read = TikaIO.parseAll() + public void testParseAllDisplayData() { + TikaIO.ParseAll parseAll = TikaIO.parseAll() .withTikaConfigPath("tikaconfigpath") .withContentTypeHint("application/pdf"); - DisplayData displayData = DisplayData.from(read); + DisplayData displayData = DisplayData.from(parseAll); assertThat(displayData, hasDisplayItem("tikaConfigPath", "tikaconfigpath")); assertThat(displayData, hasDisplayItem("inputMetadata", @@ -137,12 +137,12 @@ public void testReadDisplayData() { } @Test - public void testReadDisplayDataWithCustomOptions() { - TikaIO.ParseAll read = TikaIO.parseAll() + public void testParseAllDisplayDataWithCustomOptions() { + TikaIO.ParseAll parseAll = TikaIO.parseAll() .withTikaConfigPath("/tikaConfigPath") .withContentTypeHint("application/pdf"); - DisplayData displayData = DisplayData.from(read); + DisplayData displayData = DisplayData.from(parseAll); assertThat(displayData, hasDisplayItem("tikaConfigPath", "/tikaConfigPath")); assertThat(displayData, hasDisplayItem("inputMetadata", @@ -157,6 +157,9 @@ static class FilterMetadataFn extends DoFn { public void processElement(ProcessContext c) { ParseResult result = c.element(); Metadata m = new Metadata(); + // Files contain many metadata properties. This function drops all but the"Author" + // property manually added to "apache-beam-tika1.odt" resource only to make + // the tests simpler if (result.getFileLocation().endsWith("apache-beam-tika1.odt")) { m.set("Author", result.getMetadata().get("Author")); } From f441d26a3e52f849775b30e741b63de80bf25237 Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Mon, 16 Oct 2017 18:04:26 +0100 Subject: [PATCH 12/17] Minor update to the docs --- .../tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index 5a3a9f0bf60e..c87724ae14e5 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -55,7 +55,7 @@ *

Combine {@link TikaIO.ParseAll} with {@link FileIO.Match} * and {@link FileIO.ReadMatches} to read and parse the files. * - *

{@link TikaIO.ParseAll} returns a bounded bounded {@link PCollection} + *

{@link TikaIO.ParseAll} returns a bounded {@link PCollection} * containing one {@link ParseResult} per each file. * *

Example: From 683f27fab766109fea718a495286b2914061e0e7 Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Tue, 17 Oct 2017 13:30:05 +0100 Subject: [PATCH 13/17] Initializing TikaConfig in ParseToStringFn setup() --- .../org/apache/beam/sdk/io/tika/TikaIO.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index c87724ae14e5..8f94169ddd9d 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -174,23 +174,28 @@ private static class ParseToStringFn extends DoFn { private static final long serialVersionUID = 6837207505313720989L; private final TikaIO.ParseAll spec; + private TikaConfig tikaConfig; + ParseToStringFn(TikaIO.ParseAll spec) { this.spec = spec; } + + @Setup + public void setup() throws Exception { + if (spec.getTikaConfigPath() != null) { + ResourceId configResource = + FileSystems.matchSingleFileSpec(spec.getTikaConfigPath().get()).resourceId(); + tikaConfig = new TikaConfig( + Channels.newInputStream(FileSystems.open(configResource))); + } + } + @ProcessElement public void processElement(ProcessContext c) throws Exception { ReadableFile file = c.element(); InputStream stream = Channels.newInputStream(file.open()); try (InputStream tikaStream = TikaInputStream.get(stream)) { - TikaConfig tikaConfig = null; - if (spec.getTikaConfigPath() != null) { - ResourceId configResource = - FileSystems.matchSingleFileSpec(spec.getTikaConfigPath().get()).resourceId(); - tikaConfig = new TikaConfig( - Channels.newInputStream(FileSystems.open(configResource))); - } - final Parser parser = tikaConfig == null ? new AutoDetectParser() : new AutoDetectParser(tikaConfig); From 99148f06f01af6a6f54a26ac498e8baab44a88cd Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Wed, 18 Oct 2017 17:10:16 +0100 Subject: [PATCH 14/17] Renaming ParseAll to ParseFiles, introducing ParseAll shortcut --- .../org/apache/beam/sdk/io/tika/TikaIO.java | 100 ++++++++++++++---- .../apache/beam/sdk/io/tika/TikaIOTest.java | 51 ++++++--- 2 files changed, 114 insertions(+), 37 deletions(-) diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index 8f94169ddd9d..637d808bb7fa 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.tika; import static com.google.common.base.Preconditions.checkNotNull; + import com.google.auto.value.AutoValue; import java.io.InputStream; @@ -26,6 +27,8 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; @@ -35,6 +38,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.tika.config.TikaConfig; import org.apache.tika.io.TikaInputStream; @@ -49,14 +53,14 @@ /** - * {@link PTransform} for parsing arbitrary files using Apache Tika. + * A collection of {@link PTransform} transforms for parsing arbitrary files using Apache Tika. * Files in many well known text, binary or scientific formats can be processed. * - *

Combine {@link TikaIO.ParseAll} with {@link FileIO.Match} - * and {@link FileIO.ReadMatches} to read and parse the files. + *

{@link TikaIO.ParseAll} and {@link TikaIO.ParseFiles} parse the files and return + * a bounded {@link PCollection} containing one {@link ParseResult} per each file. * - *

{@link TikaIO.ParseAll} returns a bounded {@link PCollection} - * containing one {@link ParseResult} per each file. + *

Combine {@link TikaIO.ParseFiles} with {@link FileIO.Match} + * and {@link FileIO.ReadMatches} to match, read and parse the files. * *

Example: * @@ -67,7 +71,19 @@ * PCollection results = * p.apply(FileIO.match().filepattern("/local/path/to/file.pdf")) * .apply(FileIO.readMatches()) - .apply(TikaIO.parseAll()); + * .apply(TikaIO.parseFiles()); + * } + * + *

Use {@link TikaIO.ParseAll} to match, read and parse the files in simple cases. + * + *

Example: + * + *

{@code
+ * Pipeline p = ...;
+ *
+ * // A simple parse of a local PDF file (only runs locally):
+ * PCollection results =
+ *   p.apply(TikaIO.parseAll().filepattern("/local/path/to/file.pdf"));
  * }
* * Warning: the API of this IO is likely to change in the next release. @@ -76,18 +92,62 @@ public class TikaIO { /** - * A {@link PTransform} that parses one or more files and returns a bounded - * {@link PCollection} containing one {@link ParseResult} per each file. + * A {@link PTransform} that matches and parses the files + * and returns a bounded {@link PCollection} of {@link ParseResult}. */ - public static ParseAll parseAll() { - return new AutoValue_TikaIO_ParseAll.Builder() + public static ParseAll parseAll() { + return new AutoValue_TikaIO_ParseAll.Builder() .build(); - } + } - /** Implementation of {@link #parseAll}. */ + /** + * A {@link PTransform} that accepts a bounded {@link PCollection} of {@link ReadableFile} + * and returns a bounded {@link PCollection} of {@link ParseResult}. + */ + public static ParseFiles parseFiles() { + return new AutoValue_TikaIO_ParseFiles.Builder() + .build(); + } + + /** Implementation of {@link #parseAll}. */ @SuppressWarnings("serial") @AutoValue - public abstract static class ParseAll extends + public abstract static class ParseAll extends PTransform> { + @Nullable + abstract ValueProvider getFilepattern(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setFilepattern(ValueProvider filepattern); + + abstract ParseAll build(); + } + + /** Matches the given filepattern. */ + public ParseAll filepattern(String filepattern) { + return this.filepattern(ValueProvider.StaticValueProvider.of(filepattern)); + } + + /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */ + public ParseAll filepattern(ValueProvider filepattern) { + return toBuilder().setFilepattern(filepattern).build(); + } + + @Override + public PCollection expand(PBegin input) { + return input + .apply(FileIO.match().filepattern(getFilepattern())) + .apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)) + .apply(parseFiles()); + } + } + + /** Implementation of {@link #parseFiles}. */ + @SuppressWarnings("serial") + @AutoValue + public abstract static class ParseFiles extends PTransform, PCollection> { @Nullable abstract ValueProvider getTikaConfigPath(); @@ -100,19 +160,19 @@ abstract static class Builder { abstract Builder setTikaConfigPath(ValueProvider tikaConfigPath); abstract Builder setInputMetadata(Metadata metadata); - abstract ParseAll build(); + abstract ParseFiles build(); } /** * Returns a new transform which will use the custom TikaConfig. */ - public ParseAll withTikaConfigPath(String tikaConfigPath) { + public ParseFiles withTikaConfigPath(String tikaConfigPath) { checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty."); return withTikaConfigPath(StaticValueProvider.of(tikaConfigPath)); } /** Same as {@code with(tikaConfigPath)}, but accepting a {@link ValueProvider}. */ - public ParseAll withTikaConfigPath(ValueProvider tikaConfigPath) { + public ParseFiles withTikaConfigPath(ValueProvider tikaConfigPath) { checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty."); return toBuilder() .setTikaConfigPath(tikaConfigPath) @@ -123,7 +183,7 @@ public ParseAll withTikaConfigPath(ValueProvider tikaConfigPath) { * Returns a new transform which will use the provided content type hint * to make the file parser detection more efficient. */ - public ParseAll withContentTypeHint(String contentType) { + public ParseFiles withContentTypeHint(String contentType) { checkNotNull(contentType, "ContentType cannot be empty."); Metadata metadata = new Metadata(); metadata.add(Metadata.CONTENT_TYPE, contentType); @@ -134,7 +194,7 @@ public ParseAll withContentTypeHint(String contentType) { * Returns a new transform which will use the provided input metadata * for parsing the files. */ - public ParseAll withInputMetadata(Metadata metadata) { + public ParseFiles withInputMetadata(Metadata metadata) { Metadata inputMetadata = this.getInputMetadata(); if (inputMetadata != null) { for (String name : metadata.names()) { @@ -173,10 +233,10 @@ public void populateDisplayData(DisplayData.Builder builder) { private static class ParseToStringFn extends DoFn { private static final long serialVersionUID = 6837207505313720989L; - private final TikaIO.ParseAll spec; + private final TikaIO.ParseFiles spec; private TikaConfig tikaConfig; - ParseToStringFn(TikaIO.ParseAll spec) { + ParseToStringFn(TikaIO.ParseFiles spec) { this.spec = spec; } diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java index 98080c334586..b6104172d4dd 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java @@ -71,64 +71,81 @@ public void testParseAllPdfFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath(); - doTestParseAllFiles(resourcePath, new ParseResult(resourcePath, PDF_FILE)); + doTestParseAll(resourcePath, new ParseResult(resourcePath, PDF_FILE)); + } + + private void doTestParseAll(String resourcePath, ParseResult... expectedResults) + throws IOException { + PCollection output = + p.apply("ParseAll", TikaIO.parseAll().filepattern(resourcePath)) + .apply(ParDo.of(new FilterMetadataFn())); + PAssert.that(output).containsInAnyOrder(expectedResults); + p.run(); + } + + @Test + public void testParseFilesPdfFile() throws IOException { + + String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath(); + + doTestParseFiles(resourcePath, new ParseResult(resourcePath, PDF_FILE)); } @Test - public void testParseAllZipPdfFile() throws IOException { + public void testParseFilesZipPdfFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika-pdf.zip").getPath(); - doTestParseAllFiles(resourcePath, new ParseResult(resourcePath, PDF_ZIP_FILE)); + doTestParseFiles(resourcePath, new ParseResult(resourcePath, PDF_ZIP_FILE)); } @Test - public void testParseAllOdtFile() throws IOException { + public void testParseFilesOdtFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - doTestParseAllFiles(resourcePath, new ParseResult(resourcePath, ODT_FILE, getOdtMetadata())); + doTestParseFiles(resourcePath, new ParseResult(resourcePath, ODT_FILE, getOdtMetadata())); } @Test - public void testParseAllOdtFiles() throws IOException { + public void testParseFilesOdtFiles() throws IOException { String resourcePath1 = getClass().getResource("/apache-beam-tika1.odt").getPath(); String resourcePath2 = getClass().getResource("/apache-beam-tika2.odt").getPath(); String resourcePath = resourcePath1.replace("apache-beam-tika1", "*"); - doTestParseAllFiles(resourcePath, new ParseResult(resourcePath1, ODT_FILE, getOdtMetadata()), + doTestParseFiles(resourcePath, new ParseResult(resourcePath1, ODT_FILE, getOdtMetadata()), new ParseResult(resourcePath2, ODT_FILE2)); } - private void doTestParseAllFiles(String resourcePath, ParseResult... expectedResults) + private void doTestParseFiles(String resourcePath, ParseResult... expectedResults) throws IOException { PCollection output = p.apply("ParseFiles", FileIO.match().filepattern(resourcePath)) .apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)) - .apply(TikaIO.parseAll()) + .apply(TikaIO.parseFiles()) .apply(ParDo.of(new FilterMetadataFn())); PAssert.that(output).containsInAnyOrder(expectedResults); p.run(); } @Test - public void testParseAllDamagedPdfFile() throws IOException { + public void testParseFilesDamagedPdfFile() throws IOException { thrown.expectCause(isA(TikaException.class)); String resourcePath = getClass().getResource("/damaged.pdf").getPath(); p.apply("ParseInvalidPdfFile", FileIO.match().filepattern(resourcePath)) .apply(FileIO.readMatches()) - .apply(TikaIO.parseAll()); + .apply(TikaIO.parseFiles()); p.run(); } @Test - public void testParseAllDisplayData() { - TikaIO.ParseAll parseAll = TikaIO.parseAll() + public void testParseFilesDisplayData() { + TikaIO.ParseFiles parseFiles = TikaIO.parseFiles() .withTikaConfigPath("tikaconfigpath") .withContentTypeHint("application/pdf"); - DisplayData displayData = DisplayData.from(parseAll); + DisplayData displayData = DisplayData.from(parseFiles); assertThat(displayData, hasDisplayItem("tikaConfigPath", "tikaconfigpath")); assertThat(displayData, hasDisplayItem("inputMetadata", @@ -137,12 +154,12 @@ public void testParseAllDisplayData() { } @Test - public void testParseAllDisplayDataWithCustomOptions() { - TikaIO.ParseAll parseAll = TikaIO.parseAll() + public void testParseFilesDisplayDataWithCustomOptions() { + TikaIO.ParseFiles parseFiles = TikaIO.parseFiles() .withTikaConfigPath("/tikaConfigPath") .withContentTypeHint("application/pdf"); - DisplayData displayData = DisplayData.from(parseAll); + DisplayData displayData = DisplayData.from(parseFiles); assertThat(displayData, hasDisplayItem("tikaConfigPath", "/tikaConfigPath")); assertThat(displayData, hasDisplayItem("inputMetadata", From cea903e5e1c858da51fc0e7ec67006a34f296ce9 Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Wed, 18 Oct 2017 17:19:24 +0100 Subject: [PATCH 15/17] Adding ParseAll DisplayData test --- .../main/java/org/apache/beam/sdk/io/tika/TikaIO.java | 9 +++++++++ .../java/org/apache/beam/sdk/io/tika/TikaIOTest.java | 10 ++++++++++ 2 files changed, 19 insertions(+) diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index 637d808bb7fa..cf89fe9bb4db 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -135,6 +135,15 @@ public ParseAll filepattern(ValueProvider filepattern) { return toBuilder().setFilepattern(filepattern).build(); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder + .addIfNotNull( + DisplayData.item("filePattern", getFilepattern()).withLabel("File Pattern")); + } + @Override public PCollection expand(PBegin input) { return input diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java index b6104172d4dd..f551ea496ba1 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java @@ -139,6 +139,16 @@ public void testParseFilesDamagedPdfFile() throws IOException { p.run(); } + @Test + public void testParseAllDisplayData() { + TikaIO.ParseAll parseAll = TikaIO.parseAll().filepattern("file.pdf"); + + DisplayData displayData = DisplayData.from(parseAll); + + assertThat(displayData, hasDisplayItem("filePattern", "file.pdf")); + assertEquals(1, displayData.items().size()); + } + @Test public void testParseFilesDisplayData() { TikaIO.ParseFiles parseFiles = TikaIO.parseFiles() From 8bc5ee2f0a0187f539342871ea849316a97b9bf0 Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Fri, 20 Oct 2017 10:55:39 +0100 Subject: [PATCH 16/17] Renaming ParseAll to Parse and ParseFiles to the original ParseAll, minor doc tweaks --- .../org/apache/beam/sdk/io/tika/TikaIO.java | 50 +++++++++--------- .../apache/beam/sdk/io/tika/TikaIOTest.java | 52 +++++++++---------- 2 files changed, 51 insertions(+), 51 deletions(-) diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index cf89fe9bb4db..32353e1c0685 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -56,10 +56,10 @@ * A collection of {@link PTransform} transforms for parsing arbitrary files using Apache Tika. * Files in many well known text, binary or scientific formats can be processed. * - *

{@link TikaIO.ParseAll} and {@link TikaIO.ParseFiles} parse the files and return - * a bounded {@link PCollection} containing one {@link ParseResult} per each file. + *

{@link TikaIO.Parse} and {@link TikaIO.ParseAll} parse the files and return + * a {@link PCollection} containing one {@link ParseResult} per each file. * - *

Combine {@link TikaIO.ParseFiles} with {@link FileIO.Match} + *

Combine {@link TikaIO.ParseAll} with {@link FileIO.Match} * and {@link FileIO.ReadMatches} to match, read and parse the files. * *

Example: @@ -74,7 +74,7 @@ * .apply(TikaIO.parseFiles()); * } * - *

Use {@link TikaIO.ParseAll} to match, read and parse the files in simple cases. + *

Use {@link TikaIO.Parse} to match, read and parse the files in simple cases. * *

Example: * @@ -95,24 +95,24 @@ public class TikaIO { * A {@link PTransform} that matches and parses the files * and returns a bounded {@link PCollection} of {@link ParseResult}. */ - public static ParseAll parseAll() { - return new AutoValue_TikaIO_ParseAll.Builder() + public static Parse parse() { + return new AutoValue_TikaIO_Parse.Builder() .build(); } /** - * A {@link PTransform} that accepts a bounded {@link PCollection} of {@link ReadableFile} - * and returns a bounded {@link PCollection} of {@link ParseResult}. + * A {@link PTransform} that accepts a {@link PCollection} of {@link ReadableFile} + * and returns a {@link PCollection} of {@link ParseResult}. */ - public static ParseFiles parseFiles() { - return new AutoValue_TikaIO_ParseFiles.Builder() + public static ParseAll parseAll() { + return new AutoValue_TikaIO_ParseAll.Builder() .build(); } - /** Implementation of {@link #parseAll}. */ + /** Implementation of {@link #parse}. */ @SuppressWarnings("serial") @AutoValue - public abstract static class ParseAll extends PTransform> { + public abstract static class Parse extends PTransform> { @Nullable abstract ValueProvider getFilepattern(); @@ -122,16 +122,16 @@ public abstract static class ParseAll extends PTransform filepattern); - abstract ParseAll build(); + abstract Parse build(); } /** Matches the given filepattern. */ - public ParseAll filepattern(String filepattern) { + public Parse filepattern(String filepattern) { return this.filepattern(ValueProvider.StaticValueProvider.of(filepattern)); } /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */ - public ParseAll filepattern(ValueProvider filepattern) { + public Parse filepattern(ValueProvider filepattern) { return toBuilder().setFilepattern(filepattern).build(); } @@ -149,14 +149,14 @@ public PCollection expand(PBegin input) { return input .apply(FileIO.match().filepattern(getFilepattern())) .apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)) - .apply(parseFiles()); + .apply(parseAll()); } } - /** Implementation of {@link #parseFiles}. */ + /** Implementation of {@link #parseAll}. */ @SuppressWarnings("serial") @AutoValue - public abstract static class ParseFiles extends + public abstract static class ParseAll extends PTransform, PCollection> { @Nullable abstract ValueProvider getTikaConfigPath(); @@ -169,19 +169,19 @@ abstract static class Builder { abstract Builder setTikaConfigPath(ValueProvider tikaConfigPath); abstract Builder setInputMetadata(Metadata metadata); - abstract ParseFiles build(); + abstract ParseAll build(); } /** * Returns a new transform which will use the custom TikaConfig. */ - public ParseFiles withTikaConfigPath(String tikaConfigPath) { + public ParseAll withTikaConfigPath(String tikaConfigPath) { checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty."); return withTikaConfigPath(StaticValueProvider.of(tikaConfigPath)); } /** Same as {@code with(tikaConfigPath)}, but accepting a {@link ValueProvider}. */ - public ParseFiles withTikaConfigPath(ValueProvider tikaConfigPath) { + public ParseAll withTikaConfigPath(ValueProvider tikaConfigPath) { checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty."); return toBuilder() .setTikaConfigPath(tikaConfigPath) @@ -192,7 +192,7 @@ public ParseFiles withTikaConfigPath(ValueProvider tikaConfigPath) { * Returns a new transform which will use the provided content type hint * to make the file parser detection more efficient. */ - public ParseFiles withContentTypeHint(String contentType) { + public ParseAll withContentTypeHint(String contentType) { checkNotNull(contentType, "ContentType cannot be empty."); Metadata metadata = new Metadata(); metadata.add(Metadata.CONTENT_TYPE, contentType); @@ -203,7 +203,7 @@ public ParseFiles withContentTypeHint(String contentType) { * Returns a new transform which will use the provided input metadata * for parsing the files. */ - public ParseFiles withInputMetadata(Metadata metadata) { + public ParseAll withInputMetadata(Metadata metadata) { Metadata inputMetadata = this.getInputMetadata(); if (inputMetadata != null) { for (String name : metadata.names()) { @@ -242,10 +242,10 @@ public void populateDisplayData(DisplayData.Builder builder) { private static class ParseToStringFn extends DoFn { private static final long serialVersionUID = 6837207505313720989L; - private final TikaIO.ParseFiles spec; + private final TikaIO.ParseAll spec; private TikaConfig tikaConfig; - ParseToStringFn(TikaIO.ParseFiles spec) { + ParseToStringFn(TikaIO.ParseAll spec) { this.spec = spec; } diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java index f551ea496ba1..a985b0a62146 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java @@ -67,95 +67,95 @@ public class TikaIOTest { public ExpectedException thrown = ExpectedException.none(); @Test - public void testParseAllPdfFile() throws IOException { + public void testParsePdfFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath(); - doTestParseAll(resourcePath, new ParseResult(resourcePath, PDF_FILE)); + doTestParse(resourcePath, new ParseResult(resourcePath, PDF_FILE)); } - private void doTestParseAll(String resourcePath, ParseResult... expectedResults) + private void doTestParse(String resourcePath, ParseResult... expectedResults) throws IOException { PCollection output = - p.apply("ParseAll", TikaIO.parseAll().filepattern(resourcePath)) + p.apply("ParseAll", TikaIO.parse().filepattern(resourcePath)) .apply(ParDo.of(new FilterMetadataFn())); PAssert.that(output).containsInAnyOrder(expectedResults); p.run(); } @Test - public void testParseFilesPdfFile() throws IOException { + public void testParseAllPdfFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath(); - doTestParseFiles(resourcePath, new ParseResult(resourcePath, PDF_FILE)); + doTestParseAll(resourcePath, new ParseResult(resourcePath, PDF_FILE)); } @Test - public void testParseFilesZipPdfFile() throws IOException { + public void testParseAllZipPdfFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika-pdf.zip").getPath(); - doTestParseFiles(resourcePath, new ParseResult(resourcePath, PDF_ZIP_FILE)); + doTestParseAll(resourcePath, new ParseResult(resourcePath, PDF_ZIP_FILE)); } @Test - public void testParseFilesOdtFile() throws IOException { + public void testParseAllOdtFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - doTestParseFiles(resourcePath, new ParseResult(resourcePath, ODT_FILE, getOdtMetadata())); + doTestParseAll(resourcePath, new ParseResult(resourcePath, ODT_FILE, getOdtMetadata())); } @Test - public void testParseFilesOdtFiles() throws IOException { + public void testParseAllOdtFiles() throws IOException { String resourcePath1 = getClass().getResource("/apache-beam-tika1.odt").getPath(); String resourcePath2 = getClass().getResource("/apache-beam-tika2.odt").getPath(); String resourcePath = resourcePath1.replace("apache-beam-tika1", "*"); - doTestParseFiles(resourcePath, new ParseResult(resourcePath1, ODT_FILE, getOdtMetadata()), + doTestParseAll(resourcePath, new ParseResult(resourcePath1, ODT_FILE, getOdtMetadata()), new ParseResult(resourcePath2, ODT_FILE2)); } - private void doTestParseFiles(String resourcePath, ParseResult... expectedResults) + private void doTestParseAll(String resourcePath, ParseResult... expectedResults) throws IOException { PCollection output = p.apply("ParseFiles", FileIO.match().filepattern(resourcePath)) .apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)) - .apply(TikaIO.parseFiles()) + .apply(TikaIO.parseAll()) .apply(ParDo.of(new FilterMetadataFn())); PAssert.that(output).containsInAnyOrder(expectedResults); p.run(); } @Test - public void testParseFilesDamagedPdfFile() throws IOException { + public void testParseAllDamagedPdfFile() throws IOException { thrown.expectCause(isA(TikaException.class)); String resourcePath = getClass().getResource("/damaged.pdf").getPath(); p.apply("ParseInvalidPdfFile", FileIO.match().filepattern(resourcePath)) .apply(FileIO.readMatches()) - .apply(TikaIO.parseFiles()); + .apply(TikaIO.parseAll()); p.run(); } @Test - public void testParseAllDisplayData() { - TikaIO.ParseAll parseAll = TikaIO.parseAll().filepattern("file.pdf"); + public void testParseDisplayData() { + TikaIO.Parse parse = TikaIO.parse().filepattern("file.pdf"); - DisplayData displayData = DisplayData.from(parseAll); + DisplayData displayData = DisplayData.from(parse); assertThat(displayData, hasDisplayItem("filePattern", "file.pdf")); assertEquals(1, displayData.items().size()); } @Test - public void testParseFilesDisplayData() { - TikaIO.ParseFiles parseFiles = TikaIO.parseFiles() + public void testParseAllDisplayData() { + TikaIO.ParseAll parseAll = TikaIO.parseAll() .withTikaConfigPath("tikaconfigpath") .withContentTypeHint("application/pdf"); - DisplayData displayData = DisplayData.from(parseFiles); + DisplayData displayData = DisplayData.from(parseAll); assertThat(displayData, hasDisplayItem("tikaConfigPath", "tikaconfigpath")); assertThat(displayData, hasDisplayItem("inputMetadata", @@ -164,12 +164,12 @@ public void testParseFilesDisplayData() { } @Test - public void testParseFilesDisplayDataWithCustomOptions() { - TikaIO.ParseFiles parseFiles = TikaIO.parseFiles() + public void testParseAllDisplayDataWithCustomOptions() { + TikaIO.ParseAll parseAll = TikaIO.parseAll() .withTikaConfigPath("/tikaConfigPath") .withContentTypeHint("application/pdf"); - DisplayData displayData = DisplayData.from(parseFiles); + DisplayData displayData = DisplayData.from(parseAll); assertThat(displayData, hasDisplayItem("tikaConfigPath", "/tikaConfigPath")); assertThat(displayData, hasDisplayItem("inputMetadata", @@ -184,7 +184,7 @@ static class FilterMetadataFn extends DoFn { public void processElement(ProcessContext c) { ParseResult result = c.element(); Metadata m = new Metadata(); - // Files contain many metadata properties. This function drops all but the"Author" + // Files contain many metadata properties. This function drops all but the "Author" // property manually added to "apache-beam-tika1.odt" resource only to make // the tests simpler if (result.getFileLocation().endsWith("apache-beam-tika1.odt")) { From 95e75db1ad26e9b04c1cd4c03274655bbe4ed52f Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Thu, 26 Oct 2017 11:12:38 +0100 Subject: [PATCH 17/17] Making sure TikaIO does not break the pipeline in case of the parsing errors --- .../apache/beam/sdk/io/tika/ParseResult.java | 82 +++++++++++++++++-- .../org/apache/beam/sdk/io/tika/TikaIO.java | 15 +++- .../beam/sdk/io/tika/ParseResultTest.java | 8 ++ .../apache/beam/sdk/io/tika/TikaIOTest.java | 10 ++- 4 files changed, 102 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java index 0a77491cdfdb..3f5a0e22d87f 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java @@ -17,9 +17,13 @@ */ package org.apache.beam.sdk.io.tika; +import java.io.IOException; +import java.io.ObjectInputStream; import java.io.Serializable; import java.util.Arrays; +import javax.annotation.Nullable; + import org.apache.tika.metadata.Metadata; /** @@ -28,10 +32,47 @@ */ @SuppressWarnings("serial") public class ParseResult implements Serializable { + + private static final class SerializableThrowable implements Serializable { + private final Throwable throwable; + private final StackTraceElement[] stackTrace; + + private SerializableThrowable(Throwable t) { + this.throwable = t; + this.stackTrace = t.getStackTrace(); + } + + private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException { + is.defaultReadObject(); + throwable.setStackTrace(stackTrace); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof SerializableThrowable)) { + return false; + } + SerializableThrowable sr = (SerializableThrowable) obj; + return this.throwable.getClass().equals(sr.throwable.getClass()) + && (this.throwable.getCause() == null && sr.throwable.getCause() == null + || this.throwable.getCause().getClass().equals(sr.throwable.getCause().getClass())); + } + + @Override + public int hashCode() { + int hashCode = 1; + hashCode = 31 * hashCode + throwable.getClass().hashCode(); + return hashCode; + } + } + private final String fileLocation; - private final String content; + @Nullable + private String content; private final Metadata metadata; private final String[] metadataNames; + @Nullable + private SerializableThrowable throwable; public ParseResult(String fileLocation, String content) { this(fileLocation, content, new Metadata()); @@ -44,15 +85,24 @@ public ParseResult(String fileLocation, String content, Metadata metadata) { this.metadataNames = metadata.names(); } + public ParseResult(String fileLocation, Metadata metadata, Throwable t) { + this.fileLocation = fileLocation; + this.metadata = metadata; + this.metadataNames = metadata.names(); + this.throwable = new SerializableThrowable(t); + } + /** - * Gets a file content. + * Gets a file content which can be set to null + * if a parsing exception has occurred. */ public String getContent() { return content; } /** - * Gets a file metadata. + * Gets a file metadata which can be only be partially populated + * if a parsing exception has occurred. */ public Metadata getMetadata() { return metadata; @@ -65,12 +115,24 @@ public String getFileLocation() { return fileLocation; } + /** + * Gets a parse exception. + */ + public Throwable getThrowable() { + return throwable == null ? null : throwable.throwable; + } + @Override public int hashCode() { int hashCode = 1; hashCode = 31 * hashCode + fileLocation.hashCode(); - hashCode = 31 * hashCode + content.hashCode(); + if (content != null) { + hashCode = 31 * hashCode + content.hashCode(); + } hashCode = 31 * hashCode + getMetadataHashCode(); + if (throwable != null) { + hashCode = 31 * hashCode + throwable.hashCode(); + } return hashCode; } @@ -81,9 +143,17 @@ public boolean equals(Object obj) { } ParseResult pr = (ParseResult) obj; + if (this.content == null && pr.content != null + || this.content != null && pr.content == null + || this.throwable == null && pr.throwable != null + || this.throwable != null && pr.throwable == null) { + return false; + } + return this.fileLocation.equals(pr.fileLocation) - && this.content.equals(pr.content) - && this.metadata.equals(pr.metadata); + && (this.content == null && pr.content == null || this.content.equals(pr.content)) + && this.metadata.equals(pr.metadata) + && (this.throwable == null && pr.throwable == null || this.throwable.equals(pr.throwable)); } //TODO: diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index 32353e1c0685..b2adbe84b95a 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -262,6 +262,7 @@ public void setup() throws Exception { @ProcessElement public void processElement(ProcessContext c) throws Exception { ReadableFile file = c.element(); + InputStream stream = Channels.newInputStream(file.open()); try (InputStream tikaStream = TikaInputStream.get(stream)) { @@ -274,11 +275,17 @@ public void processElement(ProcessContext c) throws Exception { ? spec.getInputMetadata() : new org.apache.tika.metadata.Metadata(); ContentHandler tikaHandler = new ToTextContentHandler(); - parser.parse(tikaStream, tikaHandler, tikaMetadata, context); + String fileLocation = file.getMetadata().resourceId().toString(); + + ParseResult parseResult = null; + try { + parser.parse(tikaStream, tikaHandler, tikaMetadata, context); + parseResult = new ParseResult(fileLocation, tikaHandler.toString(), tikaMetadata); + } catch (Throwable t) { + parseResult = new ParseResult(fileLocation, tikaMetadata, t); + } - c.output(new ParseResult(file.getMetadata().resourceId().toString(), - tikaHandler.toString(), - tikaMetadata)); + c.output(parseResult); } } } diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java index fd86152c6888..63de9129b7e1 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java @@ -33,6 +33,14 @@ public void testEqualsAndHashCode() { assertEquals(p1.hashCode(), p2.hashCode()); } + @Test + public void testEqualsAndHashCodeWithException() { + ParseResult p1 = new ParseResult("a.txt", getMetadata(), new RuntimeException()); + ParseResult p2 = new ParseResult("a.txt", getMetadata(), new RuntimeException()); + assertEquals(p1, p2); + assertEquals(p1.hashCode(), p2.hashCode()); + } + static Metadata getMetadata() { Metadata m = new Metadata(); m.add("Author", "BeamTikaUser"); diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java index a985b0a62146..2419683b34ea 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java @@ -129,13 +129,14 @@ private void doTestParseAll(String resourcePath, ParseResult... expectedResults) } @Test - public void testParseAllDamagedPdfFile() throws IOException { + public void testParseAllDamagedPdfFile() throws Exception { thrown.expectCause(isA(TikaException.class)); String resourcePath = getClass().getResource("/damaged.pdf").getPath(); p.apply("ParseInvalidPdfFile", FileIO.match().filepattern(resourcePath)) .apply(FileIO.readMatches()) - .apply(TikaIO.parseAll()); + .apply(TikaIO.parseAll()) + .apply(ParDo.of(new FilterMetadataFn())); p.run(); } @@ -181,8 +182,11 @@ static class FilterMetadataFn extends DoFn { private static final long serialVersionUID = 6338014219600516621L; @ProcessElement - public void processElement(ProcessContext c) { + public void processElement(ProcessContext c) throws Throwable { ParseResult result = c.element(); + if (result.getThrowable() != null) { + throw result.getThrowable(); + } Metadata m = new Metadata(); // Files contain many metadata properties. This function drops all but the "Author" // property manually added to "apache-beam-tika1.odt" resource only to make