Skip to content

Commit

Permalink
Merge d08879f into cb6e0a8
Browse files Browse the repository at this point in the history
  • Loading branch information
peihe committed Jan 24, 2017
2 parents cb6e0a8 + d08879f commit 59e8556
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -53,6 +52,8 @@ public class FileSystems {
private static final Map<String, FileSystemRegistrar> SCHEME_TO_REGISTRAR =
new ConcurrentHashMap<>();

private static PipelineOptions defaultConfig;

private static final Map<String, PipelineOptions> SCHEME_TO_DEFAULT_CONFIG =
new ConcurrentHashMap<>();

Expand All @@ -78,27 +79,12 @@ private static void loadFileSystemRegistrars() {
}

/**
* Sets the default configuration to be used with a {@link FileSystemRegistrar} for the provided
* {@code scheme}.
* Sets the default configuration in workers.
*
* <p>Syntax: <pre>scheme = alpha *( alpha | digit | "+" | "-" | "." )</pre>
* Upper case letters are treated as the same as lower case letters.
* <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
*/
public static void setDefaultConfig(String scheme, PipelineOptions options) {
String lowerCaseScheme = checkNotNull(scheme, "scheme").toLowerCase();
checkArgument(
URI_SCHEME_PATTERN.matcher(lowerCaseScheme).matches(),
String.format("Scheme: [%s] doesn't match URI syntax: %s",
lowerCaseScheme, URI_SCHEME_PATTERN.pattern()));
checkArgument(
SCHEME_TO_REGISTRAR.containsKey(lowerCaseScheme),
String.format("No FileSystemRegistrar found for scheme: [%s].", lowerCaseScheme));
SCHEME_TO_DEFAULT_CONFIG.put(lowerCaseScheme, checkNotNull(options, "options"));
}

@VisibleForTesting
static PipelineOptions getDefaultConfig(String scheme) {
return SCHEME_TO_DEFAULT_CONFIG.get(scheme.toLowerCase());
public static void setDefaultConfigInWorkers(PipelineOptions options) {
defaultConfig = checkNotNull(options, "options");
}

/**
Expand All @@ -108,7 +94,8 @@ static PipelineOptions getDefaultConfig(String scheme) {
static FileSystem getFileSystemInternal(URI uri) {
String lowerCaseScheme = (uri.getScheme() != null
? uri.getScheme().toLowerCase() : LocalFileSystemRegistrar.LOCAL_FILE_SCHEME);
return getRegistrarInternal(lowerCaseScheme).fromOptions(getDefaultConfig(lowerCaseScheme));
return getRegistrarInternal(lowerCaseScheme)
.fromOptions(checkNotNull(defaultConfig, "defaultConfig"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
*/
package org.apache.beam.sdk.io;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.Sets;
import java.net.URI;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -41,35 +38,6 @@ public class FileSystemsTest {
@Rule
public ExpectedException thrown = ExpectedException.none();

@Test
public void testSetDefaultConfig() throws Exception {
PipelineOptions first = PipelineOptionsFactory.create();
PipelineOptions second = PipelineOptionsFactory.create();
FileSystems.setDefaultConfig("file", first);
assertEquals(first, FileSystems.getDefaultConfig("file"));
assertEquals(first, FileSystems.getDefaultConfig("FILE"));

FileSystems.setDefaultConfig("FILE", second);
assertNotEquals(first, FileSystems.getDefaultConfig("file"));
assertNotEquals(first, FileSystems.getDefaultConfig("FILE"));
assertEquals(second, FileSystems.getDefaultConfig("file"));
assertEquals(second, FileSystems.getDefaultConfig("FILE"));
}

@Test
public void testSetDefaultConfigNotFound() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("No FileSystemRegistrar found for scheme: [gs-s3].");
FileSystems.setDefaultConfig("gs-s3", PipelineOptionsFactory.create());
}

@Test
public void testSetDefaultConfigInvalidScheme() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Scheme: [gs:] doesn't match URI syntax");
FileSystems.setDefaultConfig("gs:", PipelineOptionsFactory.create());
}

@Test
public void testGetLocalFileSystem() throws Exception {
assertTrue(
Expand Down

0 comments on commit 59e8556

Please sign in to comment.