From 323323c7a971d05f517b421df68655cd89948049 Mon Sep 17 00:00:00 2001 From: Behroz Sikander Date: Thu, 14 Jan 2016 17:56:15 +0100 Subject: [PATCH 1/5] Added round robin allocation strategy and its test case + Changed the strategy used in JobInProgress class + Added RoundRobin Allocation strategy + Added test case to verify it --- .../org/apache/hama/bsp/JobInProgress.java | 8 +- .../RoundRobinTaskAllocator.java | 159 ++++++++++++++++++ .../bsp/TestTaskAllocationRoundRobin.java | 118 +++++++++++++ 3 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java create mode 100644 core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java diff --git a/core/src/main/java/org/apache/hama/bsp/JobInProgress.java b/core/src/main/java/org/apache/hama/bsp/JobInProgress.java index e462594c6..a66fab729 100644 --- a/core/src/main/java/org/apache/hama/bsp/JobInProgress.java +++ b/core/src/main/java/org/apache/hama/bsp/JobInProgress.java @@ -40,6 +40,7 @@ import org.apache.hama.bsp.sync.MasterSyncClient; import org.apache.hama.bsp.taskallocation.BSPResource; import org.apache.hama.bsp.taskallocation.BestEffortDataLocalTaskAllocator; +import org.apache.hama.bsp.taskallocation.RoundRobinTaskAllocator; import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy; import org.apache.hama.util.ReflectionUtils; @@ -300,8 +301,13 @@ public synchronized void initTasks() throws IOException { tasksInited = true; + //CHANGED by Behroz - Uncomment this +// Class taskAllocatorClass = conf.getClass(Constants.TASK_ALLOCATOR_CLASS, +// BestEffortDataLocalTaskAllocator.class, TaskAllocationStrategy.class); + Class taskAllocatorClass = conf.getClass(Constants.TASK_ALLOCATOR_CLASS, - BestEffortDataLocalTaskAllocator.class, TaskAllocationStrategy.class); + RoundRobinTaskAllocator.class, TaskAllocationStrategy.class); + this.taskAllocationStrategy = (TaskAllocationStrategy) ReflectionUtils .newInstance(taskAllocatorClass, new Object[0]); diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java new file mode 100644 index 000000000..efe04538d --- /dev/null +++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.taskallocation; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.bsp.GroomServerStatus; +import org.apache.hama.bsp.TaskInProgress; +import org.apache.hama.bsp.BSPJobClient.RawSplit; + +public class RoundRobinTaskAllocator implements TaskAllocationStrategy { + + Log LOG = LogFactory.getLog(RoundRobinTaskAllocator.class); + + @Override + public void initialize(Configuration conf) { + } + + /** + * Select grooms that has the block of data locally stored on the groom + * server. + * TODO: REMOVE -> Code taken from BestEffort. No change. + */ + @Override + public String[] selectGrooms(Map groomStatuses, + Map taskCountInGroomMap, + BSPResource[] resources, TaskInProgress taskInProgress) { + if (!taskInProgress.canStartTask()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot start task based on id"); + } + return new String[0]; + } + + RawSplit rawSplit = taskInProgress.getFileSplit(); + if (rawSplit != null) { + return rawSplit.getLocations(); + } + return null; + } + + @Override + public GroomServerStatus getGroomToAllocate( + Map groomStatuses, String[] selectedGrooms, + Map taskCountInGroomMap, + BSPResource[] resources, TaskInProgress taskInProgress) { + if (!taskInProgress.canStartTask()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Exceeded allowed attempts."); + } + return null; + } + + String groomName = null; + if (selectedGrooms != null) { + groomName = getGroomToSchedule(groomStatuses, taskCountInGroomMap, selectedGrooms); + } + + if (groomName == null) { + groomName = findGroomWithMinimumTasks(taskCountInGroomMap); + } + + if (groomName != null) { + return groomStatuses.get(groomName); + } + + return null; + } + + /** + * This operation is not supported. + * TODO: Taken from BestEffort. No change. + */ + @Override + public Set getGroomsToAllocate( + Map groomStatuses, String[] selectedGrooms, + Map taskCountInGroomMap, + BSPResource[] resources, TaskInProgress taskInProgress) { + throw new UnsupportedOperationException( + "This API is not supported for the called API function call."); + } + + /* + * This function loops through the whole list of Grooms with their task count and returns the first Groom + * which contains the minimum number of tasks. + */ + private String findGroomWithMinimumTasks( + Map taskCountInGroomMap) { + Entry firstGroomWithMinimumTasks = null; + + for (Entry currentGroom : taskCountInGroomMap.entrySet()) { + if (firstGroomWithMinimumTasks == null || firstGroomWithMinimumTasks.getValue() > currentGroom.getValue()) { + firstGroomWithMinimumTasks = currentGroom; + } + } + + return (firstGroomWithMinimumTasks == null) ? null : firstGroomWithMinimumTasks.getKey().getGroomHostName(); + } + + /** + * From the set of grooms given, returns the groom on which a task could be + * scheduled on. + * + * @param grooms + * @param tasksInGroomMap + * @param possibleLocations + * @return a hostname of Groom Server. + */ + private String getGroomToSchedule(Map grooms, + Map tasksInGroomMap, + String[] possibleLocations) { + + for (String location : possibleLocations) { + GroomServerStatus groom = grooms.get(location); + if (groom == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Could not find groom for location " + location); + } + continue; + } + Integer taskInGroom = tasksInGroomMap.get(groom); + taskInGroom = (taskInGroom == null) ? 0 : taskInGroom; + if (LOG.isDebugEnabled()) { + LOG.debug("taskInGroom = " + taskInGroom + " max tasks = " + + groom.getMaxTasks() + " location = " + location + + " groomhostname = " + groom.getGroomHostName()); + } + if (taskInGroom < groom.getMaxTasks() + && location.equals(groom.getGroomHostName())) { + return groom.getGroomHostName(); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Returning null"); + } + return null; + } + +} diff --git a/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java new file mode 100644 index 000000000..fdd96c757 --- /dev/null +++ b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java @@ -0,0 +1,118 @@ +package org.apache.hama.bsp; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import junit.framework.TestCase; + +import org.junit.Test; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.bsp.BSPJobClient.RawSplit; +import org.apache.hama.bsp.taskallocation.BSPResource; +import org.apache.hama.bsp.taskallocation.RoundRobinTaskAllocator; +import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy; + +public class TestTaskAllocationRoundRobin extends TestCase { + + public static final Log LOG = LogFactory.getLog(TestTaskAllocationRoundRobin.class); + + Configuration conf = new Configuration(); + Map groomStatuses; + Map taskCountInGroomMap; + BSPResource[] resources; + TaskInProgress taskInProgress; + + @Override + protected void setUp() throws Exception { + super.setUp(); + + String[] locations = new String[] { "host6", "host4", "host3" }; + String value = "data"; + RawSplit split = new RawSplit(); + split.setLocations(locations); + split.setBytes(value.getBytes(), 0, value.getBytes().length); + split.setDataLength(value.getBytes().length); + + assertEquals(value.getBytes().length, (int) split.getDataLength()); + + taskCountInGroomMap = new LinkedHashMap(10); + resources = new BSPResource[0]; + BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp"); + JobInProgress jobProgress = new JobInProgress(job.getJobID(), conf); + taskInProgress = new TaskInProgress(job.getJobID(), + "job.xml", split, conf, jobProgress, 1); + + groomStatuses = new LinkedHashMap(20); + + for (int i = 0; i < 10; ++i) { + String name = "host" + i; + + GroomServerStatus status = new GroomServerStatus(name, + new ArrayList(), 0, 3,"",name); + groomStatuses.put(name, status); + taskCountInGroomMap.put(status, 0); + } + + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + } + + /* + * This test simulates the allocation of 30 tasks in round robin fashion. Notice that in function + * getGroomToAllocate null has been passed for selectedGrooms (which contains the list of grooms according to + * data locality). Internally getGroomToAllocate uses the findGroomWithMinimumTasks function to allocate the + * tasks. + */ + @Test + public void testRoundRobinAllocation() { + TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf + .getClass("", RoundRobinTaskAllocator.class, + TaskAllocationStrategy.class), conf); + + for(int i = 0; i < 30; i++) { + GroomServerStatus groomStatus = strategy.getGroomToAllocate(groomStatuses, null, taskCountInGroomMap, resources, taskInProgress); + if(groomStatus != null) { + taskCountInGroomMap.put(groomStatus, taskCountInGroomMap.get(groomStatus) + 1); //Increment the total tasks in it + + assertEquals("","host" + (i%10),groomStatus.getGroomHostName()); // After 10 it will start over from zero + } + } + //System.out.println("Groom Selected -> " + groomStatus.getGroomHostName()); + } + + /* + * Allocation according to data locality still works the same way. No change made in that part. + */ + @Test + public void testRoundRobinDataLocality() throws Exception { + + TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf + .getClass("", RoundRobinTaskAllocator.class, + TaskAllocationStrategy.class), conf); + + String[] hosts = strategy.selectGrooms(groomStatuses, taskCountInGroomMap, + resources, taskInProgress); + + List list = new ArrayList(); + + for (int i = 0; i < hosts.length; ++i) { + list.add(hosts[i]); + } + + assertTrue(list.contains("host6")); + assertTrue(list.contains("host3")); + assertTrue(list.contains("host4")); + } + +} From 12911464fb2d6dccec5a097cc83939c1aac0c06e Mon Sep 17 00:00:00 2001 From: Behroz Sikander Date: Sat, 16 Jan 2016 15:38:51 +0100 Subject: [PATCH 2/5] Fixed bug to handle the case when no tasks is allocation on any Groom + Added comments on code + Fixed code in round robin allocator + Fixed an issue in test --- .../RoundRobinTaskAllocator.java | 65 +++++++++++----- .../bsp/TestTaskAllocationRoundRobin.java | 76 +++++++++++-------- 2 files changed, 89 insertions(+), 52 deletions(-) diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java index efe04538d..8fb17e576 100644 --- a/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java +++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java @@ -28,6 +28,12 @@ import org.apache.hama.bsp.TaskInProgress; import org.apache.hama.bsp.BSPJobClient.RawSplit; +/** + * RoundRobinTaskAllocator is a round robin based task allocator that equally + * divides the tasks among all the Grooms. It balances the load of cluster. For example + * if a cluster has 10 Grooms and 20 tasks are to be scheduled then each Groom which + * get 2 tasks. + */ public class RoundRobinTaskAllocator implements TaskAllocationStrategy { Log LOG = LogFactory.getLog(RoundRobinTaskAllocator.class); @@ -39,7 +45,6 @@ public void initialize(Configuration conf) { /** * Select grooms that has the block of data locally stored on the groom * server. - * TODO: REMOVE -> Code taken from BestEffort. No change. */ @Override public String[] selectGrooms(Map groomStatuses, @@ -69,27 +74,21 @@ public GroomServerStatus getGroomToAllocate( LOG.debug("Exceeded allowed attempts."); } return null; - } + } String groomName = null; - if (selectedGrooms != null) { - groomName = getGroomToSchedule(groomStatuses, taskCountInGroomMap, selectedGrooms); - } - - if (groomName == null) { - groomName = findGroomWithMinimumTasks(taskCountInGroomMap); - } - + + groomName = findGroomWithMinimumTasks(groomStatuses, taskCountInGroomMap); + if (groomName != null) { return groomStatuses.get(groomName); } - + return null; } /** * This operation is not supported. - * TODO: Taken from BestEffort. No change. */ @Override public Set getGroomsToAllocate( @@ -99,24 +98,52 @@ public Set getGroomsToAllocate( throw new UnsupportedOperationException( "This API is not supported for the called API function call."); } - - /* - * This function loops through the whole list of Grooms with their task count and returns the first Groom - * which contains the minimum number of tasks. + + /** + * This function loops through the whole list of Grooms with their task count + * and returns the first Groom which contains the minimum number of tasks. + * @param groomStatuses The map of groom-name to + * GroomServerStatus object for all known grooms. + * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated + * soon) + * @return returns the groom name which should be allocated the next task or + * null no suitable groom was found. */ private String findGroomWithMinimumTasks( + Map groomStatuses, Map taskCountInGroomMap) { + Entry firstGroomWithMinimumTasks = null; + // At the start taskCountInGroomMap is empty so we have to put 0 tasks on grooms + if (taskCountInGroomMap.size() < groomStatuses.size()) { + for (String s : groomStatuses.keySet()) { + GroomServerStatus groom = groomStatuses.get(s); + if (groom == null) + continue; + Integer taskInGroom = taskCountInGroomMap.get(groom); + + // Find the groom that is yet to get its first tasks and assign 0 value to it. + // Having zero will make sure that it gets selected. + if (taskInGroom == null) { + taskCountInGroomMap.put(groom, 0); + break; + } + } + } + for (Entry currentGroom : taskCountInGroomMap.entrySet()) { if (firstGroomWithMinimumTasks == null || firstGroomWithMinimumTasks.getValue() > currentGroom.getValue()) { - firstGroomWithMinimumTasks = currentGroom; + if(currentGroom.getValue() < currentGroom.getKey().getMaxTasks()) { // Assign the task to groom which still has space for more tasks + firstGroomWithMinimumTasks = currentGroom; + } // If there is no space then continue and find the next best groom } } - return (firstGroomWithMinimumTasks == null) ? null : firstGroomWithMinimumTasks.getKey().getGroomHostName(); + return (firstGroomWithMinimumTasks == null) ? null + : firstGroomWithMinimumTasks.getKey().getGroomHostName(); } - + /** * From the set of grooms given, returns the groom on which a task could be * scheduled on. diff --git a/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java index fdd96c757..e471fe031 100644 --- a/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java +++ b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java @@ -1,9 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hama.bsp; -import static org.junit.Assert.*; - import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -22,14 +36,15 @@ public class TestTaskAllocationRoundRobin extends TestCase { - public static final Log LOG = LogFactory.getLog(TestTaskAllocationRoundRobin.class); - + public static final Log LOG = LogFactory + .getLog(TestTaskAllocationRoundRobin.class); + Configuration conf = new Configuration(); Map groomStatuses; Map taskCountInGroomMap; BSPResource[] resources; TaskInProgress taskInProgress; - + @Override protected void setUp() throws Exception { super.setUp(); @@ -42,68 +57,63 @@ protected void setUp() throws Exception { split.setDataLength(value.getBytes().length); assertEquals(value.getBytes().length, (int) split.getDataLength()); - + taskCountInGroomMap = new LinkedHashMap(10); resources = new BSPResource[0]; BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp"); JobInProgress jobProgress = new JobInProgress(job.getJobID(), conf); - taskInProgress = new TaskInProgress(job.getJobID(), - "job.xml", split, conf, jobProgress, 1); + taskInProgress = new TaskInProgress(job.getJobID(), "job.xml", split, conf, + jobProgress, 1); + + groomStatuses = new LinkedHashMap(10); - groomStatuses = new LinkedHashMap(20); - for (int i = 0; i < 10; ++i) { String name = "host" + i; - + GroomServerStatus status = new GroomServerStatus(name, - new ArrayList(), 0, 3,"",name); + new ArrayList(), 0, 3, "", name); groomStatuses.put(name, status); taskCountInGroomMap.put(status, 0); } - } @Override protected void tearDown() throws Exception { super.tearDown(); } - - /* - * This test simulates the allocation of 30 tasks in round robin fashion. Notice that in function - * getGroomToAllocate null has been passed for selectedGrooms (which contains the list of grooms according to - * data locality). Internally getGroomToAllocate uses the findGroomWithMinimumTasks function to allocate the - * tasks. + + /** + * This test simulates the allocation of 30 tasks in round robin fashion + * on 10 Grooms. */ @Test public void testRoundRobinAllocation() { TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf .getClass("", RoundRobinTaskAllocator.class, TaskAllocationStrategy.class), conf); - - for(int i = 0; i < 30; i++) { - GroomServerStatus groomStatus = strategy.getGroomToAllocate(groomStatuses, null, taskCountInGroomMap, resources, taskInProgress); - if(groomStatus != null) { - taskCountInGroomMap.put(groomStatus, taskCountInGroomMap.get(groomStatus) + 1); //Increment the total tasks in it + + for (int i = 0; i < 30; i++) { + GroomServerStatus groomStatus = strategy.getGroomToAllocate( + groomStatuses, null, taskCountInGroomMap, resources, taskInProgress); + if (groomStatus != null) { + taskCountInGroomMap.put(groomStatus, + taskCountInGroomMap.get(groomStatus) + 1); // Increment the total tasks in it - assertEquals("","host" + (i%10),groomStatus.getGroomHostName()); // After 10 it will start over from zero + assertEquals("", "host" + (i % 10), groomStatus.getGroomHostName()); } } - //System.out.println("Groom Selected -> " + groomStatus.getGroomHostName()); } - - /* - * Allocation according to data locality still works the same way. No change made in that part. - */ + @Test public void testRoundRobinDataLocality() throws Exception { - + TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf .getClass("", RoundRobinTaskAllocator.class, TaskAllocationStrategy.class), conf); String[] hosts = strategy.selectGrooms(groomStatuses, taskCountInGroomMap, resources, taskInProgress); - + List list = new ArrayList(); for (int i = 0; i < hosts.length; ++i) { From 2885183e49de4fd439fb99c798228b391957668d Mon Sep 17 00:00:00 2001 From: Behroz Sikander Date: Sat, 16 Jan 2016 16:12:21 +0100 Subject: [PATCH 3/5] Changing the default class to best effort --- .../src/main/java/org/apache/hama/bsp/JobInProgress.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/hama/bsp/JobInProgress.java b/core/src/main/java/org/apache/hama/bsp/JobInProgress.java index a66fab729..a14fd7cf3 100644 --- a/core/src/main/java/org/apache/hama/bsp/JobInProgress.java +++ b/core/src/main/java/org/apache/hama/bsp/JobInProgress.java @@ -40,7 +40,6 @@ import org.apache.hama.bsp.sync.MasterSyncClient; import org.apache.hama.bsp.taskallocation.BSPResource; import org.apache.hama.bsp.taskallocation.BestEffortDataLocalTaskAllocator; -import org.apache.hama.bsp.taskallocation.RoundRobinTaskAllocator; import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy; import org.apache.hama.util.ReflectionUtils; @@ -299,14 +298,10 @@ public synchronized void initTasks() throws IOException { MasterSyncClient syncClient = master.getSyncClient(); syncClient.registerJob(this.getJobID().toString()); - tasksInited = true; + tasksInited = true; - //CHANGED by Behroz - Uncomment this -// Class taskAllocatorClass = conf.getClass(Constants.TASK_ALLOCATOR_CLASS, -// BestEffortDataLocalTaskAllocator.class, TaskAllocationStrategy.class); - Class taskAllocatorClass = conf.getClass(Constants.TASK_ALLOCATOR_CLASS, - RoundRobinTaskAllocator.class, TaskAllocationStrategy.class); + BestEffortDataLocalTaskAllocator.class, TaskAllocationStrategy.class); this.taskAllocationStrategy = (TaskAllocationStrategy) ReflectionUtils .newInstance(taskAllocatorClass, new Object[0]); From b9640fd2a5d4246c0cd71c1ac3da578c072e6dba Mon Sep 17 00:00:00 2001 From: Behroz Sikander Date: Sat, 16 Jan 2016 16:13:36 +0100 Subject: [PATCH 4/5] Removing spaces --- core/src/main/java/org/apache/hama/bsp/JobInProgress.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/hama/bsp/JobInProgress.java b/core/src/main/java/org/apache/hama/bsp/JobInProgress.java index a14fd7cf3..e462594c6 100644 --- a/core/src/main/java/org/apache/hama/bsp/JobInProgress.java +++ b/core/src/main/java/org/apache/hama/bsp/JobInProgress.java @@ -298,11 +298,10 @@ public synchronized void initTasks() throws IOException { MasterSyncClient syncClient = master.getSyncClient(); syncClient.registerJob(this.getJobID().toString()); - tasksInited = true; + tasksInited = true; Class taskAllocatorClass = conf.getClass(Constants.TASK_ALLOCATOR_CLASS, BestEffortDataLocalTaskAllocator.class, TaskAllocationStrategy.class); - this.taskAllocationStrategy = (TaskAllocationStrategy) ReflectionUtils .newInstance(taskAllocatorClass, new Object[0]); From 27c6ce47eec59485caaddf8e0c9c4c3b042671df Mon Sep 17 00:00:00 2001 From: Behroz Sikander Date: Sat, 16 Jan 2016 16:16:52 +0100 Subject: [PATCH 5/5] Removed getGroomToSchedule function --- .../RoundRobinTaskAllocator.java | 130 ++++++------------ 1 file changed, 45 insertions(+), 85 deletions(-) diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java index 8fb17e576..0bf060bd0 100644 --- a/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java +++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java @@ -42,6 +42,51 @@ public class RoundRobinTaskAllocator implements TaskAllocationStrategy { public void initialize(Configuration conf) { } + /** + * This function loops through the whole list of Grooms with their task count + * and returns the first Groom which contains the minimum number of tasks. + * @param groomStatuses The map of groom-name to + * GroomServerStatus object for all known grooms. + * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated + * soon) + * @return returns the groom name which should be allocated the next task or + * null no suitable groom was found. + */ + private String findGroomWithMinimumTasks( + Map groomStatuses, + Map taskCountInGroomMap) { + + Entry firstGroomWithMinimumTasks = null; + + // At the start taskCountInGroomMap is empty so we have to put 0 tasks on grooms + if (taskCountInGroomMap.size() < groomStatuses.size()) { + for (String s : groomStatuses.keySet()) { + GroomServerStatus groom = groomStatuses.get(s); + if (groom == null) + continue; + Integer taskInGroom = taskCountInGroomMap.get(groom); + + // Find the groom that is yet to get its first tasks and assign 0 value to it. + // Having zero will make sure that it gets selected. + if (taskInGroom == null) { + taskCountInGroomMap.put(groom, 0); + break; + } + } + } + + for (Entry currentGroom : taskCountInGroomMap.entrySet()) { + if (firstGroomWithMinimumTasks == null || firstGroomWithMinimumTasks.getValue() > currentGroom.getValue()) { + if(currentGroom.getValue() < currentGroom.getKey().getMaxTasks()) { // Assign the task to groom which still has space for more tasks + firstGroomWithMinimumTasks = currentGroom; + } // If there is no space then continue and find the next best groom + } + } + + return (firstGroomWithMinimumTasks == null) ? null + : firstGroomWithMinimumTasks.getKey().getGroomHostName(); + } + /** * Select grooms that has the block of data locally stored on the groom * server. @@ -98,89 +143,4 @@ public Set getGroomsToAllocate( throw new UnsupportedOperationException( "This API is not supported for the called API function call."); } - - /** - * This function loops through the whole list of Grooms with their task count - * and returns the first Groom which contains the minimum number of tasks. - * @param groomStatuses The map of groom-name to - * GroomServerStatus object for all known grooms. - * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated - * soon) - * @return returns the groom name which should be allocated the next task or - * null no suitable groom was found. - */ - private String findGroomWithMinimumTasks( - Map groomStatuses, - Map taskCountInGroomMap) { - - Entry firstGroomWithMinimumTasks = null; - - // At the start taskCountInGroomMap is empty so we have to put 0 tasks on grooms - if (taskCountInGroomMap.size() < groomStatuses.size()) { - for (String s : groomStatuses.keySet()) { - GroomServerStatus groom = groomStatuses.get(s); - if (groom == null) - continue; - Integer taskInGroom = taskCountInGroomMap.get(groom); - - // Find the groom that is yet to get its first tasks and assign 0 value to it. - // Having zero will make sure that it gets selected. - if (taskInGroom == null) { - taskCountInGroomMap.put(groom, 0); - break; - } - } - } - - for (Entry currentGroom : taskCountInGroomMap.entrySet()) { - if (firstGroomWithMinimumTasks == null || firstGroomWithMinimumTasks.getValue() > currentGroom.getValue()) { - if(currentGroom.getValue() < currentGroom.getKey().getMaxTasks()) { // Assign the task to groom which still has space for more tasks - firstGroomWithMinimumTasks = currentGroom; - } // If there is no space then continue and find the next best groom - } - } - - return (firstGroomWithMinimumTasks == null) ? null - : firstGroomWithMinimumTasks.getKey().getGroomHostName(); - } - - /** - * From the set of grooms given, returns the groom on which a task could be - * scheduled on. - * - * @param grooms - * @param tasksInGroomMap - * @param possibleLocations - * @return a hostname of Groom Server. - */ - private String getGroomToSchedule(Map grooms, - Map tasksInGroomMap, - String[] possibleLocations) { - - for (String location : possibleLocations) { - GroomServerStatus groom = grooms.get(location); - if (groom == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Could not find groom for location " + location); - } - continue; - } - Integer taskInGroom = tasksInGroomMap.get(groom); - taskInGroom = (taskInGroom == null) ? 0 : taskInGroom; - if (LOG.isDebugEnabled()) { - LOG.debug("taskInGroom = " + taskInGroom + " max tasks = " - + groom.getMaxTasks() + " location = " + location - + " groomhostname = " + groom.getGroomHostName()); - } - if (taskInGroom < groom.getMaxTasks() - && location.equals(groom.getGroomHostName())) { - return groom.getGroomHostName(); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Returning null"); - } - return null; - } - }