Skip to content

Commit

Permalink
Rename object store directories in parallel
Browse files Browse the repository at this point in the history
In this PR:
- Refactored DeleteBuffer to OperationBuffer to use for renaming large
directories as well

pr-link: #8639
change-id: cid-702cfadfc2a542d41079ed76c983aba5b6d6746f
  • Loading branch information
madanadit authored and alluxio-bot committed Apr 2, 2019
1 parent 6e819a7 commit cd332f2
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 110 deletions.
15 changes: 9 additions & 6 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Expand Up @@ -598,7 +598,7 @@ public String toString() {
new Builder(Name.UNDERFS_OBJECT_STORE_SERVICE_THREADS)
.setDefaultValue(20)
.setDescription("The number of threads in executor pool for parallel object store "
+ "UFS operations.")
+ "UFS operations, such as directory renames and deletes.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
Expand Down Expand Up @@ -729,11 +729,14 @@ public String toString() {
public static final PropertyKey UNDERFS_S3_UPLOAD_THREADS_MAX =
new Builder(Name.UNDERFS_S3_UPLOAD_THREADS_MAX)
.setDefaultValue(20)
.setDescription("The maximum number of threads to use for uploading data to S3 for "
+ "multipart uploads. These operations can be fairly expensive, so multiple "
+ "threads are encouraged. However, this also splits the bandwidth between "
+ "threads, meaning the overall latency for completing an upload will be higher "
+ "for more threads.")
.setDescription("For an Alluxio worker, this is the maximum number of threads to use "
+ "for uploading data to S3 for multipart uploads. These operations can be fairly "
+ "expensive, so multiple threads are encouraged. However, this also splits the "
+ "bandwidth between threads, meaning the overall latency for completing an upload "
+ "will be higher for more threads. For the Alluxio master, this is the maximum "
+ "number of threads used for the rename (copy) operation. It is recommended that "
+ "value should be greater than or equal to "
+ Name.UNDERFS_OBJECT_STORE_SERVICE_THREADS)
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
Expand Down
280 changes: 178 additions & 102 deletions core/common/src/main/java/alluxio/underfs/ObjectUnderFileSystem.java
Expand Up @@ -12,6 +12,7 @@
package alluxio.underfs;

import alluxio.AlluxioURI;
import alluxio.collections.Pair;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.ExceptionMessage;
Expand Down Expand Up @@ -218,6 +219,131 @@ public short getMode() {
}
}

/**
* Operations added to this buffer are performed concurrently.
*
* @param T input type for operation
*/
protected abstract class OperationBuffer<T> {
/** A list of inputs in batches to be operated on in parallel. */
private ArrayList<List<T>> mBatches;
/** A list of the successful operations for each batch. */
private ArrayList<Future<List<T>>> mBatchesResult;
/** Buffer for a batch of inputs. */
private List<T> mCurrentBatchBuffer;
/** Total number of inputs to be operated on across batches. */
protected int mEntriesAdded;

/**
* Construct a new {@link OperationBuffer} instance.
*/
protected OperationBuffer() {
mBatches = new ArrayList<>();
mBatchesResult = new ArrayList<>();
mCurrentBatchBuffer = new ArrayList<>();
mEntriesAdded = 0;
}

/**
* Get the batch size.
*
* @return a positive integer denoting the batch size
*/
protected abstract int getBatchSize();

/**
* Operate on a list of input type {@link T}.
*
* @param paths the list of input type {@link T} to operate on
* @return list of inputs for successful operations
*/
protected abstract List<T> operate(List<T> paths) throws IOException;

/**
* Add a new input to be operated on.
*
* @param input the input to operate on
* @throws IOException if a non-Alluxio error occurs
*/
public void add(T input) throws IOException {
if (mCurrentBatchBuffer.size() == getBatchSize()) {
// Batch is full
submitBatch();
}
mCurrentBatchBuffer.add(input);
mEntriesAdded++;
}

/**
* Get the combined result from all batches.
*
* @return a list of inputs for successful operations
* @throws IOException if a non-Alluxio error occurs
*/
public List<T> getResult() throws IOException {
submitBatch();
List<T> result = new ArrayList<>();
for (Future<List<T>> list : mBatchesResult) {
try {
result.addAll(list.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// If operation was interrupted do not add to successfully deleted list
LOG.warn(
"{}: Interrupted while waiting for the result of batch operation. UFS and Alluxio "
+ "state may be inconsistent. Error: {}",
getClass().getName(), e.getMessage());
} catch (ExecutionException e) {
// If operation failed to execute do not add to successfully deleted list
LOG.warn(
"{}: A batch operation failed. UFS and Alluxio state may be inconsistent. Error: {}",
getClass().getName(), e.getMessage());
}
}
return result;
}

/**
* Process the current batch asynchronously.
*/
private void submitBatch() throws IOException {
if (mCurrentBatchBuffer.size() != 0) {
int batchNumber = mBatches.size();
mBatches.add(new ArrayList<>(mCurrentBatchBuffer));
mCurrentBatchBuffer.clear();
mBatchesResult.add(batchNumber,
mExecutorService.submit(new OperationThread(mBatches.get(batchNumber))));
}
}

/**
* Thread class to operate on a batch of objects.
*/
@NotThreadSafe
protected class OperationThread implements Callable<List<T>> {
List<T> mBatch;

/**
* Operate on a batch of inputs.
*
* @param batch a list of inputs for the current batch
*/
public OperationThread(List<T> batch) {
mBatch = batch;
}

@Override
public List<T> call() {
try {
return operate(mBatch);
} catch (IOException e) {
// Do not append to success list
return Collections.emptyList();
}
}
}
}

@Override
public void cleanup() throws IOException {
}
Expand Down Expand Up @@ -290,110 +416,24 @@ public boolean deleteDirectory(String path, DeleteOptions options) throws IOExce
}

/**
* Objects added to a {@link DeleteBuffer} will be deleted in batches. Multiple batches are
* processed in parallel.
* Object keys added to a {@link DeleteBuffer} will be deleted in batches.
*/
@NotThreadSafe
protected class DeleteBuffer {
/** A list of objects in batches to be deleted in parallel. */
private ArrayList<List<String>> mBatches;
/** A list of the successfully deleted objects for each batch delete. */
private ArrayList<Future<List<String>>> mBatchesResult;
/** Buffer for a batch of objects to be deleted. */
private List<String> mCurrentBatchBuffer;
/** Total number of objects to be deleted across batches. */
private int mEntriesAdded;

protected class DeleteBuffer extends OperationBuffer<String> {
/**
* Construct a new {@link DeleteBuffer} instance.
*/
public DeleteBuffer() {
mBatches = new ArrayList<>();
mBatchesResult = new ArrayList<>();
mCurrentBatchBuffer = new ArrayList<>();
mEntriesAdded = 0;
}
public DeleteBuffer() {}

/**
* Add a new object to be deleted.
*
* @param path of object
* @throws IOException if a non-Alluxio error occurs
*/
public void add(String path) throws IOException {
@Override
protected int getBatchSize() {
// Delete batch size is same as listing length
if (mCurrentBatchBuffer.size() == getListingChunkLength(mAlluxioConf)) {
// Batch is full
submitBatch();
}
mCurrentBatchBuffer.add(path);
mEntriesAdded++;
return getListingChunkLength(mAlluxioConf);
}

/**
* Get the combined result from all batches.
*
* @return a list of successfully deleted objects
* @throws IOException if a non-Alluxio error occurs
*/
public List<String> getResult() throws IOException {
submitBatch();
List<String> result = new ArrayList<>();
for (Future<List<String>> list : mBatchesResult) {
try {
result.addAll(list.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// If operation was interrupted do not add to successfully deleted list
LOG.warn("Interrupted while waiting for the result of batch delete. UFS and Alluxio "
+ "state may be inconsistent. Error: {}", e.getMessage());
} catch (ExecutionException e) {
// If operation failed to execute do not add to successfully deleted list
LOG.warn("A batch delete failed. UFS and Alluxio state may be inconsistent. Error: {}",
e.getMessage());
}
}
return result;
}

/**
* Process the current batch asynchronously.
*/
private void submitBatch() throws IOException {
if (mCurrentBatchBuffer.size() != 0) {
int batchNumber = mBatches.size();
mBatches.add(new ArrayList<>(mCurrentBatchBuffer));
mCurrentBatchBuffer.clear();
mBatchesResult.add(batchNumber,
mExecutorService.submit(new DeleteThread(mBatches.get(batchNumber))));
}
}

/**
* Thread class to delete a batch of objects.
*/
@NotThreadSafe
protected class DeleteThread implements Callable<List<String>> {

List<String> mBatch;

/**
* Delete a batch of objects.
* @param batch a list of objects to delete
*/
public DeleteThread(List<String> batch) {
mBatch = batch;
}

@Override
public List<String> call() {
try {
return deleteObjects(mBatch);
} catch (IOException e) {
// Do not append to success list
return Collections.emptyList();
}
}
@Override
protected List<String> operate(List<String> paths) throws IOException {
return deleteObjects(paths);
}
}

Expand Down Expand Up @@ -561,25 +601,61 @@ public boolean renameDirectory(String src, String dst) throws IOException {
return false;
}
// Rename each child in the src folder to destination/child
// a. Since renames are a copy operation, files are added to a buffer and processed concurrently
// b. Pseudo-directories are metadata only operations are not added to the buffer
RenameBuffer buffer = new RenameBuffer();
for (UfsStatus child : children) {
String childSrcPath = PathUtils.concatPath(src, child.getName());
String childDstPath = PathUtils.concatPath(dst, child.getName());
boolean success;
if (child.isDirectory()) {
// Recursive call
success = renameDirectory(childSrcPath, childDstPath);
if (!renameDirectory(childSrcPath, childDstPath)) {
LOG.error("Failed to rename path {} to {}, aborting rename.", childSrcPath, childDstPath);
return false;
}
} else {
success = renameFile(childSrcPath, childDstPath);
}
if (!success) {
LOG.error("Failed to rename path {} to {}, aborting rename.", childSrcPath, childDstPath);
return false;
buffer.add(new Pair<>(childSrcPath, childDstPath));
}
}
// Get result of parallel file renames
int filesRenamed = buffer.getResult().size();
if (filesRenamed != buffer.mEntriesAdded) {
LOG.warn("Failed to rename directory, successfully renamed {} files out of {}.",
filesRenamed, buffer.mEntriesAdded);
return false;
}
// Delete src and everything under src
return deleteDirectory(src, DeleteOptions.defaults().setRecursive(true));
}

/**
* File paths added to a {@link RenameBuffer} will be renamed concurrently.
*/
@NotThreadSafe
protected class RenameBuffer extends OperationBuffer<Pair<String, String>> {
/**
* Construct a new {@link RenameBuffer} instance.
*/
public RenameBuffer() {}

@Override
protected int getBatchSize() {
return 1;
}

@Override
protected List<Pair<String, String>> operate(List<Pair<String, String>> paths)
throws IOException {
List<Pair<String, String>> succeeded = new ArrayList<>();
for (Pair<String, String> pathPair : paths) {
if (renameFile(pathPair.getFirst(), pathPair.getSecond())) {
succeeded.add(pathPair);
}
}
return succeeded;
}
}

@Override
public boolean renameFile(String src, String dst) throws IOException {
if (!isFile(src)) {
Expand Down

0 comments on commit cd332f2

Please sign in to comment.