Skip to content

Commit

Permalink
HADOOP-15262. AliyunOSS: move files under a directory in parallel whe…
Browse files Browse the repository at this point in the history
…n rename a directory. Contributed by Jinhu Wu.
  • Loading branch information
Sammi Chen committed Mar 19, 2018
1 parent 86816da commit d67a5e2
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.aliyun.oss;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* Used by {@link AliyunOSSFileSystem} and {@link AliyunOSSCopyFileTask}
* as copy context. It contains some variables used in copy process.
*/
public class AliyunOSSCopyFileContext {
private final ReentrantLock lock = new ReentrantLock();

private Condition readyCondition = lock.newCondition();

private boolean copyFailure;
private int copiesFinish;

public AliyunOSSCopyFileContext() {
copyFailure = false;
copiesFinish = 0;
}

public void lock() {
lock.lock();
}

public void unlock() {
lock.unlock();
}

public void awaitAllFinish(int copiesToFinish) throws InterruptedException {
while (this.copiesFinish != copiesToFinish) {
readyCondition.await();
}
}

public void signalAll() {
readyCondition.signalAll();
}

public boolean isCopyFailure() {
return copyFailure;
}

public void setCopyFailure(boolean copyFailure) {
this.copyFailure = copyFailure;
}

public void incCopiesFinish() {
++copiesFinish;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.aliyun.oss;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Used by {@link AliyunOSSFileSystem} as an task that submitted
* to the thread pool to accelerate the copy progress.
* Each AliyunOSSCopyFileTask copies one file from src path to dst path
*/
public class AliyunOSSCopyFileTask implements Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(AliyunOSSCopyFileTask.class);

private AliyunOSSFileSystemStore store;
private String srcKey;
private String dstKey;
private AliyunOSSCopyFileContext copyFileContext;

public AliyunOSSCopyFileTask(AliyunOSSFileSystemStore store,
String srcKey, String dstKey, AliyunOSSCopyFileContext copyFileContext) {
this.store = store;
this.srcKey = srcKey;
this.dstKey = dstKey;
this.copyFileContext = copyFileContext;
}

@Override
public void run() {
boolean fail = false;
try {
store.copyFile(srcKey, dstKey);
} catch (Exception e) {
LOG.warn("Exception thrown when copy from "
+ srcKey + " to " + dstKey + ", exception: " + e);
fail = true;
} finally {
copyFileContext.lock();
if (fail) {
copyFileContext.setCopyFailure(fail);
}
copyFileContext.incCopiesFinish();
copyFileContext.signalAll();
copyFileContext.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -70,7 +72,9 @@ public class AliyunOSSFileSystem extends FileSystem {
private AliyunOSSFileSystemStore store;
private int maxKeys;
private int maxReadAheadPartNumber;
private int maxConcurrentCopyTasksPerDir;
private ListeningExecutorService boundedThreadPool;
private ListeningExecutorService boundedCopyThreadPool;

private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override
Expand All @@ -90,6 +94,7 @@ public void close() throws IOException {
try {
store.close();
boundedThreadPool.shutdown();
boundedCopyThreadPool.shutdown();
} finally {
super.close();
}
Expand Down Expand Up @@ -331,6 +336,23 @@ public void initialize(URI name, Configuration conf) throws IOException {

this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared");

maxConcurrentCopyTasksPerDir = AliyunOSSUtils.intPositiveOption(conf,
Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY,
Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_DEFAULT);

int maxCopyThreads = AliyunOSSUtils.intPositiveOption(conf,
Constants.MAX_COPY_THREADS_NUM_KEY,
Constants.MAX_COPY_THREADS_DEFAULT);

int maxCopyTasks = AliyunOSSUtils.intPositiveOption(conf,
Constants.MAX_COPY_TASKS_KEY,
Constants.MAX_COPY_TASKS_DEFAULT);

this.boundedCopyThreadPool = BlockingThreadPoolExecutorService.newInstance(
maxCopyThreads, maxCopyTasks, 60L,
TimeUnit.SECONDS, "oss-copy-unbounded");

setConf(conf);
}

Expand Down Expand Up @@ -653,14 +675,30 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
}

store.storeEmptyFile(dstKey);
AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext();
ExecutorService executorService = MoreExecutors.listeningDecorator(
new SemaphoredDelegatingExecutor(boundedCopyThreadPool,
maxConcurrentCopyTasksPerDir, true));
ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true);
statistics.incrementReadOps(1);
// Copy files from src folder to dst
int copiesToFinish = 0;
while (true) {
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
String newKey =
dstKey.concat(objectSummary.getKey().substring(srcKey.length()));
store.copyFile(objectSummary.getKey(), newKey);

//copy operation just copies metadata, oss will support shallow copy
executorService.execute(new AliyunOSSCopyFileTask(
store, objectSummary.getKey(), newKey, copyFileContext));
copiesToFinish++;
// No need to call lock() here.
// It's ok to copy one more file if the rename operation failed
// Reduce the call of lock() can also improve our performance
if (copyFileContext.isCopyFailure()) {
//some error occurs, break
break;
}
}
if (objects.isTruncated()) {
String nextMarker = objects.getNextMarker();
Expand All @@ -670,7 +708,16 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
break;
}
}
return true;
//wait operations in progress to finish
copyFileContext.lock();
try {
copyFileContext.awaitAllFinish(copiesToFinish);
} catch (InterruptedException e) {
LOG.warn("interrupted when wait copies to finish");
} finally {
copyFileContext.unlock();
}
return !copyFileContext.isCopyFailure();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,22 @@ private Constants() {
"fs.oss.multipart.download.ahead.part.max.number";
public static final int MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT = 4;

// The maximum queue number for copies
// New copies will be blocked when queue is full
public static final String MAX_COPY_TASKS_KEY = "fs.oss.max.copy.tasks";
public static final int MAX_COPY_TASKS_DEFAULT = 1024 * 10240;

// The maximum number of threads allowed in the pool for copies
public static final String MAX_COPY_THREADS_NUM_KEY =
"fs.oss.max.copy.threads";
public static final int MAX_COPY_THREADS_DEFAULT = 25;

// The maximum number of concurrent tasks allowed to copy one directory.
// So we will not block other copies
public static final String MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY =
"fs.oss.max.copy.tasks.per.dir";
public static final int MAX_CONCURRENT_COPY_TASKS_PER_DIR_DEFAULT = 5;

// Comma separated list of directories
public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";

Expand Down

0 comments on commit d67a5e2

Please sign in to comment.