-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2512] Introduces TextIO.watchForNewFiles() and the Match transform #3607
Conversation
a8e180c
to
6e2a6f5
Compare
6e2a6f5
to
1f33b25
Compare
* Pipeline p = ...; | ||
* | ||
* PCollection<String> lines = p.apply(TextIO.read() | ||
* .from("/local/path/to/files/*") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imagine a case where a new directory is generated every day, and you then want to start watching for new files in that directory. Can we model this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so - I added the Match transform to support this. Everything looks cleaner now.
03b6b05
to
ebeaacf
Compare
Changes Unknown when pulling ebeaacf on jkff:textio-read-watch-new-files into ** on apache:master**. |
Changes Unknown when pulling ebeaacf on jkff:textio-read-watch-new-files into ** on apache:master**. |
Changes Unknown when pulling ebeaacf on jkff:textio-read-watch-new-files into ** on apache:master**. |
} | ||
} | ||
}; | ||
writer.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we start writer after the pipeline starts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No - TestPipeline.run() waits for it to complete. The writer deliberately sleeps a bit in the beginning, to have a high-but-not-100% chance of the first poll hitting an empty set of files.
5ebd39c
to
c249a9d
Compare
Thanks! Refactored this into 3 commits for easier review, and cleaned up things a bit. |
Changes Unknown when pulling c249a9d on jkff:textio-read-watch-new-files into ** on apache:master**. |
Changes Unknown when pulling c249a9d on jkff:textio-read-watch-new-files into ** on apache:master**. |
Reuven verbally LGTMd this - I'm going to go ahead and merge. |
Part of http://s.apache.org/textio-sdf, based on http://s.apache.org/beam-watch-transform. The Match transform can be useful for users who want to write their own file-based connectors, or for advanced use cases such as: watch for new subdirectories to appear in a directory (using Match), and then start watching each subdirectory for new files and reading them (using TextIO.watchForNewFiles()). Additionally, finally makes it configurable whether TextIO.read/readAll() allow filepatterns matching no files. Normal reads disallow empty filepatterns (to preserve old behavior), readAll() allows them if the filepattern contains a wildcard (which seems a reasonable default behavior that read() should have had from the beginning, but we can't change it), and watchForNewFiles() allows them unconditionally (because files might appear later).
c249a9d
to
b54883e
Compare
https://issues.apache.org/jira/browse/BEAM-2512
Part of http://s.apache.org/textio-sdf, based on http://s.apache.org/beam-watch-transform.
The
Match
transform can be useful for users who want to write their own file-based connectors, or for advanced use cases such as: watch for new subdirectories to appear in a directory (usingMatch
), and then start watching each subdirectory for new files and reading them (usingTextIO.watchForNewFiles()
).Additionally, finally makes it configurable whether
TextIO.read/readAll()
allow filepatterns matching no files, and adds aFileSystems
utility for that.Normal reads disallow empty filepatterns (to preserve old behavior),
readAll()
allows them if the filepattern contains a wildcard (which seems a reasonable default behavior that read() should have had from the beginning, but we can't change it), andwatchForNewFiles()
allows them unconditionally (because files might appear later).R: @reuvenlax