Skip to content

Conversation

galenwarren
Copy link
Contributor

What is the purpose of the change

For the GCS FileSystem plugin, use the same authentication options for the RecoverableWriter portion as is done for the normal FileSystem portion. This means that it will use GOOGLE_APPLICATION_CREDENTIALS, if it exists, but will also use the google.cloud.auth.service.account.json.keyfile property from Hadoop config.

To have both portions of the plugin use the same rules, each of them will only consider using service credentials if the Hadoop property google.cloud.auth.service.account.enable is true or unspecified (i.e. the default value is true).

Brief change log

  • Update GSFileSystemFactory to read Hadoop config from the location specified in CoreOptions.FLINK_HADOOP_CONF_DIR or in the HADOOP_CONF_DIR environment variable and to combine it with Hadoop config values from the Flink config
  • Update GSFileSystem to look for credentials in either GOOGLE_APPLICATION_CREDENTIALS or google.cloud.auth.service.account.json.keyfile, if google.cloud.auth.service.account.enable is not false, when constructing the Storage instance for the RecoverableWriter

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

This change is a trivial rework / code cleanup without any test coverage.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no) No
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no) No
  • The serializers: (yes / no / don't know) No
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know) No
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) No
  • The S3 file system connector: (yes / no / don't know) No

Documentation

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 6d23f67 (Mon Jan 24 23:11:42 UTC 2022)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 24, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@xintongsong xintongsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @galenwarren. The changes look good in general. It might be better to have some test cases guarding our changes. I can think of two things that need to be verified: the configuration fallback logics, and merging of hadoop config from flink config and core-site.

Comment on lines 55 to 61
// follow the same rules as for the Hadoop connector, i.e.
// 1) only use service credentials at all if Hadoop
// "google.cloud.auth.service.account.enable" is true (default: true)
// 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, if supplied
// 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as location of
// credentials, if supplied
// 4) use no credentials
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just trying to understand, what happens if a user configs none of this options? I.e, the service account is enabled by default, but no credential is provided.

Copy link
Contributor Author

@galenwarren galenwarren Jan 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it stands now, if service account were enabled but no credentials were supplied via either GOOGLE_APPLICATION_CREDENTIALS or google.cloud.auth.service.account.json.keyfile, it would create a Storage instance with no credential. If you were writing to a publicly writable GCS bucket, this would work, but it would fail if the bucket required credentials.

This is similar to what would happen (as far as I understand) wrt Hadoop config; even if service credentials are enabled (which they are by default), you still have to specify a credential of some kind or else it won't use one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it's worth noting that there are more authentication options supported for Hadoop than just those two:

  • Private key and id
  • P12 certificate
  • Short-lived service-account impersonation

But the only ones that have been documented as supported in Flink so far are the two that are directly mentioned in the new docs, GOOGLE_APPLICATION_CREDENTIALS and google.cloud.auth.service.account.json.keyfile.

Do you think it's OK just to support those?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to only support those mentioned in the documentation, as aligned with the previous behavior.

That leads to another questions: shall we load the entire core-site/default.xml and merge them with configurations from flink config? Alternatively we may only look for the configs we supports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check me on this, but my thought was that if someone was using the gcs-connector before -- i.e. just as a Hadoop-backed FileSystem -- that they would have been able to supply arbitrary config options in the core-site/default.xml and expected them to be applied. So I was trying to preserve that behavior.

But, yes, if we support arbitrary config options in core-site/default.xml except for certain authentication-related options, that does seem a bit counterintuitive.

I suppose one option could be to continue to parse and pass through all the options but to document that the only authentication options that will yield the proper behavior for all FileSystem operations are the two documented, and not the others (P12, etc.).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into this a bit more, I think it's probably fine as is.

The module currently consists of two parts leveraging different underlying libaraies: the FileSystem that uses gcs-connector, and the RecoverableWriter that uses google-cloud-storage. Hadoop configurations (core-site/default.xml) can be applied directly on gcs-connector but not google-cloud-storage.

In that sense, it makes sense to me that RecoverableWriter/google-cloud-storage only support selective Hadoop configurations. As a first step, the supported configurations includes only account.enable and account.json.keyfile. We can add more if new demands emerge later.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree.

I'll add a note to the docs (on the other PR) to call this out, and I'll work on some unit tests.

Comment on lines 92 to 108
Storage storage;
if (credentialsPath.isPresent()) {
LOGGER.info(
"Creating GSRecoverableWriter using credentials from {}",
credentialsPath.get());
try (FileInputStream credentialsStream = new FileInputStream(credentialsPath.get())) {
GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsStream);
storage =
StorageOptions.newBuilder()
.setCredentials(credentials)
.build()
.getService();
}
} else {
LOGGER.info("Creating GSRecoverableWriter using no credentials");
storage = StorageOptions.newBuilder().build().getService();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to minimize things we do in the if-else branches as follow:

        // construct the storage instance, using credentials if provided
        StorageOptions.Builder storageOptionBuilder = StorageOptions.newBuilder();
        if (credentialsPath.isPresent()) {
            LOGGER.info(
                    "Creating GSRecoverableWriter using credentials from {}",
                    credentialsPath.get());
            try (FileInputStream credentialsStream = new FileInputStream(credentialsPath.get())) {
                GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsStream);
                storageOptionBuilder.setCredentials(credentials);
            }
        } else {
            LOGGER.info("Creating GSRecoverableWriter using no credentials");
        }

        // create the GS blob storage wrapper
        GSBlobStorageImpl blobStorage =
                new GSBlobStorageImpl(storageOptionBuilder.build().getService());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, I'll make that change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 6d137f7 (and moved to ConfigUtils).

@galenwarren
Copy link
Contributor Author

Please let me know if there's anything else you would suggest I change in terms of the implementation, besides what you've already suggested, and then I'll look at unit tests as the last piece.

…FileSystem to support unit tests; add unit tests
@galenwarren
Copy link
Contributor Author

galenwarren commented Jan 26, 2022

@xintongsong

It might be better to have some test cases guarding our changes. I can think of two things that need to be verified: the configuration fallback logics, and merging of hadoop config from flink config and core-site

I've added these in the latest commit, 6d137f7.

I had to refactor things a bit to make them easily testable; now, all the interesting code is in ConfigUtils and GSFileSystemFactory and GSFileSystem are simplified.

I also consolidated things a bit. There was really no good reason for the configuration operations to be spread between GSFileSystemFactory and GSFileSystem before; now, all configuration is resolved in GSFileSystemFactory.

Please let me know if you have any other feedback, or let me know if it looks good and I'll squash. Thanks.

* Interface that provides context-specific config helper functions, factored out to support
* unit testing. *
*/
public interface ConfigContext {
Copy link
Contributor

@xintongsong xintongsong Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find a bit hard to understand the responsibility of this interface. It seems several things are mixed together.

  1. It serves as a provider of context-related inputs: environment variables and files.
  2. It somehow also decides how context-related inputs are applied: overwriting the given config / storage options.
  3. In the tests, it's also used for recording which files the util is reading.

I think 1) alone should be good enough for providing different context-related inputs in production / tests. 2) is probably not a big deal as the logics are as simple as passing the around. However, I'm not sure about 3), as it feels like we are checking that the ConfigUtils is reading the correct input rather than providing the correct output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing that doesn't feel right is having to provide different ConfigContext implementations for various test cases. And the UnsupportedOperationException indicates that we are assuming how ConfigContext is used by ConfigUtils internally, rather than treat the latter as a blackbox.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest the following:

  1. Change the ConfigContext protocol to a mere input provider. Something like:
public interface ConfigContext {
    Optional<String> getenv(String name);
    org.apache.hadoop.conf.Configuration loadHadoopConfigFromDir(String configDir);
    GoogleCredentials loadStorageCredentialsFromFile(String credentialsPath);
}
  1. We can have a testing implementation like:
public class TestingConfigContext implements ConfigContext{
    Map<String, String> envs;
    Map<String, org.apache.hadoop.conf.Configuration> hadoopConfigs;
    Map<String, GoogleCredentials> credentials;

    public Optional<String> getenv(String name) {
        return Optional.ofNullable(envs.get(name));
    }
    public org.apache.hadoop.conf.Configuration loadHadoopConfigFromDir(String configDir) {
        return hadoopConfigs.get(configDir);
    }
    public GoogleCredentials loadStorageCredentialsFromFile(String credentialsPath) {
        return credentials.get(credentialsPath);
    }
}

In this way we can reuse the same TestingConfigContext implementation in different test cases, constructed with different parameters.

  1. The test cases can simply verify the outcome of ConfigUtils#getHadoopConfiguration and ConfigUtils#getStorageOptions (StorageOptions#getCredentials).

Copy link
Contributor Author

@galenwarren galenwarren Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xintongsong

Good suggestions, done in c258789.

I had to make one more change. It turns out that, if one creates a StorageOptions instance via StorageOptions.Builder#build without having provided credentials to the builder, the builder will use credentials defined via GOOGLE_APPLICATION_CREDENTIALS -- and return them via getCredentials -- and there doesn't seem to be a way to prevent this. This isn't a problem at runtime, but it does create problems for the tests if GOOGLE_APPLICATION_CREDENTIALS is defined in the environment; specifically, StorageOptions that we would expect to have no credentials via getCredentials in fact do have credentials defined.

So, I changed ConfigUtils#getStorageOptions to be ConfigUtils#getStorageCredentials instead, i.e.:

    public static Optional<GoogleCredentials> getStorageCredentials(
            org.apache.hadoop.conf.Configuration hadoopConfig, ConfigContext configContext) {

... which allows me to properly validate in unit tests whether credentials were created or not, avoiding the need to read them back out of a StorageOptions instance.

Copy link
Contributor

@xintongsong xintongsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my comments, @galenwarren. LGTM.

I'm taking over from here and merge both this and #18430. There's only one minor comment which I'll address myself while merging, as well as squashing the commits.

Comment on lines +49 to +55
@VisibleForTesting
static final String HADOOP_OPTION_ENABLE_SERVICE_ACCOUNT =
"google.cloud.auth.service.account.enable";

@VisibleForTesting
static final String HADOOP_OPTION_SERVICE_ACCOUNT_JSON_KEYFILE =
"google.cloud.auth.service.account.json.keyfile";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@VisibleForTesting
static final String HADOOP_OPTION_ENABLE_SERVICE_ACCOUNT =
"google.cloud.auth.service.account.enable";
@VisibleForTesting
static final String HADOOP_OPTION_SERVICE_ACCOUNT_JSON_KEYFILE =
"google.cloud.auth.service.account.json.keyfile";
private static final String HADOOP_OPTION_ENABLE_SERVICE_ACCOUNT =
"google.cloud.auth.service.account.enable";
private static final String HADOOP_OPTION_SERVICE_ACCOUNT_JSON_KEYFILE =
"google.cloud.auth.service.account.json.keyfile";

@galenwarren
Copy link
Contributor Author

@xintongsong Sounds great! With this and the associated docs, we're all done right? I just want to make sure I'm not forgetting anything. Thanks again for all your help on this.

niklassemmler pushed a commit to niklassemmler/flink that referenced this pull request Feb 3, 2022
gddezero pushed a commit to gddezero/flink that referenced this pull request Feb 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants