Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public static enum ConfVars implements ConfigKey {
SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()),
SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"),
SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",
Runtime.getRuntime().availableProcessors() * 2, Validators.min("1")),
2, Validators.min("1")),
SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192),
SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 120),
SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 20),
Expand Down
3 changes: 2 additions & 1 deletion tajo-core/src/main/java/org/apache/tajo/worker/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;

public interface Task {

void init() throws IOException;

void fetch();
void fetch(ExecutorService fetcherExecutor);

void run() throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.concurrent.ExecutorService;

/**
* The driver class for Tajo Task processing.
*/
Expand All @@ -29,10 +31,12 @@ public class TaskContainer implements Runnable {

private final TaskExecutor executor;
private final int sequenceId;
private final ExecutorService fetchExecutor;

public TaskContainer(int sequenceId, TaskExecutor executor) {
public TaskContainer(int sequenceId, TaskExecutor executor, ExecutorService fetchExecutor) {
this.sequenceId = sequenceId;
this.executor = executor;
this.fetchExecutor = fetchExecutor;
}

@Override
Expand All @@ -56,7 +60,7 @@ public void run() {
task.init();

if (task.hasFetchPhase()) {
task.fetch(); // The fetch is performed in an asynchronous way.
task.fetch(fetchExecutor); // The fetch is performed in an asynchronous way.
}

if (!taskAttemptContext.isStopped()) {
Expand Down
29 changes: 14 additions & 15 deletions tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,27 @@

package org.apache.tajo.worker;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.ResourceProtos.TaskRequestProto;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.TaskRequestImpl;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.resource.NodeResource;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
import org.apache.tajo.worker.event.NodeResourceEvent;
import org.apache.tajo.worker.event.TaskStartEvent;
import org.apache.tajo.ResourceProtos.TaskRequestProto;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
Expand All @@ -55,7 +56,7 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskSt
private final Map<TaskAttemptId, NodeResource> allocatedResourceMap;
private final BlockingQueue<Task> taskQueue;
private final AtomicInteger runningTasks;
private ExecutorService fetcherThreadPool;
private List<ExecutorService> fetcherThreadPoolList;
private ExecutorService threadPool;
private TajoConf tajoConf;
private volatile boolean isStopped;
Expand All @@ -66,6 +67,7 @@ public TaskExecutor(TajoWorker.WorkerContext workerContext) {
this.allocatedResourceMap = Maps.newConcurrentMap();
this.runningTasks = new AtomicInteger();
this.taskQueue = new LinkedBlockingQueue<Task>();
this.fetcherThreadPoolList = Lists.newArrayList();
}

@Override
Expand All @@ -82,12 +84,12 @@ protected void serviceStart() throws Exception {
new ThreadFactoryBuilder().setNameFormat("Task executor #%d").build());

int maxFetcherThreads = tajoConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM);
this.fetcherThreadPool = Executors.newFixedThreadPool(nThreads,
new ThreadFactoryBuilder().setNameFormat("Fetcher executor #%d").build());


for (int i = 0; i < nThreads; i++) {
threadPool.submit(new TaskContainer(i, this));
ExecutorService fetcherThreadPool = Executors.newFixedThreadPool(maxFetcherThreads,
new ThreadFactoryBuilder().setNameFormat("TaskContainer[" + i + "] fetcher executor #%d").build());

threadPool.submit(new TaskContainer(i, this, fetcherThreadPool));
fetcherThreadPoolList.add(fetcherThreadPool);
}

super.serviceStart();
Expand All @@ -99,7 +101,9 @@ protected void serviceStop() throws Exception {
isStopped = true;

threadPool.shutdown();
fetcherThreadPool.shutdown();
for (ExecutorService fetcherThreadPool : fetcherThreadPoolList) {
fetcherThreadPool.shutdown();
}
super.serviceStop();
}

Expand Down Expand Up @@ -139,11 +143,6 @@ protected void releaseResource(TaskAttemptId taskId) {
}
}

protected ExecutorService getFetcherExecutor() {
return fetcherThreadPool;
}


protected Task createTask(ExecutionBlockContext executionBlockContext,
TaskRequestProto taskRequest) throws IOException {
Task task = null;
Expand All @@ -153,7 +152,7 @@ protected Task createTask(ExecutionBlockContext executionBlockContext,
LOG.error(errorMessage);
executionBlockContext.fatalError(taskAttemptId, errorMessage);
} else {
task = new TaskImpl(new TaskRequestImpl(taskRequest), executionBlockContext, getFetcherExecutor());
task = new TaskImpl(new TaskRequestImpl(taskRequest), executionBlockContext);
executionBlockContext.getTasks().put(task.getTaskContext().getTaskId(), task);
}
return task;
Expand Down
7 changes: 2 additions & 5 deletions tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public class TaskImpl implements Task {
private final TaskRequest request;
private final Map<String, TableDesc> descs;
private final TableStats inputStats;
private final ExecutorService fetcherExecutor;
private final Path taskDir;

private final TaskAttemptContext context;
Expand All @@ -101,16 +100,14 @@ public class TaskImpl implements Task {
private TupleComparator sortComp = null;

public TaskImpl(final TaskRequest request,
final ExecutionBlockContext executionBlockContext,
final ExecutorService fetcherExecutor) throws IOException {
final ExecutionBlockContext executionBlockContext) throws IOException {

this.request = request;
this.executionBlockContext = executionBlockContext;
this.systemConf = executionBlockContext.getConf();
this.queryContext = request.getQueryContext(systemConf);
this.inputStats = new TableStats();
this.fetcherRunners = Lists.newArrayList();
this.fetcherExecutor = fetcherExecutor;
this.descs = Maps.newHashMap();

Path baseDirPath = executionBlockContext.createBaseDir();
Expand Down Expand Up @@ -254,7 +251,7 @@ public boolean hasFetchPhase() {
}

@Override
public void fetch() {
public void fetch(ExecutorService fetcherExecutor) {
for (Fetcher f : fetcherRunners) {
fetcherExecutor.submit(new FetchRunner(context, f));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public Path createBaseDir() throws IOException {
}
};

org.apache.tajo.worker.Task task = new TaskImpl(taskRequest, context, null);
org.apache.tajo.worker.Task task = new TaskImpl(taskRequest, context);
task.kill();
assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

public class MockTaskExecutor extends TaskExecutor {
Expand Down Expand Up @@ -70,7 +71,7 @@ public void init() throws IOException {
}

@Override
public void fetch() {
public void fetch(ExecutorService executorService) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -223,7 +224,7 @@ public void init() throws IOException {
}

@Override
public void fetch() {
public void fetch(ExecutorService fetchExecutor) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Expand Down