Skip to content

Commit 6d23f67

Browse files
committed
[FLINK-25790][flink-gs-fs-hadoop] Support RecoverableWriter auth via Hadoop config
1 parent 8fc657b commit 6d23f67

File tree

2 files changed

+142
-26
lines changed

2 files changed

+142
-26
lines changed

flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,18 @@
2424
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
2525
import org.apache.flink.util.Preconditions;
2626

27+
import com.google.auth.oauth2.GoogleCredentials;
2728
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
2829
import com.google.cloud.storage.Storage;
2930
import com.google.cloud.storage.StorageOptions;
31+
import org.apache.hadoop.conf.Configuration;
3032
import org.slf4j.Logger;
3133
import org.slf4j.LoggerFactory;
3234

35+
import java.io.FileInputStream;
36+
import java.io.IOException;
37+
import java.util.Optional;
38+
3339
/** Provides recoverable-writer functionality for the standard GoogleHadoopFileSystem. */
3440
class GSFileSystem extends HadoopFileSystem {
3541

@@ -39,17 +45,67 @@ class GSFileSystem extends HadoopFileSystem {
3945

4046
GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, GSFileSystemOptions options) {
4147
super(Preconditions.checkNotNull(googleHadoopFileSystem));
42-
LOGGER.info("Creating GSFileSystem with options {}", options);
43-
4448
this.options = Preconditions.checkNotNull(options);
49+
LOGGER.info("Creating GSFileSystem with options {}", options);
4550
}
4651

4752
@Override
48-
public RecoverableWriter createRecoverableWriter() {
49-
LOGGER.info("Creating recoverable writer with options {}", options);
53+
public RecoverableWriter createRecoverableWriter() throws IOException {
54+
55+
// follow the same rules as for the Hadoop connector, i.e.
56+
// 1) only use service credentials at all if Hadoop
57+
// "google.cloud.auth.service.account.enable" is true (default: true)
58+
// 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, if supplied
59+
// 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as location of
60+
// credentials, if supplied
61+
// 4) use no credentials
62+
63+
// store any credentials we are to use, here
64+
Optional<String> credentialsPath = Optional.empty();
65+
66+
// only look for credentials if service account support is enabled
67+
Configuration hadoopConfig = getHadoopFileSystem().getConf();
68+
boolean enableServiceAccount =
69+
hadoopConfig.getBoolean("google.cloud.auth.service.account.enable", true);
70+
if (enableServiceAccount) {
71+
72+
// load google application credentials, and then fall back to
73+
// "google.cloud.auth.service.account.json.keyfile" from Hadoop
74+
credentialsPath = Optional.ofNullable(System.getenv("GOOGLE_APPLICATION_CREDENTIALS"));
75+
if (credentialsPath.isPresent()) {
76+
LOGGER.info(
77+
"Recoverable writer is using GOOGLE_APPLICATION_CREDENTIALS at {}",
78+
credentialsPath.get());
79+
} else {
80+
credentialsPath =
81+
Optional.ofNullable(
82+
hadoopConfig.get("google.cloud.auth.service.account.json.keyfile"));
83+
credentialsPath.ifPresent(
84+
path ->
85+
LOGGER.info(
86+
"Recoverable writer is using credentials from Hadoop at {}",
87+
path));
88+
}
89+
}
5090

51-
// create the Google storage service instance
52-
Storage storage = StorageOptions.getDefaultInstance().getService();
91+
// construct the storage instance, using credentials if provided
92+
Storage storage;
93+
if (credentialsPath.isPresent()) {
94+
LOGGER.info(
95+
"Creating GSRecoverableWriter using credentials from {}",
96+
credentialsPath.get());
97+
try (FileInputStream credentialsStream = new FileInputStream(credentialsPath.get())) {
98+
GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsStream);
99+
storage =
100+
StorageOptions.newBuilder()
101+
.setCredentials(credentials)
102+
.build()
103+
.getService();
104+
}
105+
} else {
106+
LOGGER.info("Creating GSRecoverableWriter using no credentials");
107+
storage = StorageOptions.newBuilder().build().getService();
108+
}
53109

54110
// create the GS blob storage wrapper
55111
GSBlobStorageImpl blobStorage = new GSBlobStorageImpl(storage);

flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java

Lines changed: 80 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,26 @@
1919
package org.apache.flink.fs.gs;
2020

2121
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.configuration.CoreOptions;
2223
import org.apache.flink.core.fs.FileSystem;
2324
import org.apache.flink.core.fs.FileSystemFactory;
2425
import org.apache.flink.runtime.util.HadoopConfigLoader;
2526
import org.apache.flink.util.Preconditions;
2627

2728
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
29+
import org.apache.hadoop.fs.Path;
2830
import org.slf4j.Logger;
2931
import org.slf4j.LoggerFactory;
3032

3133
import javax.annotation.Nullable;
3234

3335
import java.io.IOException;
36+
import java.io.StringWriter;
37+
import java.io.Writer;
3438
import java.net.URI;
3539
import java.util.Collections;
40+
import java.util.Map;
41+
import java.util.Optional;
3642

3743
/**
3844
* Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for
@@ -53,30 +59,23 @@ public class GSFileSystemFactory implements FileSystemFactory {
5359

5460
private static final String FLINK_SHADING_PREFIX = "";
5561

56-
private final HadoopConfigLoader hadoopConfigLoader;
57-
5862
@Nullable private Configuration flinkConfig;
5963

64+
@Nullable private org.apache.hadoop.conf.Configuration hadoopConfig;
65+
6066
/** Constructs the Google Storage file system factory. */
6167
public GSFileSystemFactory() {
6268
LOGGER.info("Creating GSFileSystemFactory");
63-
64-
this.hadoopConfigLoader =
65-
new HadoopConfigLoader(
66-
FLINK_CONFIG_PREFIXES,
67-
MIRRORED_CONFIG_KEYS,
68-
HADOOP_CONFIG_PREFIX,
69-
Collections.emptySet(),
70-
Collections.emptySet(),
71-
FLINK_SHADING_PREFIX);
7269
}
7370

7471
@Override
7572
public void configure(Configuration flinkConfig) {
7673
LOGGER.info("Configuring GSFileSystemFactory with Flink configuration {}", flinkConfig);
7774

7875
this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
79-
hadoopConfigLoader.setFlinkConfig(flinkConfig);
76+
this.hadoopConfig = getHadoopConfiguration(flinkConfig);
77+
78+
LOGGER.info("Using Hadoop configuration {}", serializeHadoopConfig(hadoopConfig));
8079
}
8180

8281
@Override
@@ -90,20 +89,81 @@ public FileSystem create(URI fsUri) throws IOException {
9089

9190
Preconditions.checkNotNull(fsUri);
9291

93-
// create and configure the Google Hadoop file system
94-
org.apache.hadoop.conf.Configuration hadoopConfig =
95-
hadoopConfigLoader.getOrLoadHadoopConfig();
96-
LOGGER.info(
97-
"Creating GoogleHadoopFileSystem for uri {} with Hadoop config {}",
98-
fsUri,
99-
hadoopConfig);
92+
// initialize the Google Hadoop filesystem
10093
GoogleHadoopFileSystem googleHadoopFileSystem = new GoogleHadoopFileSystem();
101-
googleHadoopFileSystem.initialize(fsUri, hadoopConfig);
94+
try {
95+
googleHadoopFileSystem.initialize(fsUri, hadoopConfig);
96+
} catch (IOException ex) {
97+
throw new IOException("Failed to initialize GoogleHadoopFileSystem", ex);
98+
}
10299

103100
// construct the file system options
104101
GSFileSystemOptions options = new GSFileSystemOptions(flinkConfig);
105102

106103
// create the file system wrapper
107104
return new GSFileSystem(googleHadoopFileSystem, options);
108105
}
106+
107+
/**
108+
* Loads the hadoop configuration, in two steps.
109+
*
110+
* <p>1) Find a hadoop conf dir using CoreOptions.FLINK_HADOOP_CONF_DIR or the HADOOP_CONF_DIR
111+
* environment variable, and load core-default.xml and core-site.xml from that location
112+
*
113+
* <p>2) Load hadoop conf from the Flink config, with translations defined above
114+
*
115+
* <p>... then merge together, such that keys from the second overwrite the first.
116+
*
117+
* @return The Hadoop configuration.
118+
*/
119+
private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(
120+
Configuration flinkConfig) {
121+
122+
// create an empty hadoop configuration
123+
org.apache.hadoop.conf.Configuration hadoopConfig =
124+
new org.apache.hadoop.conf.Configuration();
125+
126+
// look for a hadoop configuration directory and load configuration from core-default.xml
127+
// and core-site.xml
128+
Optional<String> hadoopConfigDir =
129+
Optional.ofNullable(flinkConfig.get(CoreOptions.FLINK_HADOOP_CONF_DIR));
130+
if (!hadoopConfigDir.isPresent()) {
131+
hadoopConfigDir = Optional.ofNullable(System.getenv("HADOOP_CONF_DIR"));
132+
}
133+
hadoopConfigDir.ifPresent(
134+
configDir -> {
135+
LOGGER.info("Loading system Hadoop config from {}", configDir);
136+
hadoopConfig.addResource(new Path(configDir, "core-default.xml"));
137+
hadoopConfig.addResource(new Path(configDir, "core-site.xml"));
138+
hadoopConfig.reloadConfiguration();
139+
});
140+
141+
// now, load hadoop config from flink and copy key/value pairs into the base config
142+
HadoopConfigLoader hadoopConfigLoader =
143+
new HadoopConfigLoader(
144+
FLINK_CONFIG_PREFIXES,
145+
MIRRORED_CONFIG_KEYS,
146+
HADOOP_CONFIG_PREFIX,
147+
Collections.emptySet(),
148+
Collections.emptySet(),
149+
FLINK_SHADING_PREFIX);
150+
hadoopConfigLoader.setFlinkConfig(flinkConfig);
151+
org.apache.hadoop.conf.Configuration flinkHadoopConfig =
152+
hadoopConfigLoader.getOrLoadHadoopConfig();
153+
for (Map.Entry<String, String> entry : flinkHadoopConfig) {
154+
hadoopConfig.set(entry.getKey(), entry.getValue());
155+
}
156+
157+
return hadoopConfig;
158+
}
159+
160+
private String serializeHadoopConfig(org.apache.hadoop.conf.Configuration hadoopConfig)
161+
throws RuntimeException {
162+
try (Writer writer = new StringWriter()) {
163+
org.apache.hadoop.conf.Configuration.dumpConfiguration(hadoopConfig, writer);
164+
return writer.toString();
165+
} catch (IOException ex) {
166+
throw new RuntimeException(ex);
167+
}
168+
}
109169
}

0 commit comments

Comments
 (0)