Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19715][Structured Streaming] Option to Strip Paths in FileSource #17120

Closed
wants to merge 6 commits into from

Conversation

lw-lin
Copy link
Contributor

@lw-lin lw-lin commented Mar 1, 2017

What changes were proposed in this pull request?

Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this would cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing s3n to s3a).

This patch adds an option fileNameOnly that causes the new file check to be based only on the filename (but still store the whole path in the log).

Usage

spark
  .readStream
  .option("fileNameOnly", true)
  .text("s3n://bucket/dir1/dir2")
  .writeStream
  ...

How was this patch tested?

Added a test case

@steveloughran
Copy link
Contributor

steveloughran commented Mar 1, 2017

-1, non binding

I understand the rationale for this, to aid migration from s3/s3n to s3a, but given the need is schema independence, you should be using the full path name from Path.getUri().getPath() instead of 'Path.getName()`, which means only the filename, the last entry in the path element list, is checked.

match only on name and the two files

s3a://bucket/incoming/dataset.avro
s3a://bucket/2015/12/dataset.avro

will be mistaken for the same file, even when they aren't. If this scenario arises then someone will end up fielding support calls about missing data, or worse, incorrect query results.

If you use the full path, that problem goes away and the filtering is only on schema and filesystem/bucket name.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73691 has finished for PR 17120 at commit aeb10d1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean)

@lw-lin
Copy link
Contributor Author

lw-lin commented Mar 1, 2017

@steveloughran thanks for the comments and your concern also looks reasonable to me. I'm open to both approaches.

@marmbrus @zsxwing it'd be great if you could share some thoughts!

@marmbrus
Copy link
Contributor

marmbrus commented Mar 3, 2017

The use case here is when you have truly unique filenames (i.e. they contain a guid). This is actually pretty common in my experience. We definitely shouldn't turn this on by default, but as implemented I think the semantics are pretty clear and this option is useful.

@steveloughran
Copy link
Contributor

I know that's the current use case, but I'm thinking about future confusion, especially as the use case you espoused, "move from s3n to s3a within the same window" isn't likely to be that common in a running app, is it?
At the very least, the documentation needs to be explicit about what works and what doesn't here.

@marmbrus
Copy link
Contributor

marmbrus commented Mar 3, 2017

Note streams can be very long running, so this isn't about some short window. It could even be that I'm moving to a different bucket (but don't want to loose my exactly once guarantees of a very long running stream).

I agree the documentation should be explicit about the expectations of the filename for this parameter.

· "file:///dataset.txt"<br/>
· "s3://a/dataset.txt"<br/>
· "s3n://a/b/dataset.txt"<br/>
· "s3a://a/b/c/dataset.txt"<br/>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the incidents of a <li> does not look pretty, so I'm using a dot here

@lw-lin
Copy link
Contributor Author

lw-lin commented Mar 4, 2017

Thank you @marmbrus @steveloughran for the feedback. Added some explicit docs. Here's a screenshot of the affected section from the programming guide:

snip20170304_5

Please take a look again.

@SparkQA
Copy link

SparkQA commented Mar 4, 2017

Test build #73884 has finished for PR 17120 at commit c59f35f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 4, 2017

Test build #73885 has finished for PR 17120 at commit 2354ae6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


map.add("file:///a/b/c/d", 5)
map.add("file:///a/b/c/e", 5)
assert(map.size == 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend === for better error reporting

Copy link
Contributor Author

@lw-lin lw-lin Mar 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, thanks!

@SparkQA
Copy link

SparkQA commented Mar 4, 2017

Test build #73900 has finished for PR 17120 at commit f9e525e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lw-lin
Copy link
Contributor Author

lw-lin commented Mar 7, 2017

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74062 has finished for PR 17120 at commit f9e525e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Left some minor comments.

@@ -75,7 +77,7 @@ class FileStreamSource(

/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs, sourceOptions.fileNameOnly)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add a warning when fileNameOnly is true. How about

logWarning("fileNameOnly is enabled. Make user your file names are unique (e.g., using UUID), otherwise, files using the same name will be considered as the same file and causes data lost")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added. thanks!

/**
* Note when `fileNameOnly` is true, each entry would be (file name, timestamp) rather than
* (full path, timestamp).
*/
def allEntries: Seq[(String, Timestamp)] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not used. Could you just delete it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted :)

# Conflicts:
#
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStr
eamSource.scala
@SparkQA
Copy link

SparkQA commented Mar 9, 2017

Test build #74236 has finished for PR 17120 at commit 7da2a9c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (fileNameOnly) {
logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " +
"UUID), otherwise, files using the same name will be considered as the same file and causes" +
" data lost")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if I may, this message sounds a bit odd.

files using the same name will be considered as the same file and causes data lost

could we say
files with the same name but under different paths will be considered the same and causes data lost

Copy link
Contributor Author

@lw-lin lw-lin Mar 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

udpated -- thank you!

@SparkQA
Copy link

SparkQA commented Mar 9, 2017

Test build #74245 has started for PR 17120 at commit aab7554.

@lw-lin
Copy link
Contributor Author

lw-lin commented Mar 9, 2017

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Mar 9, 2017

Test build #74260 has finished for PR 17120 at commit aab7554.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Mar 9, 2017

LGTM. Merging to master.

@asfgit asfgit closed this in 40da4d1 Mar 9, 2017
@lw-lin lw-lin deleted the filename-only branch March 10, 2017 02:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants