Skip to content

Commit

Permalink
[SMALLFIX] Updates for copyFromLocal (#8309)
Browse files Browse the repository at this point in the history
* Update tests because copyFromLocal fails fast now.

* Always shutdown threadpool.

* Update description of cp -t.

* Make sure threads are stopped under interruption.

* Fix checkstyle.

* Fix parsing of options in copyFromLocal.

* Update description of copyFromLocal.

* Log interrupted exceptions.

* Revert "Update tests because copyFromLocal fails fast now."

This reverts commit 7c82ec4.

* Improve exception handling and error messages.

When exceptions are thrown during copying a file or traversing directory, copyFromLocal will print an error message to stderr, but will continue copying other files in the best effort.
After all copy tasks succeed or fail, a summary of all errors including stacktraces will be printed out at the bottom of the command's output.

Example output:

Failed to create directory /test/d
Copied file:///tmp/test/f2 to /test/f2.
Copied file:///tmp/test/d2/d/f to /test/d2/d/f.
Copied file:///tmp/test/f1 to /test/f1.
ERRORS:
[java.io.IOException: Failed to create directory /test/d
        at alluxio.cli.fs.command.CpCommand.asyncCopyLocalPath(CpCommand.java:556)
        at alluxio.cli.fs.command.CpCommand.run(CpCommand.java:306)
        at alluxio.cli.fs.command.CopyFromLocalCommand.run(CopyFromLocalCommand.java:63)
        at alluxio.cli.AbstractShell.run(AbstractShell.java:105)
        at alluxio.cli.fs.FileSystemShell.main(FileSystemShell.java:66)
Caused by: alluxio.exception.FileAlreadyExistsException: /test/d already exists
        at alluxio.client.file.BaseFileSystem.createDirectory(BaseFileSystem.java:107)
        at alluxio.client.file.BaseFileSystem.createDirectory(BaseFileSystem.java:95)
        at alluxio.cli.fs.command.CpCommand.asyncCopyLocalPath(CpCommand.java:554)
        ... 4 more
]

* Fix checkstyle.

* Refactor sendMessage into succeed and fail.

* Fix checkstyle.

* Log full stacktrace but just print short description of errors.

* Do not show Exception class name in command line output.

* Test parsing -t in copyFromLocal.
  • Loading branch information
Cheng Chang authored and madanadit committed Jan 29, 2019
1 parent 96a2031 commit d9866a7
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 59 deletions.
Expand Up @@ -17,6 +17,7 @@
import alluxio.exception.status.InvalidArgumentException;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;

import javax.annotation.concurrent.ThreadSafe;
import java.io.File;
Expand Down Expand Up @@ -44,6 +45,11 @@ public String getCommandName() {
return "copyFromLocal";
}

@Override
public Options getOptions() {
return new Options().addOption(CpCommand.THREAD_OPTION);
}

@Override
public void validateArgs(CommandLine cl) throws InvalidArgumentException {
CommandUtils.checkNumOfArgsEquals(this, cl, 2);
Expand All @@ -60,11 +66,12 @@ public int run(CommandLine cl) throws AlluxioException, IOException {

@Override
public String getUsage() {
return "copyFromLocal <src> <remoteDst>";
return "copyFromLocal [-t <number of threads for copying>] <src> <remoteDst>";
}

@Override
public String getDescription() {
return "Copies a file or a directory from local filesystem to Alluxio filesystem.";
return "Copies a file or a directory from local filesystem to Alluxio filesystem "
+ "in parallel at file level.";
}
}
202 changes: 145 additions & 57 deletions shell/src/main/java/alluxio/cli/fs/command/CpCommand.java
Expand Up @@ -17,6 +17,7 @@
import alluxio.cli.fs.FileSystemShellUtils;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileOutStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.URIStatus;
import alluxio.conf.PropertyKey;
Expand All @@ -38,7 +39,11 @@
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.ThreadSafe;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
Expand All @@ -51,32 +56,34 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.annotation.concurrent.ThreadSafe;

/**
* Copies a file or a directory in the Alluxio filesystem.
*/
@ThreadSafe
public final class CpCommand extends AbstractFileSystemCommand {
private static final Logger LOG = LoggerFactory.getLogger(CpCommand.class);
private static final String COPY_SUCCEED_MESSAGE = "Copied %s to %s";
private static final String COPY_FAIL_MESSAGE = "Failed to copy %s to %s";

private static final Option RECURSIVE_OPTION =
Option.builder("R")
.required(false)
.hasArg(false)
.desc("copy files in subdirectories recursively")
.build();
private static final Option THREAD_OPTION =
public static final Option THREAD_OPTION =
Option.builder("t")
.required(false)
.hasArg(true)
.numberOfArgs(1)
.argName("threads")
.type(Integer.class)
.desc("Number of threads used to copy files in parallel")
.type(Number.class)
.desc("Number of threads used to copy files in parallel, default value is CPU cores * 2")
.build();

/**
Expand All @@ -85,14 +92,30 @@ public final class CpCommand extends AbstractFileSystemCommand {
* Copy tasks can send messages to an output stream in a thread safe way.
*/
@ThreadSafe
private final class CopyThreadPoolExecutor {
private static final String MESSAGE_DONE = "#";
private static final class CopyThreadPoolExecutor {
private static final class CopyException extends Exception {
public CopyException(AlluxioURI src, AlluxioURI dst, Exception cause) {
super(String.format(COPY_FAIL_MESSAGE, src, dst), cause);
}
}

private ThreadPoolExecutor mPool;
private BlockingQueue<String> mMessages;
private PrintStream mOutput;
private Thread mPrinter;
private AlluxioURI mPath;
private static final Object MESSAGE_DONE = new Object();

private final ThreadPoolExecutor mPool;
/**
* Message queue used by mPrinter.
* Only supports objects of type String and Exception.
* String messages will be printed to stdout;
* Exception messages will be printed to stderr and collected into mExceptions.
* Other types of messages will be ignored.
*/
private final BlockingQueue<Object> mMessages;
private final ConcurrentLinkedQueue<Exception> mExceptions;
private final PrintStream mStdout;
private final PrintStream mStderr;
private final Thread mPrinter;
private final FileSystem mFileSystem;
private final AlluxioURI mPath;

/**
* Creates a new thread pool with the specified number of threads,
Expand All @@ -102,29 +125,43 @@ private final class CopyThreadPoolExecutor {
* NOTE: needs to call {@link #shutdown()} to release resources.
*
* @param threads number of threads
* @param out the output stream for tasks to send messages to
* @param stdout the stdout stream for tasks to send messages to
* @param stderr the stderr stream for tasks to send error messages to
* @param fileSystem the Alluxio filesystem used to delete path
* @param path the path to delete on shutdown when it's empty, otherwise can be {@code null}
*/
public CopyThreadPoolExecutor(int threads, PrintStream out, AlluxioURI path) {
public CopyThreadPoolExecutor(int threads, PrintStream stdout, PrintStream stderr,
FileSystem fileSystem, AlluxioURI path) {
mPool = new ThreadPoolExecutor(threads, threads,
1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(threads * 2),
new ThreadPoolExecutor.CallerRunsPolicy());
mMessages = new LinkedBlockingQueue<>();
mOutput = out;
mExceptions = new ConcurrentLinkedQueue<>();
mStdout = stdout;
mStderr = stderr;
mPrinter = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
String message = mMessages.take();
if (message.equals(MESSAGE_DONE)) {
Object message = mMessages.take();
if (message == MESSAGE_DONE) {
break;
}
mOutput.println(message);
if (message instanceof String) {
mStdout.println(message);
} else if (message instanceof CopyException) {
CopyException e = (CopyException) message;
mStderr.println(messageAndCause(e));
} else {
LOG.error("Unsupported message type " + message.getClass()
+ " in message queue of copy thread pool");
}
} catch (InterruptedException e) {
break;
}
}
});
mPrinter.start();
mFileSystem = fileSystem;
mPath = path;
}

Expand All @@ -138,37 +175,88 @@ public <T> void submit(Callable<T> task) {
}

/**
* Prints out the message into the specified output stream in a new line, waiting if necessary
* for the internal message queue to have available space.
* Sends a message to the pool to indicate that src is copied to dst,
* the message will be displayed in the stdout stream.
*
* @param src the source path
* @param dst the destination path
* @throws InterruptedException if interrupted while waiting to send the message
*/
public void succeed(AlluxioURI src, AlluxioURI dst) throws InterruptedException {
mMessages.put(String.format(COPY_SUCCEED_MESSAGE, src, dst));
}

/**
* Sends the exception to the pool to indicate that src fails to be copied to dst,
* the exception will be displayed in the stderr stream.
*
* @param message the message, must not be "#"
* @throws InterruptedException if interrupted while waiting
* @param src the source path
* @param dst the destination path
* @param cause the cause of the failure
* @throws InterruptedException if interrupted while waiting to send the exception
*/
public void println(String message) throws InterruptedException {
mMessages.put(message);
public void fail(AlluxioURI src, AlluxioURI dst, Exception cause) throws InterruptedException {
CopyException exception = new CopyException(src, dst, cause);
mExceptions.add(exception);
mMessages.put(exception);
}

/**
* Waits until all asynchronous copy tasks succeed or fail, then shuts down the thread pool,
* joins the printer thread, and deletes the copy destination in case of error.
*
* @throws IOException when threads are interrupted or the path fails to be deleted
* @throws IOException summarizing all exceptions thrown in the submitted tasks and in shutdown
*/
public void shutdown() throws IOException {
mPool.shutdown();
try {
mPool.shutdown();
mPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("Copy thread pool is interrupted in shutdown.", e);
Thread.currentThread().interrupt();
mPool.shutdownNow();
}

try {
mMessages.put(MESSAGE_DONE);
mPrinter.join();
} catch (InterruptedException e) {
LOG.warn("Message queue or printer in copy thread pool is interrupted in shutdown.", e);
Thread.currentThread().interrupt();
mPrinter.interrupt();
}

try {
if (mPath != null
&& mFileSystem.exists(mPath)
&& mFileSystem.getStatus(mPath).isFolder()
&& mFileSystem.listStatus(mPath).isEmpty()) {
mFileSystem.delete(mPath);
}
} catch (Exception e) {
throw new IOException(e);
mExceptions.add(new IOException("Failed to delete path " + mPath.toString(), e));
}

if (!mExceptions.isEmpty()) {
List<String> errors = new ArrayList<>();
for (Exception e : mExceptions) {
LOG.error(stacktrace(e));
errors.add(messageAndCause(e));
}
throw new IOException("ERRORS:\n" + Joiner.on("\n").join(errors));
}
}

private String messageAndCause(Exception e) {
return e.getMessage() + ": " + e.getCause().getMessage();
}

private String stacktrace(Exception e) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(os, true);
e.printStackTrace(ps);
ps.close();
return os.toString();
}
}

Expand Down Expand Up @@ -232,19 +320,29 @@ && isFile(srcPath.getScheme())) {
int numThreads;
if (cl.hasOption("t")) {
try {
numThreads = (int) cl.getParsedOptionValue("t");
numThreads = ((Number) cl.getParsedOptionValue("t")).intValue();
} catch (ParseException e) {
throw new IOException("Failed to parse option -t into an integer", e);
}
} else {
numThreads = Runtime.getRuntime().availableProcessors() * 2;
}
CopyThreadPoolExecutor pool = new CopyThreadPoolExecutor(numThreads, System.out,
mFileSystem.exists(dstPath) ? null : dstPath);
copyFromLocalFileList(pool, srcPaths, dstPath);
pool.shutdown();
CopyThreadPoolExecutor pool = new CopyThreadPoolExecutor(numThreads, System.out, System.err,
mFileSystem, mFileSystem.exists(dstPath) ? null : dstPath);
try {
createDstDir(dstPath);
for (AlluxioURI src : srcPaths) {
AlluxioURI dst = new AlluxioURI(dstPath, new AlluxioURI(src.getName()));
asyncCopyLocalPath(pool, src, dst);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
pool.shutdown();
}
}
System.out.println("Copied " + srcPath + " to " + dstPath);
System.out.println(String.format(COPY_SUCCEED_MESSAGE, srcPath, dstPath));
} else if ((srcPath.getScheme() == null || isAlluxio(srcPath.getScheme()))
&& isFile(dstPath.getScheme())) {
List<AlluxioURI> srcPaths = FileSystemShellUtils.getAlluxioURIs(mFileSystem, srcPath);
Expand Down Expand Up @@ -398,25 +496,7 @@ private void copyFile(AlluxioURI srcPath, AlluxioURI dstPath)
os.cancel();
throw e;
}
System.out.println("Copied " + srcPath + " to " + dstPath);
}
}

/**
* Copies a list of files or directories specified by srcPaths from the local filesystem to
* dstPath in the Alluxio filesystem space. This method is used when the input path contains
* wildcards.
*
* @param pool the thread pool for copying
* @param srcPaths a list of files or directories in the local filesystem
* @param dstPath the {@link AlluxioURI} of the destination
*/
private void copyFromLocalFileList(CopyThreadPoolExecutor pool,
List<AlluxioURI> srcPaths, AlluxioURI dstPath) throws AlluxioException, IOException {
createDstDir(dstPath);
for (AlluxioURI srcPath : srcPaths) {
AlluxioURI newURI = new AlluxioURI(dstPath, new AlluxioURI(srcPath.getName()));
asyncCopyLocalPath(pool, srcPath, newURI);
System.out.println(String.format(COPY_SUCCEED_MESSAGE, srcPath, dstPath));
}
}

Expand Down Expand Up @@ -484,25 +564,33 @@ private void copyFromLocalFile(AlluxioURI srcPath, AlluxioURI dstPath)
*
* @param srcPath the {@link AlluxioURI} of the source file in the local filesystem
* @param dstPath the {@link AlluxioURI} of the destination
* @throws InterruptedException when failed to send messages to the pool
*/
private void asyncCopyLocalPath(CopyThreadPoolExecutor pool, AlluxioURI srcPath,
AlluxioURI dstPath) throws AlluxioException, IOException {
AlluxioURI dstPath) throws InterruptedException {
File src = new File(srcPath.getPath());
if (!src.isDirectory()) {
pool.submit(() -> {
try {
copyFromLocalFile(srcPath, dstPath);
pool.println(String.format("Copied %s to %s.", srcPath, dstPath));
pool.succeed(srcPath, dstPath);
} catch (Exception e) {
pool.println(e.getMessage());
pool.fail(srcPath, dstPath, e);
}
return null;
});
} else {
mFileSystem.createDirectory(dstPath);
try {
mFileSystem.createDirectory(dstPath);
} catch (Exception e) {
pool.fail(srcPath, dstPath, e);
return;
}
File[] fileList = src.listFiles();
if (fileList == null) {
throw new IOException(String.format("Failed to list files for directory %s", src));
pool.fail(srcPath, dstPath,
new IOException(String.format("Failed to list directory %s.", src)));
return;
}
for (File srcFile : fileList) {
AlluxioURI newURI = new AlluxioURI(dstPath, new AlluxioURI(srcFile.getName()));
Expand Down
Expand Up @@ -354,4 +354,11 @@ public void copyFromLargeLocalDirectory() throws Exception {
Assert.assertTrue(fileExists(dst.join(String.format(filePathFormat, i))));
}
}

@Test
public void parseThreadOption() throws Exception {
String testDir = FileSystemShellUtilsTest.resetLocalFileHierarchy(mLocalAlluxioCluster);
int ret = mFsShell.run("copyFromLocal", "-t", "1", testDir + "/*/foo*", "/testDir");
Assert.assertEquals(0, ret);
}
}

0 comments on commit d9866a7

Please sign in to comment.