Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27188][SS] FileStreamSink: provide a new option to have retention on output files #24128

Closed
wants to merge 2 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Mar 18, 2019

What changes were proposed in this pull request?

This patch proposes to provide a new option to specify time-to-live (TTL) for output file entries in FileStreamSink.

The metadata log greatly helps to easily achieve exactly-once but given the output path is open to arbitrary readers, there's no way to compact the metadata log, which ends up growing the metadata file as query runs for long time, especially for compacted batch.
(There're some reports from end users which include their workarounds: SPARK-24295)

This patch will filter out outdated output files in metadata while compacting batches, which helps metadata to not grow linearly, as well as filtered out files will be "eventually" no longer seen in reader queries which leverage File(Stream)Source.

How was this patch tested?

Added unit tests.

<br/>
<code>disableMetadata</code>: whether to disable metadata log files or not (default: false)
Metadata log is growing incrementally while running streaming query which affect query execution time as well as disk space.
Disabling metadata log greatly helps on remedying the impact, but it changes fault-tolerance guarantee of FileStreamSink to
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be an arguable topic: if the default implementation of batch query (SQLHadoopMapReduceCommitProtocol) is taken for commit protocol, which fault-tolerance semantic it can guarantee? I'm not sure so safely mention 'at-least-once' here, but we can change it if we feel it's still 'exactly-once'.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

http://mail-archives.apache.org/mod_mbox/spark-user/201706.mbox/%3CCA+AHuKmDDRCr4ZjvXOaaX+9QjLfMhF7Xy1qyUL3VH2Q3ziAQwg@mail.gmail.com%3E

I found relevant thread in the mailing list which @tdas replied relevant question. Quoting here.

> ·         The Query creates an additional dir “_spark_metadata
> <https://console.aws.amazon.com/s3/>” under the Destination dir and this
> causes the select statements against the Parquet table fail as it is
> expecting only the parquet files under the destination location. Is there a
> config to avoid the creation of this dir?

The _spark_metadata directory hold the metadata information (a
write-ahead-log) of which files in the directory are the correct complete
files generated by the streaming query. This is how we actually get exactly
once guarantees when writing to a file system. So this directory is very
important. In fact, if you want to run queries on this directory, Spark is
aware of this metadata and will read the correct files, and ignore
temporary/incomplete files that may have been generated by failed/duplicate
tasks. Querying that directory from Hive may lead to duplicate data as it
will read those temp/duplicate files as well.

My understanding for non-metadata commit protocol is writing all files to stage directory in each task, and let driver move files in stage directory to output directory, so "kind-of" exactly-once, but I also think the trick can be also broken for some possible cases so safer to say it's 'at-least-once'

@@ -40,14 +40,14 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
// Track the list of files added by a task, only used on the executors.
@transient private var addedFiles: ArrayBuffer[String] = _

@transient private var fileLog: FileStreamSinkLog = _
@transient private var fileLog: MetadataLog[Array[SinkFileStatus]] = _
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary to not having branch on mode, but we can still apply asInstanceOf (with require) and keep this as it is. Please let me know what's our preference.


private val fileCommitProtocolClass = {
if (disableMetadata) {
// if metadata is disabled, just pick up same file commit protocol as batch query
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we want to just directly use SQLHadoopMapReduceCommitProtocol, or rely on batch query file commit protocol as I'm proposing here?

@HeartSaVioR
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Mar 18, 2019

Test build #103609 has finished for PR 24128 at commit 3e82d95.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor

The FileStreamSink was designed to work with the metadata log. I'm not comfortable adding an option to just turn it off; there are all sorts of ways that could cause more subtle issues than at-least-once semantics.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Mar 18, 2019

I'm not comfortable adding an option to just turn it off; there are all sorts of ways that could cause more subtle issues than at-least-once semantics.

I totally understand about uncomfortable of disabling the metadata, but as I described in JIRA issue and description of PR there's no workaround except letting end users deal with dirty thing by their hands. I'd give it another try to let FileStreamSink checks deleted output files in background (which would be deleted by end users via some retention policies) and exclude when compacting metadata (I guess it's ideal one to go), but that definitely brings overhead and maybe more complicated configurations as well.

Regarding subtle issues it would be better for us to share possible issues (instead of 'something might happen') if we can imagine any: it would help to lead our direction to the right way.

@HeartSaVioR
Copy link
Contributor Author

Btw, my 2 cents, ideally output of FileStreamSink should be able to also read from other than Spark, which means metadata should only affect semantics. End users may try to do it: mail thread in previous comment is one of example - querying output with Hive.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Mar 18, 2019

Another possible workaround is alternative of #23840 - add an option to force FileStreamSink to filter out all entries (or leaving only last batch) when compacting batches. It definitely needs option to source side which force File(Stream)Sources to ignore metadata. I'm not sure the new option is pretty different from this proposal though.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Mar 18, 2019

There's also another possible workaround - if we really feel OK to exclude old output files in metadata even they're not actually deleted in its directory, we can expose option to set retention policy (mostly time to live) and force FileStreamSink to filter out entries which becomes old based on TTL. Readers cannot read some of output files even these are not deleted, but it's a policy being set from end users, so that might be OK. We may still want expose option for File(Stream)Sources to ignore metadata.

So we have some alternatives on this patch and all of things have its trade-off. What's our preference? I believe this is ongoing issue end users are already struggling with, so we have to take one of approach even it's not that ideal.

@jose-torres
Copy link
Contributor

I'm not sure I agree with the premise. Adding a feature, even behind a flag, represents a judgment by the Spark community that the feature works well and an ongoing obligation for Spark committers to maintain and fix bugs in the feature. We shouldn't implement ad-hoc changes with unclear behavior and semantics just to fix an immediate symptom some end-users face.

A retention policy generally makes sense to me, although I don't personally know enough about how Hadoop-compatible filesystems deal with timestamps to effectively review such a change.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Mar 19, 2019

This is a commit based on master for the last alternative (exclude old output files in metadata):

HeartSaVioR@8564ab4

I guess this is the least intrusive approach and simplest. One thing to note is that excluding output files are reflected when compacting batches so it depends on compaction interval as well as batch interval. We can safely document it as "eventually happens".

@jose-torres What do you think about this approach?

EDIT: I just saw your comment after commenting.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Mar 19, 2019

We shouldn't implement ad-hoc changes with unclear behavior and semantics just to fix an immediate symptom some end-users face.

I tend to agree your point: what I generally would like to do is investigating production issues end-users are facing and providing possible solutions which even may be seen as a workaround. (That's why I enumerate all of approaches I can imagine instead of pushing only this one.) We can still reject the approach which doesn't feel safe and clear. We might want to share why it is not safe/clear then, so that we don't take that way again.

@HeartSaVioR HeartSaVioR changed the title [SPARK-27188][SS] FileStreamSink: provide a new option to disable metadata log [SPARK-27188][SS] FileStreamSink: provide a new option to have retention on output files Mar 19, 2019
blockReplication: Int,
blockSize: Long,
action: String) {
// use modification time if we don't know about exact commit time
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Mar 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we concern about dealing with Hadoop timestamp / timezone, we can just set it to Long.MaxValue. This means existing entries in metadata will not be affected by the change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just changed it. Retention logic completely relies on commit timestamp and not relevant to individual file's modification time.

@@ -62,7 +76,8 @@ object SinkFileStatus {
modificationTime = f.getModificationTime,
blockReplication = f.getReplication,
blockSize = f.getBlockSize,
action = FileStreamSinkLog.ADD_ACTION)
action = FileStreamSinkLog.ADD_ACTION,
commitTime = f.getModificationTime)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: we can still set this to Long.MaxValue to avoid concerns.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Mar 19, 2019

Rebased to the approach: applying retention. Also updated JIRA and PR as well.

@SparkQA
Copy link

SparkQA commented Mar 19, 2019

Test build #103643 has finished for PR 24128 at commit 8564ab4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 19, 2019

Test build #103656 has finished for PR 24128 at commit db2fa60.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Mar 19, 2019

Test build #103662 has finished for PR 24128 at commit db2fa60.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Could I kindly ask for reviewing on new approach? That would not be intrusive unless end users configure the retention badly.

@HeartSaVioR
Copy link
Contributor Author

Kindly reminder.

@HeartSaVioR
Copy link
Contributor Author

Ping again, as Spark+AI Summit 2019 in SF is end.

@HeartSaVioR
Copy link
Contributor Author

@tdas @zsxwing @jose-torres @gaborgsomogyi Kindly reminder.

@HeartSaVioR
Copy link
Contributor Author

Ping.

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110688 has finished for PR 24128 at commit db2fa60.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110713 has finished for PR 24128 at commit 2a3bad5.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111279 has finished for PR 24128 at commit 2a3bad5.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111307 has finished for PR 24128 at commit 2a3bad5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Nov 22, 2019

UPDATE: SPARK-29995 is just filed from other end user which denotes same issues SPARK-24295.

@uncleGen
Copy link
Contributor

uncleGen commented Nov 25, 2019

There are many end-users facing this problem. So I give +1 to resolve this problem. But the ttl option can just resolve the problems of some scenarios. This dose not resolve scenarios where end-users does not set ttl, or just want to read files from earliest offset with ttl set.

@HeartSaVioR
Copy link
Contributor Author

Maybe we can differentiate two major cases:

  1. downstream query to read the output directory is also Spark (leverages metadata)

In this case, technically we never be able to delete any entries in metadata if we want to ensure the downstream query provides same result during multiple runs (unless inputs are added in real time).

We know that's only ideal - if the streaming query runs longer and writes gigantic number/size of files for a long time, we would want to get rid of some part to gain speed and save storage with fully understanding that we are throwing out some inputs which will affect the result of query.

Assume we decided to get rid of some output files. How to do it safely? The only safe way to do it is, getting rid of them in metadata first, and delete actual files. (Downstream query relies on the metadata to get the list of files, so if we don't make sure deleting them in metadata first, the downstream query will try to read the file which no longer exist, and fails - depending on the option.)

That means running streaming query should deal with the deletion, as we don't have any official offline tool to modify metadata, and you may find difficulties to "how" to let streaming query know which files to delete. That's why I just simply pick "retention" which is generally acceptable approach (Kafka also applies retention policy by default).

  1. we never let Spark read the output directory - we let other frameworks to read the directory

In this case we don't need to build metadata - though this means end users will need to deal with "at-least-once" guarantee. Given the file sink doesn't overwrite the file, it may leave corrupted records on partial output as well. If that's acceptable, we may be able to add an option to "disable" metadata, though there was some comments worried about doing it: #24128 (comment)

So I guess there're not many options here and I guess I picked the viable one, but I'd be really appreciated for more ideas!

@uncleGen
Copy link
Contributor

uncleGen commented Nov 26, 2019

IMHO, the core problem is the compact metadata log grows bigger and bigger, and it is a time-consuming work to compact the metadata log, because it will read old compact log file and then write to new compact log file. So why not limit the size of compact metadata log? I mean we can split the compact log to multiple log files. We may just need write limited sink file path into single compact log. And every 10 batches, we just compact delta logs with latest compact log.

batch N:

10 11 12.compact

batch N+10:

10 11 12.compact 13 14 ... 22.compact.1 22.compact
---------------------------^^^^^^^^^^^^-----------------

batch N+20:

10 11 12.compact 13 14 ... 22.compact.1 22.compact 23 24 ... 32.compact.1 32.compact

batch N+30:

10 11 12.compact 13 14 ... 22.compact.1 22.compact 23 24 ... 32.compact.1 32.compact 33 34 ... 42.compact.2 42.compact.1 42.compact

In batch N+10, we rename 12.compact to 22.compact, and compact delta log (13,14 ... 21) into 22.compact. If the size of 22.compact is beyond the limit, we need to rename 22.compact to 22.compact.1 and then create a new 22.compact. The cost of rename is low or not higher than writing to new file. And we just need to compact delta logs with latest compact log file. The biggest gain is the time cost of compacting is predictable, and does not scales linearly for increases.

But this may broke the compatibility when use old version spark to read stream sink files. Well, We can also add a option to control whether enable or disable this feature. This is just my rough ideas, please advise if there is any mistake.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Nov 27, 2019

IMHO, the core problem is the compact metadata log grows bigger and bigger, and it is a time-consuming work to compact the metadata log, because it will read old compact log file and then write to new compact log file.

I agree with you that the problem is that compact metadata log just grows most of the times, though taking plenty of time building metadata log is just a one of multiple major issues. The other major issue, reading metadata log won't decrease unless we optimize the format of file or just get rid of entities like this patch is proposing.

One thing we have to consider is, when compact phase happens, Spark is able to get rid of some entities which have been existing - that's the feature this patch leverages. That requires full read and rewrite of entities per each compact phase, and that's why we can't just simply add two compact files.

Looks like CompactibleFileStreamLog is introduced to avoid "small files problem", which seems to be possible to tweak a bit to change the approach to maintain "ranged delta" (say, compacted delta among with range of batches) which might be more similar with what you proposed. That's no longer be a "snapshot" and that might lost ability (or be inefficient) to get rid of entities, but in most cases the entities are not removed so it also makes sense to me. I'm expecting the logic more complicated than current one, but that might be acceptable given the issue has been affecting badly for end users.

@HeartSaVioR
Copy link
Contributor Author

@tdas @zsxwing @jose-torres @gaborgsomogyi Kindly reminder.

@HeartSaVioR
Copy link
Contributor Author

@uncleGen
Hi, do you plan to go ahead with your idea? I have been thinking about this issue, and your idea seems to be a realistic solution which doesn't introduce too much changes. While we may also want to find the solution which could deal with most of things, but for now it would be great even only with your idea.

Otherwise, would you mind if I pick your idea up if you're not planning to do it?

@uncleGen
Copy link
Contributor

@HeartSaVioR OK

@HeartSaVioR
Copy link
Contributor Author

@HeartSaVioR
Would you mind if I ask to elaborate your answer? IMHO it's not clear which one (or both?) you are OK with.

@uncleGen
Copy link
Contributor

@HeartSaVioR Sorry to make you confused! Feel free to pick my idea please.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants