Skip to content

Commit

Permalink
Revert "[BEAM-2005] Move getScheme from FileSystemRegistrar to FileSy…
Browse files Browse the repository at this point in the history
…stem"

This reverts commit ce88c88.
  • Loading branch information
tgroh committed Apr 28, 2017
1 parent a8a04a2 commit 25f9b4a
Show file tree
Hide file tree
Showing 15 changed files with 120 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,4 @@ protected abstract void rename(
* such as when the specified {@code singleResourceSpec} is not a valid resource name.
*/
protected abstract ResourceIdT matchNewResource(String singleResourceSpec, boolean isDirectory);

/**
* Get the URI scheme which defines the namespace of the {@link FileSystem}.
*
* @see <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
*/
protected abstract String getScheme();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,17 @@
*/
public interface FileSystemRegistrar {
/**
* Create zero or more {@link FileSystem filesystems} from the given {@link PipelineOptions}.
* Create a {@link FileSystem} from the given {@link PipelineOptions}.
*/
FileSystem fromOptions(@Nullable PipelineOptions options);

/**
* Get the URI scheme which defines the namespace of the {@link FileSystemRegistrar}.
*
* <p>The scheme is required to be unique among all
* {@link FileSystemRegistrar FileSystemRegistrars}.
*
* <p>Each {@link FileSystem#getScheme() scheme} is required to be unique among all
* {@link FileSystem}s registered by all {@link FileSystemRegistrar}s.
* @see <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
*/
Iterable<FileSystem> fromOptions(@Nullable PipelineOptions options);
String getScheme();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
Expand All @@ -44,7 +43,7 @@
import java.util.Map.Entry;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
Expand All @@ -68,8 +67,14 @@ public class FileSystems {
private static final Pattern URI_SCHEME_PATTERN = Pattern.compile(
"(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*");

private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM =
new AtomicReference<>();
private static final Map<String, FileSystemRegistrar> SCHEME_TO_REGISTRAR =
new ConcurrentHashMap<>();

private static PipelineOptions defaultConfig;

static {
loadFileSystemRegistrars();
}

/********************************** METHODS FOR CLIENT **********************************/

Expand Down Expand Up @@ -397,81 +402,88 @@ private static String parseScheme(String spec) {
Matcher matcher = URI_SCHEME_PATTERN.matcher(spec);

if (!matcher.matches()) {
return "file";
return LocalFileSystemRegistrar.LOCAL_FILE_SCHEME;
} else {
return matcher.group("scheme").toLowerCase();
}
}

/**
* Internal method to get {@link FileSystem} for {@code scheme}.
* Internal method to get {@link FileSystem} for {@code spec}.
*/
@VisibleForTesting
static FileSystem getFileSystemInternal(String scheme) {
return getRegistrarInternal(scheme.toLowerCase()).fromOptions(defaultConfig);
}

/**
* Internal method to get {@link FileSystemRegistrar} for {@code scheme}.
*/
@VisibleForTesting
static FileSystemRegistrar getRegistrarInternal(String scheme) {
String lowerCaseScheme = scheme.toLowerCase();
Map<String, FileSystem> schemeToFileSystem = SCHEME_TO_FILESYSTEM.get();
FileSystem rval = schemeToFileSystem.get(lowerCaseScheme);
if (rval != null) {
return rval;
}
rval = schemeToFileSystem.get(DEFAULT_SCHEME);
if (rval != null) {
return rval;
if (SCHEME_TO_REGISTRAR.containsKey(lowerCaseScheme)) {
return SCHEME_TO_REGISTRAR.get(lowerCaseScheme);
} else if (SCHEME_TO_REGISTRAR.containsKey(DEFAULT_SCHEME)) {
return SCHEME_TO_REGISTRAR.get(DEFAULT_SCHEME);
} else {
throw new IllegalStateException("Unable to find registrar for " + scheme);
}
throw new IllegalStateException("Unable to find registrar for " + scheme);
}

/********************************** METHODS FOR REGISTRATION **********************************/

/**
* Sets the default configuration in workers.
*
* <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
* Loads available {@link FileSystemRegistrar} services.
*/
public static void setDefaultConfigInWorkers(PipelineOptions options) {
checkNotNull(options, "options");
private static void loadFileSystemRegistrars() {
SCHEME_TO_REGISTRAR.clear();
Set<FileSystemRegistrar> registrars =
Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
registrars.addAll(Lists.newArrayList(
ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader())));

SCHEME_TO_FILESYSTEM.set(verifySchemesAreUnique(options, registrars));
verifySchemesAreUnique(registrars);

for (FileSystemRegistrar registrar : registrars) {
SCHEME_TO_REGISTRAR.put(registrar.getScheme().toLowerCase(), registrar);
}
}

/**
* Sets the default configuration in workers.
*
* <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
*/
public static void setDefaultConfigInWorkers(PipelineOptions options) {
defaultConfig = checkNotNull(options, "options");
}

@VisibleForTesting
static Map<String, FileSystem> verifySchemesAreUnique(
PipelineOptions options, Set<FileSystemRegistrar> registrars) {
Multimap<String, FileSystem> fileSystemsBySchemes =
static void verifySchemesAreUnique(Set<FileSystemRegistrar> registrars) {
Multimap<String, FileSystemRegistrar> registrarsBySchemes =
TreeMultimap.create(Ordering.<String>natural(), Ordering.arbitrary());

for (FileSystemRegistrar registrar : registrars) {
for (FileSystem fileSystem : registrar.fromOptions(options)) {
fileSystemsBySchemes.put(fileSystem.getScheme(), fileSystem);
}
registrarsBySchemes.put(registrar.getScheme().toLowerCase(), registrar);
}
for (Entry<String, Collection<FileSystem>> entry
: fileSystemsBySchemes.asMap().entrySet()) {
for (Entry<String, Collection<FileSystemRegistrar>> entry
: registrarsBySchemes.asMap().entrySet()) {
if (entry.getValue().size() > 1) {
String conflictingFileSystems = Joiner.on(", ").join(
String conflictingRegistrars = Joiner.on(", ").join(
FluentIterable.from(entry.getValue())
.transform(new Function<FileSystem, String>() {
.transform(new Function<FileSystemRegistrar, String>() {
@Override
public String apply(@Nonnull FileSystem input) {
public String apply(@Nonnull FileSystemRegistrar input) {
return input.getClass().getName();
}})
.toSortedList(Ordering.<String>natural()));
throw new IllegalStateException(String.format(
"Scheme: [%s] has conflicting filesystems: [%s]",
"Scheme: [%s] has conflicting registrars: [%s]",
entry.getKey(),
conflictingFileSystems));
conflictingRegistrars));
}
}

ImmutableMap.Builder<String, FileSystem> schemeToFileSystem = ImmutableMap.builder();
for (Entry<String, FileSystem> entry : fileSystemsBySchemes.entries()) {
schemeToFileSystem.put(entry.getKey(), entry.getValue());
}
return schemeToFileSystem.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,6 @@ protected LocalResourceId matchNewResource(String singleResourceSpec, boolean is
return LocalResourceId.fromPath(path, isDirectory);
}

@Override
protected String getScheme() {
return "file";
}

private MatchResult matchOne(String spec) throws IOException {
File file = Paths.get(spec).toFile();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@
package org.apache.beam.sdk.io;

import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.PipelineOptions;

/**
* {@link AutoService} registrar for the {@link LocalFileSystem}.
* {@link AutoService} registrar for the {@link FileSystem}.
*/
@AutoService(FileSystemRegistrar.class)
public class LocalFileSystemRegistrar implements FileSystemRegistrar {

static final String LOCAL_FILE_SCHEME = "file";

@Override
public FileSystem fromOptions(@Nullable PipelineOptions options) {
return new LocalFileSystem();
}

@Override
public Iterable<FileSystem> fromOptions(@Nullable PipelineOptions options) {
return ImmutableList.<FileSystem>of(new LocalFileSystem());
public String getScheme() {
return LOCAL_FILE_SCHEME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private LocalResourceId resolveLocalPathWindowsOS(String other, ResolveOptions r

@Override
public String getScheme() {
return "file";
return LocalFileSystemRegistrar.LOCAL_FILE_SCHEME;
}

Path getPath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.io.Files;

import java.io.Writer;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import javax.annotation.Nullable;

import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.commons.lang3.SystemUtils;
import org.junit.Rule;
Expand Down Expand Up @@ -74,12 +77,21 @@ public void testGetLocalFileSystem() throws Exception {
@Test
public void testVerifySchemesAreUnique() throws Exception {
thrown.expect(RuntimeException.class);
thrown.expectMessage("Scheme: [file] has conflicting filesystems");
thrown.expectMessage("Scheme: [file] has conflicting registrars");
FileSystems.verifySchemesAreUnique(
PipelineOptionsFactory.create(),
Sets.<FileSystemRegistrar>newHashSet(
new LocalFileSystemRegistrar(),
new LocalFileSystemRegistrar()));
new FileSystemRegistrar() {
@Override
public FileSystem fromOptions(@Nullable PipelineOptions options) {
return null;
}

@Override
public String getScheme() {
return "FILE";
}
}));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@
*/
package org.apache.beam.sdk.io;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import com.google.common.collect.Lists;
import java.util.ServiceLoader;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -40,8 +36,6 @@ public void testServiceLoader() {
for (FileSystemRegistrar registrar
: Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
if (registrar instanceof LocalFileSystemRegistrar) {
Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
assertThat(fileSystems, contains(instanceOf(LocalFileSystem.class)));
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,6 @@ protected void copy(List<GcsResourceId> srcResourceIds, List<GcsResourceId> dest
options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds));
}

@Override
protected String getScheme() {
return "gs";
}

private List<MatchResult> matchGlobs(List<GcsPath> globs) {
// TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503.
return FluentIterable.from(globs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.FileSystem;
Expand All @@ -33,11 +32,18 @@
@AutoService(FileSystemRegistrar.class)
public class GcsFileSystemRegistrar implements FileSystemRegistrar {

static final String GCS_SCHEME = "gs";

@Override
public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
public FileSystem fromOptions(@Nonnull PipelineOptions options) {
checkNotNull(
options,
"Expect the runner have called FileSystems.setDefaultConfigInWorkers().");
return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class)));
return new GcsFileSystem(options.as(GcsOptions.class));
}

@Override
public String getScheme() {
return GCS_SCHEME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private boolean isDirectory() {

@Override
public String getScheme() {
return "gs";
return GcsFileSystemRegistrar.GCS_SCHEME;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
*/
package org.apache.beam.sdk.io.gcp.storage;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.common.collect.Lists;
import java.util.ServiceLoader;
import org.apache.beam.sdk.io.FileSystem;

import org.apache.beam.sdk.io.FileSystemRegistrar;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Test;
Expand All @@ -42,8 +41,8 @@ public void testServiceLoader() {
for (FileSystemRegistrar registrar
: Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
if (registrar instanceof GcsFileSystemRegistrar) {
Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
assertThat(fileSystems, contains(instanceOf(GcsFileSystem.class)));
assertEquals("gs", registrar.getScheme());
assertTrue(registrar.fromOptions(PipelineOptionsFactory.create()) instanceof GcsFileSystem);
return;
}
}
Expand Down

0 comments on commit 25f9b4a

Please sign in to comment.