Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-2994] Prepare for a switch to ParDo based implementation #3835

Closed
wants to merge 17 commits into from
Closed

[BEAM-2994] Prepare for a switch to ParDo based implementation #3835

wants to merge 17 commits into from

Conversation

sberyozkin
Copy link
Contributor

No description provided.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.01%) to 69.673% when pulling acd7aae on sberyozkin:tikaio into d60b29f on apache:master.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.02%) to 69.543% when pulling 07805a5 on sberyozkin:tikaio into a92c45f on apache:master.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.02%) to 69.522% when pulling 16352c7 on sberyozkin:tikaio into aa2604a on apache:master.

@sberyozkin sberyozkin changed the title Fix for the minor TikaIO doc typos [BEAM-2994] Prepare for a switch to ParDo based implementation Sep 29, 2017
Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This PR is a huge improvement over the previous code. WDYT about just evolving this PR incrementally into the final form, i.e. addressing the rest of the comments in it too? The direction so far looks good.

/**
* Sets a file content.
*/
public void setContent(String content) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend to get rid of the setters and of the default constructor. PCollection elements need to be immutable.


@Override
public int hashCode() {
return fileLocation.hashCode() + 37 * content.hashCode() + 37 * metadata.hashCode();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metadata doesn't implement a proper hashCode, seems like you'll need to implement one.

ParseResult pr = (ParseResult) obj;
return this.fileLocation.equals(pr.fileLocation)
&& this.content.equals(pr.content)
&& isMetadataEqual(this.metadata, pr.metadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curiously enough, Metadata does provide an equals() so this one you don't need :)

final Bounded<String> read = org.apache.beam.sdk.io.Read.from(new TikaSource(this));
PCollection<String> pcol = input.getPipeline().apply(read);
final Bounded<ParseResult> read = org.apache.beam.sdk.io.Read.from(new TikaSource(this));
PCollection<ParseResult> pcol = input.getPipeline().apply(read);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just: return input.apply(...io.Read.from(new TikaSource(this)));

@Override
protected Coder<String> getDefaultOutputCoder() {
return StringUtf8Coder.of();
protected Coder<ParseResult> getDefaultOutputCoder() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this method - it is deprecated and unnecessary.

@Override
public Coder<String> getDefaultOutputCoder() {
return StringUtf8Coder.of();
public Coder<ParseResult> getDefaultOutputCoder() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's called getOutputCoder() now.

String key = metadataIterator.next();
// The metadata name/value separator can be configured if needed
current = key + "=" + tikaMetadata.get(key);
if (!docParsed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make start() say "current = ...; return true;" and make advance "return false;" and remove docParsed variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, sure, will deal with the above comments first

@sberyozkin
Copy link
Contributor Author

sberyozkin commented Oct 2, 2017

I've dealt with the above review comments (I'm a bit surprised though I can not find my last comment where I committed to dealing with them, may be I got confused but I'm nearly certain I saw it in this page :-)).

TIKA-2472 will address the missing Metadata#hashCode.

For the moment ParseResult returns a copy of Metadata to address a non-mutability constraint, I was not sure if overriding all the Metadata setters was the better option or not, as it could introduce the sync issues with future Tika releases adding some new setters; that said - it's probably unlikely to occur soon, so if preferred then I can follow this approach if returning a copy does not look great...

thanks

@jkff
Copy link
Contributor

jkff commented Oct 2, 2017

To clarify: elements that go into a PCollection must not be modified, even though they can be in principle modifiable Java types. So returning a copy is reasonable.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks - I'd suggest to proceed with removing the source/reader classes and replacing them with a DoFn.

I'd recommend the following API:

class TikaIO {
  public static ParseAllToString parseAllToString() {..}
  class ParseAllToString extends PTransform<PCollection<ReadableFile>, PCollection<ParseResult>> {
    ...configuration properties...
    expand {
      return input.apply(ParDo.of(new ParseToStringFn))
    }
    class ParseToStringFn extends DoFn<...> {...}
  }
}

and document usage as:

PCollection<ParseResult> res = p.apply(FileIO.match().filepattern("..."))
    .apply(FileIO.readMatches())
    .apply(TikaIO.parseAllToString()...configuration...);

* Gets a file metadata.
*/
public Metadata getMetadata() {
return getMetadataCopy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems more reasonable to make a copy in the constructor, rather than create a fresh copy in the getter every time.

Copy link
Contributor Author

@sberyozkin sberyozkin Oct 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if it's OK to do it in the constructor then it means a copy can simply be avoided. ParseResult owns a Metadata instance passed to it at the construction time, hence copying it does not really achieve anything, do you agree ? Or do you suggest to keep the original Metadata instance and its copy too, not sure it is necessary.
Thinking more about it..., I can imagine getMetadata() be called once in most cases, so I suppose returning a copy in getMetadata() plus optimizing around its names as you suggested below seems reasonable, let me commit, see what you think...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah if the Metadata is not actually modified after being wrapped into the ParseResult, then it's fine to not make a copy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept making a copy in place (in case some function tries to set something there), with the assumption it will be accessed once, twice may be, and thus will won't cause any performance side-effects, while memorized the names at the construction time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to be consistent with other Beam code - other code doesn't do copy-on-read for all potentially-mutable fields. It is common to do copy-on-construction, but in this case I think even that is unnecessary.

*/
public class ParseResult implements Serializable {
private static final long serialVersionUID = 6133510503781405912L;
private String content;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be final


@Override
public int hashCode() {
return fileLocation.hashCode() + 37 * content.hashCode() + 37 * getMetadataHashCode();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No big deal here, but this hash code is suboptimal in quality: should be more like
int hashCode = 1;
hashCode = 31 * hashCode + fileLocation.hashCode();
hashCode = 31 * hashCode + content.hashCode();
hashCode = 31 * hashCode + getMetadataHashCode();
return hashCode;
(i.e. do "multiply by 31 and add next hash" 3 times - whereas the current code is equivalent to fileLocation.hashCode() + 37*(content.hashCode() + getMetadataHashCode()), i.e. it doesn't do the multiply/shift for the two latter hashes)


private int getMetadataHashCode() {
int hashCode = 0;
for (String name : metadata.names()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

names() makes a copy of the key set - this should probably be treated as a bug in Metadata, but until it's fixed, probably a good idea to memoize the result.

}
return advanceToNext();
current = tikaHandler.toString().trim();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If tikaHandler decided to generate a string with some leading/trailing whitespace, then according to this handler the whitespace is probably significant. The user can trim the string manually if they would like to - it doesn't feel like responsibility of TikaIO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only comment I'm not comfortable with - I can not imagine, ever, spaces before or at the end of some meaningful content, be of any significance, given that we are talking now about TikaIO helping to deal with practical cases. Tika parsers only report whatever they can, if it is some presentation with N leading or trailing spaces, then the parsers will report them; you are right users can strip them, but that is why we are working on TikaIO - let users do less when it comes to processing the content...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they were insignificant, why did the parser return them? If "the parser doesn't trim the string" should be considered a bug in the parser, then I don't think it's TikaIO's job to fix the bug; if it shouldn't be considered a bug in the parser and rather part of its intended behavior, then it's not TikaIO's job to change the intended behavior of the parser the user asked for either. I think it's better to have TikaIO do one job - "glue Beam to the Tika library", rather than two jobs - "glue Beam to the Tika library and do minor fixups on Tika's output".

Let's approach it from this viewpoint:

  • Might a user ever file a bug "I need TikaIO to trim the string returned by a parser, but it didn't"?
  • Might a user ever file a bug "I need TikaIO to return the string returned by the parser verbatim, but it trimmed the string"?

I think bullet 1 is extremely unlikely, because, if for the user's purposes it really matters that the string must be trimmed (I suspect in most cases it doesn't matter at all, e.g. if they're splitting the string into words for text indexing, it doesn't matter if it's trimmed or not), all the user needs to do is type the 6 characters .trim() wherever they use parseResult.getContent().

Bullet 2 is also quite unlikely, but possible: whitespace can be significant in some documents. E.g. imagine that the user is ingesting Word documents with Shakespeare plays and converting them to text files. The Word documents may be indented using whitespace, such as here http://www.textfiles.com/etext/AUTHORS/SHAKESPEARE/shakespeare-romeo-48.txt - it starts with a bunch of whitespace. Trimming the string is unrecoverably losing the indentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure referring to the Shakespeare play is most convincing, so I'll drop 'trim()' :-)

public void setMinTextLength(int minTextLength) {
this.minTextLength = minTextLength;
}
static class ContentHandlerImpl extends ToTextContentHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class doesn't do anything, just use ToTextContentHandler, or even better use Tika.toString()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept a dedicated extension because I was imagining there could be a need to customize, stripping off or replacing some characters (say replacing all the insignificant whitespaces between with a single one, dropping the characters if the size of the buffer has reached some max amount, etc). But I'm OK with dropping a custom extension till we have the concrete ideas on what to customize...

@jkff
Copy link
Contributor

jkff commented Oct 2, 2017

Also would recommend to create a Coder for ParseResult rather than keeping it Serializable. Java serialization is wicked slow, so this is another benefit that TikaIO can provide for a user. Create an AtomicCoder for it, and register it using something akin to https://github.com/apache/beam/blob/master/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java

@sberyozkin
Copy link
Contributor Author

Thanks for continuing doing the high quality reviews and reminding me of some effective Java ideas, I'll keep ploughing along...

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, here's the next round.

builder
.addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
.withLabel("File Pattern"));
//String filepatternDisplay = getFilepattern().isAccessible()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented-out code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

TikaConfig tikaConfig = null;
if (spec.getTikaConfigPath() != null) {
try {
tikaConfig = new TikaConfig(spec.getTikaConfigPath().get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs to be fixed to use FileSystems API to read the config, because tikaConfigPath will not be accessible via regular java file APIs when the pipeline runs on anything but a local machine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to, please check if that is correct...

@ProcessElement
public void processElement(ProcessContext c) throws IOException {
ReadableFile file = c.element();
InputStream stream = Channels.newInputStream(file.open());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might fail to close this on many code paths below. Better wrap it in try-with-resources right here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Started from TikaInputStream and auto-closing it should close the original one ?

org.apache.tika.metadata.Metadata tikaMetadata = spec.getInputMetadata() != null
? spec.getInputMetadata() : new org.apache.tika.metadata.Metadata();

ContentHandler tikaHandler = new ToTextContentHandler();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a lot of this be abbreviated to Tika.parseToString(is) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reviewed that code, IMHO we should retain the control in TikaIO, that function uses
https://github.com/apache/tika/blob/master/tika-core/src/main/java/org/apache/tika/sax/BodyContentHandler.java

which is apparently optimized for reading the XML content, and the goal was also to let users provide custom content handlers.

Tim also recommended the other day supporting the recursive ParserWrapper,
https://github.com/apache/tika/blob/master/tika-example/src/main/java/org/apache/tika/example/ParsingExample.java#L138
so may be having a Metadata only (in ParseResult will not be enough), I suppose we can still keep ParseResult.getMetadata (the 1st one in the list) but add later on ParseResult.getEmbeddedMetadata too, once we enhance the code to use the recursive parser

try {
parser.parse(is, tikaHandler, tikaMetadata, context);
} catch (Exception ex) {
throw new IOException(ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make the processElement method throw Exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -40,31 +40,6 @@
String getContentTypeHint();
void setContentTypeHint(String value);

@Description("Metadata report status")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose a next step would be to remove support for this Options class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to suggest reviewing it once the configuration support at the API level is in place, perhaps it may stilll prove useful for users...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that you already removed Read.withOptions(), so it is time to remove this class - it is unused.

// ? getFilepattern().get() : getFilepattern().toString();
//builder
// .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
// .withLabel("File Pattern"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below: does a simple getInputMetadata().toString() give unacceptable output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, but added trim() as it returns a trailing space for a single entry map, and this extra space is definitely ignorable :-)

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Almost converged; likely at most 1 more round remaining :)

private int getMetadataHashCode() {
int hashCode = 0;
for (String name : metadataNames) {
hashCode += name.hashCode() ^ Arrays.hashCode(metadata.getValues(name));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO somewhere here to remove this and use metadata.hashCode(), once a Tika release with the fix happens?

builder
.add(DisplayData.item("inputMetadata", sb.toString())
.add(DisplayData.item("inputMetadata", metadata.toString().trim())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making a fix in Tika to make this trim() unnecessary

private static class ParseToStringFn extends DoFn<ReadableFile, ParseResult> {

private static final long serialVersionUID = 6837207505313720989L;
private TikaIO.ParseAll spec;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final


final ParseContext context = new ParseContext();
context.set(Parser.class, parser);
org.apache.tika.metadata.Metadata tikaMetadata = spec.getInputMetadata() != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the fully qualified name necessary?

@@ -40,31 +40,6 @@
String getContentTypeHint();
void setContentTypeHint(String value);

@Description("Metadata report status")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that you already removed Read.withOptions(), so it is time to remove this class - it is unused.

@Test
public void testReadPdfFile() throws IOException {

String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath();

doTestReadFiles(resourcePath, PDF_FILE);
doTestReadFiles(resourcePath, new ParseResult(resourcePath, PDF_FILE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these tests exercise the same codepath in TikaIO, so they are redundant - there's nothing TikaIO could realistically break to make some of these tests fail without making all of them fail. It's sufficient to keep just a single test that reads a single file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I wanted to do some basic POC variation, PDF vs ODT, though may be indeed a test testing 2 ODT files does not add anything new.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though even with a 2 file test, I understand FileIO is expected to work right, but I'm not sure it really redundant to show at a test level how 2 files are processed by TikaIO...

try {
p.run();
fail("Transform failure is expected");
} catch (RuntimeException ex) {
assertTrue(ex.getCause().getCause() instanceof TikaException);
assertTrue(ex.getCause() instanceof TikaException);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use @Rule ExpectedException instead

TikaIO.read().from(resourcePath).withParseSynchronously(sync));
p.apply("ParseInvalidPdfFile", FileIO.match().filepattern(resourcePath))
.apply(FileIO.readMatches())
.apply(TikaIO.parseAll());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, given the use cases of TikaIO, I think we can do better than failing the whole pipeline on a single malformed file - malformed files are probably the norm. How about, say, making ParseResult be either a success (with current fields) or a failure (with a filename and exception)? You can mimic the serialization logic in the class SuccessOrFailure in the SDK for Throwable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a look

"--readOutputMetadata=true"};
TikaIO.Read read = TikaIO.read().withOptions(createOptions(args));
"--contentTypeHint=application/pdf"};
TikaOptions options = PipelineOptionsFactory.fromArgs(args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argument parsing seems unnecessary in this test - the configured transform doesn't use options, you can just say TikaIO.parseAll().withTikaConfigPath("/tikaConfigPath").withContentTypeHint("application/pdf")

return;
ParseResult result = c.element();
Metadata m = new Metadata();
if (result.getFileLocation().endsWith("apache-beam-tika1.odt")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I'm following what this code is trying to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I manually created that file and added that specific piece of metadata (Author), but the metadata usually contains some other pieces of info which will also vary between test files so I wanted to test that the metadata I added myself was captured

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment about this

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK great! This is the final round of comments, thanks for bearing with me. OK to merge afterwards @jbonofre

We're 800 lines lighter now :)

* Gets a file metadata.
*/
public Metadata getMetadata() {
return getMetadataCopy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to be consistent with other Beam code - other code doesn't do copy-on-read for all potentially-mutable fields. It is common to do copy-on-construction, but in this case I think even that is unnecessary.

return new AutoValue_TikaIO_Read.Builder()
.setQueuePollTime(Read.DEFAULT_QUEUE_POLL_TIME)
.setQueueMaxPollTime(Read.DEFAULT_QUEUE_MAX_POLL_TIME)
public static ParseAll parseAll() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update Javadoc for this method and for the whole class.

.build();
}

/** Implementation of {@link #read}. */
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
public abstract static class ParseAll extends
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 85: should reference parseAll instead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, serialVersionUIDs throughout this PR are unnecessary and generally not used in most Beam code (with the exception of some low-level classes). These objects are not going to be serialized and then deserialized with a different version of the code.

try (InputStream tikaStream = TikaInputStream.get(stream)) {

TikaConfig tikaConfig = null;
if (spec.getTikaConfigPath() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be a good idea to do this in @Setup rather than on every element.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you suggest creating TikaConfig in the ParseToStringFn constructor or even earlier, in ParseAll, in withTikaConfigPath ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm suggesting to do this in the @Setup method of this DoFn, which is invoked once for every created instance of this DoFn (there'll of course be many instances; at least 1 per worker per thread). It's commonly used to establish connections, cache configuration etc. See e.g. https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L473

This shouldn't be done in constructor or builder method, because those methods are invoked when the pipeline is constructed, rather than when it is executed. When a user specifies a path to the config, they probably expect that the config will be read at execution time, rather than at construction time. This matters really a lot of if we're talking about a template pipeline, which is constructed once but may be executed many times.

I suppose it might make sense to allow the user to pass an explicitly specified config at construction time (as a Java string containing the XML), if they don't want to bother copying the Tika config onto a distributed filesystem just so it's accessible to the pipeline and if they prefer instead to inline it into the pipeline. You might want to add this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks, did not know about DoFn possibly having setup/teardown, just updated... I'm planning to have another PR at the next stage which will address all of the Configuration related improvements (custom content handlers, etc, and now including the possibility of passing the XML configuration fragment as you suggested).
Re the shortcut and ParseResult success/failure, I've np with continuing looking into it in this PR, but may be it will be easier, esp for the reviewers, to merge what is already available, this IO is still Experimental so I guess it will be safe enough, but it's up to the team

"--contentTypeHint=application/pdf",
"--readOutputMetadata=true"};
TikaIO.Read read = TikaIO.read().withOptions(createOptions(args));
TikaIO.ParseAll read = TikaIO.parseAll()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename "read" to "parseAll" in variables and test method names throughout this test?

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I think it's fine to either merge this PR as-is (modulo my last comment), or proceed with adding the shortcuts for match/readMatches and adding failure reporting in ParseResult.

try (InputStream tikaStream = TikaInputStream.get(stream)) {

TikaConfig tikaConfig = null;
if (spec.getTikaConfigPath() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm suggesting to do this in the @Setup method of this DoFn, which is invoked once for every created instance of this DoFn (there'll of course be many instances; at least 1 per worker per thread). It's commonly used to establish connections, cache configuration etc. See e.g. https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L473

This shouldn't be done in constructor or builder method, because those methods are invoked when the pipeline is constructed, rather than when it is executed. When a user specifies a path to the config, they probably expect that the config will be read at execution time, rather than at construction time. This matters really a lot of if we're talking about a template pipeline, which is constructed once but may be executed many times.

I suppose it might make sense to allow the user to pass an explicitly specified config at construction time (as a Java string containing the XML), if they don't want to bother copying the Tika config onto a distributed filesystem just so it's accessible to the pipeline and if they prefer instead to inline it into the pipeline. You might want to add this feature.

@sberyozkin
Copy link
Contributor Author

Hi Eugene, I've typed the shortcut code (ParseAll -> ParseFiles) as you suggested earlier, a shortcut looks nice for the simple cases. I suppose I'd need to introduce something similar to MatchConfiguration to propagate it from ParseAll to ParseFiles ?

@sberyozkin
Copy link
Contributor Author

Or may be we can just keep a shortcut as simple as possible at the moment, if users will want to customize then they'd start with an expanded ParseFiles, I guess something like ParseConfiguration can easily be introduced later on without affecting anything. So, unless you think introducing ParseConfiguration should be done earlier rather than later, I'll consider that the only major pending item is to make sure ParseResult can be reported in the case of the parsing failures, with the configuration improvements to follow at some later stage

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think it's fine to submit as-is, modulo the last rename comment, and address parse failures and configuration improvements later.

return new AutoValue_TikaIO_Read.Builder()
.setQueuePollTime(Read.DEFAULT_QUEUE_POLL_TIME)
.setQueueMaxPollTime(Read.DEFAULT_QUEUE_MAX_POLL_TIME)
public static ParseAll parseAll() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"SomethingAll" tends to refer in the Beam codebase to something that takes a PCollection of inputs, e.g. TextIO.readAll() reads a PCollection of filepatterns, whereas TextIO.read() reads a single filepattern. I suggest to rename this to TikaIO.parse().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np, I'll then also rename parseFiles back to parseAll before I introduced a shortcut, as it will be a bit more consistent with the 'All' naming pattern used everywhere in Beam, hope that will work for you as well

public static final long DEFAULT_QUEUE_MAX_POLL_TIME = 3000L;
/**
* A {@link PTransform} that accepts a bounded {@link PCollection} of {@link ReadableFile}
* and returns a bounded {@link PCollection} of {@link ParseResult}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not necessarily true: the collection can be unbounded, depending on whether the input collection is bounded. Just say that it returns a collection of ParseResult.

@jkff
Copy link
Contributor

jkff commented Oct 19, 2017

Don't bother with MatchConfiguration. The purpose of Parse is to provide a shortcut for one common use case - it saves only 2 lines of code. Adding more configuration to it negates the advantage.

@sberyozkin
Copy link
Contributor Author

Yeah, I was slowly getting to a similar conclusion myself...

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'm going to do a couple of minor final touch-ups and merge this. Thanks!

@sberyozkin
Copy link
Contributor Author

sberyozkin commented Oct 26, 2017

Hi Eugene, thanks. And sorry, could not resist squeezing one more push into tikaio:-), it's to do with not failing the pipeline in case of the parse errors, as you suggested earlier. I thought it was a breaking kind of change so it would be rather done now, the configuration improvements can be incrementally applied later. Have a look please, hope that looks reasonable enough. FYI. I'm away from this eve till Mon eve so won't be able to react to the comments till next Tue. If my last commit only requires minor tweaks to make it right, then may be combine them with the other touch-ups you've had in mind.
(Note, if Tika throws the parsing exception, the metadata may already contain some or all the file metadata, depending on the way a given parser operates, hence ParseResult still accepts Metadata with Throwable)
Thanks.

@jkff
Copy link
Contributor

jkff commented Oct 26, 2017

Yeah, I actually also implemented failure handling similarly to what you did while merging the PR, but didn't finish it yesterday. It requires a bit more twiddling to get equality and hash code to work properly, because Throwable doesn't implement either of these things properly. I'm finishing this up right now and will merge soon.

@asfgit asfgit closed this in 0c22113 Oct 26, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants