Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-26015] Fixes object store bug #18692

Merged
merged 6 commits into from Feb 16, 2022
Merged

[FLINK-26015] Fixes object store bug #18692

merged 6 commits into from Feb 16, 2022

Conversation

XComp
Copy link
Contributor

@XComp XComp commented Feb 9, 2022

What is the purpose of the change

FileSystem implementations based on object stores like S3 do not support directories. FileSystem.getFileStatus fails with a FileNotFoundException in such a case. This also happens despite the fact that FileSystem.mkdirs doesn't fail.

Additionally, I created FLINK-26061 as a follow-up to cover the inconsistency of the FileSystem contract.

Brief change log

  • Removes directory check from FileSystemJobResultStore since it's actually obsolete
  • Adds MinioTestContainer to test the change

Verifying this change

  • A ITCase was added that runs a job with HA enabled and backed by S3 using the newly added Minio TestContainer
  • MinioTestContainerTest was added to test rudimentary functionality of the MinioTestContainer

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
  • The S3 file system connector: yes

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 9, 2022

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 6fc55ff (Wed Feb 09 19:40:50 UTC 2022)

Warnings:

  • 1 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 9, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@XComp
Copy link
Contributor Author

XComp commented Feb 10, 2022

The license checker failed because of the curator-test dependency being added. That introduces a new transitive dependency for guava. I excluded that dependency from flink-s3-fs-presto's dependency tree and force-pushed the changes triggering a new build.


final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();

flinkCluster.runDetached(jobGraph);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of the FileSystemJobResultStore issue covered by this PR, this test would run forever because the error appeared during the initialization. I didn't find a better way to assert the state of the MiniCluster here. Checking the isRunning() method wouldn't help because it's switched immediately into state running.

IMHO, we should make the FatalErrorHandler available through the MiniClusterResource. Right now, it's instantiated internally calling a closeAsync() on the MiniCluster. Making this one accessible in the test would help adding an assert on that. But it felt like a separate change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning to create a ticket to cover this functionality after my current approach was reviewed to make sure that I don't miss anything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zentol I would like to get your opinion on this issue. Would you agree that exposing the FatalErrorHandler for testing purposes is a valid improvement?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'd be fine I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I'm curious for what event you'd be waiting on. The handler doesn't give you more information than running the job, does it?
If you'd disable restarts and run the job non-detached we could simplify the test a bit.

Copy link
Contributor Author

@XComp XComp Feb 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that MiniCluster is running in a separate thread. It fails while I'm submitting the Job which keeps the job submission to never complete. But using the non-detached job submission is a good point: I added a timeout to that to make the test fail earlier. That way, the test would run forever if there was an initialization error.

Initially, my idea was that I could wait for the FatalErrorHandler (if it would be exposed outside of the MiniCluster) to report an error and fail in that case. Waiting for a reasonable long time with no error occurring would have been an indication that the initialization of the MiniCluster succeeded and I could have continued with the test.

But that approach has the downside that the test has to wait in cases where it actually succeeds which is bad in terms of test runtime. Making the job submission fail after a specific timeout sounds like the better approach. I adapted the test accordingly.

@zentol zentol self-assigned this Feb 10, 2022
Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a licensing issue with the PR.

@@ -45,4 +45,6 @@
public static final String PULSAR = "apachepulsar/pulsar:2.8.0";

public static final String CASSANDRA_3 = "cassandra:3.0";

public static final String MINIO = "minio/minio:edge";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what stability does "edge" provide is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minio/minio:edge seems to be an old image version which caused weird errors with the hadoop s3 filesystem. I updated the tag to the most recent release which resolved the issue (@rmetzger used this old image in his test as well). That explains why we observed the same error with the s3 hadoop file system.

Copy link
Contributor Author

@XComp XComp Feb 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into the versioning again. edge seems to be a development version:

$ docker run --entrypoint="" \
  -p 9000:9000 \
  -p 9001:9001 \
  -v /tmp/minio/data2:/data \
  -e "MINIO_ROOT_USER=minio" \
  -e "MINIO_ROOT_PASSWORD=minio123" \
  -it minio/minio:edge /opt/bin/minio --version
minio version DEVELOPMENT.2021-11-23T00-07-23Z

It includes a bug that causes the getFileStatus to fail on an empty directory to fail. The observed behavior is also present in the released version RELEASE.2021-11-24T23-19-33Z but fixed in the subsequent version RELEASE.2021-12-09T06-19-41Z

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does that mean all this was caused by minio being buggy?

Copy link
Contributor Author

@XComp XComp Feb 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it looks like it the hadoop S3 FS issue on the Flink side was caused by Minio... I'm trying to understand a bit better still digging through the diff on the minio side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debugging the behavior from the Flink side brings me to the conclusion that it's caused by the response coming from a listObjects call. It looks like Minio PR #13804 is the fix that was added in RELEASE.2021-12-09T06-19-41Z that's relevant for us here. The PR description also points to S3AFileSystem "utilizing the behavior that is fixed in that PR". I leave it like that without verifying it through a custom build to not put more time into that investigation.

@rmetzger
Copy link
Contributor

Sadly, the JRS still doesn't work on K8s, using a minio s3 implementation:

2022-02-10 12:20:23,679 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Starting the resource manager.
2022-02-10 12:20:23,765 INFO  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Start SessionDispatcherLeaderProcess.
2022-02-10 12:20:25,060 INFO  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Stopping SessionDispatcherLeaderProcess.
2022-02-10 12:20:25,164 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Stopping DefaultJobGraphStore.
2022-02-10 12:20:25,255 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_322]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) [?:1.8.0_322]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) [?:1.8.0_322]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore
	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:186) ~[flink-dist-1.15-jrs-fix.jar:1.15-jrs-fix]
	at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15-jrs-fix.jar:1.15-jrs-fix]
	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:178) ~[flink-dist-1.15-jrs-fix.jar:1.15-jrs-fix]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_322]
	... 3 more
Caused by: java.io.FileNotFoundException: No such file or directory: s3://path
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2344) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2226) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2160) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerListStatus(S3AFileSystem.java:1961) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listStatus$9(S3AFileSystem.java:1940) ~[?:?]
	at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:1940) ~[?:?]
	at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.15-jrs-fix.jar:1.15-jrs-fix]
	at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:158) ~[flink-dist-1.15-jrs-fix.jar:1.15-jrs-fix]
	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.15-jrs-fix.jar:1.15-jrs-fix]
	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.15-jrs-fix.jar:1.15-jrs-fix]
	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15-jrs-fix.jar:1.15-jrs-fix]
	at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15-jrs-fix.jar:1.15-jrs-fix]
	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:178) ~[flink-dist-1.15-jrs-fix.jar:1.15-jrs-fix]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_322]
	... 3 more
2022-02-10 12:20:25,384 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StandaloneApplicationClusterEntryPoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..

The directory exists:

AWS_ACCESS_KEY_ID=admin AWS_SECRET_ACCESS_KEY=password aws --endpoint-url http://localhost:9000 s3 ls s3://path

s3://path is manually shortened by me.

@@ -155,18 +155,15 @@ public boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException {
@Override
public Set<JobResult> getDirtyResultsInternal() throws IOException {
final Set<JobResult> dirtyResults = new HashSet<>();
final FileStatus fs = fileSystem.getFileStatus(this.basePath);
if (fs.isDir()) {
FileStatus[] statuses = fileSystem.listStatus(this.basePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how this can fix anything, because the S3AFileSystem#listStatus uses getFileStatus internally. The presto implementation for both methods also do the same request against the server (#listPrefix).

Are we sure we're drawing the right conclusions?

Copy link
Contributor Author

@XComp XComp Feb 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The presto test still fails with this change being reverted. PrestoS3FileSystem does not support getFileStatus on empty directories. Removing the isDir check fixes that issue. I do another round over the presto implementation tomorrow to understand why it is. FLINK-26061 covers the difference between presto and hadoop s3 fs

updated the comment to make more sense out of it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: I looked through the code once more. The PrestoS3FileSystem does not support getFileStatus on empty directories. You're right when you said that the mkdir call doesn't create anything (see PrestoS3FileSystem:520). But the getFileStatus method tries to get the FileStatus of the object at the given path. If that does not exist, it will look for objects having the path as a prefix (through listObject). A FileNotFoundException is thrown if no objects live underneath the passed path (which corresponds to an empty directory, see PrestoS3FileSystem:361).

@zentol
Copy link
Contributor

zentol commented Feb 10, 2022

#18692 (comment) would be the most pressing comment.

@XComp XComp force-pushed the FLINK-26015 branch 5 times, most recently from ad21c08 to e62d230 Compare February 10, 2022 22:37
@XComp
Copy link
Contributor Author

XComp commented Feb 10, 2022

Sadly, the JRS still doesn't work on K8s, using a minio s3 implementation:

Apparently, the failure was caused by an old Docker image being used for the test which I also used in my initial testing efforts. I addressed this in this comment above. In the mean time @rmetzger ran his test with a new Minio version without problems. Additionally, he ran the test on AWS S3 without running into the issue. All tests were performed with Hadoop S3 filesystem loaded.

@XComp
Copy link
Contributor Author

XComp commented Feb 10, 2022

Thanks @zentol for your thorough. I addressed/responded to all your comments. I squashed the commits and rebased the branch.

@XComp
Copy link
Contributor Author

XComp commented Feb 11, 2022

@flinkbot run azure

@XComp
Copy link
Contributor Author

XComp commented Feb 11, 2022

I created FLINK-26087 to have the issue covered that AzureCI is not properly setup to run the S3-related tests (it looks like the environment variables are missing in AzureCI).

@XComp
Copy link
Contributor Author

XComp commented Feb 14, 2022

@zentol I addressed your comments. Shall we proceed with squashing the PR or did I miss something?

@zentol
Copy link
Contributor

zentol commented Feb 14, 2022

Well we no longer need the changes in the filestore#getDirtyResultsInternal then, correct?

@XComp
Copy link
Contributor Author

XComp commented Feb 14, 2022

Well we no longer need the changes in the filestore#getDirtyResultsInternal then, correct?

We do. The Presto S3 Filesystem requires it. That's what FLINK-26061 is about.

@XComp
Copy link
Contributor Author

XComp commented Feb 14, 2022

Thanks for the second review @zentol. I addressed/responded to your comments. I reorganized the commits and squashed the individual commits together after addressing changes. Additionally, the branch got rebased

@zentol
Copy link
Contributor

zentol commented Feb 15, 2022

We do. The Presto S3 Filesystem requires it. That's what FLINK-26061 is about.

I'm wondering if that isn't yet again a minio issue. From everything I have read presto assumes that everything is an empty directory by default (hence why mkdirs doesn't do anything). So unless you check that a directory does not exist things should be fine. https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/directory_markers.html

Did you step through PrestoS3FileSystem to see which call fails? How does the test (FileSystemBehaviorTestSuite.testMkdirsCreatesParentDirectories) actually fail? Is the assertion failing, or is the FS throwing a FileNotFoundException.
I'm asking because FileSystem#exists only checks whether getFileStatus returns null, but I don't see a codepath in the presto filesystem where that can be the case.

@XComp
Copy link
Contributor Author

XComp commented Feb 15, 2022

I'm wondering if that isn't yet again a minio issue. From everything I have read presto assumes that everything is an empty directory by default (hence why mkdirs doesn't do anything). So unless you check that a directory does not exist things should be fine. https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/directory_markers.html

After looking through the Presto code once, we can conclude that it's not a Minio issue and the change is required. See the explanation below

Did you step through PrestoS3FileSystem to see which call fails? How does the test (FileSystemBehaviorTestSuite.testMkdirsCreatesParentDirectories) actually fail? Is the assertion failing, or is the FS throwing a FileNotFoundException.
I'm asking because FileSystem#exists only checks whether getFileStatus returns null, but I don't see a codepath in the presto filesystem where that can be the case.

I guess, I see where the confusion is coming: I missed the fact that the FileNotFoundException is handled in the hadoop-common FileSystem implementation (see FileSystem:1401). Hence, exists() doesn't cause problems but the getFileStatus implementation which is used in the pre-PR implementation of FileSystemJobResultStore (see my comment above for the code path).

Hence, change in question (removing the directory check) is still necessary for PrestoS3FileSystem. Otherwise, we would run into FileNotFoundExceptions when the base directory is empty.

@XComp XComp merged commit d62be2e into apache:master Feb 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants