Skip to content

Commit

Permalink
Support distributed load files to alluxio by a include worker set
Browse files Browse the repository at this point in the history
Fix #13221

Usage:

```
bin/alluxio fs distributedLoad / --worker-set host1,host2,host3
```

pr-link: #13229
change-id: cid-0d1f30766d3ce9e85f100889d679dfaaf7af0b0f
  • Loading branch information
maobaolong committed Apr 23, 2021
1 parent 6f18745 commit fc5a280
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 25 deletions.
16 changes: 15 additions & 1 deletion job/common/src/main/java/alluxio/job/plan/load/LoadConfig.java
Expand Up @@ -21,6 +21,9 @@

import javax.annotation.concurrent.ThreadSafe;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* The configuration of loading a file.
Expand All @@ -32,15 +35,19 @@ public class LoadConfig implements PlanConfig {
private static final long serialVersionUID = -7937106659935180792L;
private final String mFilePath;
private final int mReplication;
private final Set<String> mWorkerSet;

/**
* @param filePath the file path
* @param replication the number of workers to store each block on, defaults to 1
* @param workerSet the worker set
*/
public LoadConfig(@JsonProperty("filePath") String filePath,
@JsonProperty("replication") Integer replication) {
@JsonProperty("replication") Integer replication,
@JsonProperty("workerSet") Set<String> workerSet) {
mFilePath = Preconditions.checkNotNull(filePath, "The file path cannot be null");
mReplication = replication == null ? 1 : replication;
mWorkerSet = workerSet == null ? Collections.EMPTY_SET : new HashSet(workerSet);
}

/**
Expand Down Expand Up @@ -94,4 +101,11 @@ public String getName() {
public Collection<String> affectedPaths() {
return ImmutableList.of(mFilePath);
}

/**
* @return worker set
*/
public Set<String> getWorkerSet() {
return mWorkerSet;
}
}
Expand Up @@ -15,13 +15,15 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;

/**
* Tests {@link LoadConfig}.
*/
public final class LoadConfigTest {
@Test
public void jsonTest() throws Exception {
LoadConfig config = new LoadConfig("/path/to/load", 3);
LoadConfig config = new LoadConfig("/path/to/load", 3, Collections.EMPTY_SET);
ObjectMapper mapper = new ObjectMapper();
LoadConfig other = mapper.readValue(mapper.writeValueAsString(config), LoadConfig.class);
checkEquality(config, other);
Expand All @@ -30,7 +32,7 @@ public void jsonTest() throws Exception {
@Test
public void nullTest() {
try {
new LoadConfig(null, null);
new LoadConfig(null, null, Collections.EMPTY_SET);
Assert.fail("Cannot create config with null path");
} catch (NullPointerException exception) {
Assert.assertEquals("The file path cannot be null", exception.getMessage());
Expand Down
Expand Up @@ -27,6 +27,7 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;

/**
Expand All @@ -41,7 +42,8 @@ public final class CompositeConfigTest {
ArrayList<JobConfig> jobs = new ArrayList<>();
jobs.add(new CompositeConfig(new ArrayList<>(), true));
jobs.add(new CompositeConfig(new ArrayList<>(), false));
jobs.add(new CompositeConfig(Lists.newArrayList(new LoadConfig("/", 1)), true));
jobs.add(new CompositeConfig(
Lists.newArrayList(new LoadConfig("/", 1, Collections.EMPTY_SET)), true));
jobs.add(new CompactConfig(pInfo, "/input", pInfo, "/output", 100, FileUtils.ONE_GB));
CONFIG = new CompositeConfig(jobs, true);
}
Expand Down
Expand Up @@ -73,7 +73,11 @@ public Set<Pair<WorkerInfo, ArrayList<LoadTask>>> selectExecutors(LoadConfig con
List<BlockWorkerInfo> workers = new ArrayList<>();
for (BlockWorkerInfo worker : context.getFsContext().getCachedWorkers()) {
if (jobWorkersByAddress.containsKey(worker.getNetAddress().getHost())) {
workers.add(worker);
if (config.getWorkerSet() == null
|| config.getWorkerSet().isEmpty()
|| config.getWorkerSet().contains(worker.getNetAddress().getHost().toLowerCase())) {
workers.add(worker);
}
} else {
LOG.warn("Worker on host {} has no local job worker", worker.getNetAddress().getHost());
missingJobWorkerHosts.add(worker.getNetAddress().getHost());
Expand Down
Expand Up @@ -47,6 +47,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -100,7 +101,7 @@ public void replicationSatisfied() throws Exception {
int numBlocks = 7;
int replication = 3;
createFileWithNoLocations(TEST_URI, numBlocks);
LoadConfig config = new LoadConfig(TEST_URI, replication);
LoadConfig config = new LoadConfig(TEST_URI, replication, Collections.EMPTY_SET);
Set<Pair<WorkerInfo, ArrayList<LoadTask>>> assignments =
new LoadDefinition().selectExecutors(config,
JOB_WORKERS, new SelectExecutorsContext(1, mJobServerContext));
Expand All @@ -119,7 +120,7 @@ public void skipJobWorkersWithoutLocalBlockWorkers() throws Exception {
Arrays.asList(new BlockWorkerInfo(new WorkerNetAddress().setHost("host0"), 0, 0));
Mockito.when(mMockFsContext.getCachedWorkers()).thenReturn(blockWorkers);
createFileWithNoLocations(TEST_URI, 10);
LoadConfig config = new LoadConfig(TEST_URI, 1);
LoadConfig config = new LoadConfig(TEST_URI, 1, Collections.EMPTY_SET);
Set<Pair<WorkerInfo, ArrayList<LoadTask>>> assignments =
new LoadDefinition().selectExecutors(config, JOB_WORKERS,
new SelectExecutorsContext(1, mJobServerContext));
Expand All @@ -130,7 +131,7 @@ public void skipJobWorkersWithoutLocalBlockWorkers() throws Exception {
@Test
public void notEnoughWorkersForReplication() throws Exception {
createFileWithNoLocations(TEST_URI, 1);
LoadConfig config = new LoadConfig(TEST_URI, 5); // set replication to 5
LoadConfig config = new LoadConfig(TEST_URI, 5, Collections.EMPTY_SET); // set replication to 5
try {
new LoadDefinition().selectExecutors(config, JOB_WORKERS,
new SelectExecutorsContext(1, mJobServerContext));
Expand All @@ -148,7 +149,7 @@ public void notEnoughJobWorkersWithLocalBlockWorkers() throws Exception {
new BlockWorkerInfo(new WorkerNetAddress().setHost("otherhost"), 0, 0));
Mockito.when(mMockFsContext.getCachedWorkers()).thenReturn(blockWorkers);
createFileWithNoLocations(TEST_URI, 1);
LoadConfig config = new LoadConfig(TEST_URI, 2); // set replication to 2
LoadConfig config = new LoadConfig(TEST_URI, 2, Collections.EMPTY_SET); // set replication to 2
try {
new LoadDefinition().selectExecutors(config,
JOB_WORKERS, new SelectExecutorsContext(1, mJobServerContext));
Expand Down
Expand Up @@ -31,10 +31,15 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;

import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -77,6 +82,15 @@ public final class DistributedLoadCommand extends AbstractDistributedJobCommand
.argName("index file")
.desc("Name of the index file that lists all files to be loaded")
.build();
private static final Option HOSTS_OPTION =
Option.builder()
.longOpt("hosts")
.required(false)
.hasArg(true)
.numberOfArgs(1)
.argName("hosts")
.desc("A list of worker hosts separated by comma")
.build();

/**
* Constructs a new instance to load a file or directory in Alluxio space.
Expand All @@ -95,7 +109,8 @@ public String getCommandName() {
@Override
public Options getOptions() {
return new Options().addOption(REPLICATION_OPTION).addOption(ACTIVE_JOB_COUNT_OPTION)
.addOption(INDEX_FILE);
.addOption(INDEX_FILE)
.addOption(HOSTS_OPTION);
}

@Override
Expand All @@ -105,7 +120,8 @@ public void validateArgs(CommandLine cl) throws InvalidArgumentException {

@Override
public String getUsage() {
return "distributedLoad [--replication <num>] [--active-jobs <num>] [--index] <path>";
return "distributedLoad [--replication <num>] [--active-jobs <num>] [--index] "
+ "[--host <host1,host2,...,hostn>] <path>";
}

@Override
Expand All @@ -121,15 +137,22 @@ public int run(CommandLine cl) throws AlluxioException, IOException {

String[] args = cl.getArgs();
int replication = FileSystemShellUtils.getIntArg(cl, REPLICATION_OPTION, DEFAULT_REPLICATION);
Set<String> workerSet = Collections.EMPTY_SET;
if (cl.hasOption(HOSTS_OPTION.getLongOpt())) {
String argOption = cl.getOptionValue(HOSTS_OPTION.getLongOpt()).trim();
workerSet = Arrays.stream(StringUtils.split(argOption, ","))
.map(str -> str.trim().toUpperCase())
.collect(Collectors.toSet());
}

if (!cl.hasOption(INDEX_FILE.getLongOpt())) {
AlluxioURI path = new AlluxioURI(args[0]);
distributedLoad(path, replication);
distributedLoad(path, replication, workerSet);
} else {
try (BufferedReader reader = new BufferedReader(new FileReader(args[0]))) {
for (String filename; (filename = reader.readLine()) != null; ) {
AlluxioURI path = new AlluxioURI(filename);
distributedLoad(path, replication);
distributedLoad(path, replication, workerSet);
}
}
}
Expand All @@ -147,9 +170,9 @@ public void close() throws IOException {
* @param filePath The {@link AlluxioURI} path to load into Alluxio memory
* @param replication The replication of file to load into Alluxio memory
*/
private LoadJobAttempt newJob(AlluxioURI filePath, int replication) {
private LoadJobAttempt newJob(AlluxioURI filePath, int replication, Set<String> workerSet) {
LoadJobAttempt jobAttempt = new LoadJobAttempt(mClient, new
LoadConfig(filePath.getPath(), replication), new CountingRetry(3));
LoadConfig(filePath.getPath(), replication, workerSet), new CountingRetry(3));

jobAttempt.run();

Expand All @@ -159,7 +182,7 @@ private LoadJobAttempt newJob(AlluxioURI filePath, int replication) {
/**
* Add one job.
*/
private void addJob(URIStatus status, int replication) {
private void addJob(URIStatus status, int replication, Set<String> workerSet) {
AlluxioURI filePath = new AlluxioURI(status.getPath());
if (status.getInAlluxioPercentage() == 100) {
// The file has already been fully loaded into Alluxio.
Expand All @@ -171,7 +194,7 @@ private void addJob(URIStatus status, int replication) {
waitJob();
}
System.out.println(filePath + " loading");
mSubmittedJobAttempts.add(newJob(filePath, replication));
mSubmittedJobAttempts.add(newJob(filePath, replication, workerSet));
}

/**
Expand All @@ -180,9 +203,9 @@ private void addJob(URIStatus status, int replication) {
* @param filePath The {@link AlluxioURI} path to load into Alluxio memory
* @param replication The replication of file to load into Alluxio memory
*/
private void distributedLoad(AlluxioURI filePath, int replication)
private void distributedLoad(AlluxioURI filePath, int replication, Set<String> workerSet)
throws AlluxioException, IOException {
load(filePath, replication);
load(filePath, replication, workerSet);
// Wait remaining jobs to complete.
drain();
}
Expand All @@ -194,12 +217,12 @@ private void distributedLoad(AlluxioURI filePath, int replication)
* @throws AlluxioException when Alluxio exception occurs
* @throws IOException when non-Alluxio exception occurs
*/
private void load(AlluxioURI filePath, int replication)
private void load(AlluxioURI filePath, int replication, Set<String> workerSet)
throws IOException, AlluxioException {
ListStatusPOptions options = ListStatusPOptions.newBuilder().setRecursive(true).build();
mFileSystem.iterateStatus(filePath, options, uriStatus -> {
if (!uriStatus.isFolder()) {
addJob(uriStatus, replication);
addJob(uriStatus, replication, workerSet);
}
});
}
Expand Down
Expand Up @@ -24,6 +24,8 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;

/**
* Integration tests for {@link LoadDefinition}.
*/
Expand Down Expand Up @@ -51,14 +53,14 @@ public void loadTest() throws Exception {
Assert.assertEquals(0, status.getInMemoryPercentage());

// run the load job
waitForJobToFinish(mJobMaster.run(new LoadConfig("/test", null)));
waitForJobToFinish(mJobMaster.run(new LoadConfig("/test", null, Collections.EMPTY_SET)));

// check the file is fully in memory
status = mFileSystem.getStatus(filePath);
Assert.assertEquals(100, status.getInMemoryPercentage());

// a second load should work too, no worker is selected
long jobId = mJobMaster.run(new LoadConfig("/test", null));
long jobId = mJobMaster.run(new LoadConfig("/test", null, Collections.EMPTY_SET));
Assert.assertTrue(mJobMaster.getStatus(jobId).getChildren().isEmpty());
}

Expand All @@ -79,14 +81,14 @@ public void loadManyBlocks() throws Exception {
Assert.assertEquals(0, status.getInMemoryPercentage());

// run the load job
waitForJobToFinish(mJobMaster.run(new LoadConfig("/test", null)));
waitForJobToFinish(mJobMaster.run(new LoadConfig("/test", null, Collections.EMPTY_SET)));

// check the file is fully in memory
status = mFileSystem.getStatus(filePath);
Assert.assertEquals(100, status.getInMemoryPercentage());

// a second load should work too, no worker is selected
long jobId = mJobMaster.run(new LoadConfig("/test", null));
long jobId = mJobMaster.run(new LoadConfig("/test", null, Collections.EMPTY_SET));
Assert.assertTrue(mJobMaster.getStatus(jobId).getChildren().isEmpty());
}
}

0 comments on commit fc5a280

Please sign in to comment.