From 53b6f9f9e4f6419ae1e9481ec53ac703dee53df8 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 25 Feb 2016 14:22:57 +0900 Subject: [PATCH 1/2] TAJO-2081: Incorrect task locality on single node. --- .../querymaster/DefaultTaskScheduler.java | 41 ++++++++----------- 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index e2901844d0..4aa2513976 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -417,7 +417,7 @@ public synchronized TaskAttemptId getLocalTask() { TaskAttemptId taskAttemptId = null; if (unassignedTaskForEachVolume.size() > 0) { - int retry = unassignedTaskForEachVolume.size(); + int retry = diskVolumeLoads.size(); do { //clean and get a remaining local task taskAttemptId = getAndRemove(volumeId); @@ -473,6 +473,7 @@ private synchronized TaskAttemptId getAndRemove(int volumeId){ Iterator iterator = list.iterator(); taskAttempt = iterator.next(); iterator.remove(); + remainTasksNum.decrementAndGet(); } taskAttemptId = taskAttempt.getId(); @@ -484,6 +485,8 @@ private synchronized TaskAttemptId getAndRemove(int volumeId){ } increaseConcurrency(volumeId); + } else { + unassignedTaskForEachVolume.remove(volumeId); } return taskAttemptId; @@ -526,7 +529,7 @@ private synchronized int increaseConcurrency(int volumeId) { } else if (volumeId == REMOTE) { // this case has processed all block on host and it will be assigned to remote LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize() - + ", Remote Concurrency : " + concurrency); + + ", Remote Concurrency : " + concurrency + ", Unassigned volumes: " + unassignedTaskForEachVolume.size()); } diskVolumeLoads.put(volumeId, concurrency); return concurrency; @@ -537,13 +540,9 @@ private synchronized int increaseConcurrency(int volumeId) { */ private synchronized void decreaseConcurrency(int volumeId){ if(diskVolumeLoads.containsKey(volumeId)){ - Integer concurrency = diskVolumeLoads.get(volumeId); + int concurrency = diskVolumeLoads.get(volumeId); if(concurrency > 0){ diskVolumeLoads.put(volumeId, concurrency - 1); - } else { - if (volumeId > REMOTE && !unassignedTaskForEachVolume.containsKey(volumeId)) { - diskVolumeLoads.remove(volumeId); - } } } } @@ -559,7 +558,7 @@ public int getLowestVolumeId(){ for (Map.Entry entry : diskVolumeLoads.entrySet()) { if(volumeEntry == null) volumeEntry = entry; - if (volumeEntry.getValue() >= entry.getValue()) { + if (entry.getKey() != REMOTE && volumeEntry.getValue() >= entry.getValue()) { volumeEntry = entry; } } @@ -596,19 +595,16 @@ public String getRack() { public void cancel(TaskAttempt taskAttempt) { + TaskAttemptToSchedulerEvent schedulerEvent = new TaskAttemptToSchedulerEvent( + EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(), + null, taskAttempt); + if(taskAttempt.isLeafTask()) { releaseTaskAttempt(taskAttempt); - List locations = taskAttempt.getTask().getDataLocations(); - - for (DataLocation location : locations) { - HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost()); - volumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt); - } - - scheduledRequests.leafTasks.add(taskAttempt.getId()); + scheduledRequests.addLeafTask(schedulerEvent); } else { - scheduledRequests.nonLeafTasks.add(taskAttempt.getId()); + scheduledRequests.addNonLeafTask(schedulerEvent); } context.getMasterContext().getEventHandler().handle( @@ -826,7 +822,8 @@ public void assignToLeafTasks(LinkedList taskRequests) { tailLimit = Math.max(remainingScheduledObjectNum() / nodes, 1); } - if (hostVolumeMapping.getRemoteConcurrency() >= tailLimit) { //remote task throttling per node + //remote task throttling per node + if (nodes > 1 && hostVolumeMapping.getRemoteConcurrency() >= tailLimit) { continue; } else { // assign to remote volume @@ -904,9 +901,7 @@ public void assignToLeafTasks(LinkedList taskRequests) { cancellation++; } - if(LOG.isDebugEnabled()) { - LOG.debug("Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); - } + LOG.info("Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); continue; } } catch (Exception e) { @@ -1022,9 +1017,7 @@ public void assignToNonLeafTasks(LinkedList taskRequests) { cancellation++; } - if(LOG.isDebugEnabled()) { - LOG.debug("Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); - } + LOG.info("Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); continue; } From 3167f446a695760a9596f175d270ae1edd79e2f7 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Wed, 2 Mar 2016 10:32:40 +0900 Subject: [PATCH 2/2] add ebId to logger --- .../java/org/apache/tajo/conf/TajoConf.java | 2 ++ .../querymaster/DefaultTaskScheduler.java | 27 ++++++++++--------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index a535ece61a..c36f43bcae 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -203,6 +203,8 @@ public static enum ConfVars implements ConfigKey { QUERYMASTER_TASK_SCHEDULER_DELAY("tajo.qm.task-scheduler.delay", 50), // 50 ms + QUERYMASTER_TASK_SCHEDULER_REQUEST_MAX_NUM("tajo.qm.task-scheduler.request.max-num", 50), + // Catalog CATALOG_ADDRESS("tajo.catalog.client-rpc.address", "localhost:26005", Validators.networkAddr()), diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index 4aa2513976..a0e76cc7a8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -64,8 +64,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class); - private static final String REQUEST_MAX_NUM = "tajo.qm.task-scheduler.request.max-num"; - private final TaskSchedulerContext context; private Stage stage; private TajoConf tajoConf; @@ -123,7 +121,7 @@ public void run() { break; } } - LOG.info("TaskScheduler schedulingThread stopped"); + info(LOG, "TaskScheduler schedulingThread stopped"); } }; super.init(conf); @@ -131,8 +129,9 @@ public void run() { @Override public void start() { - LOG.info("Start TaskScheduler"); - maximumRequestContainer = tajoConf.getInt(REQUEST_MAX_NUM, stage.getContext().getWorkerMap().size()); + info(LOG, "Start TaskScheduler"); + maximumRequestContainer = Math.min(tajoConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_TASK_SCHEDULER_REQUEST_MAX_NUM) + , stage.getContext().getWorkerMap().size()); if (isLeaf) { candidateWorkers.addAll(getWorkerIds(getLeafTaskHosts())); @@ -160,10 +159,14 @@ public void stop() { } candidateWorkers.clear(); scheduledRequests.clear(); - LOG.info("Task Scheduler stopped"); + info(LOG, "Task Scheduler stopped"); super.stop(); } + protected void info(Log log, String message) { + log.info(String.format("[%s] %s", stage.getId(), message)); + } + private Fragment[] fragmentsForNonLeafTask; private Fragment[] broadcastFragmentsForNonLeafTask; @@ -522,13 +525,13 @@ private synchronized int increaseConcurrency(int volumeId) { } if (volumeId > -1) { - LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency); + info(LOG, "Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency); } else if (volumeId == -1) { // this case is disabled namenode block meta or compressed text file or amazon s3 - LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency); + info(LOG, "Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency); } else if (volumeId == REMOTE) { // this case has processed all block on host and it will be assigned to remote - LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize() + info(LOG, "Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize() + ", Remote Concurrency : " + concurrency + ", Unassigned volumes: " + unassignedTaskForEachVolume.size()); } diskVolumeLoads.put(volumeId, concurrency); @@ -901,7 +904,7 @@ public void assignToLeafTasks(LinkedList taskRequests) { cancellation++; } - LOG.info("Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); + info(LOG, "Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); continue; } } catch (Exception e) { @@ -913,7 +916,7 @@ public void assignToLeafTasks(LinkedList taskRequests) { rackLocalAssigned += rackAssign; if (rackAssign > 0) { - LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), " + + info(LOG, String.format("Assigned Local/Rack/Total: (%d/%d/%d), " + "Attempted Cancel/Assign/Total: (%d/%d/%d), " + "Locality: %.2f%%, Rack host: %s", hostLocalAssigned, rackLocalAssigned, totalAssigned, @@ -1017,7 +1020,7 @@ public void assignToNonLeafTasks(LinkedList taskRequests) { cancellation++; } - LOG.info("Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); + info(LOG, "Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); continue; }