Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support #15599

Conversation

galenwarren
Copy link
Contributor

What is the purpose of the change

The goal here is to add RecoverableWriter support for Google Storage (GS), to allow writing to GS buckets from Flink's StreamingFileSink. To do this, we must implement and register a FileSystem implementation for GS and then provide a RecoverableWriter implementation via createRecoverableWriter.

Fortunately, Google supplies a Hadoop FileSystem implementation via GoogleHadoopFileSystem, which can already be used with Flink. So we can wrap this with Flink's HadoopFileSystem to implement the core file-system functionality.

At a high level, to implement a recoverable writer, one must provide a RecoverableWriter implementation that does the following:

  • Creates an output data stream from a URI (i.e. gs://bucket/foo/bar), i.e. an implementation of FSDataOutputStream
  • Allows writing to the output data stream in multiple chunks
  • Allows persisting the state of the output data stream while the stream is open
  • Allows recovering the persisted state of the output data stream, enabling writes to resume from that point
  • Supports atomic commit of the final file once the stream is closed

This implementation accomplishes this for GS by writing files to a temporary location as data is written to the output stream, and then combining the various temporary files together upon commit (and deleting the temporary files). Each temporary file is written to GS using the resumable upload API. The recoverable writer state (GSRecoverableWriterState) keeps track of which temporary files, in what order, should be combined together to form the final file. (The recoverable writer state also keeps track of the final file location, the number of bytes written so far, and whether the output stream has been closed.)

We considered but rejected the idea of using Google's resumable upload support to support the entire process of writing a temporary file, i.e. a design in which there would be exactly one temporary file uploaded for every final file written. We rejected this approach for two reasons:

  • Doing that would have required us to depend on Java serialization to persist the WriteChannel associated with the resumable upload
  • There is a nonconfigurable two-week limit on the duration of a single resumable upload, and we didn't want to have to live with that constraint

Instead, our approach (potentially) writes multiple temporary files associated with each recoverable write; each time persist is called, any ongoing resumable upload is closed, causing a temporary file to be committed, and a new resumable upload is started if/when more bytes are written. We thus avoid having to persist WriteChannels and we avoid the two-week limit for a recoverable write. Note that each individual temporary file must be written within two weeks, which means that checkpoints need to be taken at least that frequently, but that doesn't seem like a problematic limitation in practice.

When a recoverable write is cleaned up, either on commit or after a failure, all of the temporary files associated with that recoverable write are deleted. The naming scheme for the temporary files associated with a recoverable write is such that we can be sure to delete all temporary files -- even orphaned ones that might result from restarting from an earlier save/checkpoint.

To simplify accessing the Google Storage API and to make it mockable for unit testing, this code includes a BlobStorage abstraction. This is implemented against GS in GSBlobStorage and against an in-memory store in MockBlobStorage.

Brief change log

Main changes are:

  • fa060e9 Add flink-gs-fs-hadoop project: Add new project for GS file system and recoverable writer with FileSystemFactory wireup.
  • 4b8c0d5 Add BlobStorage abstraction: Add interfaces to abstract away direct access to the Google Storage API, both to simplify that access and to make it mockable.
  • f8bf558 Implement BlobStorage for Google Storage: Add GSBlobStorage, an implementation of BlobStorage against the Google Storage API.
  • f2399cd Add utility functions: Add some utility functions used by the recoverable writer. Includes unit tests.
  • 0f20b24 Implement BlobStorage for unit testing: Add MockBlobStorage, an implementation of BlobStorage against an in-memory store.
  • 1aac728 Implement recoverable writer: Implements RecoverableWriter against the BlobStorage abstraction. Includes unit tests.

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests for utility functions
  • Added unit tests for the recoverable writer against a mock in-memory BlobStorage implementation (MockBlobStorage), to validate various success and failure recoverable-write scenarios.

Note that there are currently no unit tests that validate that GSBlobStorage (the concrete implementation of BlobStorage against the GS API) properly invokes the underlying API. This API is difficult to mock, as many return values are classes that can't be created or extended outside the package. Unit tests would be much easier here if we were to use something like Mockito, but that is discouraged in the coding guidelines so I'm looking for some guidance here.

Also, I haven't implemented the FileSystemBehaviorTestSuite, since it seems to be testing the underlying FileSystem behavior which is provided directly by Flink's HadoopFileSystem wrapped around Google's GoogleHadoopFileSystem, and not really by any code in this PR. But if this should be added, I can do that -- just let me know.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
    Yes, this adds dependencies on google-cloud-storage and gcs-connector in the flink-gs-fs-hadoop project. These dependencies require a newer version of guava than is present in flink-fs-hadoop-shaded, so this project pulls in a newer version of guava and shades it.
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
    No
  • The serializers: (yes / no / don't know)
    Yes, adds a new serializer (GsRecoverableWriterStateSerializer) to serialize persisted state of a recoverable write.
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
    No
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
    No, except that we'll need to document how to properly deploy and use this new file system.
  • The S3 file system connector: (yes / no / don't know)
    No

Documentation

  • Does this pull request introduce a new feature? (yes / no)
    Yes.
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
    We will need some sort of GS documentation like this documentation for S3. I intend to provide that in a follow-up commit once any changes settle down as part of the code review.

@galenwarren
Copy link
Contributor Author

Sorry for the long delay. I do have a couple of additional questions that came up that I wanted to ask:

  • Does the license NOTICE file get generated automatically during build/deploy, or is that something I need to generate? I saw a script called collect_license_files.sh in the project, but I wasn't sure how to use it. Right now, there is no NOTICE.
  • RecoverableFsDataOutputStream.Committer contains both a commit and commitAfterRecovery, and the descriptions say that the latter should be tolerant of situations where, say, the file has already been committed, which suggests that the former should not tolerate that situation. I've implemented if that way, but in thinking through some possible scenarios, it seems like it would be possible for a file to be written, committed (which deletes the temp files), and then for the processing to be restarted from an earlier check/savepoint at which point the recoverable write was still in progress. In that case, temporary files would continue to get written from that point on, but at commit time, the commit would fail because some of the temporary files would have already been deleted. So I wasn't sure if it perhaps made more sense to always look for the presence of the final file when committing with either method -- not just with commitAfterRecovery so that the commit would not fail in that case. The cost would be an extra file read on every commit, to see if the commit had already completed.

@xintongsong , thanks for your help so far and looking forward to your feedback.

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 13, 2021

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit e751250 (Thu Sep 23 18:01:00 UTC 2021)

Warnings:

  • 2 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 13, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@xintongsong
Copy link
Contributor

Thanks for preparing this PR, @galenwarren. I'll try to take a look asap.

Quick response to your questions.

  • The licensing issues are described here. In short, you need to manually create the NOTICE file. The script is mainly used for generating the root NOTICE file from those of the sub-modules.
  • I think commit should fail if the file is already committed. The contract of this interface says it publishes the file, making it visible. We should not allow a job restarted from an earlier checkpoint/savepoint to overwrite a published file.

BTW, AZP failed during compiling. Please take a look.

@xintongsong xintongsong self-assigned this Apr 14, 2021
@galenwarren
Copy link
Contributor Author

Thanks @xintongsong, take your time, of course. And sounds good re: commit, I'll just leave it as is.

The first AZP build error looks to be related to the license/notice files we've been discussing, so when I add that I expect that will be fixed.

The second is a bit more puzzling, it's complaining that:

Failed to execute goal on project flink-gs-fs-hadoop: Could not resolve dependencies for project org.apache.flink:flink-gs-fs-hadoop:jar:1.13-SNAPSHOT: Failed to collect dependencies at com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.0 -> com.google.cloud.bigdataoss:gcsio:jar:2.2.0 -> io.grpc:grpc-alts:jar:1.34.1 -> io.grpc:grpc-grpclb:jar:1.34.1 -> io.grpc:grpc-core:jar:[1.34.1]: No versions available for io.grpc:grpc-core:jar:[1.34.1] within specified range -> [Help 1]

... but that dependency does seem to exist (https://mvnrepository.com/artifact/io.grpc/grpc-core/1.34.1), and mvn clean package -DskipTests succeeds locally. Is there anything obvious I'm missing that would explain why that dependency wouldn't resolve in AZP? I'm wondering if maybe it was some transient problem, I think I'll rerun the AZP build to see if we get the same result.

@galenwarren
Copy link
Contributor Author

@flinkbot run azure

Copy link
Contributor

@xintongsong xintongsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for preparing this PR, @galenwarren.

First of all, I must say the quality and readability of codes are very impressive. It's a pleasant experience reviewing this PR.

In this first round of review, I skipped the pom file and some test cases. There are already some comments which would be nice to take a look. Some of them are related to Flink's code style preferences. Among all the comments, I think the following issues need some particular attentions.

  • The clean-up logics, which might break the recoverability.
  • The responsibility and mutability of GSRecoverableWriterState
  • I think we should also add some logs for this component

flink-filesystems/flink-gs-fs-hadoop/pom.xml Outdated Show resolved Hide resolved
Comment on lines 98 to 110
List<BlobId> foundTempBlobIds =
storage.list(temporaryBucketName, temporaryObjectPartialName);
if (!foundTempBlobIds.isEmpty()) {

// delete all the temp blobs, and populate the set with ones that were actually deleted
// normalize in case the blob came back with a generation populated
List<Boolean> deleteResults = storage.delete(foundTempBlobIds);
for (int i = 0; i < deleteResults.size(); i++) {
if (deleteResults.get(i)) {
deletedBlobIds.add(BlobUtils.normalizeBlobId(foundTempBlobIds.get(i)));
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not right.

For each file being written, there could be a series of snapshots (s1, s2, s3, ...) taken before it's committed. Calling cleanupRecoverableState(s2) means we no longer need to recover the file writing from s2, probably also not likely to recover from s1 which is taken before s2. However, we can still recover from s3. Removing all the component blobs of s2 means some of the component blobs of s3 are also removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. I wasn't understanding that cleanupRecoverableState might be called multiple times during a single recoverable write operation, to clean up some but not all of the temporary blobs -- rather, I was thinking it would just be called once, at the end of a failed recoverable write operation, to clean up everything.

I agree, the scenario you describe could result in needed files being missing. So, to make sure I'm on the same page, is the right thing to do here to just delete those temp blobs directly referenced by the supplied ResumeRecoverable? That would actually simplify some things, e.g. there would be no need to support the list method on BlobStorage anymore, the BlobId normalization we discussed elsewhere wouldn't be needed ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Contract wise, cleanupRecoverableState is expected to clean-up for a resumable rather than a target file being written. Once a checkpoint is successfully completed, everything happened before that checkpoint is not expected to be replayed. That means there's no need to recover from a resumable generated before that checkpoint, thus the resources for recovering from that resumable can be released.

I think the problem in our case is that, is there any temporary blobs that are only referenced by a given resumable? Currently, all the temporary blobs referenced by a previous resumable are also referenced by all subsequent resumables. Consequently, we might not be able to remove anything at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworked in 34ad465, according to the discussion below.

}

// clean up after commit
writer.cleanupRecoverableState(state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure about cleaning the temp blobs here.

Ideally, for each Recoverable that has been created, there should be a call to cleanupRecoverableState that cleans up the corresponding state.

I think it serves as a good safe net that we do a cleanup on committing, if the cleaning doesn't cause any problem. However, as you have mentioned, it is possible that the job restarts from an early checkpoint. I think we should not overwrite an existing committed file from the storage. But what if the file is manually cleaned up and the job is intentionally restarted from an early checkpoint?

Maybe GSRecoverableWriterCommitter should not try to be over smart. We might tolerant some un-cleaned temp files, and it should be StreamingFileSink's responsibility to make sure cleanupRecoverableState is always called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure exactly when cleanupRecoverableState was called in the normal flow of things, so I ran a test where I set a breakpoint on that method and wrote data through a StreamingFileSink. What I observed was that cleanupRecoverableState did not get called as blobs were successfully committed, so no cleanup was occurring.

From the description of the method, it seemed plausible to me that this was only called on failure:

Frees up any resources that were previously occupied in order to be able to recover from a (potential) failure

... but I wasn't sure.

But yes, I agree, it would make more sense to me if cleanupRecoverableState were called on both successes and failures, and in that case the call in commit should be removed.

If it's certainly possible that I made a mistake in my testing. Should cleanupRecoverabeState be getting called after successful commits?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think cleanupRecoverableState is not called immediately on blobs committed, but should rather be called on the next success checkpoint.
I've talked to Guowei offline. He is also a Flink committer and one of the main contributor of streaming file sink. According to him, there are some known issues that cleanupRecoverableState may not always get called, e.g., the next success checkpoint never happen. Efforts for fixing those issues are still in progress.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I don't actually recall seeing cleanupRecoverableState being called at any point on successful writes, even after the next checkpoint completed, but it's possible I'm wrong about that. I'll run another quick check to make sure I do see cleanupRecoverableState being called when expected, and if so, I'll remove that call during commit. I'll report back what I find.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, if we cannot clean anything in cleanupRecoverableState, it might be our only chance to clean the temporary blobs on committing. This might have a higher priority than supporting manually recover from an early checkpoint.

I think the following issues are closely related and really need to be clarified consistently.

  • What are the relationships between snapshots (resumables) and temporary blobs
  • Actions for cleanupRecoverableState
  • Actions for commit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thought on recovering from earlier checkpoints.

Another way to make this possible would be not to do any cleanup in commit or cleanupRecoverableState and leave the responsibility for cleanup of temporary blobs to the caller. The most obvious way for the caller to do this would be to specify a separate bucket for temporary blobs and then to apply a TTL.

This would make things nice and simple -- temporary files would stay around for the TTL period, and it would be the responsibility of the caller to choose an appropriate TTL to balance recovery and storage needs. The ability to recover wouldn't depend on whether commit or cleanupRecoverableState or any other method had been called at some point, it would just depend on the age of the temporary blobs. This actually strikes me as a nice configuration, one that I might like to use.

The downside, of course, would be that if this were the default configuration, then callers who write to GS files without supplying any additional configuration (i.e. no options) would leak temporary blobs. If this were not the default configuration, then we'd be looking at some sort of option to enable this "no cleanup" mode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to fix depends on how we resolve the commit/cleanup issue:

  • If we think that commit should perform its own cleanup, then I could remove the cleanup code from cleanupRecoverableState and put it somewhere where commit can call it.
  • If we think that cleanupRecoverableState should be being called for the commit recoverables, and we feel we can change the existing behavior, then we could leave the code where it is but only perform the cleanup if the supplied recoverable is, in fact, a commit recoverable (not a resume recoverable)

I totally agree with your analysis regarding commit and cleanupRecoverableState. Double-checked with @gaoyunhaii, one of the original contributors of streaming file sink. According to him, commit indeed should cleanup the temporary file if any. So my previous comments were incorrect, sorry for the misleading.

It was actually this sort of scenario that made me consider an alternate way of handling commit, before, which was to compose the final blob from the temporary blobs only if the final blob didn't already exist. My thought was that, if we assume idempotency, it should be ok to skip recreating the final blob if it already exists. And this would allow restoring from earlier checkpoints without failure even after the temporary blobs were deleted on commit.

I'm not entirely sure about the idempotency. Ideally, a file should not be committed twice. IIUC, the only chance a file get committed twice is that, the job fails after committing the file and before the next success checkpoint. In that case, the recovered job does not know whether the file has been successfully committed, and would have to retry committing the file. In such cases, the content of file is guaranteed identical, and commitAfterRecovery should be called. Apart from the above case, I think we should fail when a job tries to commit a file twice, which likely indicates something being wrong and there's no guarantee on the idempotency.

To be specific, I think we should take no action for commitAfterRecovery and fail for commit if the final blob already exist (and/or the temporary blobs do not exist).

Another way to make this possible would be not to do any cleanup in commit or cleanupRecoverableState and leave the responsibility for cleanup of temporary blobs to the caller. The most obvious way for the caller to do this would be to specify a separate bucket for temporary blobs and then to apply a TTL.

I'd suggest not to rely on the TTL for cleaning up temporary blobs. In addition to what you've mentioned, that this requires additional user configurations, I've some other concerns.

  • If the user does not specify a dedicated bucket for the temporary blobs, the temporary blobs will stay as long as the final blobs, since there's no way to specify different TTLs for temporary and final blobs.
  • The temporary bucket may take much larger space. A user may specify a relative larger TTL that a small fraction of suspended jobs can be resumed within that time. If the unused temporary files are not cleaned, we would need as large space as to keep all temporary blobs generated during that time, which is undesired.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great -- that all sounds good. Thanks for walking through the failure scenario, that was the scenario I was worried about, i.e. would there be some way to get into a nonrecoverable state. Agreed that, in those scenarios, commitAfterRecovery should be called and it should allow recovery.

So, to confirm, we'll go with:

  • commit
    • If the final blob already exists, fail
    • If the final blob doesn't exist, compose it from the temp blobs (failing if the temp blobs don't exist)
    • Last, delete all temp blobs associated with the final blob, if no failure in prior steps
  • commitAfterRecovery
    • If the final blob already exists, do nothing
    • If the final blob doesn't exist, compose it from the temp blobs (failing if the temp blobs don't exist)
    • Last, delete all temp blobs associated with the final blob, if no failure in prior steps
  • cleanupRecoverableState
    • Do nothing

If that's right, then I think we have a plan here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the summary. Yes, I think we are on the same page.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 34ad465.

@galenwarren
Copy link
Contributor Author

@flinkbot run azure

@galenwarren
Copy link
Contributor Author

Thanks for the thorough review and the thoughtful comments, @xintongsong . I'll have some time to review these early next week, so I should have something back to you soon.

I've also responded to a few of the specific questions, above.

Last, I'm still a bit puzzled by this Azure error:

Failed to execute goal on project flink-gs-fs-hadoop: Could not resolve dependencies for project org.apache.flink:flink-gs-fs-hadoop:jar:1.13-SNAPSHOT: Failed to collect dependencies at com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.0 -> com.google.cloud.bigdataoss:gcsio:jar:2.2.0 -> io.grpc:grpc-alts:jar:1.34.1 -> io.grpc:grpc-grpclb:jar:1.34.1 -> io.grpc:grpc-core:jar:[1.34.1]: No versions available for io.grpc:grpc-core:jar:[1.34.1] within specified range

Curious, when you review the code, do you pull it down and build, or do you just look at it on GitHub? If the former, are you having any problem resolving that dependency locally?

Thanks.

@xintongsong
Copy link
Contributor

@galenwarren,

In most cases, I pull the codes to review them, but not necessarily build them. For this time, I tried build your codes locally and have not run into any problems.

I checked the logs, and it seems there's a connectivity issue between the azure worker and google maven mirror.

This is a bit complicated. In short, IIUC, the problem should gone once we fix the license issues.

  • We have two worker pools for our ci build.
    • The compile_ci stage is executed on Alibaba hosted machines, downloading artifacts from Alibaba mirror. All artifacts can be properly downloaded, thus the license checking fails in 20+ minutes which is after downloading the grpc dependencies.
    • The e2e_ci stage is executed on Azure hosted machines, downloading artifacts from google mirror. The error happens in about 5 minutes, complaining about not finding the dependency. I think it's a connectivity issue because I tried downloading the same artifact from the same google mirror on my local machine and it succeed.
  • We leverage Azure cache for sharing maven dependency artifacts across builds. That means, for all dependencies that have been downloaded previously, they are fetched from azure cache rather than downloaded again from the maven mirror. That's probably why the e2e_ci build does not fail for other artifacts. This can be confirmed from the maven log, where no "Downloading xxx" can be found until the failure.

I'll find people to look into the connectivity issue between google mirror and azure workers. Meantime, I think once we fix the license issues and make the compile_ci stage pass, the new dependencies should be uploaded to the azure cache, and we should no longer have this error.

@rmetzger
Copy link
Contributor

I can not explain what's happening here with Azure. I deployed this PR do my personal Azure account, and it was able to download the dependency.
Here's the run: https://dev.azure.com/rmetzger/Flink/_build/results?buildId=9061&view=logs&j=9401bf33-03c4-5a24-83fe-e51d75db73ef&t=3965ec8d-ff2e-54b6-c0a6-70172f3e6f55

I'll further investigate the issue ...

@xintongsong
Copy link
Contributor

@rmetzger, thanks for help looking into this. :)

@rmetzger
Copy link
Contributor

Yeah, I'm happy to help, I also think I know what's causing this, but I don't know how to resolve it 😢

I pushed this PR onto

  • my Personal CI, where it passed without problems.
  • onto Flink's CI, where it passed without problems.
  • I manually restarted this PRs CI, and it broke again.

The only difference between all these runs is that the broken PR CI run is downloading data from the maven cache on Azure (e.g. the local .m2 directory contains stuff in the failure case).

@galenwarren Could you do me a favor and add a "-U" argument here: https://github.com/apache/flink/blob/master/tools/ci/maven-utils.sh#L107. This COULD resolve the issue.

This can be confirmed from the maven log, where no "Downloading xxx" can be found until the failure.

The reason for the missing log statements is that we've disabled the download messages from maven: https://github.com/apache/flink/blob/master/tools/ci/maven-utils.sh#L103

@galenwarren
Copy link
Contributor Author

@rmetzger Thanks for your help diagnosing the build issue

@xintongsong
Copy link
Contributor

@galenwarren
FYI, next week is public holidays in China, and I could be less responsive.

@galenwarren
Copy link
Contributor Author

@xintongsong

I think we've made it through your comments, so far, and I understand how to proceed. For any comment I didn't reply to, I'll just take your suggestion. If there's anything still outstanding from your perspective, please let me know.

The one thing we haven't talked about is adding some logging. I agree that would be a good idea. Are there any conventions I should be following here? I didn't see anything specific in the coding guidelines. I'm happy to just take a crack at it, but if there are some guidelines I should be following, just let me know.

Enjoy the holiday. I'll try to have something for you the week after next, after the holiday week.

@xintongsong
Copy link
Contributor

Thank you, @galenwarren.
I'm not aware of any guidelines on logging either.

@rmetzger
Copy link
Contributor

rmetzger commented May 3, 2021

Note: There is some related activity in contributing another GCS FS implementation: https://issues.apache.org/jira/browse/FLINK-19481?focusedCommentId=17338198&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17338198 Feel free to add to the discussion in the Jira ticket!

@xintongsong
Copy link
Contributor

@rmetzger, thanks for the pointer :)

@galenwarren galenwarren force-pushed the create-gs-file-system-with-recoverable-writer branch from 1aac728 to b423fee Compare May 16, 2021 14:46
@galenwarren
Copy link
Contributor Author

Hi @xintongsong, I've rebased and pushed commits to address some of your feedback. I've noted the relevant commits in some open conversations above. More commits coming soon for the rest of the feedback ...

@galenwarren
Copy link
Contributor Author

@xintongsong Just pushed a commit, I think this addresses your comments so far except for logging (which I'll still add). Also, I'd like to rework some of the unit tests but haven't been able to do so yet, so please don't spend too much time there :)

@xintongsong
Copy link
Contributor

Thanks for the updates, @galenwarren. I'll try to give it another pass this week.

@xintongsong
Copy link
Contributor

@galenwarren,
Sorry, I haven't finished the review yet, distracted by some other works. I'm halfway there, and should be able to finish this early next week.

Meantime, I did managed to look into the license issue. The problem is that flink-gs-fs-hadoop depends on javax.annotation-api (indirectly, via google-cloud-storage) at the compile scope. javax.annotation-api has a GPLv2 license, which means per the Apache guidelines we cannot bundle it into any Apache Flink artifacts.

Could you try exclude javax.annotation-api in the pom, see if that fix the CI failure? Moreover, could you try if the feature works without bundling this dependency? If not, users would need to manually download this dependency to the lib folder, and we need to mention this in documentation.

@galenwarren
Copy link
Contributor Author

galenwarren commented May 28, 2021

@xintongsong Thanks for the update and no worries on timing. I'll see what happens if we exclude javax.annotation-api.

@sap1ens
Copy link

sap1ens commented Jan 20, 2022

Thank you, @galenwarren! We've been using this in our internal fork for many months.

@sap1ens
Copy link

sap1ens commented Jan 20, 2022

Oh, one thing that I'd expect to see is missing, shouldn't it be included in flink-dist? I proposed some changes here #15599 (comment) @xintongsong

@xintongsong
Copy link
Contributor

@sap1ens,
Thanks for the notice, and sorry that your previous comment was overlooked among the tons of comments. I think you're correct that this should be included in flink-dist. I'll fix it with a hotfix commit.

@sap1ens
Copy link

sap1ens commented Jan 20, 2022

Thank you 👍

@galenwarren
Copy link
Contributor Author

galenwarren commented Jan 20, 2022

@sap1ens I'm glad that you've found this useful! I wanted to give you a heads up about one thing, though, which is that there could be a problem with the final version of the code reading save/checkpoint data written by earlier versions of the code.

Without going into too much detail, there are two types that get written into save/checkpoint data -- ResumeRecoverable and CommitRecoverable. In the original implementation, these were implemented by the same class, used the same serializer, and so had the same serialized format. However, later in the project, these were changed to be implemented by separate classes, with separate serializers and separate (though related) serialized formats. And, while the SimpleVersionedSerializer does support versioning, I did not bump the version of the serializers in the final code -- they are still at version 0. So it would not be simple to determine, during deserialization, whether save/checkpoint data for these recoverables were serialized the old way vs. the new way.

I'm not sure exactly how you're using this now -- if you have the ability to stop jobs and restart them without starting from a savepoint, then there should be no problem.

If you will need to start from a savepoint when moving to the new version of the code, you could have a problem reading the savepoint data for any in-progress writes to GCS. One pretty simple thing we could do to make this easier would be to bump the serializer versions in the to-be-released code to 1, instead of 0. I don't think that would make any difference to new users, but it would allow you to distinguish the two formats in a fork so that you could read the old format properly (with a bit of extra code).

I'd have to run this by @xintongsong, though, since he's already merged the code.

@sap1ens
Copy link

sap1ens commented Jan 20, 2022

Nice, thanks for the explanation. So far we've been using savepoints to redeploy the jobs without issues. We still use 1.13.2 though with a very early version of your PR, but we're currently upgrading to 1.14.2 using the latest changes here.

We can run some tests to confirm the issue. Bumping serializer versions would be nice 👍

@galenwarren
Copy link
Contributor Author

@xintongsong Please let me know what you think. The change I'd propose to help @sap1ens (or anyone else who used the pre-released code) would be to change 0 to 1 in the following lines in GSCommitRecoverableSerializer and GSResumeRecoverableSerializer:

private static final int SERIALIZER_VERSION = 0;

Preconditions.checkArgument(version >= 0);

I'd be happy to create a PR if we want to do this.

@galenwarren
Copy link
Contributor Author

@sap1ens I thought of another way to potentially address this issue, which could be done entirely in your fork. But please check me on this.

Instead of changing the master Flink code to write version 1 for the serializer version, you could change your local fork to write version -1 for the serializer versions (and also change the Precondition check, above). Any savepoints you'd take after that point would have recoverables serialized with version -1, and then when you switched to use the upcoming Flink code, you could tell the two serialized formats apart with a bit of extra code (which you'll have to add to your fork in any case).

@sap1ens
Copy link

sap1ens commented Jan 21, 2022

Thank you for the suggestion 👍 Yep, it seems fairly straightforward, we'll add the change.

@galenwarren
Copy link
Contributor Author

Sounds good, and let me know if I can help with anything.

@galenwarren
Copy link
Contributor Author

galenwarren commented Jan 21, 2022

@xintongsong I've realized something as I'm working through the docs and checking things.

The existing GCS documentation describes authentication methods used to access GCS. The first is via changes to core-site.xml and flink-conf.yaml; the second is by setting the GOOGLE_APPLICATION_CREDENTIALS environment variable.

So far, I've doing all my work using the second method, and I think that's Google's generally preferred method at this point, i.e. setting that environment variable seems to be the standard way that works across all Google libraries. Right now, if someone were to set credentials in the Hadoop configuration as documented, that would work for the FileSystem part of the GS FileSystem but not for the RecoverableWriter part.

So, I can see two paths forward:

  • Remove the documentation for how to set credentials in the Hadoop config and simply require users to set the GOOGLE_APPLICATION_CREDENTIALS environment variable, if needed, for authentication
  • Change the RecoverableWriter implementation to look for credentials in the Hadoop config and use them, if they exist

The first would be easist, obviously, but if someone were already using the Hadoop config to access GCS, they would need to change that in order to use flink-gs-fs-hadoop.

Any thoughts? Thanks.

EDIT: For now, I've gone ahead and just documented the one way to specify credentials, via GOOGLE_APPLICATION_CREDENTIALS.

@xintongsong
Copy link
Contributor

@galenwarren, sorry for the late response. I was OoO the past a few days.

The change I'd propose to help @sap1ens (or anyone else who used the pre-released code) would be to change 0 to 1 in the following lines in GSCommitRecoverableSerializer and GSResumeRecoverableSerializer

We don't usually provide compatibility guarantees for pre-released codes. However, in this specific case, I see barely any cost in making lives of the pre-released code users easier. So +1 for changing the serializer version. I've already opened a hotfix PR #18409 for including flink-gs-fs-hadoop into flink-dist, and I can change the serializer version there.

Concerning the authentication, do you have an idea how much effort the 2nd approach may require?

  • I think it would be ideal that we can fallback to core-site.xml, if it contains the credentials AND GOOGLE_APPLICATION_CREDENTIALS is not specified. This would be my top preference, if feasible.
  • If the above approach requires too much effort / time (as we are approaching to the feature freeze date), I'd also be fine with documenting this as a known limitation (that core-site.xml does not work with the RecoverableWriter part), and address this as a follow-up issue in the next release cycle.
  • I would suggest not to exclude the core-site.xml approach from the documentation, unless we decide to deprecate / remove this approach for good. Otherwise, it might confuse users whether the approach is still available or not. And as far as I can see, there's not yet strong reasons for making such an incompatible change.

@galenwarren
Copy link
Contributor Author

@xintongsong No worries, I hope you had a nice time away from work.

Regarding the version-bump issue -- I did suggest an alternative solution that I think would work and doesn't require bumping the version in the main Flink code. So I don't think it's necessary at this point, but it also wouldn't hurt and would probably be easier for others if you want to bump the version. So I'll leave that up to you to decide.

Regarding the authentication, I don't think it should be that much work. I'm willing to give it a try. Should I do it in this PR or create a new one?

@galenwarren
Copy link
Contributor Author

@xintongsong
I went ahead and created a new JIRA and will link the PR there: https://issues.apache.org/jira/browse/FLINK-25790.

@galenwarren
Copy link
Contributor Author

PR created: #18489

xintongsong added a commit to xintongsong/flink that referenced this pull request Jan 25, 2022
The purpose of this change is to prevent serialization/deserialization problems
between the PR and the final merged versions, as the PR has been opened for a
long time and we noticed it already have many users before being merged.

This commit should change nothing for new users of the officially merged version.

See the following discussion for more details:
apache#15599 (comment)
xintongsong added a commit to xintongsong/flink that referenced this pull request Jan 25, 2022
The purpose of this change is to prevent serialization/deserialization problems
between the PR and the final merged versions, as the PR has been opened for a
long time and we noticed it already have many users before being merged.

This commit should change nothing for new users of the officially merged version.

See the following discussion for more details:
apache#15599 (comment)
xintongsong added a commit that referenced this pull request Jan 26, 2022
The purpose of this change is to prevent serialization/deserialization problems
between the PR and the final merged versions, as the PR has been opened for a
long time and we noticed it already have many users before being merged.

This commit should change nothing for new users of the officially merged version.

See the following discussion for more details:
#15599 (comment)

This closes #18409
niklassemmler pushed a commit to niklassemmler/flink that referenced this pull request Feb 3, 2022
…bleWriter support

Add RecoverableWriter support for Google Storage, to allow Flink to write to GS buckets via StreamingFileSink. This involves registering a FileSystem implementation for GS, utilizing the Google-provided GoogleHadoopFileSystem, and then implementing the RecoverableWriter interface.

The RecoverableWriter implementation writes temporarary files to blobs in GS when the recoverable stream is persisted, and composes those temporary blobs together on commit.

This closes apache#15599
niklassemmler pushed a commit to niklassemmler/flink that referenced this pull request Feb 3, 2022
The purpose of this change is to prevent serialization/deserialization problems
between the PR and the final merged versions, as the PR has been opened for a
long time and we noticed it already have many users before being merged.

This commit should change nothing for new users of the officially merged version.

See the following discussion for more details:
apache#15599 (comment)

This closes apache#18409
@galenwarren
Copy link
Contributor Author

@xintongsong I noticed this in today's email titled "Flink 1.15 Stabilisation Weekly Update":

In case your feature has not yet a manual testing issue please create one asap.

I haven't created a manual testing issue, should I? Would that just be an issue that requests testing and points to the original JIRA and PR?

@xintongsong
Copy link
Contributor

@galenwarren,

Yes, I think we should create one asap. Sorry, I should bring this up earlier. I haven't been closely following the release progress these days, distracted by some other works.

A manual testing issue should provide enough information for an arbitrary 3rd party to test the feature.

  • Links to the documentation
  • Suggestions on how to verify the feature
  • In our case, we may want to add a notice that a google cloud / storage account is needed for the testing

Moreover, in order for the issue to be properly tracked, the following fields should be set.

  • Fix Version/s: 1.15.0
  • Priority: Blocker
  • Labels: release-testing

You may find examples here.

@galenwarren
Copy link
Contributor Author

@xintongsong
Copy link
Contributor

@xintongsong How does this look? https://issues.apache.org/jira/browse/FLINK-26451

Perfect!

gddezero pushed a commit to gddezero/flink that referenced this pull request Feb 17, 2023
…bleWriter support

Add RecoverableWriter support for Google Storage, to allow Flink to write to GS buckets via StreamingFileSink. This involves registering a FileSystem implementation for GS, utilizing the Google-provided GoogleHadoopFileSystem, and then implementing the RecoverableWriter interface.

The RecoverableWriter implementation writes temporarary files to blobs in GS when the recoverable stream is persisted, and composes those temporary blobs together on commit.

This closes apache#15599

(cherry picked from commit 0e4a666)
gddezero pushed a commit to gddezero/flink that referenced this pull request Feb 17, 2023
The purpose of this change is to prevent serialization/deserialization problems
between the PR and the final merged versions, as the PR has been opened for a
long time and we noticed it already have many users before being merged.

This commit should change nothing for new users of the officially merged version.

See the following discussion for more details:
apache#15599 (comment)

This closes apache#18409

(cherry picked from commit 19eb5f3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants