Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38605][SS] Retrying on file manager open operation in HDFSMetadataLog #35994

Closed
wants to merge 5 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Mar 28, 2022

What changes were proposed in this pull request?

This patch adds retrying to file open operation in HDFSMetadataLog.

Why are the changes needed?

Currently HDFSMetadataLog uses CheckpointFileManager to do some file operation like opening metadata file. It is very easy to be affected by network blips and causes the streaming query failed. We experiences such issues which caused streaming query failed. Although we can restart the streaming query, but it takes more time to recover. It sounds better to make the stream query more resilient to such temporary failure by retrying.

Does this PR introduce any user-facing change?

Yes, users can config retrying mechanism for HDFSMetadataLog file open operation.

How was this patch tested?

Existing tests.

@@ -138,7 +142,9 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
def applyFnToBatchByStream[RET](batchId: Long)(fn: InputStream => RET): RET = {
val batchMetadataFile = batchIdToPath(batchId)
if (fileManager.exists(batchMetadataFile)) {
val input = fileManager.open(batchMetadataFile)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We encountered network blips on this open operation. This also seems safer to add retrying. I don't add retrying to other operations to avoid change current behaviors.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

val STREAMING_HDFS_METADATALOG_FS_NUM_RETRIES =
buildConf("spark.sql.streaming.hdfsMetadataLog.fsNumRetries")
.internal()
.doc("The maximum number of retrying when opening file in HDFSMetadataLog for streaming queries")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This second commit seems to bring a linter failure.

@viirya
Copy link
Member Author

viirya commented Mar 29, 2022

There are some unrelated python errors:

python/pyspark/pandas/ml.py:62: error: Incompatible types in assignment (expression has type "Index", variable has type "MultiIndex")  [assignment]
python/pyspark/pandas/frame.py:6133: error: unused "type: ignore" comment
python/pyspark/pandas/frame.py:6292: error: Incompatible types in assignment (expression has type "Index", variable has type "MultiIndex")  [assignment]

@zsxwing
Copy link
Member

zsxwing commented Mar 29, 2022

It is very easy to be affected by network blips and causes the streaming query failed.

Is it a unique issue of HDFS? If so, I’m surprised that HDFS client cannot survive from transient errors. Is Spark the right layer to fix this issue?

In addition, will the same issue happen after open? For example, when reading the file content? Do we need to worry about other places as well? This is not the only place that touches FileSystem in the driver.

Could you also add a unit test to verify the retry code? For example, you can use a fake file system to simulate the errors from open.

@zsxwing
Copy link
Member

zsxwing commented Mar 29, 2022

By the way, is it possible to share the stack trace of this issue? I'm curious why the client cannot handle transient errors and the stack trace may have some clues.

@viirya
Copy link
Member Author

viirya commented Mar 29, 2022

Is it a unique issue of HDFS? If so, I’m surprised that HDFS client cannot survive from transient errors. Is Spark the right layer to fix this issue?

The error happened when opening files on S3. I'm not familiar with HDFS client or S3 client (btw, I think it is relatively harder to expect all kinds of implementations having retrying), but seems I don't see retrying happened there. I think we have similar retrying mechanisms in Spark at several places, if you ask me if Spark is the right layer to fix this issue, I'm not sure if it is exactly right place, but it seems a consistent approach?

In addition, will the same issue happen after open? For example, when reading the file content? Do we need to worry about other places as well? This is not the only place that touches FileSystem in the driver.

I'd say I don't exclude the possibilities. Currently this is constrained on open only as it is the issue. This seems also the most safer place to retry as I don't want to change the behavior unexpectedly.

Could you also add a unit test to verify the retry code? For example, you can use a fake file system to simulate the errors from open.

Okay. I'll try to add one.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 30, 2022

Probably there is another good point the FS API is the better layer to fix.

In Spark's point of view, there is no notion of distinguish whether the failure is retryable or not (we have to deal with NonFatal since we have no context, which IOException may be even too general), whereas the FS client layer it should have more context to decide. Once the client layer decides that the failure is retryable without changing semantic (e.g. the operation is idempotent), the client layer can do the retry and it can be transparent to Spark.

@zsxwing
Copy link
Member

zsxwing commented Mar 30, 2022

The error happened when opening files on S3.

Are you using S3AFileSystem? If so, I think it already defines a fine grained retry policy in S3ARetryPolicy. It would be great if you can share the stack trace so that we can understand the issue better.

@zsxwing
Copy link
Member

zsxwing commented Mar 30, 2022

Probably there is another good point the FS API is the better layer to fix.

+1. If we can fix it in FS layer, all places calling FS API will get benefits from the fix.

@viirya
Copy link
Member Author

viirya commented Mar 30, 2022

I cannot give the exact stack trace here, but the exceptions are AWSClientIOException. Thanks for the hint. I re-checked the stack trace with the S3AFileSystem codebase, seems it is already retried.

I don't think this would be at FS API layer as different FS implementations might have different way, some might not be able to retry.

Personally I think a config at Spark is much easier to follow up from an end-user perspective. It is harder to follow up with different FS implementation cases as they might have different way to configure the behavior. Users will also look for Spark configs, instead of configs at underlying libraries.

As for this case (S3A FS), seems it is already retried and so there is no urgent requirement to add this config from my side now, I'm fine to close it if you still think this should not be added.

@viirya viirya closed this Mar 30, 2022
@HeartSaVioR
Copy link
Contributor

Thanks for your patience and understanding on taking this back. I should have commented in the JIRA when you asked me to give inputs, but I didn't get the intention (it was unclear for me) and I also don't have not enough expertise to see the issue/better fix on envisioning perspective. Sorry about that.

@viirya
Copy link
Member Author

viirya commented Mar 31, 2022

I personally have no interest to touch this, but was asked by the customer due to their issue. So as retrying is actually performed by S3 in this case, I'm happy to take this back without pushing it further. Thanks for the inputs.

@viirya viirya deleted the fs_retry branch December 27, 2023 18:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants