Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,35 @@

import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Provides recoverable-writer functionality for the standard GoogleHadoopFileSystem. */
/** FileSystem implementation that wraps GoogleHadoopFileSystem and supports RecoverableWriter. */
class GSFileSystem extends HadoopFileSystem {

private static final Logger LOGGER = LoggerFactory.getLogger(GSFileSystem.class);

private final GSFileSystemOptions options;
private final GSFileSystemOptions fileSystemOptions;

GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, GSFileSystemOptions options) {
super(Preconditions.checkNotNull(googleHadoopFileSystem));
LOGGER.info("Creating GSFileSystem with options {}", options);
private final Storage storage;

this.options = Preconditions.checkNotNull(options);
GSFileSystem(
GoogleHadoopFileSystem googleHadoopFileSystem,
Storage storage,
GSFileSystemOptions fileSystemOptions) {
super(Preconditions.checkNotNull(googleHadoopFileSystem));
this.fileSystemOptions = Preconditions.checkNotNull(fileSystemOptions);
this.storage = Preconditions.checkNotNull(storage);
}

@Override
public RecoverableWriter createRecoverableWriter() {
LOGGER.info("Creating recoverable writer with options {}", options);

// create the Google storage service instance
Storage storage = StorageOptions.getDefaultInstance().getService();
LOGGER.info("Creating GSRecoverableWriter with file-system options {}", fileSystemOptions);

// create the GS blob storage wrapper
GSBlobStorageImpl blobStorage = new GSBlobStorageImpl(storage);

// construct the recoverable writer with the blob storage wrapper and the options
return new GSRecoverableWriter(blobStorage, options);
return new GSRecoverableWriter(blobStorage, fileSystemOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.apache.flink.fs.gs.utils.ConfigUtils;
import org.apache.flink.util.Preconditions;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Optional;

/**
* Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for
Expand All @@ -45,38 +50,54 @@ public class GSFileSystemFactory implements FileSystemFactory {
/** The scheme for the Google Storage file system. */
public static final String SCHEME = "gs";

private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";

private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX};

private static final String[][] MIRRORED_CONFIG_KEYS = {};

private static final String FLINK_SHADING_PREFIX = "";

private final HadoopConfigLoader hadoopConfigLoader;

@Nullable private Configuration flinkConfig;
/**
* The Hadoop, formed by combining system Hadoop config with properties defined in Flink config.
*/
@Nullable private org.apache.hadoop.conf.Configuration hadoopConfig;

/** The options used for GSFileSystem and RecoverableWriter. */
@Nullable private GSFileSystemOptions fileSystemOptions;

/**
* Though it isn't documented as clearly as one might expect, the methods on this object are
* threadsafe, so we can safely share a single instance among all file system instances.
*
* <p>Issue that discusses pending docs is here:
* https://github.com/googleapis/google-cloud-java/issues/1238
*
* <p>StackOverflow discussion:
* https://stackoverflow.com/questions/54516284/google-cloud-storage-java-client-pooling
*/
@Nullable private Storage storage;

/** Constructs the Google Storage file system factory. */
public GSFileSystemFactory() {
LOGGER.info("Creating GSFileSystemFactory");

this.hadoopConfigLoader =
new HadoopConfigLoader(
FLINK_CONFIG_PREFIXES,
MIRRORED_CONFIG_KEYS,
HADOOP_CONFIG_PREFIX,
Collections.emptySet(),
Collections.emptySet(),
FLINK_SHADING_PREFIX);
}

@Override
public void configure(Configuration flinkConfig) {
LOGGER.info("Configuring GSFileSystemFactory with Flink configuration {}", flinkConfig);

this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
hadoopConfigLoader.setFlinkConfig(flinkConfig);
Preconditions.checkNotNull(flinkConfig);

ConfigUtils.ConfigContext configContext = new RuntimeConfigContext();

// load Hadoop config
this.hadoopConfig = ConfigUtils.getHadoopConfiguration(flinkConfig, configContext);
LOGGER.info(
"Using Hadoop configuration {}", ConfigUtils.stringifyHadoopConfig(hadoopConfig));

// construct file-system options
this.fileSystemOptions = new GSFileSystemOptions(flinkConfig);
LOGGER.info("Using file system options {}", fileSystemOptions);

// get storage credentials and construct Storage instance
Optional<GoogleCredentials> credentials =
ConfigUtils.getStorageCredentials(hadoopConfig, configContext);
StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder();
credentials.ifPresent(storageOptionsBuilder::setCredentials);
this.storage = storageOptionsBuilder.build().getService();
}

@Override
Expand All @@ -86,24 +107,47 @@ public String getScheme() {

@Override
public FileSystem create(URI fsUri) throws IOException {
LOGGER.info("Creating GS file system for uri {}", fsUri);
LOGGER.info("Creating GSFileSystem for uri {} with options {}", fsUri, fileSystemOptions);

Preconditions.checkNotNull(fsUri);

// create and configure the Google Hadoop file system
org.apache.hadoop.conf.Configuration hadoopConfig =
hadoopConfigLoader.getOrLoadHadoopConfig();
LOGGER.info(
"Creating GoogleHadoopFileSystem for uri {} with Hadoop config {}",
fsUri,
hadoopConfig);
// create the Google Hadoop file system
GoogleHadoopFileSystem googleHadoopFileSystem = new GoogleHadoopFileSystem();
googleHadoopFileSystem.initialize(fsUri, hadoopConfig);

// construct the file system options
GSFileSystemOptions options = new GSFileSystemOptions(flinkConfig);
try {
googleHadoopFileSystem.initialize(fsUri, hadoopConfig);
} catch (IOException ex) {
throw new IOException("Failed to initialize GoogleHadoopFileSystem", ex);
}

// create the file system
return new GSFileSystem(googleHadoopFileSystem, storage, fileSystemOptions);
}

// create the file system wrapper
return new GSFileSystem(googleHadoopFileSystem, options);
/** Config context implementation used at runtime. */
private static class RuntimeConfigContext implements ConfigUtils.ConfigContext {

@Override
public Optional<String> getenv(String name) {
return Optional.ofNullable(System.getenv(name));
}

@Override
public org.apache.hadoop.conf.Configuration loadHadoopConfigFromDir(String configDir) {
org.apache.hadoop.conf.Configuration hadoopConfig =
new org.apache.hadoop.conf.Configuration();
hadoopConfig.addResource(new Path(configDir, "core-default.xml"));
hadoopConfig.addResource(new Path(configDir, "core-site.xml"));
hadoopConfig.reloadConfiguration();
return hadoopConfig;
}

@Override
public GoogleCredentials loadStorageCredentialsFromFile(String credentialsPath) {
try (FileInputStream credentialsStream = new FileInputStream(credentialsPath)) {
return GoogleCredentials.fromStream(credentialsStream);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
}
Loading