From de5669a3efa9f52be2c9eca546db605979e281e3 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Thu, 27 Apr 2017 18:13:43 -0700 Subject: [PATCH 1/4] [BEAM-2005] Move getScheme from FileSystemRegistrar to FileSystem Note that I needed to update FileSystems to instantiate the FileSystem(s) upfront instead of remembering the mapping from scheme to registrar. --- .../org/apache/beam/sdk/io/FileSystem.java | 7 ++ .../beam/sdk/io/FileSystemRegistrar.java | 14 +--- .../org/apache/beam/sdk/io/FileSystems.java | 80 ++++++++----------- .../apache/beam/sdk/io/LocalFileSystem.java | 5 ++ .../beam/sdk/io/LocalFileSystemRegistrar.java | 16 ++-- .../apache/beam/sdk/io/LocalResourceId.java | 2 +- .../apache/beam/sdk/io/FileSystemsTest.java | 20 +---- .../sdk/io/LocalFileSystemRegistrarTest.java | 7 ++ .../sdk/io/gcp/storage/GcsFileSystem.java | 5 ++ .../gcp/storage/GcsFileSystemRegistrar.java | 13 +-- .../sdk/io/gcp/storage/GcsResourceId.java | 2 +- .../storage/GcsFileSystemRegistrarTest.java | 12 +-- .../beam/sdk/io/hdfs/HadoopFileSystem.java | 5 ++ .../io/hdfs/HadoopFileSystemRegistrar.java | 12 +-- .../hdfs/HadoopFileSystemRegistrarTest.java | 13 +-- 15 files changed, 99 insertions(+), 114 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java index 76c5dc126fcb..375264a547d0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java @@ -151,4 +151,11 @@ protected abstract void rename( * such as when the specified {@code singleResourceSpec} is not a valid resource name. */ protected abstract ResourceIdT matchNewResource(String singleResourceSpec, boolean isDirectory); + + /** + * Get the URI scheme which defines the namespace of the {@link FileSystem}. + * + * @see RFC 2396 + */ + protected abstract String getScheme(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java index 1d81c1e36428..a61b598d4ce1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; import com.google.auto.service.AutoService; +import java.util.List; import java.util.ServiceLoader; import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; @@ -33,17 +34,10 @@ */ public interface FileSystemRegistrar { /** - * Create a {@link FileSystem} from the given {@link PipelineOptions}. - */ - FileSystem fromOptions(@Nullable PipelineOptions options); - - /** - * Get the URI scheme which defines the namespace of the {@link FileSystemRegistrar}. + * Create zero or more {@link FileSystem filesystems} from the given {@link PipelineOptions}. * - *

The scheme is required to be unique among all + *

The {@link FileSystem#getScheme() scheme} is required to be unique among all * {@link FileSystemRegistrar FileSystemRegistrars}. - * - * @see RFC 2396 */ - String getScheme(); + List fromOptions(@Nullable PipelineOptions options); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index b2904985a1c8..113480afa632 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -25,6 +25,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; @@ -67,15 +68,9 @@ public class FileSystems { private static final Pattern URI_SCHEME_PATTERN = Pattern.compile( "(?[a-zA-Z][-a-zA-Z0-9+.]*)://.*"); - private static final Map SCHEME_TO_REGISTRAR = + private static final Map SCHEME_TO_FILESYSTEM = new ConcurrentHashMap<>(); - private static PipelineOptions defaultConfig; - - static { - loadFileSystemRegistrars(); - } - /********************************** METHODS FOR CLIENT **********************************/ /** @@ -402,7 +397,7 @@ private static String parseScheme(String spec) { Matcher matcher = URI_SCHEME_PATTERN.matcher(spec); if (!matcher.matches()) { - return LocalFileSystemRegistrar.LOCAL_FILE_SCHEME; + return "file"; } else { return matcher.group("scheme").toLowerCase(); } @@ -413,19 +408,11 @@ private static String parseScheme(String spec) { */ @VisibleForTesting static FileSystem getFileSystemInternal(String scheme) { - return getRegistrarInternal(scheme.toLowerCase()).fromOptions(defaultConfig); - } - - /** - * Internal method to get {@link FileSystemRegistrar} for {@code scheme}. - */ - @VisibleForTesting - static FileSystemRegistrar getRegistrarInternal(String scheme) { String lowerCaseScheme = scheme.toLowerCase(); - if (SCHEME_TO_REGISTRAR.containsKey(lowerCaseScheme)) { - return SCHEME_TO_REGISTRAR.get(lowerCaseScheme); - } else if (SCHEME_TO_REGISTRAR.containsKey(DEFAULT_SCHEME)) { - return SCHEME_TO_REGISTRAR.get(DEFAULT_SCHEME); + if (SCHEME_TO_FILESYSTEM.containsKey(lowerCaseScheme)) { + return SCHEME_TO_FILESYSTEM.get(lowerCaseScheme); + } else if (SCHEME_TO_FILESYSTEM.containsKey(DEFAULT_SCHEME)) { + return SCHEME_TO_FILESYSTEM.get(DEFAULT_SCHEME); } else { throw new IllegalStateException("Unable to find registrar for " + scheme); } @@ -434,56 +421,55 @@ static FileSystemRegistrar getRegistrarInternal(String scheme) { /********************************** METHODS FOR REGISTRATION **********************************/ /** - * Loads available {@link FileSystemRegistrar} services. + * Sets the default configuration in workers. + * + *

It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes. */ - private static void loadFileSystemRegistrars() { - SCHEME_TO_REGISTRAR.clear(); + public static void setDefaultConfigInWorkers(PipelineOptions options) { + checkNotNull(options, "options"); + SCHEME_TO_FILESYSTEM.clear(); Set registrars = Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); registrars.addAll(Lists.newArrayList( ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader()))); - verifySchemesAreUnique(registrars); - - for (FileSystemRegistrar registrar : registrars) { - SCHEME_TO_REGISTRAR.put(registrar.getScheme().toLowerCase(), registrar); - } - } - - /** - * Sets the default configuration in workers. - * - *

It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes. - */ - public static void setDefaultConfigInWorkers(PipelineOptions options) { - defaultConfig = checkNotNull(options, "options"); + SCHEME_TO_FILESYSTEM.putAll(verifySchemesAreUnique(options, registrars)); } @VisibleForTesting - static void verifySchemesAreUnique(Set registrars) { - Multimap registrarsBySchemes = + static Map verifySchemesAreUnique( + PipelineOptions options, Set registrars) { + Multimap fileSystemsBySchemes = TreeMultimap.create(Ordering.natural(), Ordering.arbitrary()); for (FileSystemRegistrar registrar : registrars) { - registrarsBySchemes.put(registrar.getScheme().toLowerCase(), registrar); + for (FileSystem fileSystem : registrar.fromOptions(options)) { + fileSystemsBySchemes.put(fileSystem.getScheme(), fileSystem); + } } - for (Entry> entry - : registrarsBySchemes.asMap().entrySet()) { + for (Entry> entry + : fileSystemsBySchemes.asMap().entrySet()) { if (entry.getValue().size() > 1) { - String conflictingRegistrars = Joiner.on(", ").join( + String conflictingFileSystems = Joiner.on(", ").join( FluentIterable.from(entry.getValue()) - .transform(new Function() { + .transform(new Function() { @Override - public String apply(@Nonnull FileSystemRegistrar input) { + public String apply(@Nonnull FileSystem input) { return input.getClass().getName(); }}) .toSortedList(Ordering.natural())); throw new IllegalStateException(String.format( - "Scheme: [%s] has conflicting registrars: [%s]", + "Scheme: [%s] has conflicting filesystems: [%s]", entry.getKey(), - conflictingRegistrars)); + conflictingFileSystems)); } } + + ImmutableMap.Builder schemeToFileSystem = ImmutableMap.builder(); + for (Entry entry : fileSystemsBySchemes.entries()) { + schemeToFileSystem.put(entry.getKey(), entry.getValue()); + } + return schemeToFileSystem.build(); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java index 2d80ae4bf512..235b77d4b49c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java @@ -171,6 +171,11 @@ protected LocalResourceId matchNewResource(String singleResourceSpec, boolean is return LocalResourceId.fromPath(path, isDirectory); } + @Override + protected String getScheme() { + return "file"; + } + private MatchResult matchOne(String spec) throws IOException { File file = Paths.get(spec).toFile(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java index 75a38e8c922e..0028d711fc87 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java @@ -18,24 +18,18 @@ package org.apache.beam.sdk.io; import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import java.util.List; import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; /** - * {@link AutoService} registrar for the {@link FileSystem}. + * {@link AutoService} registrar for the {@link LocalFileSystem}. */ @AutoService(FileSystemRegistrar.class) public class LocalFileSystemRegistrar implements FileSystemRegistrar { - - static final String LOCAL_FILE_SCHEME = "file"; - - @Override - public FileSystem fromOptions(@Nullable PipelineOptions options) { - return new LocalFileSystem(); - } - @Override - public String getScheme() { - return LOCAL_FILE_SCHEME; + public List fromOptions(@Nullable PipelineOptions options) { + return ImmutableList.of(new LocalFileSystem()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java index 9aa765b26f4f..b67ec4610529 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java @@ -113,7 +113,7 @@ private LocalResourceId resolveLocalPathWindowsOS(String other, ResolveOptions r @Override public String getScheme() { - return LocalFileSystemRegistrar.LOCAL_FILE_SCHEME; + return "file"; } Path getPath() { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java index 8cfa3dc6e631..a75c54dd7fd1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java @@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import com.google.common.io.Files; - import java.io.Writer; import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; @@ -35,12 +34,10 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; -import javax.annotation.Nullable; - import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.MimeTypes; import org.apache.commons.lang3.SystemUtils; import org.junit.Rule; @@ -77,21 +74,12 @@ public void testGetLocalFileSystem() throws Exception { @Test public void testVerifySchemesAreUnique() throws Exception { thrown.expect(RuntimeException.class); - thrown.expectMessage("Scheme: [file] has conflicting registrars"); + thrown.expectMessage("Scheme: [file] has conflicting filesystems"); FileSystems.verifySchemesAreUnique( + PipelineOptionsFactory.create(), Sets.newHashSet( new LocalFileSystemRegistrar(), - new FileSystemRegistrar() { - @Override - public FileSystem fromOptions(@Nullable PipelineOptions options) { - return null; - } - - @Override - public String getScheme() { - return "FILE"; - } - })); + new LocalFileSystemRegistrar())); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java index e4e8326ea8cf..963b5e6e5958 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java @@ -17,10 +17,15 @@ */ package org.apache.beam.sdk.io; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import com.google.common.collect.Lists; +import java.util.List; import java.util.ServiceLoader; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -36,6 +41,8 @@ public void testServiceLoader() { for (FileSystemRegistrar registrar : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { if (registrar instanceof LocalFileSystemRegistrar) { + List fileSystems = registrar.fromOptions(PipelineOptionsFactory.create()); + assertThat(fileSystems, contains(instanceOf(LocalFileSystem.class))); return; } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java index 1b0bd9d2f22f..ff71f3c37d49 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java @@ -145,6 +145,11 @@ protected void copy(List srcResourceIds, List dest options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds)); } + @Override + protected String getScheme() { + return "gs"; + } + private List matchGlobs(List globs) { // TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503. return FluentIterable.from(globs) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java index 31df0e104908..56c74ba1e7d7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java @@ -20,6 +20,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import java.util.List; import javax.annotation.Nonnull; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.io.FileSystem; @@ -32,18 +34,11 @@ @AutoService(FileSystemRegistrar.class) public class GcsFileSystemRegistrar implements FileSystemRegistrar { - static final String GCS_SCHEME = "gs"; - @Override - public FileSystem fromOptions(@Nonnull PipelineOptions options) { + public List fromOptions(@Nonnull PipelineOptions options) { checkNotNull( options, "Expect the runner have called FileSystems.setDefaultConfigInWorkers()."); - return new GcsFileSystem(options.as(GcsOptions.class)); - } - - @Override - public String getScheme() { - return GCS_SCHEME; + return ImmutableList.of(new GcsFileSystem(options.as(GcsOptions.class))); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java index a1ac827b1219..38dcaf5038bf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java @@ -88,7 +88,7 @@ private boolean isDirectory() { @Override public String getScheme() { - return GcsFileSystemRegistrar.GCS_SCHEME; + return "gs"; } GcsPath getGcsPath() { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java index ecac8f64cdc0..0ea750fb6d18 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java @@ -17,13 +17,15 @@ */ package org.apache.beam.sdk.io.gcp.storage; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import com.google.common.collect.Lists; +import java.util.List; import java.util.ServiceLoader; - +import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.FileSystemRegistrar; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Test; @@ -41,8 +43,8 @@ public void testServiceLoader() { for (FileSystemRegistrar registrar : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { if (registrar instanceof GcsFileSystemRegistrar) { - assertEquals("gs", registrar.getScheme()); - assertTrue(registrar.fromOptions(PipelineOptionsFactory.create()) instanceof GcsFileSystem); + List fileSystems = registrar.fromOptions(PipelineOptionsFactory.create()); + assertThat(fileSystems, contains(instanceOf(GcsFileSystem.class))); return; } } diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java index ca56a60be149..a8bdd446a572 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java @@ -73,4 +73,9 @@ protected void delete(Collection resourceIds) throws IOExcepti protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { throw new UnsupportedOperationException(); } + + @Override + protected String getScheme() { + return "hdfs"; + } } diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java index 1471cb0dbdc2..f5e8ca519e01 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java @@ -18,10 +18,11 @@ package org.apache.beam.sdk.io.hdfs; import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import java.util.List; import javax.annotation.Nonnull; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.FileSystemRegistrar; -import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; /** @@ -31,12 +32,7 @@ public class HadoopFileSystemRegistrar implements FileSystemRegistrar { @Override - public FileSystem fromOptions(@Nonnull PipelineOptions options) { - return new HadoopFileSystem(); - } - - @Override - public String getScheme() { - return FileSystems.DEFAULT_SCHEME; + public List fromOptions(@Nonnull PipelineOptions options) { + return ImmutableList.of(new HadoopFileSystem()); } } diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java index 22a439acdb8c..5d3d2b94dc07 100644 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java @@ -17,14 +17,16 @@ */ package org.apache.beam.sdk.io.hdfs; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import com.google.common.collect.Lists; +import java.util.List; import java.util.ServiceLoader; +import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.FileSystemRegistrar; -import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Test; import org.junit.runner.RunWith; @@ -41,9 +43,8 @@ public void testServiceLoader() { for (FileSystemRegistrar registrar : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { if (registrar instanceof HadoopFileSystemRegistrar) { - assertEquals(FileSystems.DEFAULT_SCHEME, registrar.getScheme()); - assertTrue( - registrar.fromOptions(PipelineOptionsFactory.create()) instanceof HadoopFileSystem); + List fileSystems = registrar.fromOptions(PipelineOptionsFactory.create()); + assertThat(fileSystems, contains(instanceOf(HadoopFileSystem.class))); return; } } From 7778f3572defc11b8b7e01923e3795a01a396755 Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Thu, 27 Apr 2017 23:19:53 -0700 Subject: [PATCH 2/4] fixup! Address PR comments. --- .../main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java | 4 ++-- .../src/main/java/org/apache/beam/sdk/io/FileSystems.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java index a61b598d4ce1..19bc75f42c46 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java @@ -36,8 +36,8 @@ public interface FileSystemRegistrar { /** * Create zero or more {@link FileSystem filesystems} from the given {@link PipelineOptions}. * - *

The {@link FileSystem#getScheme() scheme} is required to be unique among all - * {@link FileSystemRegistrar FileSystemRegistrars}. + *

Each {@link FileSystem#getScheme() scheme} is required to be unique among all + * {@link FileSystem}s registered by all {@link FileSystemRegistrar}s. */ List fromOptions(@Nullable PipelineOptions options); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 113480afa632..a5dd3d90f5d9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -404,7 +404,7 @@ private static String parseScheme(String spec) { } /** - * Internal method to get {@link FileSystem} for {@code spec}. + * Internal method to get {@link FileSystem} for {@code scheme}. */ @VisibleForTesting static FileSystem getFileSystemInternal(String scheme) { @@ -425,7 +425,7 @@ static FileSystem getFileSystemInternal(String scheme) { * *

It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes. */ - public static void setDefaultConfigInWorkers(PipelineOptions options) { + public static synchronized void setDefaultConfigInWorkers(PipelineOptions options) { checkNotNull(options, "options"); SCHEME_TO_FILESYSTEM.clear(); Set registrars = From 817a0f6f84a6dc1bd04fa37e2ac135369fc12164 Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Fri, 28 Apr 2017 07:33:15 -0700 Subject: [PATCH 3/4] fixup! Swap to use atomic reference of immutable map over synchronization, remove double lookup within map --- .../org/apache/beam/sdk/io/FileSystems.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index a5dd3d90f5d9..532a42fcc127 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -44,7 +44,7 @@ import java.util.Map.Entry; import java.util.ServiceLoader; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nonnull; @@ -68,8 +68,8 @@ public class FileSystems { private static final Pattern URI_SCHEME_PATTERN = Pattern.compile( "(?[a-zA-Z][-a-zA-Z0-9+.]*)://.*"); - private static final Map SCHEME_TO_FILESYSTEM = - new ConcurrentHashMap<>(); + private static final AtomicReference> SCHEME_TO_FILESYSTEM = + new AtomicReference<>(); /********************************** METHODS FOR CLIENT **********************************/ @@ -409,13 +409,16 @@ private static String parseScheme(String spec) { @VisibleForTesting static FileSystem getFileSystemInternal(String scheme) { String lowerCaseScheme = scheme.toLowerCase(); - if (SCHEME_TO_FILESYSTEM.containsKey(lowerCaseScheme)) { - return SCHEME_TO_FILESYSTEM.get(lowerCaseScheme); - } else if (SCHEME_TO_FILESYSTEM.containsKey(DEFAULT_SCHEME)) { - return SCHEME_TO_FILESYSTEM.get(DEFAULT_SCHEME); - } else { - throw new IllegalStateException("Unable to find registrar for " + scheme); + Map schemeToFileSystem = SCHEME_TO_FILESYSTEM.get(); + FileSystem rval = schemeToFileSystem.get(lowerCaseScheme); + if (rval != null) { + return rval; + } + rval = schemeToFileSystem.get(DEFAULT_SCHEME); + if (rval != null) { + return rval; } + throw new IllegalStateException("Unable to find registrar for " + scheme); } /********************************** METHODS FOR REGISTRATION **********************************/ @@ -425,15 +428,14 @@ static FileSystem getFileSystemInternal(String scheme) { * *

It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes. */ - public static synchronized void setDefaultConfigInWorkers(PipelineOptions options) { + public static void setDefaultConfigInWorkers(PipelineOptions options) { checkNotNull(options, "options"); - SCHEME_TO_FILESYSTEM.clear(); Set registrars = Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); registrars.addAll(Lists.newArrayList( ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader()))); - SCHEME_TO_FILESYSTEM.putAll(verifySchemesAreUnique(options, registrars)); + SCHEME_TO_FILESYSTEM.set(verifySchemesAreUnique(options, registrars)); } @VisibleForTesting From 8dff96f01d299d2759b23d57965d215fd493eb78 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 28 Apr 2017 08:59:03 -0700 Subject: [PATCH 4/4] fixup! Use iterable instead of list --- .../main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java | 3 +-- .../java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java | 3 +-- .../org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java | 3 +-- .../apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java | 3 +-- .../beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java | 3 +-- .../org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java | 3 +-- .../apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java | 3 +-- 7 files changed, 7 insertions(+), 14 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java index 19bc75f42c46..78a91f6b6ab7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io; import com.google.auto.service.AutoService; -import java.util.List; import java.util.ServiceLoader; import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; @@ -39,5 +38,5 @@ public interface FileSystemRegistrar { *

Each {@link FileSystem#getScheme() scheme} is required to be unique among all * {@link FileSystem}s registered by all {@link FileSystemRegistrar}s. */ - List fromOptions(@Nullable PipelineOptions options); + Iterable fromOptions(@Nullable PipelineOptions options); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java index 0028d711fc87..f18236056c87 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java @@ -19,7 +19,6 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; -import java.util.List; import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; @@ -29,7 +28,7 @@ @AutoService(FileSystemRegistrar.class) public class LocalFileSystemRegistrar implements FileSystemRegistrar { @Override - public List fromOptions(@Nullable PipelineOptions options) { + public Iterable fromOptions(@Nullable PipelineOptions options) { return ImmutableList.of(new LocalFileSystem()); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java index 963b5e6e5958..0b1729d081d4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.fail; import com.google.common.collect.Lists; -import java.util.List; import java.util.ServiceLoader; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Test; @@ -41,7 +40,7 @@ public void testServiceLoader() { for (FileSystemRegistrar registrar : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { if (registrar instanceof LocalFileSystemRegistrar) { - List fileSystems = registrar.fromOptions(PipelineOptionsFactory.create()); + Iterable fileSystems = registrar.fromOptions(PipelineOptionsFactory.create()); assertThat(fileSystems, contains(instanceOf(LocalFileSystem.class))); return; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java index 56c74ba1e7d7..1d4e4adf5bff 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java @@ -21,7 +21,6 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; -import java.util.List; import javax.annotation.Nonnull; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.io.FileSystem; @@ -35,7 +34,7 @@ public class GcsFileSystemRegistrar implements FileSystemRegistrar { @Override - public List fromOptions(@Nonnull PipelineOptions options) { + public Iterable fromOptions(@Nonnull PipelineOptions options) { checkNotNull( options, "Expect the runner have called FileSystems.setDefaultConfigInWorkers()."); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java index 0ea750fb6d18..2fc337a2973b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.fail; import com.google.common.collect.Lists; -import java.util.List; import java.util.ServiceLoader; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.FileSystemRegistrar; @@ -43,7 +42,7 @@ public void testServiceLoader() { for (FileSystemRegistrar registrar : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { if (registrar instanceof GcsFileSystemRegistrar) { - List fileSystems = registrar.fromOptions(PipelineOptionsFactory.create()); + Iterable fileSystems = registrar.fromOptions(PipelineOptionsFactory.create()); assertThat(fileSystems, contains(instanceOf(GcsFileSystem.class))); return; } diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java index f5e8ca519e01..cc22f4fa28b7 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java @@ -19,7 +19,6 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; -import java.util.List; import javax.annotation.Nonnull; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.FileSystemRegistrar; @@ -32,7 +31,7 @@ public class HadoopFileSystemRegistrar implements FileSystemRegistrar { @Override - public List fromOptions(@Nonnull PipelineOptions options) { + public Iterable fromOptions(@Nonnull PipelineOptions options) { return ImmutableList.of(new HadoopFileSystem()); } } diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java index 5d3d2b94dc07..c332af5e07e0 100644 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.fail; import com.google.common.collect.Lists; -import java.util.List; import java.util.ServiceLoader; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.FileSystemRegistrar; @@ -43,7 +42,7 @@ public void testServiceLoader() { for (FileSystemRegistrar registrar : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { if (registrar instanceof HadoopFileSystemRegistrar) { - List fileSystems = registrar.fromOptions(PipelineOptionsFactory.create()); + Iterable fileSystems = registrar.fromOptions(PipelineOptionsFactory.create()); assertThat(fileSystems, contains(instanceOf(HadoopFileSystem.class))); return; }