diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index b39e517a5bac8..96306dc531d46 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -17,32 +17,46 @@ */ package org.apache.beam.sdk.io; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Joiner; +import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.collect.TreeMultimap; + import java.io.IOException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.ServiceLoader; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.annotation.Nonnull; + import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MatchResult.Status; +import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.apache.beam.sdk.values.KV; /** * Clients facing {@link FileSystem} utility. @@ -50,6 +64,8 @@ public class FileSystems { public static final String DEFAULT_SCHEME = "default"; + private static final Pattern URI_SCHEME_PATTERN = Pattern.compile( + "(?[a-zA-Z][-a-zA-Z0-9+.]*)://.*"); private static final Map SCHEME_TO_REGISTRAR = new ConcurrentHashMap<>(); @@ -62,6 +78,60 @@ public class FileSystems { /********************************** METHODS FOR CLIENT **********************************/ + /** + * This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}. + * Callers should use {@link #match} to resolve users specs ambiguities before + * calling other methods. + * + *

Implementation handles the following ambiguities of a user-provided spec: + *

    + *
  1. {@code spec} could be a glob or a uri. {@link #match} should be able to tell and + * choose efficient implementations. + *
  2. The user-provided {@code spec} might refer to files or directories. It is common that + * users that wish to indicate a directory will omit the trailing path delimiter, such as + * {@code "/tmp/dir"} in Linux. The {@link FileSystem} should be able to recognize a directory + * with the trailing path delimiter omitted, but should always return a correct {@link ResourceId} + * (e.g., {@code "/tmp/dir/"} inside the returned {@link MatchResult}. + *
+ * + *

All {@link FileSystem} implementations should support glob in the final hierarchical path + * component of {@link ResourceId}. This allows SDK libraries to construct file system agnostic + * spec. {@link FileSystem FileSystems} can support additional patterns for user-provided specs. + * + * @return {@code List} in the same order of the input specs. + * + * @throws IllegalArgumentException if specs are invalid -- empty or have different schemes. + * @throws IOException if all specs failed to match due to issues like: + * network connection, authorization. + * Exception for individual spec is deferred until callers retrieve + * metadata with {@link MatchResult#metadata()}. + */ + public static List match(List specs) throws IOException { + return getFileSystemInternal(getOnlyScheme(specs)).match(specs); + } + + /** + * Returns {@link MatchResult MatchResults} for the given {@link ResourceId resourceIds}. + * + * @param resourceIds {@link ResourceId resourceIds} that might be derived from {@link #match}, + * {@link ResourceId#resolve}, or {@link ResourceId#getCurrentDirectory()}. + * + * @throws IOException if all {@code resourceIds} failed to match due to issues like: + * network connection, authorization. + * Exception for individual {@link ResourceId} need to be deferred until callers retrieve + * metadata with {@link MatchResult#metadata()}. + */ + public static List matchResources(List resourceIds) throws IOException { + return match(FluentIterable + .from(resourceIds) + .transform(new Function() { + @Override + public String apply(@Nonnull ResourceId resourceId) { + return resourceId.toString(); + }}) + .toList()); + } + /** * Returns a write channel for the given {@link ResourceId}. * @@ -85,7 +155,7 @@ public static WritableByteChannel create(ResourceId resourceId, String mimeType) */ public static WritableByteChannel create(ResourceId resourceId, CreateOptions createOptions) throws IOException { - return getFileSystemInternal(resourceId).create(resourceId, createOptions); + return getFileSystemInternal(resourceId.getScheme()).create(resourceId, createOptions); } /** @@ -99,44 +169,206 @@ public static WritableByteChannel create(ResourceId resourceId, CreateOptions cr * @param resourceId the reference of the file-like resource to open */ public static ReadableByteChannel open(ResourceId resourceId) throws IOException { - return getFileSystemInternal(resourceId).open(resourceId); + return getFileSystemInternal(resourceId.getScheme()).open(resourceId); } - /********************************** METHODS FOR REGISTRATION **********************************/ - /** - * Loads available {@link FileSystemRegistrar} services. + * Copies a {@link List} of file-like resources from one location to another. + * + *

The number of source resources must equal the number of destination resources. + * Destination resources will be created recursively. + * + *

{@code srcResourceIds} and {@code destResourceIds} must have the same scheme. + * + *

It doesn't support copying globs. + * + * @param srcResourceIds the references of the source resources + * @param destResourceIds the references of the destination resources */ - private static void loadFileSystemRegistrars() { - SCHEME_TO_REGISTRAR.clear(); - Set registrars = - Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); - registrars.addAll(Lists.newArrayList( - ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader()))); + public static void copy( + List srcResourceIds, + List destResourceIds, + MoveOptions... moveOptions) throws IOException { + validateOnlyScheme(srcResourceIds, destResourceIds); - verifySchemesAreUnique(registrars); + List srcToCopy; + List destToCopy; + if (Sets.newHashSet(moveOptions).contains( + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) { + KV, List> existings = + filterMissingFiles(srcResourceIds, destResourceIds); + srcToCopy = existings.getKey(); + destToCopy = existings.getValue(); + } else { + srcToCopy = srcResourceIds; + destToCopy = destResourceIds; + } + if (srcToCopy.isEmpty()) { + return; + } + getFileSystemInternal(srcToCopy.iterator().next().getScheme()) + .copy(srcToCopy, destToCopy); + } - for (FileSystemRegistrar registrar : registrars) { - SCHEME_TO_REGISTRAR.put(registrar.getScheme().toLowerCase(), registrar); + /** + * Renames a {@link List} of file-like resources from one location to another. + * + *

The number of source resources must equal the number of destination resources. + * Destination resources will be created recursively. + * + *

{@code srcResourceIds} and {@code destResourceIds} must have the same scheme. + * + *

It doesn't support renaming globs. + * + * @param srcResourceIds the references of the source resources + * @param destResourceIds the references of the destination resources + */ + public static void rename( + List srcResourceIds, + List destResourceIds, + MoveOptions... moveOptions) throws IOException { + validateOnlyScheme(srcResourceIds, destResourceIds); + List srcToRename; + List destToRename; + + if (Sets.newHashSet(moveOptions).contains( + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) { + KV, List> existings = + filterMissingFiles(srcResourceIds, destResourceIds); + srcToRename = existings.getKey(); + destToRename = existings.getValue(); + } else { + srcToRename = srcResourceIds; + destToRename = destResourceIds; + } + if (srcToRename.isEmpty()) { + return; } + getFileSystemInternal(srcToRename.iterator().next().getScheme()) + .rename(srcToRename, destToRename); } /** - * Sets the default configuration in workers. + * Deletes a collection of resources. * - *

It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes. + *

It is allowed but not recommended to delete directories recursively. + * Callers depends on {@link FileSystems} and uses {@code DeleteOptions}. + * + *

{@code resourceIds} must have the same scheme. + * + * @param resourceIds the references of the resources to delete. */ - public static void setDefaultConfigInWorkers(PipelineOptions options) { - defaultConfig = checkNotNull(options, "options"); + public static void delete( + Collection resourceIds, MoveOptions... moveOptions) throws IOException { + Collection resourceIdsToDelete; + if (Sets.newHashSet(moveOptions).contains( + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) { + resourceIdsToDelete = FluentIterable + .from(matchResources(Lists.newArrayList(resourceIds))) + .filter(new Predicate() { + @Override + public boolean apply(@Nonnull MatchResult matchResult) { + return !matchResult.status().equals(MatchResult.Status.NOT_FOUND); + }}) + .transformAndConcat(new Function>() { + @Nonnull + @Override + public Iterable apply(@Nonnull MatchResult input) { + try { + return Lists.newArrayList(input.metadata()); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to get metadata from MatchResult: %s.", input), + e); + } + }}) + .transform(new Function() { + @Nonnull + @Override + public ResourceId apply(@Nonnull Metadata input) { + return input.resourceId(); + }}) + .toList(); + } else { + resourceIdsToDelete = resourceIds; + } + if (resourceIdsToDelete.isEmpty()) { + return; + } + getFileSystemInternal(resourceIdsToDelete.iterator().next().getScheme()) + .delete(resourceIdsToDelete); + } + + private static KV, List> filterMissingFiles( + List srcResourceIds, List destResourceIds) throws IOException { + List srcToHandle = new ArrayList<>(); + List destToHandle = new ArrayList<>(); + + List matchResults = matchResources(srcResourceIds); + for (int i = 0; i < matchResults.size(); ++i) { + if (!matchResults.get(i).status().equals(Status.NOT_FOUND)) { + srcToHandle.add(srcResourceIds.get(i)); + destToHandle.add(destResourceIds.get(i)); + } + } + return KV.of(srcToHandle, destToHandle); + } + + private static void validateOnlyScheme( + List srcResourceIds, List destResourceIds) { + checkArgument( + srcResourceIds.size() == destResourceIds.size(), + "Number of source resource ids %s must equal number of destination resource ids %s", + srcResourceIds.size(), + destResourceIds.size()); + Set schemes = FluentIterable.from(srcResourceIds) + .append(destResourceIds) + .transform(new Function() { + @Override + public String apply(@Nonnull ResourceId resourceId) { + return resourceId.getScheme(); + }}) + .toSet(); + checkArgument( + schemes.size() == 1, + String.format( + "Expect srcResourceIds and destResourceIds have the same scheme, but received %s.", + Joiner.on(", ").join(schemes))); + } + + private static String getOnlyScheme(List specs) { + checkArgument(!specs.isEmpty(), "Expect specs are not empty."); + Set schemes = FluentIterable.from(specs) + .transform(new Function() { + @Override + public String apply(String spec) { + return parseScheme(spec); + }}) + .toSet(); + return Iterables.getOnlyElement(schemes); + } + + private static String parseScheme(String spec) { + // The spec is almost, but not quite, a URI. In particular, + // the reserved characters '[', ']', and '?' have meanings that differ + // from their use in the URI spec. ('*' is not reserved). + // Here, we just need the scheme, which is so circumscribed as to be + // very easy to extract with a regex. + Matcher matcher = URI_SCHEME_PATTERN.matcher(spec); + + if (!matcher.matches()) { + return LocalFileSystemRegistrar.LOCAL_FILE_SCHEME; + } else { + return matcher.group("scheme").toLowerCase(); + } } /** * Internal method to get {@link FileSystem} for {@code spec}. */ @VisibleForTesting - static FileSystem getFileSystemInternal(ResourceId resourceId) { - String lowerCaseScheme = resourceId.getScheme().toLowerCase(); - return getRegistrarInternal(lowerCaseScheme).fromOptions(defaultConfig); + static FileSystem getFileSystemInternal(String scheme) { + return getRegistrarInternal(scheme.toLowerCase()).fromOptions(defaultConfig); } /** @@ -154,6 +386,34 @@ static FileSystemRegistrar getRegistrarInternal(String scheme) { } } + /********************************** METHODS FOR REGISTRATION **********************************/ + + /** + * Loads available {@link FileSystemRegistrar} services. + */ + private static void loadFileSystemRegistrars() { + SCHEME_TO_REGISTRAR.clear(); + Set registrars = + Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); + registrars.addAll(Lists.newArrayList( + ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader()))); + + verifySchemesAreUnique(registrars); + + for (FileSystemRegistrar registrar : registrars) { + SCHEME_TO_REGISTRAR.put(registrar.getScheme().toLowerCase(), registrar); + } + } + + /** + * Sets the default configuration in workers. + * + *

It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes. + */ + public static void setDefaultConfigInWorkers(PipelineOptions options) { + defaultConfig = checkNotNull(options, "options"); + } + @VisibleForTesting static void verifySchemesAreUnique(Set registrars) { Multimap registrarsBySchemes = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java index fe6b643357685..1cad4b3d77a04 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java @@ -27,13 +27,13 @@ import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; -import java.nio.file.NoSuchFileException; import java.nio.file.PathMatcher; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; @@ -107,19 +107,13 @@ protected void copy( LocalResourceId src = srcResourceIds.get(i); LocalResourceId dst = destResourceIds.get(i); LOG.debug("Copying {} to {}", src, dst); - try { - // Copy the source file, replacing the existing destination. - // Paths.get(x) will not work on Windows OSes cause of the ":" after the drive letter. - Files.copy( - src.getPath(), - dst.getPath(), - StandardCopyOption.REPLACE_EXISTING, - StandardCopyOption.COPY_ATTRIBUTES); - } catch (NoSuchFileException e) { - LOG.debug("{} does not exist.", src); - // Suppress exception if file does not exist. - // TODO: re-throw FileNotFoundException once FileSystems supports ignoreMissingFile. - } + // Copy the source file, replacing the existing destination. + // Paths.get(x) will not work on Windows OSes cause of the ":" after the drive letter. + Files.copy( + src.getPath(), + dst.getPath(), + StandardCopyOption.REPLACE_EXISTING, + StandardCopyOption.COPY_ATTRIBUTES); } } @@ -137,19 +131,12 @@ protected void rename( LocalResourceId src = srcResourceIds.get(i); LocalResourceId dst = destResourceIds.get(i); LOG.debug("Renaming {} to {}", src, dst); - try { - // Rename the source file, replacing the existing destination. - Files.move( - src.getPath(), - dst.getPath(), - StandardCopyOption.REPLACE_EXISTING, - StandardCopyOption.COPY_ATTRIBUTES, - StandardCopyOption.ATOMIC_MOVE); - } catch (NoSuchFileException e) { - LOG.debug("{} does not exist.", src); - // Suppress exception if file does not exist. - // TODO: re-throw FileNotFoundException once FileSystems supports ignoreMissingFile. - } + // Rename the source file, replacing the existing destination. + Files.move( + src.getPath(), + dst.getPath(), + StandardCopyOption.REPLACE_EXISTING, + StandardCopyOption.ATOMIC_MOVE); } } @@ -157,12 +144,7 @@ protected void rename( protected void delete(Collection resourceIds) throws IOException { for (LocalResourceId resourceId : resourceIds) { LOG.debug("deleting file {}", resourceId); - // Delete the file if it exists. - // TODO: use Files.delete() once FileSystems supports ignoreMissingFile. - boolean exists = Files.deleteIfExists(resourceId.getPath()); - if (!exists) { - LOG.debug("Tried to delete {}, but it did not exist", resourceId); - } + Files.delete(resourceId.getPath()); } } @@ -207,7 +189,14 @@ public boolean apply(File input) { for (File match : matchedFiles) { result.add(toMetadata(match)); } - return MatchResult.create(Status.OK, result.toArray(new Metadata[result.size()])); + if (result.isEmpty()) { + // TODO: consider to return Status.OK for globs. + return MatchResult.create( + Status.NOT_FOUND, + new FileNotFoundException(String.format("No files found for spec: %s.", spec))); + } else { + return MatchResult.create(Status.OK, result.toArray(new Metadata[result.size()])); + } } private Metadata toMetadata(File file) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java index 3f4dab27f3e07..2272a06fa74ff 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java @@ -116,7 +116,7 @@ Path getPath() { @Override public String toString() { - return String.format("LocalResourceId: [%s]", path); + return path.toString(); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java new file mode 100644 index 0000000000000..c5bd27ec47877 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import org.apache.beam.sdk.io.FileSystems; + +/** + * An object that configures {@link FileSystems#copy}, {@link FileSystems#rename}, + * and {@link FileSystems#delete}. + */ +public interface MoveOptions { + + /** + * Defines the standard {@link MoveOptions}. + */ + enum StandardMoveOptions implements MoveOptions { + IGNORE_MISSING_FILES, + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java index 2bdd6604e06be..938e24a0e92ae 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.fs; +import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; /** @@ -82,4 +83,11 @@ public interface ResourceId { * RFC 2396 */ String getScheme(); + + /** + * Returns the string representation of this {@link ResourceId}. + * + *

The corresponding {@link FileSystem#match} is required to accept this string representation. + */ + String toString(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index 434baf5338f61..14781c4a71600 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -752,11 +752,6 @@ public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) { @Override public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - if (errorExtractor.itemNotFound(e)) { - // Do nothing on item not found. - LOG.debug("{} does not exist, assuming this is a retry after deletion.", from); - return; - } throw new IOException( String.format("Error trying to copy %s to %s: %s", from, to, e)); } @@ -774,11 +769,6 @@ public void onSuccess(Void obj, HttpHeaders responseHeaders) { @Override public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - if (errorExtractor.itemNotFound(e)) { - // Do nothing on item not found. - LOG.debug("{} does not exist.", file); - return; - } throw new IOException(String.format("Error trying to delete %s: %s", file, e)); } }); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java index cfa2b85735482..8cfa3dc6e631a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java @@ -17,16 +17,36 @@ */ package org.apache.beam.sdk.io; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; +import com.google.common.io.Files; + +import java.io.Writer; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; import javax.annotation.Nullable; + +import org.apache.beam.sdk.io.fs.CreateOptions; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.MimeTypes; import org.apache.commons.lang3.SystemUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -36,18 +56,21 @@ @RunWith(JUnit4.class) public class FileSystemsTest { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public ExpectedException thrown = ExpectedException.none(); + private LocalFileSystem localFileSystem = new LocalFileSystem(); @Test public void testGetLocalFileSystem() throws Exception { - assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("~/home/")) + assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("~/home/").getScheme()) instanceof LocalFileSystem); - assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("file://home")) + assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("file://home").getScheme()) instanceof LocalFileSystem); - assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("FILE://home")) + assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("FILE://home").getScheme()) instanceof LocalFileSystem); - assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("File://home")) + assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("File://home").getScheme()) instanceof LocalFileSystem); } @@ -71,6 +94,143 @@ public String getScheme() { })); } + @Test + public void testDeleteThrowsNoSuchFileException() throws Exception { + Path existingPath = temporaryFolder.newFile().toPath(); + Path nonExistentPath = existingPath.resolveSibling("non-existent"); + + createFileWithContent(existingPath, "content1"); + + thrown.expect(NoSuchFileException.class); + FileSystems.delete( + toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */)); + } + + @Test + public void testDeleteIgnoreMissingFiles() throws Exception { + Path existingPath = temporaryFolder.newFile().toPath(); + Path nonExistentPath = existingPath.resolveSibling("non-existent"); + + createFileWithContent(existingPath, "content1"); + + FileSystems.delete( + toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */), + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + } + + @Test + public void testCopyThrowsNoSuchFileException() throws Exception { + Path existingPath = temporaryFolder.newFile().toPath(); + Path nonExistentPath = existingPath.resolveSibling("non-existent"); + + Path destPath1 = existingPath.resolveSibling("dest1"); + Path destPath2 = nonExistentPath.resolveSibling("dest2"); + + createFileWithContent(existingPath, "content1"); + + thrown.expect(NoSuchFileException.class); + FileSystems.copy( + toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */), + toResourceIds(ImmutableList.of(destPath1, destPath2), false /* isDirectory */)); + } + + @Test + public void testCopyIgnoreMissingFiles() throws Exception { + Path srcPath1 = temporaryFolder.newFile().toPath(); + Path nonExistentPath = srcPath1.resolveSibling("non-existent"); + Path srcPath3 = temporaryFolder.newFile().toPath(); + + Path destPath1 = srcPath1.resolveSibling("dest1"); + Path destPath2 = nonExistentPath.resolveSibling("dest2"); + Path destPath3 = srcPath1.resolveSibling("dest3"); + + createFileWithContent(srcPath1, "content1"); + createFileWithContent(srcPath3, "content3"); + + FileSystems.copy( + toResourceIds( + ImmutableList.of(srcPath1, nonExistentPath, srcPath3), false /* isDirectory */), + toResourceIds(ImmutableList.of(destPath1, destPath2, destPath3), false /* isDirectory */), + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + + assertTrue(srcPath1.toFile().exists()); + assertTrue(srcPath3.toFile().exists()); + assertThat( + Files.readLines(srcPath1.toFile(), StandardCharsets.UTF_8), + containsInAnyOrder("content1")); + assertFalse(destPath2.toFile().exists()); + assertThat( + Files.readLines(srcPath3.toFile(), StandardCharsets.UTF_8), + containsInAnyOrder("content3")); + } + + @Test + public void testRenameThrowsNoSuchFileException() throws Exception { + Path existingPath = temporaryFolder.newFile().toPath(); + Path nonExistentPath = existingPath.resolveSibling("non-existent"); + + Path destPath1 = existingPath.resolveSibling("dest1"); + Path destPath2 = nonExistentPath.resolveSibling("dest2"); + + createFileWithContent(existingPath, "content1"); + + thrown.expect(NoSuchFileException.class); + FileSystems.rename( + toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */), + toResourceIds(ImmutableList.of(destPath1, destPath2), false /* isDirectory */)); + } + + @Test + public void testRenameIgnoreMissingFiles() throws Exception { + Path srcPath1 = temporaryFolder.newFile().toPath(); + Path nonExistentPath = srcPath1.resolveSibling("non-existent"); + Path srcPath3 = temporaryFolder.newFile().toPath(); + + Path destPath1 = srcPath1.resolveSibling("dest1"); + Path destPath2 = nonExistentPath.resolveSibling("dest2"); + Path destPath3 = srcPath1.resolveSibling("dest3"); + + createFileWithContent(srcPath1, "content1"); + createFileWithContent(srcPath3, "content3"); + + FileSystems.rename( + toResourceIds( + ImmutableList.of(srcPath1, nonExistentPath, srcPath3), false /* isDirectory */), + toResourceIds(ImmutableList.of(destPath1, destPath2, destPath3), false /* isDirectory */), + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + + assertFalse(srcPath1.toFile().exists()); + assertFalse(srcPath3.toFile().exists()); + assertThat( + Files.readLines(destPath1.toFile(), StandardCharsets.UTF_8), + containsInAnyOrder("content1")); + assertFalse(destPath2.toFile().exists()); + assertThat( + Files.readLines(destPath3.toFile(), StandardCharsets.UTF_8), + containsInAnyOrder("content3")); + } + + private List toResourceIds(List paths, final boolean isDirectory) { + return FluentIterable + .from(paths) + .transform(new Function() { + @Override + public ResourceId apply(Path path) { + return LocalResourceId.fromPath(path, isDirectory); + }}) + .toList(); + } + + private void createFileWithContent(Path path, String content) throws Exception { + try (Writer writer = Channels.newWriter( + localFileSystem.create( + LocalResourceId.fromPath(path, false /* isDirectory */), + CreateOptions.StandardCreateOptions.builder().setMimeType(MimeTypes.TEXT).build()), + StandardCharsets.UTF_8.name())) { + writer.write(content); + } + } + private LocalResourceId toLocalResourceId(String str) throws Exception { boolean isDirectory; if (SystemUtils.IS_OS_WINDOWS) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java index 74f8b72a4ad4f..bb5928ee88bfa 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java @@ -168,28 +168,24 @@ public void testMatchExact() throws Exception { @Test public void testMatchPatternNone() throws Exception { - List expected = ImmutableList.of(); temporaryFolder.newFile("a"); temporaryFolder.newFile("aa"); temporaryFolder.newFile("ab"); List matchResults = matchGlobWithPathPrefix(temporaryFolder.getRoot().toPath().resolve("b"), "*"); - assertThat( - toFilenames(matchResults), - containsInAnyOrder(expected.toArray(new String[expected.size()]))); + assertEquals(1, matchResults.size()); + assertEquals(MatchResult.Status.NOT_FOUND, matchResults.get(0).status()); } @Test public void testMatchForNonExistentFile() throws Exception { - List expected = ImmutableList.of(); temporaryFolder.newFile("aa"); List matchResults = localFileSystem.match( ImmutableList.of(temporaryFolder.getRoot().toPath().resolve("a").toString())); - assertThat( - toFilenames(matchResults), - containsInAnyOrder(expected.toArray(new String[expected.size()]))); + assertEquals(1, matchResults.size()); + assertEquals(MatchResult.Status.NOT_FOUND, matchResults.get(0).status()); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java index aecc5c9e7a9f4..a1ac827b12190 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java @@ -97,7 +97,7 @@ GcsPath getGcsPath() { @Override public String toString() { - return String.format("GcsResourceId: [%s]", gcsPath); + return gcsPath.toString(); } @Override