From 5043f23a764d43e7b2ceab67001b79cda272486c Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Wed, 2 Sep 2015 12:19:25 +0900 Subject: [PATCH 1/2] TAJO-1707: Rack local count can be more than actual number of tasks. --- .../querymaster/DefaultTaskScheduler.java | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index f9a5767da7..0175582510 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -98,6 +98,7 @@ public void init(Configuration conf) { scheduledRequests = new ScheduledRequests(); minTaskMemory = tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY); schedulerDelay= tajoConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_TASK_SCHEDULER_DELAY); + isLeaf = stage.getMasterPlan().isLeaf(stage.getBlock()); this.schedulingThread = new Thread() { public void run() { @@ -129,7 +130,6 @@ public void run() { public void start() { LOG.info("Start TaskScheduler"); maximumRequestContainer = tajoConf.getInt(REQUEST_MAX_NUM, stage.getContext().getWorkerMap().size() * 2); - isLeaf = stage.getMasterPlan().isLeaf(stage.getBlock()); if (isLeaf) { candidateWorkers.addAll(getWorkerIds(getLeafTaskHosts())); @@ -686,9 +686,6 @@ private TaskAttemptId allocateLocalTask(String host){ //find remaining local task if (leafTasks.contains(attemptId)) { leafTasks.remove(attemptId); - //LOG.info(attemptId + " Assigned based on host match " + hostName); - hostLocalAssigned++; - totalAssigned++; return attemptId; } } @@ -755,15 +752,6 @@ public int compare(HostVolumeMapping v1, HostVolumeMapping v2) { } } - if (attemptId != null) { - rackLocalAssigned++; - totalAssigned++; - - LOG.info(String.format("Assigned Local/Rack/Cancel/Total: (%d/%d/%d/%d), Locality: %.2f%%, Rack host: %s", - hostLocalAssigned, rackLocalAssigned, cancellation, totalAssigned, - ((double) hostLocalAssigned / (double) totalAssigned) * 100, host)); - - } return attemptId; } @@ -775,6 +763,9 @@ public void assignToLeafTasks(LinkedList taskRequests) { TaskRequestEvent taskRequest; while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) { + int localAssign = 0; + int rackAssign = 0; + taskRequest = taskRequests.pollFirst(); if(taskRequest == null) { // if there are only remote task requests taskRequest = remoteTaskRequests.pollFirst(); @@ -855,13 +846,11 @@ public void assignToLeafTasks(LinkedList taskRequests) { synchronized (leafTasks){ attemptId = leafTasks.iterator().next(); leafTasks.remove(attemptId); - rackLocalAssigned++; - totalAssigned++; - LOG.info(String.format("Assigned Local/Remote/Cancel/Total: (%d/%d/%d/%d), Locality: %.2f%%,", - hostLocalAssigned, rackLocalAssigned, cancellation, totalAssigned, - ((double) hostLocalAssigned / (double) totalAssigned) * 100)); } } + rackAssign++; + } else { + localAssign++; } if (attemptId != null) { @@ -917,6 +906,15 @@ public void assignToLeafTasks(LinkedList taskRequests) { LOG.error(e); } scheduledObjectNum--; + totalAssigned++; + hostLocalAssigned += localAssign; + rackLocalAssigned += rackAssign; + + if(rackAssign > 0) { + LOG.info(String.format("Assigned Local/Rack/Total/Cancel: (%d/%d/%d/%d), Locality: %.2f%%, Rack host: %s", + hostLocalAssigned, rackLocalAssigned, totalAssigned, cancellation, + ((double) hostLocalAssigned / (double) totalAssigned) * 100, host)); + } } else { throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!"); From a3a63ebc26c8b7abb8b6da5e95bf3a3c4b40c1a1 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Wed, 2 Sep 2015 16:11:54 +0900 Subject: [PATCH 2/2] add totalAttempts --- .../apache/tajo/querymaster/AbstractTaskScheduler.java | 6 ++++++ .../apache/tajo/querymaster/DefaultTaskScheduler.java | 10 +++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java index 8636eaaed8..1651bb3a8a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java @@ -32,6 +32,8 @@ public abstract class AbstractTaskScheduler extends AbstractService implements E protected int rackLocalAssigned; protected int totalAssigned; protected int cancellation; + protected int totalAttempts; + protected Set leafTaskHosts = Sets.newHashSet(); /** @@ -59,6 +61,10 @@ public int getCancellation() { return cancellation; } + public int getTotalAttempts() { + return totalAttempts; + } + public abstract void releaseTaskAttempt(TaskAttempt taskAttempt); public abstract int remainingScheduledObjectNum(); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index 0175582510..d3802955db 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -883,6 +883,7 @@ public void assignToLeafTasks(LinkedList taskRequests) { AsyncRpcClient tajoWorkerRpc = null; CallFuture callFuture = new CallFuture(); + totalAttempts++; try { tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); @@ -910,9 +911,12 @@ public void assignToLeafTasks(LinkedList taskRequests) { hostLocalAssigned += localAssign; rackLocalAssigned += rackAssign; - if(rackAssign > 0) { - LOG.info(String.format("Assigned Local/Rack/Total/Cancel: (%d/%d/%d/%d), Locality: %.2f%%, Rack host: %s", - hostLocalAssigned, rackLocalAssigned, totalAssigned, cancellation, + if (rackAssign > 0) { + LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), " + + "Attempted Cancel/Assign/Total: (%d/%d/%d), " + + "Locality: %.2f%%, Rack host: %s", + hostLocalAssigned, rackLocalAssigned, totalAssigned, + cancellation, totalAssigned, totalAttempts, ((double) hostLocalAssigned / (double) totalAssigned) * 100, host)); }