From 6d23f67c6515ebfeef972314b5c19bb02fc3d2ed Mon Sep 17 00:00:00 2001 From: Galen Warren <784517+galenwarren@users.noreply.github.com> Date: Mon, 24 Jan 2022 17:56:45 -0500 Subject: [PATCH 1/3] [FLINK-25790][flink-gs-fs-hadoop] Support RecoverableWriter auth via Hadoop config --- .../org/apache/flink/fs/gs/GSFileSystem.java | 68 ++++++++++-- .../flink/fs/gs/GSFileSystemFactory.java | 100 ++++++++++++++---- 2 files changed, 142 insertions(+), 26 deletions(-) diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java index 4ad382acd8582..e6d9244a535d9 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java @@ -24,12 +24,18 @@ import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.util.Preconditions; +import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; +import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Optional; + /** Provides recoverable-writer functionality for the standard GoogleHadoopFileSystem. */ class GSFileSystem extends HadoopFileSystem { @@ -39,17 +45,67 @@ class GSFileSystem extends HadoopFileSystem { GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, GSFileSystemOptions options) { super(Preconditions.checkNotNull(googleHadoopFileSystem)); - LOGGER.info("Creating GSFileSystem with options {}", options); - this.options = Preconditions.checkNotNull(options); + LOGGER.info("Creating GSFileSystem with options {}", options); } @Override - public RecoverableWriter createRecoverableWriter() { - LOGGER.info("Creating recoverable writer with options {}", options); + public RecoverableWriter createRecoverableWriter() throws IOException { + + // follow the same rules as for the Hadoop connector, i.e. + // 1) only use service credentials at all if Hadoop + // "google.cloud.auth.service.account.enable" is true (default: true) + // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, if supplied + // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as location of + // credentials, if supplied + // 4) use no credentials + + // store any credentials we are to use, here + Optional credentialsPath = Optional.empty(); + + // only look for credentials if service account support is enabled + Configuration hadoopConfig = getHadoopFileSystem().getConf(); + boolean enableServiceAccount = + hadoopConfig.getBoolean("google.cloud.auth.service.account.enable", true); + if (enableServiceAccount) { + + // load google application credentials, and then fall back to + // "google.cloud.auth.service.account.json.keyfile" from Hadoop + credentialsPath = Optional.ofNullable(System.getenv("GOOGLE_APPLICATION_CREDENTIALS")); + if (credentialsPath.isPresent()) { + LOGGER.info( + "Recoverable writer is using GOOGLE_APPLICATION_CREDENTIALS at {}", + credentialsPath.get()); + } else { + credentialsPath = + Optional.ofNullable( + hadoopConfig.get("google.cloud.auth.service.account.json.keyfile")); + credentialsPath.ifPresent( + path -> + LOGGER.info( + "Recoverable writer is using credentials from Hadoop at {}", + path)); + } + } - // create the Google storage service instance - Storage storage = StorageOptions.getDefaultInstance().getService(); + // construct the storage instance, using credentials if provided + Storage storage; + if (credentialsPath.isPresent()) { + LOGGER.info( + "Creating GSRecoverableWriter using credentials from {}", + credentialsPath.get()); + try (FileInputStream credentialsStream = new FileInputStream(credentialsPath.get())) { + GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsStream); + storage = + StorageOptions.newBuilder() + .setCredentials(credentials) + .build() + .getService(); + } + } else { + LOGGER.info("Creating GSRecoverableWriter using no credentials"); + storage = StorageOptions.newBuilder().build().getService(); + } // create the GS blob storage wrapper GSBlobStorageImpl blobStorage = new GSBlobStorageImpl(storage); diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java index b94f0c984d435..973e928977014 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java @@ -19,20 +19,26 @@ package org.apache.flink.fs.gs; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystemFactory; import org.apache.flink.runtime.util.HadoopConfigLoader; import org.apache.flink.util.Preconditions; import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; import java.net.URI; import java.util.Collections; +import java.util.Map; +import java.util.Optional; /** * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for @@ -53,22 +59,13 @@ public class GSFileSystemFactory implements FileSystemFactory { private static final String FLINK_SHADING_PREFIX = ""; - private final HadoopConfigLoader hadoopConfigLoader; - @Nullable private Configuration flinkConfig; + @Nullable private org.apache.hadoop.conf.Configuration hadoopConfig; + /** Constructs the Google Storage file system factory. */ public GSFileSystemFactory() { LOGGER.info("Creating GSFileSystemFactory"); - - this.hadoopConfigLoader = - new HadoopConfigLoader( - FLINK_CONFIG_PREFIXES, - MIRRORED_CONFIG_KEYS, - HADOOP_CONFIG_PREFIX, - Collections.emptySet(), - Collections.emptySet(), - FLINK_SHADING_PREFIX); } @Override @@ -76,7 +73,9 @@ public void configure(Configuration flinkConfig) { LOGGER.info("Configuring GSFileSystemFactory with Flink configuration {}", flinkConfig); this.flinkConfig = Preconditions.checkNotNull(flinkConfig); - hadoopConfigLoader.setFlinkConfig(flinkConfig); + this.hadoopConfig = getHadoopConfiguration(flinkConfig); + + LOGGER.info("Using Hadoop configuration {}", serializeHadoopConfig(hadoopConfig)); } @Override @@ -90,15 +89,13 @@ public FileSystem create(URI fsUri) throws IOException { Preconditions.checkNotNull(fsUri); - // create and configure the Google Hadoop file system - org.apache.hadoop.conf.Configuration hadoopConfig = - hadoopConfigLoader.getOrLoadHadoopConfig(); - LOGGER.info( - "Creating GoogleHadoopFileSystem for uri {} with Hadoop config {}", - fsUri, - hadoopConfig); + // initialize the Google Hadoop filesystem GoogleHadoopFileSystem googleHadoopFileSystem = new GoogleHadoopFileSystem(); - googleHadoopFileSystem.initialize(fsUri, hadoopConfig); + try { + googleHadoopFileSystem.initialize(fsUri, hadoopConfig); + } catch (IOException ex) { + throw new IOException("Failed to initialize GoogleHadoopFileSystem", ex); + } // construct the file system options GSFileSystemOptions options = new GSFileSystemOptions(flinkConfig); @@ -106,4 +103,67 @@ public FileSystem create(URI fsUri) throws IOException { // create the file system wrapper return new GSFileSystem(googleHadoopFileSystem, options); } + + /** + * Loads the hadoop configuration, in two steps. + * + *

1) Find a hadoop conf dir using CoreOptions.FLINK_HADOOP_CONF_DIR or the HADOOP_CONF_DIR + * environment variable, and load core-default.xml and core-site.xml from that location + * + *

2) Load hadoop conf from the Flink config, with translations defined above + * + *

... then merge together, such that keys from the second overwrite the first. + * + * @return The Hadoop configuration. + */ + private static org.apache.hadoop.conf.Configuration getHadoopConfiguration( + Configuration flinkConfig) { + + // create an empty hadoop configuration + org.apache.hadoop.conf.Configuration hadoopConfig = + new org.apache.hadoop.conf.Configuration(); + + // look for a hadoop configuration directory and load configuration from core-default.xml + // and core-site.xml + Optional hadoopConfigDir = + Optional.ofNullable(flinkConfig.get(CoreOptions.FLINK_HADOOP_CONF_DIR)); + if (!hadoopConfigDir.isPresent()) { + hadoopConfigDir = Optional.ofNullable(System.getenv("HADOOP_CONF_DIR")); + } + hadoopConfigDir.ifPresent( + configDir -> { + LOGGER.info("Loading system Hadoop config from {}", configDir); + hadoopConfig.addResource(new Path(configDir, "core-default.xml")); + hadoopConfig.addResource(new Path(configDir, "core-site.xml")); + hadoopConfig.reloadConfiguration(); + }); + + // now, load hadoop config from flink and copy key/value pairs into the base config + HadoopConfigLoader hadoopConfigLoader = + new HadoopConfigLoader( + FLINK_CONFIG_PREFIXES, + MIRRORED_CONFIG_KEYS, + HADOOP_CONFIG_PREFIX, + Collections.emptySet(), + Collections.emptySet(), + FLINK_SHADING_PREFIX); + hadoopConfigLoader.setFlinkConfig(flinkConfig); + org.apache.hadoop.conf.Configuration flinkHadoopConfig = + hadoopConfigLoader.getOrLoadHadoopConfig(); + for (Map.Entry entry : flinkHadoopConfig) { + hadoopConfig.set(entry.getKey(), entry.getValue()); + } + + return hadoopConfig; + } + + private String serializeHadoopConfig(org.apache.hadoop.conf.Configuration hadoopConfig) + throws RuntimeException { + try (Writer writer = new StringWriter()) { + org.apache.hadoop.conf.Configuration.dumpConfiguration(hadoopConfig, writer); + return writer.toString(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } } From 6d137f72504470a6be9d971b7e1bba1d8614cffa Mon Sep 17 00:00:00 2001 From: Galen Warren <784517+galenwarren@users.noreply.github.com> Date: Wed, 26 Jan 2022 11:40:10 -0500 Subject: [PATCH 2/3] [FLINK-25790][flink-gs-fs-hadoop] Refactor GSFileSystemFactory and GSFileSystem to support unit tests; add unit tests --- .../org/apache/flink/fs/gs/GSFileSystem.java | 83 +---- .../flink/fs/gs/GSFileSystemFactory.java | 138 ++++---- .../apache/flink/fs/gs/utils/ConfigUtils.java | 202 ++++++++++++ .../org/apache/flink/fs/gs/TestUtils.java | 34 ++ .../fs/gs/utils/ConfigUtilsHadoopTest.java | 303 ++++++++++++++++++ .../fs/gs/utils/ConfigUtilsStorageTest.java | 163 ++++++++++ 6 files changed, 773 insertions(+), 150 deletions(-) create mode 100644 flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java create mode 100644 flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsHadoopTest.java create mode 100644 flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsStorageTest.java diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java index e6d9244a535d9..f326562bc9f6c 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java @@ -24,93 +24,38 @@ import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.util.Preconditions; -import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; import com.google.cloud.storage.Storage; -import com.google.cloud.storage.StorageOptions; -import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Optional; - -/** Provides recoverable-writer functionality for the standard GoogleHadoopFileSystem. */ +/** FileSystem implementation that wraps GoogleHadoopFileSystem and supports RecoverableWriter. */ class GSFileSystem extends HadoopFileSystem { private static final Logger LOGGER = LoggerFactory.getLogger(GSFileSystem.class); - private final GSFileSystemOptions options; + private final GSFileSystemOptions fileSystemOptions; + + private final Storage storage; - GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, GSFileSystemOptions options) { + GSFileSystem( + GoogleHadoopFileSystem googleHadoopFileSystem, + Storage storage, + GSFileSystemOptions fileSystemOptions) { super(Preconditions.checkNotNull(googleHadoopFileSystem)); - this.options = Preconditions.checkNotNull(options); - LOGGER.info("Creating GSFileSystem with options {}", options); + this.fileSystemOptions = Preconditions.checkNotNull(fileSystemOptions); + this.storage = Preconditions.checkNotNull(storage); + LOGGER.info("Creating GSFileSystem with file-system options {}", fileSystemOptions); } @Override - public RecoverableWriter createRecoverableWriter() throws IOException { - - // follow the same rules as for the Hadoop connector, i.e. - // 1) only use service credentials at all if Hadoop - // "google.cloud.auth.service.account.enable" is true (default: true) - // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, if supplied - // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as location of - // credentials, if supplied - // 4) use no credentials - - // store any credentials we are to use, here - Optional credentialsPath = Optional.empty(); - - // only look for credentials if service account support is enabled - Configuration hadoopConfig = getHadoopFileSystem().getConf(); - boolean enableServiceAccount = - hadoopConfig.getBoolean("google.cloud.auth.service.account.enable", true); - if (enableServiceAccount) { - - // load google application credentials, and then fall back to - // "google.cloud.auth.service.account.json.keyfile" from Hadoop - credentialsPath = Optional.ofNullable(System.getenv("GOOGLE_APPLICATION_CREDENTIALS")); - if (credentialsPath.isPresent()) { - LOGGER.info( - "Recoverable writer is using GOOGLE_APPLICATION_CREDENTIALS at {}", - credentialsPath.get()); - } else { - credentialsPath = - Optional.ofNullable( - hadoopConfig.get("google.cloud.auth.service.account.json.keyfile")); - credentialsPath.ifPresent( - path -> - LOGGER.info( - "Recoverable writer is using credentials from Hadoop at {}", - path)); - } - } - - // construct the storage instance, using credentials if provided - Storage storage; - if (credentialsPath.isPresent()) { - LOGGER.info( - "Creating GSRecoverableWriter using credentials from {}", - credentialsPath.get()); - try (FileInputStream credentialsStream = new FileInputStream(credentialsPath.get())) { - GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsStream); - storage = - StorageOptions.newBuilder() - .setCredentials(credentials) - .build() - .getService(); - } - } else { - LOGGER.info("Creating GSRecoverableWriter using no credentials"); - storage = StorageOptions.newBuilder().build().getService(); - } + public RecoverableWriter createRecoverableWriter() { + LOGGER.info("Creating GSRecoverableWriter with file-system options {}", fileSystemOptions); // create the GS blob storage wrapper GSBlobStorageImpl blobStorage = new GSBlobStorageImpl(storage); // construct the recoverable writer with the blob storage wrapper and the options - return new GSRecoverableWriter(blobStorage, options); + return new GSRecoverableWriter(blobStorage, fileSystemOptions); } } diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java index 973e928977014..8b4b539816089 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java @@ -19,25 +19,24 @@ package org.apache.flink.fs.gs; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystemFactory; -import org.apache.flink.runtime.util.HadoopConfigLoader; +import org.apache.flink.fs.gs.utils.ConfigUtils; import org.apache.flink.util.Preconditions; +import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.FileInputStream; import java.io.IOException; -import java.io.StringWriter; -import java.io.Writer; import java.net.URI; -import java.util.Collections; -import java.util.Map; import java.util.Optional; /** @@ -51,17 +50,25 @@ public class GSFileSystemFactory implements FileSystemFactory { /** The scheme for the Google Storage file system. */ public static final String SCHEME = "gs"; - private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; - - private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX}; - - private static final String[][] MIRRORED_CONFIG_KEYS = {}; - - private static final String FLINK_SHADING_PREFIX = ""; + /** + * The Hadoop, formed by combining system Hadoop config with properties defined in Flink config. + */ + @Nullable private org.apache.hadoop.conf.Configuration hadoopConfig; - @Nullable private Configuration flinkConfig; + /** The options used for GSFileSystem and RecoverableWriter. */ + @Nullable private GSFileSystemOptions fileSystemOptions; - @Nullable private org.apache.hadoop.conf.Configuration hadoopConfig; + /** + * Though it isn't documented as clearly as one might expect, the methods on this object are + * threadsafe, so we can safely share a single instance among all file system instances. + * + *

Issue that discusses pending docs is here: + * https://github.com/googleapis/google-cloud-java/issues/1238 + * + *

StackOverflow discussion: + * https://stackoverflow.com/questions/54516284/google-cloud-storage-java-client-pooling + */ + @Nullable private Storage storage; /** Constructs the Google Storage file system factory. */ public GSFileSystemFactory() { @@ -72,10 +79,18 @@ public GSFileSystemFactory() { public void configure(Configuration flinkConfig) { LOGGER.info("Configuring GSFileSystemFactory with Flink configuration {}", flinkConfig); - this.flinkConfig = Preconditions.checkNotNull(flinkConfig); - this.hadoopConfig = getHadoopConfiguration(flinkConfig); + Preconditions.checkNotNull(flinkConfig); + + ConfigUtils.ConfigContext configContext = new RuntimeConfigContext(); + + this.hadoopConfig = ConfigUtils.getHadoopConfiguration(flinkConfig, configContext); + LOGGER.info( + "Using Hadoop configuration {}", ConfigUtils.stringifyHadoopConfig(hadoopConfig)); - LOGGER.info("Using Hadoop configuration {}", serializeHadoopConfig(hadoopConfig)); + this.fileSystemOptions = new GSFileSystemOptions(flinkConfig); + LOGGER.info("Using file system options {}", fileSystemOptions); + + this.storage = ConfigUtils.getStorageOptions(hadoopConfig, configContext).getService(); } @Override @@ -89,7 +104,7 @@ public FileSystem create(URI fsUri) throws IOException { Preconditions.checkNotNull(fsUri); - // initialize the Google Hadoop filesystem + // create the Google Hadoop file system GoogleHadoopFileSystem googleHadoopFileSystem = new GoogleHadoopFileSystem(); try { googleHadoopFileSystem.initialize(fsUri, hadoopConfig); @@ -97,73 +112,34 @@ public FileSystem create(URI fsUri) throws IOException { throw new IOException("Failed to initialize GoogleHadoopFileSystem", ex); } - // construct the file system options - GSFileSystemOptions options = new GSFileSystemOptions(flinkConfig); - - // create the file system wrapper - return new GSFileSystem(googleHadoopFileSystem, options); + // create the file system + return new GSFileSystem(googleHadoopFileSystem, storage, fileSystemOptions); } - /** - * Loads the hadoop configuration, in two steps. - * - *

1) Find a hadoop conf dir using CoreOptions.FLINK_HADOOP_CONF_DIR or the HADOOP_CONF_DIR - * environment variable, and load core-default.xml and core-site.xml from that location - * - *

2) Load hadoop conf from the Flink config, with translations defined above - * - *

... then merge together, such that keys from the second overwrite the first. - * - * @return The Hadoop configuration. - */ - private static org.apache.hadoop.conf.Configuration getHadoopConfiguration( - Configuration flinkConfig) { - - // create an empty hadoop configuration - org.apache.hadoop.conf.Configuration hadoopConfig = - new org.apache.hadoop.conf.Configuration(); - - // look for a hadoop configuration directory and load configuration from core-default.xml - // and core-site.xml - Optional hadoopConfigDir = - Optional.ofNullable(flinkConfig.get(CoreOptions.FLINK_HADOOP_CONF_DIR)); - if (!hadoopConfigDir.isPresent()) { - hadoopConfigDir = Optional.ofNullable(System.getenv("HADOOP_CONF_DIR")); - } - hadoopConfigDir.ifPresent( - configDir -> { - LOGGER.info("Loading system Hadoop config from {}", configDir); - hadoopConfig.addResource(new Path(configDir, "core-default.xml")); - hadoopConfig.addResource(new Path(configDir, "core-site.xml")); - hadoopConfig.reloadConfiguration(); - }); - - // now, load hadoop config from flink and copy key/value pairs into the base config - HadoopConfigLoader hadoopConfigLoader = - new HadoopConfigLoader( - FLINK_CONFIG_PREFIXES, - MIRRORED_CONFIG_KEYS, - HADOOP_CONFIG_PREFIX, - Collections.emptySet(), - Collections.emptySet(), - FLINK_SHADING_PREFIX); - hadoopConfigLoader.setFlinkConfig(flinkConfig); - org.apache.hadoop.conf.Configuration flinkHadoopConfig = - hadoopConfigLoader.getOrLoadHadoopConfig(); - for (Map.Entry entry : flinkHadoopConfig) { - hadoopConfig.set(entry.getKey(), entry.getValue()); + /** Config context implementation used at runtime. */ + private static class RuntimeConfigContext implements ConfigUtils.ConfigContext { + + @Override + public Optional getenv(String name) { + return Optional.ofNullable(System.getenv(name)); } - return hadoopConfig; - } + @Override + public void addHadoopResourcesFromDir( + org.apache.hadoop.conf.Configuration config, String configDir) { + config.addResource(new Path(configDir, "core-default.xml")); + config.addResource(new Path(configDir, "core-site.xml")); + } - private String serializeHadoopConfig(org.apache.hadoop.conf.Configuration hadoopConfig) - throws RuntimeException { - try (Writer writer = new StringWriter()) { - org.apache.hadoop.conf.Configuration.dumpConfiguration(hadoopConfig, writer); - return writer.toString(); - } catch (IOException ex) { - throw new RuntimeException(ex); + @Override + public void setStorageCredentialsFromFile( + StorageOptions.Builder storageOptionsBuilder, String credentialsPath) { + try (FileInputStream credentialsStream = new FileInputStream(credentialsPath)) { + GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsStream); + storageOptionsBuilder.setCredentials(credentials); + } catch (IOException ex) { + throw new RuntimeException(ex); + } } } } diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java new file mode 100644 index 0000000000000..8861e4afc2aa7 --- /dev/null +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.util.HadoopConfigLoader; + +import com.google.cloud.storage.StorageOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.util.Collections; +import java.util.Optional; + +/** Utilities class for configuration of Hadoop and Google Storage. */ +public class ConfigUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigUtils.class); + + private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; + + private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX}; + + private static final String[][] MIRRORED_CONFIG_KEYS = {}; + + private static final String FLINK_SHADING_PREFIX = ""; + + /** + * Loads the Hadoop configuration, by loading from a Hadoop conf dir (if one exists) and then + * overlaying properties derived from the Flink config. + * + * @param flinkConfig The Flink config + * @param configContext The config context. + * @return The Hadoop config. + */ + public static org.apache.hadoop.conf.Configuration getHadoopConfiguration( + Configuration flinkConfig, ConfigContext configContext) { + + // create a starting hadoop configuration + org.apache.hadoop.conf.Configuration hadoopConfig = + new org.apache.hadoop.conf.Configuration(); + + // look for a hadoop configuration directory and load configuration from it if found + Optional hadoopConfigDir = + Optional.ofNullable(flinkConfig.get(CoreOptions.FLINK_HADOOP_CONF_DIR)); + if (!hadoopConfigDir.isPresent()) { + hadoopConfigDir = configContext.getenv("HADOOP_CONF_DIR"); + } + hadoopConfigDir.ifPresent( + configDir -> { + LOGGER.info("Loading Hadoop config resources from {}", configDir); + configContext.addHadoopResourcesFromDir(hadoopConfig, configDir); + }); + + // now, load hadoop config from flink and add to base hadoop config + HadoopConfigLoader hadoopConfigLoader = + new HadoopConfigLoader( + FLINK_CONFIG_PREFIXES, + MIRRORED_CONFIG_KEYS, + HADOOP_CONFIG_PREFIX, + Collections.emptySet(), + Collections.emptySet(), + FLINK_SHADING_PREFIX); + hadoopConfigLoader.setFlinkConfig(flinkConfig); + org.apache.hadoop.conf.Configuration flinkHadoopConfig = + hadoopConfigLoader.getOrLoadHadoopConfig(); + hadoopConfig.addResource(flinkHadoopConfig); + + // reload the config resources and return it + hadoopConfig.reloadConfiguration(); + return hadoopConfig; + } + + /** + * Creates a StorageOptions instance for the given Hadoop config and environment. + * + * @param hadoopConfig The Hadoop config. + * @param configContext The config context. + * @return The StorageOptions instance. + */ + public static StorageOptions getStorageOptions( + org.apache.hadoop.conf.Configuration hadoopConfig, ConfigContext configContext) { + + // follow the same rules as for the Hadoop connector, i.e. + // 1) only use service credentials at all if Hadoop + // "google.cloud.auth.service.account.enable" is true (default: true) + // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, if supplied + // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as location of + // credentials, if supplied + // 4) use no credentials + + // store any credentials we are to use, here + Optional credentialsPath = Optional.empty(); + + // only look for credentials if service account support is enabled + boolean enableServiceAccount = + hadoopConfig.getBoolean("google.cloud.auth.service.account.enable", true); + if (enableServiceAccount) { + + // load google application credentials, and then fall back to + // "google.cloud.auth.service.account.json.keyfile" from Hadoop + credentialsPath = configContext.getenv("GOOGLE_APPLICATION_CREDENTIALS"); + if (credentialsPath.isPresent()) { + LOGGER.info( + "GSRecoverableWriter is using GOOGLE_APPLICATION_CREDENTIALS at {}", + credentialsPath.get()); + } else { + credentialsPath = + Optional.ofNullable( + hadoopConfig.get("google.cloud.auth.service.account.json.keyfile")); + credentialsPath.ifPresent( + path -> + LOGGER.info( + "GSRecoverableWriter is using credentials from Hadoop at {}", + path)); + } + } + + // construct the storage options + StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder(); + if (credentialsPath.isPresent()) { + LOGGER.info( + "Creating GSRecoverableWriter using credentials from {}", + credentialsPath.get()); + configContext.setStorageCredentialsFromFile( + storageOptionsBuilder, credentialsPath.get()); + } else { + LOGGER.info("Creating GSRecoverableWriter using no credentials"); + } + + return storageOptionsBuilder.build(); + } + + /** + * Helper to serialize a Hadoop config to a string, for logging. + * + * @param hadoopConfig The Hadoop config. + * @return A string with the Hadoop properties. + * @throws RuntimeException On underlying IO failure + */ + public static String stringifyHadoopConfig(org.apache.hadoop.conf.Configuration hadoopConfig) + throws RuntimeException { + try (Writer writer = new StringWriter()) { + org.apache.hadoop.conf.Configuration.dumpConfiguration(hadoopConfig, writer); + return writer.toString(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + /** + * Interface that provides context-specific config helper functions, factored out to support + * unit testing. * + */ + public interface ConfigContext { + /** + * Returns a named environment variable. + * + * @param name Name of variable + * @return Value of variable + */ + Optional getenv(String name); + + /** + * Adds resources to the Hadoop configuration for the provided Hadoop config dir directory. + * + * @param config The Hadoop configuration. + * @param configDir The Hadoop config directory. + */ + void addHadoopResourcesFromDir( + org.apache.hadoop.conf.Configuration config, String configDir); + + /** + * Assigns credentials to the storage options builder from credentials at the given path. + * + * @param storageOptionsBuilder The storage options builder. + * @param credentialsPath The path of the credentials file. + */ + void setStorageCredentialsFromFile( + StorageOptions.Builder storageOptionsBuilder, String credentialsPath); + } +} diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/TestUtils.java b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/TestUtils.java index 8522f86b04e93..50dc75b7a9eb1 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/TestUtils.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/TestUtils.java @@ -18,8 +18,42 @@ package org.apache.flink.fs.gs; +import java.util.HashMap; +import java.util.Map; + /** Constants for testing purposes. */ public class TestUtils { public static final long RANDOM_SEED = 27; + + /** + * Helper to create a hadoop configuration from a map. + * + * @param values Map of values + * @return Hadoop config + */ + public static org.apache.hadoop.conf.Configuration hadoopConfigFromMap( + Map values) { + org.apache.hadoop.conf.Configuration hadoopConfig = + new org.apache.hadoop.conf.Configuration(); + for (Map.Entry entry : values.entrySet()) { + hadoopConfig.set(entry.getKey(), entry.getValue()); + } + return hadoopConfig; + } + + /** + * Helper to translate Hadoop config to a map. + * + * @param hadoopConfig The Hadoop config + * @return The map of keys/values + */ + public static Map hadoopConfigToMap( + org.apache.hadoop.conf.Configuration hadoopConfig) { + HashMap map = new HashMap<>(); + for (Map.Entry entry : hadoopConfig) { + map.put(entry.getKey(), entry.getValue()); + } + return map; + } } diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsHadoopTest.java b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsHadoopTest.java new file mode 100644 index 0000000000000..98c313fcf823a --- /dev/null +++ b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsHadoopTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.fs.gs.TestUtils; + +import org.apache.flink.shaded.guava30.com.google.common.collect.MapDifference; +import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; + +import com.google.cloud.storage.StorageOptions; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Test construction of Hadoop config in GSFileSystemFactory. */ +@RunWith(Parameterized.class) +public class ConfigUtilsHadoopTest { + + /* The test case description. */ + @Parameterized.Parameter(value = 0) + public String description; + + /* The value to use for the HADOOP_CONF_DIR environment variable. */ + @Parameterized.Parameter(value = 1) + public @Nullable String envHadoopConfDir; + + /* The value to use for the Flink config. */ + @Parameterized.Parameter(value = 2) + public Configuration flinkConfig; + + /* The additional Hadoop resources to add to the hadoop config when hadoop conf dir is present. */ + @Parameterized.Parameter(value = 3) + public org.apache.hadoop.conf.Configuration additionalHadoopConfig; + + /* The expected Hadoop configuration. */ + @Parameterized.Parameter(value = 4) + public org.apache.hadoop.conf.Configuration expectedHadoopConfig; + + @Parameterized.Parameters(name = "description={0}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + { + "no env hadoop conf dir, no flink hadoop conf dir, no flink hadoop options, no hadoop options in conf dir", + null, + Configuration.fromMap(new HashMap<>()), + TestUtils.hadoopConfigFromMap(new HashMap<>()), + TestUtils.hadoopConfigFromMap(new HashMap<>()), + }, + { + "no env hadoop conf dir, no flink hadoop conf dir, no flink hadoop options, hadoop options in conf dir", + null, + Configuration.fromMap(new HashMap<>()), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("fs.gs.project.id", "project-id"); + } + }), + TestUtils.hadoopConfigFromMap(new HashMap<>()), + }, + { + "env hadoop conf dir, no flink hadoop conf dir, no flink hadoop options, hadoop options in conf dir", + "/hadoop/conf", + Configuration.fromMap(new HashMap<>()), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("fs.gs.project.id", "project-id"); + } + }), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("fs.gs.project.id", "project-id"); + } + }), + }, + { + "no env hadoop conf dir, flink hadoop conf dir, no flink hadoop options, hadoop options in conf dir", + null, + Configuration.fromMap( + new HashMap() { + { + put("env.hadoop.conf.dir", "/hadoop/conf"); + } + }), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("fs.gs.project.id", "project-id"); + } + }), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("fs.gs.project.id", "project-id"); + } + }), + }, + { + "env hadoop conf dir, flink hadoop conf dir, no flink hadoop options, hadoop options in conf dir", + "/hadoop/conf1", + Configuration.fromMap( + new HashMap() { + { + put("env.hadoop.conf.dir", "/hadoop/conf2"); + } + }), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("fs.gs.project.id", "project-id"); + } + }), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("fs.gs.project.id", "project-id"); + } + }), + }, + { + "env hadoop conf dir, no flink hadoop conf dir, flink hadoop options, hadoop options in conf dir", + "/hadoop/conf", + Configuration.fromMap( + new HashMap() { + { + put("gs.block.size", "10000"); + } + }), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("fs.gs.project.id", "project-id"); + } + }), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("fs.gs.block.size", "10000"); + put("fs.gs.project.id", "project-id"); + } + }), + }, + { + "env hadoop conf dir, no flink hadoop conf dir, flink hadoop options, hadoop options in conf dir, hadoop options overlap", + "/hadoop/conf", + Configuration.fromMap( + new HashMap() { + { + put("gs.project.id", "project-id-1"); + } + }), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("fs.gs.project.id", "project-id-2"); + } + }), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("fs.gs.project.id", "project-id-1"); + } + }), + }, + { + "env hadoop conf dir, no flink hadoop conf dir, no flink hadoop options, hadoop enabled auth options in conf dir", + "/hadoop/conf", + Configuration.fromMap(new HashMap<>()), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("google.cloud.auth.service.account.enable", "true"); + put( + "google.cloud.auth.service.account.json.keyfile", + "/opt/file.json"); + } + }), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("google.cloud.auth.service.account.enable", "true"); + put( + "google.cloud.auth.service.account.json.keyfile", + "/opt/file.json"); + } + }), + }, + { + "env hadoop conf dir, no flink hadoop conf dir, no flink hadoop options, hadoop disabled auth options in conf dir", + "/hadoop/conf", + Configuration.fromMap(new HashMap<>()), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("google.cloud.auth.service.account.enable", "false"); + put( + "google.cloud.auth.service.account.json.keyfile", + "/opt/file.json"); + } + }), + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("google.cloud.auth.service.account.enable", "false"); + put( + "google.cloud.auth.service.account.json.keyfile", + "/opt/file.json"); + } + }), + }, + }); + } + + private @Nullable String expectedHadoopConfigDir; + + @Before + public void before() { + + // determine which hadoop conf dir we expect to receive + expectedHadoopConfigDir = flinkConfig.get(CoreOptions.FLINK_HADOOP_CONF_DIR); + if (expectedHadoopConfigDir == null) { + expectedHadoopConfigDir = envHadoopConfDir; + } + } + + @Test + public void shouldProperlyCreateHadoopConfig() { + final String[] actualHadoopConfigDir = {null}; + + // get the hadoop configuration + org.apache.hadoop.conf.Configuration hadoopConfig = + ConfigUtils.getHadoopConfiguration( + flinkConfig, + new ConfigUtils.ConfigContext() { + @Override + public Optional getenv(String name) { + if ("HADOOP_CONF_DIR".equals(name)) { + return Optional.ofNullable(envHadoopConfDir); + } + return Optional.empty(); + } + + @Override + public void addHadoopResourcesFromDir( + org.apache.hadoop.conf.Configuration config, String configDir) { + actualHadoopConfigDir[0] = configDir; + config.addResource(additionalHadoopConfig); + } + + @Override + public void setStorageCredentialsFromFile( + StorageOptions.Builder storageOptionsBuilder, + String credentialsPath) { + throw new UnsupportedOperationException(); + } + }); + + assertEquals(expectedHadoopConfigDir, actualHadoopConfigDir[0]); + + Map expectedHadoopConfigMap = + TestUtils.hadoopConfigToMap(expectedHadoopConfig); + Map hadoopConfigMap = TestUtils.hadoopConfigToMap(hadoopConfig); + MapDifference difference = + Maps.difference(expectedHadoopConfigMap, hadoopConfigMap); + + assertEquals(Collections.EMPTY_MAP, difference.entriesDiffering()); + assertEquals(Collections.EMPTY_MAP, difference.entriesOnlyOnLeft()); + assertEquals(Collections.EMPTY_MAP, difference.entriesOnlyOnRight()); + } +} diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsStorageTest.java b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsStorageTest.java new file mode 100644 index 0000000000000..fdbdb53781ba9 --- /dev/null +++ b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsStorageTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.utils; + +import org.apache.flink.fs.gs.TestUtils; + +import com.google.cloud.storage.StorageOptions; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** Test construction of Storage instance in GSFileSystemFactory. */ +@RunWith(Parameterized.class) +public class ConfigUtilsStorageTest { + + /* The test case description. */ + @Parameterized.Parameter(value = 0) + public String description; + + /* The value to use for the GOOGLE_APPLICATION_CREDENTIALS environment variable. */ + @Parameterized.Parameter(value = 1) + public @Nullable String envGoogleApplicationCredentials; + + /* The Hadoop config. */ + @Parameterized.Parameter(value = 2) + public org.apache.hadoop.conf.Configuration hadoopConfig; + + /* The expected credentials file to use. */ + @Parameterized.Parameter(value = 3) + public @Nullable String expectedCredentialsFile; + + @Parameterized.Parameters(name = "description={0}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + { + "no GAC in env, no credentials in hadoop conf", + null, + TestUtils.hadoopConfigFromMap(new HashMap<>()), + null, + }, + { + "GAC in env, no credentials in hadoop conf", + "/opt/file.json", + TestUtils.hadoopConfigFromMap(new HashMap<>()), + "/opt/file.json", + }, + { + "no GAC in env, credentials in hadoop conf", + null, + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put( + "google.cloud.auth.service.account.json.keyfile", + "/opt/file.json"); + } + }), + "/opt/file.json", + }, + { + "GAC in env, credentials in hadoop conf, GAC should take precedence", + "/opt/file1.json", + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put( + "google.cloud.auth.service.account.json.keyfile", + "/opt/file2.json"); + } + }), + "/opt/file1.json", + }, + { + "GAC in env, no credentials in hadoop conf, service accounts disabled", + "/opt/file.json", + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("google.cloud.auth.service.account.enable", "false"); + } + }), + null, + }, + { + "no GAC in env, credentials in hadoop conf, service accounts disabled", + null, + TestUtils.hadoopConfigFromMap( + new HashMap() { + { + put("google.cloud.auth.service.account.enable", "false"); + put( + "google.cloud.auth.service.account.json.keyfile", + "/opt/file.json"); + } + }), + null, + }, + }); + } + + @Test + public void shouldProperlyCreateStorageOptions() { + + final String[] actualCredentialsPath = {null}; + + StorageOptions storageOptions = + ConfigUtils.getStorageOptions( + hadoopConfig, + new ConfigUtils.ConfigContext() { + @Override + public Optional getenv(String name) { + if ("GOOGLE_APPLICATION_CREDENTIALS".equals(name)) { + return Optional.ofNullable(envGoogleApplicationCredentials); + } + return Optional.empty(); + } + + @Override + public void addHadoopResourcesFromDir( + Configuration config, String configDir) { + throw new UnsupportedOperationException(); + } + + @Override + public void setStorageCredentialsFromFile( + StorageOptions.Builder storageOptionsBuilder, + String credentialsPath) { + actualCredentialsPath[0] = credentialsPath; + } + }); + + assertNotNull(storageOptions); + assertEquals(expectedCredentialsFile, actualCredentialsPath[0]); + } +} From c258789c884f9a5733c59e53f213ae635520a1e3 Mon Sep 17 00:00:00 2001 From: Galen Warren <784517+galenwarren@users.noreply.github.com> Date: Thu, 27 Jan 2022 11:18:46 -0500 Subject: [PATCH 3/3] [FLINK-25790][flink-gs-fs-hadoop] Improve ConfigContext and update associated unit tests --- .../org/apache/flink/fs/gs/GSFileSystem.java | 1 - .../flink/fs/gs/GSFileSystemFactory.java | 28 ++++--- .../apache/flink/fs/gs/utils/ConfigUtils.java | 48 ++++++----- .../fs/gs/utils/ConfigUtilsHadoopTest.java | 80 +++++++------------ .../fs/gs/utils/ConfigUtilsStorageTest.java | 61 ++++++-------- .../fs/gs/utils/TestingConfigContext.java | 59 ++++++++++++++ 6 files changed, 160 insertions(+), 117 deletions(-) create mode 100644 flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/TestingConfigContext.java diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java index f326562bc9f6c..3e9d2c66bf84b 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java @@ -45,7 +45,6 @@ class GSFileSystem extends HadoopFileSystem { super(Preconditions.checkNotNull(googleHadoopFileSystem)); this.fileSystemOptions = Preconditions.checkNotNull(fileSystemOptions); this.storage = Preconditions.checkNotNull(storage); - LOGGER.info("Creating GSFileSystem with file-system options {}", fileSystemOptions); } @Override diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java index 8b4b539816089..052b6aaddc39b 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java @@ -83,14 +83,21 @@ public void configure(Configuration flinkConfig) { ConfigUtils.ConfigContext configContext = new RuntimeConfigContext(); + // load Hadoop config this.hadoopConfig = ConfigUtils.getHadoopConfiguration(flinkConfig, configContext); LOGGER.info( "Using Hadoop configuration {}", ConfigUtils.stringifyHadoopConfig(hadoopConfig)); + // construct file-system options this.fileSystemOptions = new GSFileSystemOptions(flinkConfig); LOGGER.info("Using file system options {}", fileSystemOptions); - this.storage = ConfigUtils.getStorageOptions(hadoopConfig, configContext).getService(); + // get storage credentials and construct Storage instance + Optional credentials = + ConfigUtils.getStorageCredentials(hadoopConfig, configContext); + StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder(); + credentials.ifPresent(storageOptionsBuilder::setCredentials); + this.storage = storageOptionsBuilder.build().getService(); } @Override @@ -100,7 +107,7 @@ public String getScheme() { @Override public FileSystem create(URI fsUri) throws IOException { - LOGGER.info("Creating GS file system for uri {}", fsUri); + LOGGER.info("Creating GSFileSystem for uri {} with options {}", fsUri, fileSystemOptions); Preconditions.checkNotNull(fsUri); @@ -125,18 +132,19 @@ public Optional getenv(String name) { } @Override - public void addHadoopResourcesFromDir( - org.apache.hadoop.conf.Configuration config, String configDir) { - config.addResource(new Path(configDir, "core-default.xml")); - config.addResource(new Path(configDir, "core-site.xml")); + public org.apache.hadoop.conf.Configuration loadHadoopConfigFromDir(String configDir) { + org.apache.hadoop.conf.Configuration hadoopConfig = + new org.apache.hadoop.conf.Configuration(); + hadoopConfig.addResource(new Path(configDir, "core-default.xml")); + hadoopConfig.addResource(new Path(configDir, "core-site.xml")); + hadoopConfig.reloadConfiguration(); + return hadoopConfig; } @Override - public void setStorageCredentialsFromFile( - StorageOptions.Builder storageOptionsBuilder, String credentialsPath) { + public GoogleCredentials loadStorageCredentialsFromFile(String credentialsPath) { try (FileInputStream credentialsStream = new FileInputStream(credentialsPath)) { - GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsStream); - storageOptionsBuilder.setCredentials(credentials); + return GoogleCredentials.fromStream(credentialsStream); } catch (IOException ex) { throw new RuntimeException(ex); } diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java index 8861e4afc2aa7..904d78377720a 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java @@ -18,11 +18,12 @@ package org.apache.flink.fs.gs.utils; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.util.HadoopConfigLoader; -import com.google.cloud.storage.StorageOptions; +import com.google.auth.oauth2.GoogleCredentials; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,14 @@ public class ConfigUtils { private static final String FLINK_SHADING_PREFIX = ""; + @VisibleForTesting + static final String HADOOP_OPTION_ENABLE_SERVICE_ACCOUNT = + "google.cloud.auth.service.account.enable"; + + @VisibleForTesting + static final String HADOOP_OPTION_SERVICE_ACCOUNT_JSON_KEYFILE = + "google.cloud.auth.service.account.json.keyfile"; + /** * Loads the Hadoop configuration, by loading from a Hadoop conf dir (if one exists) and then * overlaying properties derived from the Flink config. @@ -69,7 +78,7 @@ public static org.apache.hadoop.conf.Configuration getHadoopConfiguration( hadoopConfigDir.ifPresent( configDir -> { LOGGER.info("Loading Hadoop config resources from {}", configDir); - configContext.addHadoopResourcesFromDir(hadoopConfig, configDir); + hadoopConfig.addResource(configContext.loadHadoopConfigFromDir(configDir)); }); // now, load hadoop config from flink and add to base hadoop config @@ -92,13 +101,13 @@ public static org.apache.hadoop.conf.Configuration getHadoopConfiguration( } /** - * Creates a StorageOptions instance for the given Hadoop config and environment. + * Creates an (optional) GoogleCredentials instance for the given Hadoop config and environment. * * @param hadoopConfig The Hadoop config. * @param configContext The config context. - * @return The StorageOptions instance. + * @return The optional GoogleCredentials instance. */ - public static StorageOptions getStorageOptions( + public static Optional getStorageCredentials( org.apache.hadoop.conf.Configuration hadoopConfig, ConfigContext configContext) { // follow the same rules as for the Hadoop connector, i.e. @@ -114,7 +123,7 @@ public static StorageOptions getStorageOptions( // only look for credentials if service account support is enabled boolean enableServiceAccount = - hadoopConfig.getBoolean("google.cloud.auth.service.account.enable", true); + hadoopConfig.getBoolean(HADOOP_OPTION_ENABLE_SERVICE_ACCOUNT, true); if (enableServiceAccount) { // load google application credentials, and then fall back to @@ -127,7 +136,7 @@ public static StorageOptions getStorageOptions( } else { credentialsPath = Optional.ofNullable( - hadoopConfig.get("google.cloud.auth.service.account.json.keyfile")); + hadoopConfig.get(HADOOP_OPTION_SERVICE_ACCOUNT_JSON_KEYFILE)); credentialsPath.ifPresent( path -> LOGGER.info( @@ -136,19 +145,18 @@ public static StorageOptions getStorageOptions( } } - // construct the storage options - StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder(); + // if we have a credentials path, load and return the credentials; otherwise, return empty if (credentialsPath.isPresent()) { LOGGER.info( "Creating GSRecoverableWriter using credentials from {}", credentialsPath.get()); - configContext.setStorageCredentialsFromFile( - storageOptionsBuilder, credentialsPath.get()); + GoogleCredentials credentials = + configContext.loadStorageCredentialsFromFile(credentialsPath.get()); + return Optional.of(credentials); } else { LOGGER.info("Creating GSRecoverableWriter using no credentials"); + return Optional.empty(); } - - return storageOptionsBuilder.build(); } /** @@ -182,21 +190,19 @@ public interface ConfigContext { Optional getenv(String name); /** - * Adds resources to the Hadoop configuration for the provided Hadoop config dir directory. + * Loads the Hadoop configuration from a directory. * - * @param config The Hadoop configuration. * @param configDir The Hadoop config directory. + * @return The Hadoop configuration. */ - void addHadoopResourcesFromDir( - org.apache.hadoop.conf.Configuration config, String configDir); + org.apache.hadoop.conf.Configuration loadHadoopConfigFromDir(String configDir); /** - * Assigns credentials to the storage options builder from credentials at the given path. + * Loads the Google credentials from a file. * - * @param storageOptionsBuilder The storage options builder. * @param credentialsPath The path of the credentials file. + * @return The Google credentials. */ - void setStorageCredentialsFromFile( - StorageOptions.Builder storageOptionsBuilder, String credentialsPath); + GoogleCredentials loadStorageCredentialsFromFile(String credentialsPath); } } diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsHadoopTest.java b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsHadoopTest.java index 98c313fcf823a..032d5105be56d 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsHadoopTest.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsHadoopTest.java @@ -19,14 +19,11 @@ package org.apache.flink.fs.gs.utils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.fs.gs.TestUtils; import org.apache.flink.shaded.guava30.com.google.common.collect.MapDifference; import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; -import com.google.cloud.storage.StorageOptions; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -38,7 +35,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -58,12 +54,16 @@ public class ConfigUtilsHadoopTest { @Parameterized.Parameter(value = 2) public Configuration flinkConfig; - /* The additional Hadoop resources to add to the hadoop config when hadoop conf dir is present. */ + /* The Hadoop resources to load from the config dir. */ @Parameterized.Parameter(value = 3) - public org.apache.hadoop.conf.Configuration additionalHadoopConfig; + public org.apache.hadoop.conf.Configuration loadedHadoopConfig; - /* The expected Hadoop configuration. */ + /* The expected Hadoop configuration directory. */ @Parameterized.Parameter(value = 4) + public String expectedHadoopConfigDir; + + /* The expected Hadoop configuration. */ + @Parameterized.Parameter(value = 5) public org.apache.hadoop.conf.Configuration expectedHadoopConfig; @Parameterized.Parameters(name = "description={0}") @@ -75,6 +75,7 @@ public static Collection data() { null, Configuration.fromMap(new HashMap<>()), TestUtils.hadoopConfigFromMap(new HashMap<>()), + null, TestUtils.hadoopConfigFromMap(new HashMap<>()), }, { @@ -87,6 +88,7 @@ public static Collection data() { put("fs.gs.project.id", "project-id"); } }), + null, TestUtils.hadoopConfigFromMap(new HashMap<>()), }, { @@ -99,6 +101,7 @@ public static Collection data() { put("fs.gs.project.id", "project-id"); } }), + "/hadoop/conf", TestUtils.hadoopConfigFromMap( new HashMap() { { @@ -121,6 +124,7 @@ public static Collection data() { put("fs.gs.project.id", "project-id"); } }), + "/hadoop/conf", TestUtils.hadoopConfigFromMap( new HashMap() { { @@ -143,6 +147,7 @@ public static Collection data() { put("fs.gs.project.id", "project-id"); } }), + "/hadoop/conf2", TestUtils.hadoopConfigFromMap( new HashMap() { { @@ -165,6 +170,7 @@ public static Collection data() { put("fs.gs.project.id", "project-id"); } }), + "/hadoop/conf", TestUtils.hadoopConfigFromMap( new HashMap() { { @@ -188,6 +194,7 @@ public static Collection data() { put("fs.gs.project.id", "project-id-2"); } }), + "/hadoop/conf", TestUtils.hadoopConfigFromMap( new HashMap() { { @@ -208,6 +215,7 @@ public static Collection data() { "/opt/file.json"); } }), + "/hadoop/conf", TestUtils.hadoopConfigFromMap( new HashMap() { { @@ -231,6 +239,7 @@ public static Collection data() { "/opt/file.json"); } }), + "/hadoop/conf", TestUtils.hadoopConfigFromMap( new HashMap() { { @@ -240,62 +249,35 @@ public static Collection data() { "/opt/file.json"); } }), - }, + } }); } - private @Nullable String expectedHadoopConfigDir; - - @Before - public void before() { - - // determine which hadoop conf dir we expect to receive - expectedHadoopConfigDir = flinkConfig.get(CoreOptions.FLINK_HADOOP_CONF_DIR); - if (expectedHadoopConfigDir == null) { - expectedHadoopConfigDir = envHadoopConfDir; - } - } - @Test public void shouldProperlyCreateHadoopConfig() { - final String[] actualHadoopConfigDir = {null}; + + // construct the testing config context + HashMap envs = new HashMap<>(); + if (envHadoopConfDir != null) { + envs.put("HADOOP_CONF_DIR", envHadoopConfDir); + } + HashMap hadoopConfigs = new HashMap<>(); + if (expectedHadoopConfigDir != null) { + hadoopConfigs.put(expectedHadoopConfigDir, loadedHadoopConfig); + } + TestingConfigContext configContext = + new TestingConfigContext(envs, hadoopConfigs, new HashMap<>()); // get the hadoop configuration org.apache.hadoop.conf.Configuration hadoopConfig = - ConfigUtils.getHadoopConfiguration( - flinkConfig, - new ConfigUtils.ConfigContext() { - @Override - public Optional getenv(String name) { - if ("HADOOP_CONF_DIR".equals(name)) { - return Optional.ofNullable(envHadoopConfDir); - } - return Optional.empty(); - } - - @Override - public void addHadoopResourcesFromDir( - org.apache.hadoop.conf.Configuration config, String configDir) { - actualHadoopConfigDir[0] = configDir; - config.addResource(additionalHadoopConfig); - } - - @Override - public void setStorageCredentialsFromFile( - StorageOptions.Builder storageOptionsBuilder, - String credentialsPath) { - throw new UnsupportedOperationException(); - } - }); - - assertEquals(expectedHadoopConfigDir, actualHadoopConfigDir[0]); + ConfigUtils.getHadoopConfiguration(flinkConfig, configContext); + // compare to the expected hadoop configuration Map expectedHadoopConfigMap = TestUtils.hadoopConfigToMap(expectedHadoopConfig); Map hadoopConfigMap = TestUtils.hadoopConfigToMap(hadoopConfig); MapDifference difference = Maps.difference(expectedHadoopConfigMap, hadoopConfigMap); - assertEquals(Collections.EMPTY_MAP, difference.entriesDiffering()); assertEquals(Collections.EMPTY_MAP, difference.entriesOnlyOnLeft()); assertEquals(Collections.EMPTY_MAP, difference.entriesOnlyOnRight()); diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsStorageTest.java b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsStorageTest.java index fdbdb53781ba9..695f6dd7d068e 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsStorageTest.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/ConfigUtilsStorageTest.java @@ -20,8 +20,7 @@ import org.apache.flink.fs.gs.TestUtils; -import com.google.cloud.storage.StorageOptions; -import org.apache.hadoop.conf.Configuration; +import com.google.auth.oauth2.GoogleCredentials; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -34,7 +33,6 @@ import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; /** Test construction of Storage instance in GSFileSystemFactory. */ @RunWith(Parameterized.class) @@ -54,7 +52,7 @@ public class ConfigUtilsStorageTest { /* The expected credentials file to use. */ @Parameterized.Parameter(value = 3) - public @Nullable String expectedCredentialsFile; + public @Nullable String expectedCredentialsFilePath; @Parameterized.Parameters(name = "description={0}") public static Collection data() { @@ -127,37 +125,28 @@ public static Collection data() { } @Test - public void shouldProperlyCreateStorageOptions() { - - final String[] actualCredentialsPath = {null}; - - StorageOptions storageOptions = - ConfigUtils.getStorageOptions( - hadoopConfig, - new ConfigUtils.ConfigContext() { - @Override - public Optional getenv(String name) { - if ("GOOGLE_APPLICATION_CREDENTIALS".equals(name)) { - return Optional.ofNullable(envGoogleApplicationCredentials); - } - return Optional.empty(); - } - - @Override - public void addHadoopResourcesFromDir( - Configuration config, String configDir) { - throw new UnsupportedOperationException(); - } - - @Override - public void setStorageCredentialsFromFile( - StorageOptions.Builder storageOptionsBuilder, - String credentialsPath) { - actualCredentialsPath[0] = credentialsPath; - } - }); - - assertNotNull(storageOptions); - assertEquals(expectedCredentialsFile, actualCredentialsPath[0]); + public void shouldProperlyCreateStorageCredentials() { + + // populate this if we store credentials in the testing context + Optional expectedCredentials = Optional.empty(); + + // construct the testing config context + HashMap envs = new HashMap<>(); + if (envGoogleApplicationCredentials != null) { + envs.put("GOOGLE_APPLICATION_CREDENTIALS", envGoogleApplicationCredentials); + } + HashMap credentials = new HashMap<>(); + if (expectedCredentialsFilePath != null) { + expectedCredentials = Optional.of(GoogleCredentials.newBuilder().build()); + credentials.put(expectedCredentialsFilePath, expectedCredentials.get()); + } + TestingConfigContext configContext = + new TestingConfigContext(envs, new HashMap<>(), credentials); + + // load the storage credentials + Optional loadedCredentials = + ConfigUtils.getStorageCredentials(hadoopConfig, configContext); + + assertEquals(expectedCredentials, loadedCredentials); } } diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/TestingConfigContext.java b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/TestingConfigContext.java new file mode 100644 index 0000000000000..270380ed77791 --- /dev/null +++ b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/TestingConfigContext.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.utils; + +import com.google.auth.oauth2.GoogleCredentials; +import org.apache.hadoop.conf.Configuration; + +import java.util.Map; +import java.util.Optional; + +/** Implementation of ConfigUtils.ConfigContext for testing. */ +public class TestingConfigContext implements ConfigUtils.ConfigContext { + + private final Map envs; + private final Map hadoopConfigs; + private final Map credentials; + + public TestingConfigContext( + Map envs, + Map hadoopConfigs, + Map credentials) { + this.envs = envs; + this.hadoopConfigs = hadoopConfigs; + this.credentials = credentials; + } + + @Override + public Optional getenv(String name) { + return Optional.ofNullable(envs.get(name)); + } + + @Override + public Configuration loadHadoopConfigFromDir(String configDir) { + return Optional.ofNullable(hadoopConfigs.get(configDir)) + .orElseThrow(IllegalArgumentException::new); + } + + @Override + public GoogleCredentials loadStorageCredentialsFromFile(String credentialsPath) { + return Optional.ofNullable(credentials.get(credentialsPath)) + .orElseThrow(IllegalArgumentException::new); + } +}