Skip to content

Commit

Permalink
This closes #2175
Browse files Browse the repository at this point in the history
  • Loading branch information
peihe committed Mar 30, 2017
2 parents 5363431 + d07ef53 commit 769398e
Show file tree
Hide file tree
Showing 9 changed files with 516 additions and 79 deletions.
302 changes: 281 additions & 21 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
Expand Down Expand Up @@ -107,19 +107,13 @@ protected void copy(
LocalResourceId src = srcResourceIds.get(i);
LocalResourceId dst = destResourceIds.get(i);
LOG.debug("Copying {} to {}", src, dst);
try {
// Copy the source file, replacing the existing destination.
// Paths.get(x) will not work on Windows OSes cause of the ":" after the drive letter.
Files.copy(
src.getPath(),
dst.getPath(),
StandardCopyOption.REPLACE_EXISTING,
StandardCopyOption.COPY_ATTRIBUTES);
} catch (NoSuchFileException e) {
LOG.debug("{} does not exist.", src);
// Suppress exception if file does not exist.
// TODO: re-throw FileNotFoundException once FileSystems supports ignoreMissingFile.
}
// Copy the source file, replacing the existing destination.
// Paths.get(x) will not work on Windows OSes cause of the ":" after the drive letter.
Files.copy(
src.getPath(),
dst.getPath(),
StandardCopyOption.REPLACE_EXISTING,
StandardCopyOption.COPY_ATTRIBUTES);
}
}

Expand All @@ -137,32 +131,20 @@ protected void rename(
LocalResourceId src = srcResourceIds.get(i);
LocalResourceId dst = destResourceIds.get(i);
LOG.debug("Renaming {} to {}", src, dst);
try {
// Rename the source file, replacing the existing destination.
Files.move(
src.getPath(),
dst.getPath(),
StandardCopyOption.REPLACE_EXISTING,
StandardCopyOption.COPY_ATTRIBUTES,
StandardCopyOption.ATOMIC_MOVE);
} catch (NoSuchFileException e) {
LOG.debug("{} does not exist.", src);
// Suppress exception if file does not exist.
// TODO: re-throw FileNotFoundException once FileSystems supports ignoreMissingFile.
}
// Rename the source file, replacing the existing destination.
Files.move(
src.getPath(),
dst.getPath(),
StandardCopyOption.REPLACE_EXISTING,
StandardCopyOption.ATOMIC_MOVE);
}
}

@Override
protected void delete(Collection<LocalResourceId> resourceIds) throws IOException {
for (LocalResourceId resourceId : resourceIds) {
LOG.debug("deleting file {}", resourceId);
// Delete the file if it exists.
// TODO: use Files.delete() once FileSystems supports ignoreMissingFile.
boolean exists = Files.deleteIfExists(resourceId.getPath());
if (!exists) {
LOG.debug("Tried to delete {}, but it did not exist", resourceId);
}
Files.delete(resourceId.getPath());
}
}

Expand Down Expand Up @@ -207,7 +189,14 @@ public boolean apply(File input) {
for (File match : matchedFiles) {
result.add(toMetadata(match));
}
return MatchResult.create(Status.OK, result.toArray(new Metadata[result.size()]));
if (result.isEmpty()) {
// TODO: consider to return Status.OK for globs.
return MatchResult.create(
Status.NOT_FOUND,
new FileNotFoundException(String.format("No files found for spec: %s.", spec)));
} else {
return MatchResult.create(Status.OK, result.toArray(new Metadata[result.size()]));
}
}

private Metadata toMetadata(File file) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ Path getPath() {

@Override
public String toString() {
return String.format("LocalResourceId: [%s]", path);
return path.toString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.fs;

import org.apache.beam.sdk.io.FileSystems;

/**
* An object that configures {@link FileSystems#copy}, {@link FileSystems#rename},
* and {@link FileSystems#delete}.
*/
public interface MoveOptions {

/**
* Defines the standard {@link MoveOptions}.
*/
enum StandardMoveOptions implements MoveOptions {
IGNORE_MISSING_FILES,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.fs;

import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;

/**
Expand Down Expand Up @@ -82,4 +83,11 @@ public interface ResourceId {
* <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
*/
String getScheme();

/**
* Returns the string representation of this {@link ResourceId}.
*
* <p>The corresponding {@link FileSystem#match} is required to accept this string representation.
*/
String toString();
}
10 changes: 0 additions & 10 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -752,11 +752,6 @@ public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {

@Override
public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
if (errorExtractor.itemNotFound(e)) {
// Do nothing on item not found.
LOG.debug("{} does not exist, assuming this is a retry after deletion.", from);
return;
}
throw new IOException(
String.format("Error trying to copy %s to %s: %s", from, to, e));
}
Expand All @@ -774,11 +769,6 @@ public void onSuccess(Void obj, HttpHeaders responseHeaders) {

@Override
public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
if (errorExtractor.itemNotFound(e)) {
// Do nothing on item not found.
LOG.debug("{} does not exist.", file);
return;
}
throw new IOException(String.format("Error trying to delete %s: %s", file, e));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,36 @@
*/
package org.apache.beam.sdk.io;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.io.Files;

import java.io.Writer;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import javax.annotation.Nullable;

import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.commons.lang3.SystemUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

Expand All @@ -36,18 +56,21 @@
@RunWith(JUnit4.class)
public class FileSystemsTest {

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException thrown = ExpectedException.none();
private LocalFileSystem localFileSystem = new LocalFileSystem();

@Test
public void testGetLocalFileSystem() throws Exception {
assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("~/home/"))
assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("~/home/").getScheme())
instanceof LocalFileSystem);
assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("file://home"))
assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("file://home").getScheme())
instanceof LocalFileSystem);
assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("FILE://home"))
assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("FILE://home").getScheme())
instanceof LocalFileSystem);
assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("File://home"))
assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("File://home").getScheme())
instanceof LocalFileSystem);
}

Expand All @@ -71,6 +94,143 @@ public String getScheme() {
}));
}

@Test
public void testDeleteThrowsNoSuchFileException() throws Exception {
Path existingPath = temporaryFolder.newFile().toPath();
Path nonExistentPath = existingPath.resolveSibling("non-existent");

createFileWithContent(existingPath, "content1");

thrown.expect(NoSuchFileException.class);
FileSystems.delete(
toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */));
}

@Test
public void testDeleteIgnoreMissingFiles() throws Exception {
Path existingPath = temporaryFolder.newFile().toPath();
Path nonExistentPath = existingPath.resolveSibling("non-existent");

createFileWithContent(existingPath, "content1");

FileSystems.delete(
toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */),
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
}

@Test
public void testCopyThrowsNoSuchFileException() throws Exception {
Path existingPath = temporaryFolder.newFile().toPath();
Path nonExistentPath = existingPath.resolveSibling("non-existent");

Path destPath1 = existingPath.resolveSibling("dest1");
Path destPath2 = nonExistentPath.resolveSibling("dest2");

createFileWithContent(existingPath, "content1");

thrown.expect(NoSuchFileException.class);
FileSystems.copy(
toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */),
toResourceIds(ImmutableList.of(destPath1, destPath2), false /* isDirectory */));
}

@Test
public void testCopyIgnoreMissingFiles() throws Exception {
Path srcPath1 = temporaryFolder.newFile().toPath();
Path nonExistentPath = srcPath1.resolveSibling("non-existent");
Path srcPath3 = temporaryFolder.newFile().toPath();

Path destPath1 = srcPath1.resolveSibling("dest1");
Path destPath2 = nonExistentPath.resolveSibling("dest2");
Path destPath3 = srcPath1.resolveSibling("dest3");

createFileWithContent(srcPath1, "content1");
createFileWithContent(srcPath3, "content3");

FileSystems.copy(
toResourceIds(
ImmutableList.of(srcPath1, nonExistentPath, srcPath3), false /* isDirectory */),
toResourceIds(ImmutableList.of(destPath1, destPath2, destPath3), false /* isDirectory */),
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);

assertTrue(srcPath1.toFile().exists());
assertTrue(srcPath3.toFile().exists());
assertThat(
Files.readLines(srcPath1.toFile(), StandardCharsets.UTF_8),
containsInAnyOrder("content1"));
assertFalse(destPath2.toFile().exists());
assertThat(
Files.readLines(srcPath3.toFile(), StandardCharsets.UTF_8),
containsInAnyOrder("content3"));
}

@Test
public void testRenameThrowsNoSuchFileException() throws Exception {
Path existingPath = temporaryFolder.newFile().toPath();
Path nonExistentPath = existingPath.resolveSibling("non-existent");

Path destPath1 = existingPath.resolveSibling("dest1");
Path destPath2 = nonExistentPath.resolveSibling("dest2");

createFileWithContent(existingPath, "content1");

thrown.expect(NoSuchFileException.class);
FileSystems.rename(
toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */),
toResourceIds(ImmutableList.of(destPath1, destPath2), false /* isDirectory */));
}

@Test
public void testRenameIgnoreMissingFiles() throws Exception {
Path srcPath1 = temporaryFolder.newFile().toPath();
Path nonExistentPath = srcPath1.resolveSibling("non-existent");
Path srcPath3 = temporaryFolder.newFile().toPath();

Path destPath1 = srcPath1.resolveSibling("dest1");
Path destPath2 = nonExistentPath.resolveSibling("dest2");
Path destPath3 = srcPath1.resolveSibling("dest3");

createFileWithContent(srcPath1, "content1");
createFileWithContent(srcPath3, "content3");

FileSystems.rename(
toResourceIds(
ImmutableList.of(srcPath1, nonExistentPath, srcPath3), false /* isDirectory */),
toResourceIds(ImmutableList.of(destPath1, destPath2, destPath3), false /* isDirectory */),
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);

assertFalse(srcPath1.toFile().exists());
assertFalse(srcPath3.toFile().exists());
assertThat(
Files.readLines(destPath1.toFile(), StandardCharsets.UTF_8),
containsInAnyOrder("content1"));
assertFalse(destPath2.toFile().exists());
assertThat(
Files.readLines(destPath3.toFile(), StandardCharsets.UTF_8),
containsInAnyOrder("content3"));
}

private List<ResourceId> toResourceIds(List<Path> paths, final boolean isDirectory) {
return FluentIterable
.from(paths)
.transform(new Function<Path, ResourceId>() {
@Override
public ResourceId apply(Path path) {
return LocalResourceId.fromPath(path, isDirectory);
}})
.toList();
}

private void createFileWithContent(Path path, String content) throws Exception {
try (Writer writer = Channels.newWriter(
localFileSystem.create(
LocalResourceId.fromPath(path, false /* isDirectory */),
CreateOptions.StandardCreateOptions.builder().setMimeType(MimeTypes.TEXT).build()),
StandardCharsets.UTF_8.name())) {
writer.write(content);
}
}

private LocalResourceId toLocalResourceId(String str) throws Exception {
boolean isDirectory;
if (SystemUtils.IS_OS_WINDOWS) {
Expand Down
Loading

0 comments on commit 769398e

Please sign in to comment.