diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 0436116919..b8767376fa 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -208,7 +208,7 @@ public static enum ConfVars implements ConfigKey { SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()), SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"), SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num", - Runtime.getRuntime().availableProcessors() * 2, Validators.min("1")), + 2, Validators.min("1")), SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192), SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 120), SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 20), diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 66216ee4b7..19d5da41d3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -22,12 +22,13 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.ExecutorService; public interface Task { void init() throws IOException; - void fetch(); + void fetch(ExecutorService fetcherExecutor); void run() throws Exception; diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java index 761bf5211b..f717c07a4d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java @@ -21,6 +21,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.util.concurrent.ExecutorService; + /** * The driver class for Tajo Task processing. */ @@ -29,10 +31,12 @@ public class TaskContainer implements Runnable { private final TaskExecutor executor; private final int sequenceId; + private final ExecutorService fetchExecutor; - public TaskContainer(int sequenceId, TaskExecutor executor) { + public TaskContainer(int sequenceId, TaskExecutor executor, ExecutorService fetchExecutor) { this.sequenceId = sequenceId; this.executor = executor; + this.fetchExecutor = fetchExecutor; } @Override @@ -56,7 +60,7 @@ public void run() { task.init(); if (task.hasFetchPhase()) { - task.fetch(); // The fetch is performed in an asynchronous way. + task.fetch(fetchExecutor); // The fetch is performed in an asynchronous way. } if (!taskAttemptContext.isStopped()) { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java index 5e1ccc1e5a..eef4a2d967 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java @@ -18,6 +18,7 @@ package org.apache.tajo.worker; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; @@ -25,19 +26,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.ResourceProtos.TaskRequestProto; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.TaskRequestImpl; -import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.resource.NodeResource; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; import org.apache.tajo.worker.event.NodeResourceEvent; import org.apache.tajo.worker.event.TaskStartEvent; -import org.apache.tajo.ResourceProtos.TaskRequestProto; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; @@ -55,7 +56,7 @@ public class TaskExecutor extends AbstractService implements EventHandler allocatedResourceMap; private final BlockingQueue taskQueue; private final AtomicInteger runningTasks; - private ExecutorService fetcherThreadPool; + private List fetcherThreadPoolList; private ExecutorService threadPool; private TajoConf tajoConf; private volatile boolean isStopped; @@ -66,6 +67,7 @@ public TaskExecutor(TajoWorker.WorkerContext workerContext) { this.allocatedResourceMap = Maps.newConcurrentMap(); this.runningTasks = new AtomicInteger(); this.taskQueue = new LinkedBlockingQueue(); + this.fetcherThreadPoolList = Lists.newArrayList(); } @Override @@ -82,12 +84,12 @@ protected void serviceStart() throws Exception { new ThreadFactoryBuilder().setNameFormat("Task executor #%d").build()); int maxFetcherThreads = tajoConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM); - this.fetcherThreadPool = Executors.newFixedThreadPool(nThreads, - new ThreadFactoryBuilder().setNameFormat("Fetcher executor #%d").build()); - - for (int i = 0; i < nThreads; i++) { - threadPool.submit(new TaskContainer(i, this)); + ExecutorService fetcherThreadPool = Executors.newFixedThreadPool(maxFetcherThreads, + new ThreadFactoryBuilder().setNameFormat("TaskContainer[" + i + "] fetcher executor #%d").build()); + + threadPool.submit(new TaskContainer(i, this, fetcherThreadPool)); + fetcherThreadPoolList.add(fetcherThreadPool); } super.serviceStart(); @@ -99,7 +101,9 @@ protected void serviceStop() throws Exception { isStopped = true; threadPool.shutdown(); - fetcherThreadPool.shutdown(); + for (ExecutorService fetcherThreadPool : fetcherThreadPoolList) { + fetcherThreadPool.shutdown(); + } super.serviceStop(); } @@ -139,11 +143,6 @@ protected void releaseResource(TaskAttemptId taskId) { } } - protected ExecutorService getFetcherExecutor() { - return fetcherThreadPool; - } - - protected Task createTask(ExecutionBlockContext executionBlockContext, TaskRequestProto taskRequest) throws IOException { Task task = null; @@ -153,7 +152,7 @@ protected Task createTask(ExecutionBlockContext executionBlockContext, LOG.error(errorMessage); executionBlockContext.fatalError(taskAttemptId, errorMessage); } else { - task = new TaskImpl(new TaskRequestImpl(taskRequest), executionBlockContext, getFetcherExecutor()); + task = new TaskImpl(new TaskRequestImpl(taskRequest), executionBlockContext); executionBlockContext.getTasks().put(task.getTaskContext().getTaskId(), task); } return task; diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index d77c583b0e..82ea479f9d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -79,7 +79,6 @@ public class TaskImpl implements Task { private final TaskRequest request; private final Map descs; private final TableStats inputStats; - private final ExecutorService fetcherExecutor; private final Path taskDir; private final TaskAttemptContext context; @@ -101,8 +100,7 @@ public class TaskImpl implements Task { private TupleComparator sortComp = null; public TaskImpl(final TaskRequest request, - final ExecutionBlockContext executionBlockContext, - final ExecutorService fetcherExecutor) throws IOException { + final ExecutionBlockContext executionBlockContext) throws IOException { this.request = request; this.executionBlockContext = executionBlockContext; @@ -110,7 +108,6 @@ public TaskImpl(final TaskRequest request, this.queryContext = request.getQueryContext(systemConf); this.inputStats = new TableStats(); this.fetcherRunners = Lists.newArrayList(); - this.fetcherExecutor = fetcherExecutor; this.descs = Maps.newHashMap(); Path baseDirPath = executionBlockContext.createBaseDir(); @@ -254,7 +251,7 @@ public boolean hasFetchPhase() { } @Override - public void fetch() { + public void fetch(ExecutorService fetcherExecutor) { for (Fetcher f : fetcherRunners) { fetcherExecutor.submit(new FetchRunner(context, f)); } diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index 3e8e230278..8738dbaef8 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -280,7 +280,7 @@ public Path createBaseDir() throws IOException { } }; - org.apache.tajo.worker.Task task = new TaskImpl(taskRequest, context, null); + org.apache.tajo.worker.Task task = new TaskImpl(taskRequest, context); task.kill(); assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState()); try { diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java index 23405510f8..071d26ac2d 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; public class MockTaskExecutor extends TaskExecutor { @@ -70,7 +71,7 @@ public void init() throws IOException { } @Override - public void fetch() { + public void fetch(ExecutorService executorService) { } diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java index 9b6af682c1..c0f5bb2353 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -223,7 +224,7 @@ public void init() throws IOException { } @Override - public void fetch() { + public void fetch(ExecutorService fetchExecutor) { try { Thread.sleep(50); } catch (InterruptedException e) {