diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index d086ec62a7a79..0255f2b6d58d2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; @@ -53,6 +52,8 @@ public class FileSystems { private static final Map SCHEME_TO_REGISTRAR = new ConcurrentHashMap<>(); + private static PipelineOptions defaultConfig; + private static final Map SCHEME_TO_DEFAULT_CONFIG = new ConcurrentHashMap<>(); @@ -78,27 +79,12 @@ private static void loadFileSystemRegistrars() { } /** - * Sets the default configuration to be used with a {@link FileSystemRegistrar} for the provided - * {@code scheme}. + * Sets the default configuration in workers. * - *

Syntax:

scheme = alpha *( alpha | digit | "+" | "-" | "." )
- * Upper case letters are treated as the same as lower case letters. + *

It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes. */ - public static void setDefaultConfig(String scheme, PipelineOptions options) { - String lowerCaseScheme = checkNotNull(scheme, "scheme").toLowerCase(); - checkArgument( - URI_SCHEME_PATTERN.matcher(lowerCaseScheme).matches(), - String.format("Scheme: [%s] doesn't match URI syntax: %s", - lowerCaseScheme, URI_SCHEME_PATTERN.pattern())); - checkArgument( - SCHEME_TO_REGISTRAR.containsKey(lowerCaseScheme), - String.format("No FileSystemRegistrar found for scheme: [%s].", lowerCaseScheme)); - SCHEME_TO_DEFAULT_CONFIG.put(lowerCaseScheme, checkNotNull(options, "options")); - } - - @VisibleForTesting - static PipelineOptions getDefaultConfig(String scheme) { - return SCHEME_TO_DEFAULT_CONFIG.get(scheme.toLowerCase()); + public static void setDefaultConfigInWorkers(PipelineOptions options) { + defaultConfig = checkNotNull(options, "options"); } /** @@ -106,9 +92,12 @@ static PipelineOptions getDefaultConfig(String scheme) { */ @VisibleForTesting static FileSystem getFileSystemInternal(URI uri) { + checkNotNull( + defaultConfig, + "Expect the runner have called setDefaultConfigInWorkers()."); String lowerCaseScheme = (uri.getScheme() != null ? uri.getScheme().toLowerCase() : LocalFileSystemRegistrar.LOCAL_FILE_SCHEME); - return getRegistrarInternal(lowerCaseScheme).fromOptions(getDefaultConfig(lowerCaseScheme)); + return getRegistrarInternal(lowerCaseScheme).fromOptions(defaultConfig); } /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java index 9b41b9831407c..113a5629ca7d8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.io; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import com.google.common.collect.Sets; @@ -26,6 +24,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -41,33 +40,9 @@ public class FileSystemsTest { @Rule public ExpectedException thrown = ExpectedException.none(); - @Test - public void testSetDefaultConfig() throws Exception { - PipelineOptions first = PipelineOptionsFactory.create(); - PipelineOptions second = PipelineOptionsFactory.create(); - FileSystems.setDefaultConfig("file", first); - assertEquals(first, FileSystems.getDefaultConfig("file")); - assertEquals(first, FileSystems.getDefaultConfig("FILE")); - - FileSystems.setDefaultConfig("FILE", second); - assertNotEquals(first, FileSystems.getDefaultConfig("file")); - assertNotEquals(first, FileSystems.getDefaultConfig("FILE")); - assertEquals(second, FileSystems.getDefaultConfig("file")); - assertEquals(second, FileSystems.getDefaultConfig("FILE")); - } - - @Test - public void testSetDefaultConfigNotFound() throws Exception { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("No FileSystemRegistrar found for scheme: [gs-s3]."); - FileSystems.setDefaultConfig("gs-s3", PipelineOptionsFactory.create()); - } - - @Test - public void testSetDefaultConfigInvalidScheme() throws Exception { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Scheme: [gs:] doesn't match URI syntax"); - FileSystems.setDefaultConfig("gs:", PipelineOptionsFactory.create()); + @Before + public void setup() { + FileSystems.setDefaultConfigInWorkers(PipelineOptionsFactory.create()); } @Test