From 76b5167e856984d627a73f020d4ca1c7e9ae8750 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Mon, 23 Jan 2017 13:16:49 -0800 Subject: [PATCH 1/9] added new line at then end --- NOTICE | 1 + 1 file changed, 1 insertion(+) diff --git a/NOTICE b/NOTICE index 3352dda56e..2ee5fdc1db 100644 --- a/NOTICE +++ b/NOTICE @@ -3,3 +3,4 @@ Copyright 2014 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). + From 5c8aa207f6e681e07d4e21c3c9fd9f1152bc055c Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Thu, 9 Feb 2017 17:31:07 -0800 Subject: [PATCH 2/9] JobModel Generation using SimpleGroupByContainerCount --- .../task/BalancingTaskNameGrouper.java | 9 +- .../task/SimpleGroupByContainerCount.java | 88 +++++++++++++++++++ .../SimpleGroupByContainerCountFactory.java | 33 +++++++ .../grouper/task/TaskNameGrouper.java | 5 ++ .../samza/coordinator/JobModelManager.scala | 18 ++-- 5 files changed, 145 insertions(+), 8 deletions(-) create mode 100644 samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java create mode 100644 samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java index f8295c878f..1a98fb6fef 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java @@ -18,6 +18,7 @@ */ package org.apache.samza.container.grouper.task; +import java.util.Collections; import java.util.Set; import org.apache.samza.container.LocalityManager; import org.apache.samza.job.model.ContainerModel; @@ -54,5 +55,11 @@ public interface BalancingTaskNameGrouper extends TaskNameGrouper { * @param localityManager provides a persisted task to container map to use as a baseline * @return the grouped tasks in the form of ContainerModels */ - Set balance(Set tasks, LocalityManager localityManager); + default Set balance(Set tasks, LocalityManager localityManager) { + return Collections.emptySet(); + } + default Set balance(Set containerIds, Set tasks, LocalityManager localityManager) { + return Collections.emptySet(); + } + } diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java new file mode 100644 index 0000000000..e461fa1232 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.grouper.task; + +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskModel; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +/** + * Simple grouper. + * It exposes two group methods - one that assumes sequential container numbers and one that gets a set of container + * IDs as an argument. Please note - this first implementation ignores locality information. + */ +public class SimpleGroupByContainerCount implements TaskNameGrouper { + private final int startContainerCount; + public SimpleGroupByContainerCount() { + this.startContainerCount = 1; + } + + public SimpleGroupByContainerCount(int containerCount) { + if (containerCount <= 0) throw new IllegalArgumentException("Must have at least one container"); + this.startContainerCount = containerCount; + } + + @Override + public Set group(Set tasks) { + List containerIds = new ArrayList<>(startContainerCount); + for (int i = 0; i < startContainerCount; i++) { + containerIds.add(i); + } + return group(tasks, containerIds); + } + + public Set group(Set tasks, List containersIds) { + if (containersIds == null) + return this.group(tasks); + + int containerCount = containersIds.size(); + + // Sort tasks by taskName. + List sortedTasks = new ArrayList<>(tasks); + Collections.sort(sortedTasks); + + // Map every task to a container in round-robin fashion. + Map[] taskGroups = new Map[containerCount]; + for (int i = 0; i < containerCount; i++) { + taskGroups[i] = new HashMap<>(); + } + for (int i = 0; i < sortedTasks.size(); i++) { + TaskModel tm = sortedTasks.get(i); + taskGroups[i % containerCount].put(tm.getTaskName(), tm); + } + + // Convert to a Set of ContainerModel + Set containerModels = new HashSet<>(); + for (int i = 0; i < containerCount; i++) { + containerModels.add(new ContainerModel(containersIds.get(i), taskGroups[i])); + } + + return Collections.unmodifiableSet(containerModels); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java new file mode 100644 index 0000000000..c85c825e0a --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.task; + + +import org.apache.samza.config.Config; + + +/** + * Factory to build the GroupByContainerCount class. + */ +public class SimpleGroupByContainerCountFactory implements TaskNameGrouperFactory { + @Override + public TaskNameGrouper build(Config config) { + return new SimpleGroupByContainerCount(); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java index 59a3237c69..848beea481 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java @@ -18,6 +18,7 @@ */ package org.apache.samza.container.grouper.task; +import java.util.List; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskModel; @@ -50,4 +51,8 @@ public interface TaskNameGrouper { * @return Set of containers, which contain the tasks that were passed in. */ Set group(Set tasks); + + default Set group(Set tasks, List containersIds) { + return null; + } } diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index 85f4df0bff..7dbf9ac18c 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -106,7 +106,7 @@ object JobModelManager extends Logging { } } - val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache, streamPartitionCountMonitor) + val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache, streamPartitionCountMonitor, null) createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions) jobCoordinator @@ -121,8 +121,9 @@ object JobModelManager extends Logging { changelogManager: ChangelogPartitionManager, localityManager: LocalityManager, streamMetadataCache: StreamMetadataCache, - streamPartitionCountMonitor: StreamPartitionCountMonitor) = { - val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache) + streamPartitionCountMonitor: StreamPartitionCountMonitor, + containerIds: java.util.List[Integer]) = { + val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache, containerIds) jobModelRef.set(jobModel) val server = new HttpServer @@ -188,7 +189,8 @@ object JobModelManager extends Logging { private def initializeJobModel(config: Config, changelogManager: ChangelogPartitionManager, localityManager: LocalityManager, - streamMetadataCache: StreamMetadataCache): JobModel = { + streamMetadataCache: StreamMetadataCache, + containerIds: java.util.List[Integer]): JobModel = { // Do grouping to fetch TaskName to SSP mapping val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache) val grouper = getSystemStreamPartitionGrouper(config) @@ -217,7 +219,8 @@ object JobModelManager extends Logging { allSystemStreamPartitions, groups, previousChangelogMapping, - localityManager) + localityManager, + containerIds) val jobModel = jobModelGenerator() @@ -250,7 +253,8 @@ object JobModelManager extends Logging { allSystemStreamPartitions: util.Set[SystemStreamPartition], groups: util.Map[TaskName, util.Set[SystemStreamPartition]], previousChangelogMapping: util.Map[TaskName, Integer], - localityManager: LocalityManager): JobModel = { + localityManager: LocalityManager, + containerIds: java.util.List[Integer]): JobModel = { // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change // mapping. @@ -282,7 +286,7 @@ object JobModelManager extends Logging { if (containerGrouper.isInstanceOf[BalancingTaskNameGrouper]) containerGrouper.asInstanceOf[BalancingTaskNameGrouper].balance(taskModels, localityManager) else - containerGrouper.group(taskModels) + containerGrouper.group(taskModels, containerIds) } val containerMap = asScalaSet(containerModels).map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap From 017fe796d8654bcb0229774d458402b0f76f1e01 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Fri, 10 Feb 2017 12:09:13 -0800 Subject: [PATCH 3/9] added tests --- .../task/SimpleGroupByContainerCount.java | 14 ++ .../standalone/StandaloneJobCoordinator.java | 2 +- .../task/TestSimpleGroupByContainerCount.java | 165 ++++++++++++++++++ 3 files changed, 180 insertions(+), 1 deletion(-) create mode 100644 samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java index e461fa1232..3ec2f14f36 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java @@ -19,6 +19,7 @@ package org.apache.samza.container.grouper.task; +import java.util.Arrays; import org.apache.samza.container.TaskName; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskModel; @@ -50,6 +51,12 @@ public SimpleGroupByContainerCount(int containerCount) { @Override public Set group(Set tasks) { + if(tasks.isEmpty()) + throw new IllegalArgumentException("cannot group an empty set"); + + if(startContainerCount > tasks.size()) + throw new IllegalArgumentException("number of containers=" + startContainerCount + " is bigger than number of tasks=" + tasks.size()); + List containerIds = new ArrayList<>(startContainerCount); for (int i = 0; i < startContainerCount; i++) { containerIds.add(i); @@ -58,6 +65,13 @@ public Set group(Set tasks) { } public Set group(Set tasks, List containersIds) { + if(tasks.isEmpty()) + throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays + .toString(containersIds.toArray())); + + if(containersIds.size() > tasks.size()) + throw new IllegalArgumentException("number of containers " + containersIds.size() + " is bigger than number of tasks " + tasks.size()); + if (containersIds == null) return this.group(tasks); diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java index 140172520d..46dbf30eb3 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java @@ -105,7 +105,7 @@ public StandaloneJobCoordinator(int processorId, Config config, SamzaContainerCo * TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator * (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper) */ - this.jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null); + this.jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null, null); } @Override diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java new file mode 100644 index 0000000000..fa79473b39 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java @@ -0,0 +1,165 @@ +package org.apache.samza.container.grouper.task; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.samza.container.LocalityManager; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskModel; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.samza.container.mock.ContainerMocks.generateTaskModels; +import static org.apache.samza.container.mock.ContainerMocks.getTaskModel; +import static org.apache.samza.container.mock.ContainerMocks.getTaskName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestSimpleGroupByContainerCount { + private TaskAssignmentManager taskAssignmentManager; + private LocalityManager localityManager; + + @Before + public void setup() { + taskAssignmentManager = mock(TaskAssignmentManager.class); + localityManager = mock(LocalityManager.class); + when(localityManager.getTaskAssignmentManager()).thenReturn(taskAssignmentManager); + } + + @Test(expected = IllegalArgumentException.class) + public void testGroupEmptyTasks() { + new SimpleGroupByContainerCount(1).group(new HashSet()); + } + + @Test(expected = IllegalArgumentException.class) + public void testGroupFewerTasksThanContainers() { + Set taskModels = new HashSet<>(); + taskModels.add(getTaskModel(1)); + new SimpleGroupByContainerCount(2).group(taskModels); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGrouperResultImmutable() { + Set taskModels = generateTaskModels(3); + Set containers = new SimpleGroupByContainerCount(3).group(taskModels); + containers.remove(containers.iterator().next()); + } + + @Test + public void testGroupHappyPath() { + Set taskModels = generateTaskModels(5); + + Set containers = new SimpleGroupByContainerCount(2).group(taskModels); + + Map containersMap = new HashMap<>(); + for (ContainerModel container : containers) { + containersMap.put(container.getContainerId(), container); + } + + assertEquals(2, containers.size()); + ContainerModel container0 = containersMap.get(0); + ContainerModel container1 = containersMap.get(1); + assertNotNull(container0); + assertNotNull(container1); + assertEquals(0, container0.getContainerId()); + assertEquals(1, container1.getContainerId()); + assertEquals(3, container0.getTasks().size()); + assertEquals(2, container1.getTasks().size()); + assertTrue(container0.getTasks().containsKey(getTaskName(0))); + assertTrue(container0.getTasks().containsKey(getTaskName(2))); + assertTrue(container0.getTasks().containsKey(getTaskName(4))); + assertTrue(container1.getTasks().containsKey(getTaskName(1))); + assertTrue(container1.getTasks().containsKey(getTaskName(3))); + } + + @Test + public void testGroupHappyPathWithListOfContainers() { + Set taskModels = generateTaskModels(5); + + List containerIds = new ArrayList() {{ + add(4); add(2); + }}; + + Set containers = new SimpleGroupByContainerCount().group(taskModels, containerIds); + + Map containersMap = new HashMap<>(); + for (ContainerModel container : containers) { + containersMap.put(container.getContainerId(), container); + } + + assertEquals(2, containers.size()); + ContainerModel container0 = containersMap.get(4); + ContainerModel container1 = containersMap.get(2); + assertNotNull(container0); + assertNotNull(container1); + assertEquals(4, container0.getContainerId()); + assertEquals(2, container1.getContainerId()); + assertEquals(3, container0.getTasks().size()); + assertEquals(2, container1.getTasks().size()); + assertTrue(container0.getTasks().containsKey(getTaskName(0))); + assertTrue(container0.getTasks().containsKey(getTaskName(2))); + assertTrue(container0.getTasks().containsKey(getTaskName(4))); + assertTrue(container1.getTasks().containsKey(getTaskName(1))); + assertTrue(container1.getTasks().containsKey(getTaskName(3))); + } + + + @Test + public void testGroupManyTasks() { + Set taskModels = generateTaskModels(21); + + List containerIds = new ArrayList() {{ + add(4); add(2); + }}; + + + Set containers = new SimpleGroupByContainerCount().group(taskModels, containerIds); + + Map containersMap = new HashMap<>(); + for (ContainerModel container : containers) { + containersMap.put(container.getContainerId(), container); + } + + assertEquals(2, containers.size()); + ContainerModel container0 = containersMap.get(4); + ContainerModel container1 = containersMap.get(2); + assertNotNull(container0); + assertNotNull(container1); + assertEquals(4, container0.getContainerId()); + assertEquals(2, container1.getContainerId()); + assertEquals(11, container0.getTasks().size()); + assertEquals(10, container1.getTasks().size()); + + // NOTE: tasks are sorted lexicographically, so the container assignment + // can seem odd, but the consistency is the key focus + assertTrue(container0.getTasks().containsKey(getTaskName(0))); + assertTrue(container0.getTasks().containsKey(getTaskName(10))); + assertTrue(container0.getTasks().containsKey(getTaskName(12))); + assertTrue(container0.getTasks().containsKey(getTaskName(14))); + assertTrue(container0.getTasks().containsKey(getTaskName(16))); + assertTrue(container0.getTasks().containsKey(getTaskName(18))); + assertTrue(container0.getTasks().containsKey(getTaskName(2))); + assertTrue(container0.getTasks().containsKey(getTaskName(3))); + assertTrue(container0.getTasks().containsKey(getTaskName(5))); + assertTrue(container0.getTasks().containsKey(getTaskName(7))); + assertTrue(container0.getTasks().containsKey(getTaskName(9))); + + assertTrue(container1.getTasks().containsKey(getTaskName(1))); + assertTrue(container1.getTasks().containsKey(getTaskName(11))); + assertTrue(container1.getTasks().containsKey(getTaskName(13))); + assertTrue(container1.getTasks().containsKey(getTaskName(15))); + assertTrue(container1.getTasks().containsKey(getTaskName(17))); + assertTrue(container1.getTasks().containsKey(getTaskName(19))); + assertTrue(container1.getTasks().containsKey(getTaskName(20))); + assertTrue(container1.getTasks().containsKey(getTaskName(4))); + assertTrue(container1.getTasks().containsKey(getTaskName(6))); + assertTrue(container1.getTasks().containsKey(getTaskName(8))); + } +} From 34d5a88fa118d37c8f945368fb669a0694327259 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Wed, 15 Feb 2017 10:02:51 -0800 Subject: [PATCH 4/9] changed implementation of default group() method --- .../apache/samza/container/grouper/task/TaskNameGrouper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java index 848beea481..d06bf62a64 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java @@ -53,6 +53,6 @@ public interface TaskNameGrouper { Set group(Set tasks); default Set group(Set tasks, List containersIds) { - return null; + return group(tasks); } } From 99e3815898e660ee5a9e04523ae1b4a91de00e3e Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Wed, 15 Feb 2017 13:23:37 -0800 Subject: [PATCH 5/9] removed not yet used default api balance(containerIds..) from BalancingTaskNameGrouper --- .../container/grouper/task/BalancingTaskNameGrouper.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java index 1a98fb6fef..fc96bef75a 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java @@ -58,8 +58,4 @@ public interface BalancingTaskNameGrouper extends TaskNameGrouper { default Set balance(Set tasks, LocalityManager localityManager) { return Collections.emptySet(); } - default Set balance(Set containerIds, Set tasks, LocalityManager localityManager) { - return Collections.emptySet(); - } - } From b02e8f4b80a63b2cf4142a3fa5364f19cb20203c Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Wed, 15 Feb 2017 15:16:07 -0800 Subject: [PATCH 6/9] style errors --- .../task/SimpleGroupByContainerCount.java | 8 ++-- .../task/TestSimpleGroupByContainerCount.java | 37 ++++++++++++++++--- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java index 3ec2f14f36..0593738355 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java @@ -51,10 +51,10 @@ public SimpleGroupByContainerCount(int containerCount) { @Override public Set group(Set tasks) { - if(tasks.isEmpty()) + if (tasks.isEmpty()) throw new IllegalArgumentException("cannot group an empty set"); - if(startContainerCount > tasks.size()) + if (startContainerCount > tasks.size()) throw new IllegalArgumentException("number of containers=" + startContainerCount + " is bigger than number of tasks=" + tasks.size()); List containerIds = new ArrayList<>(startContainerCount); @@ -65,11 +65,11 @@ public Set group(Set tasks) { } public Set group(Set tasks, List containersIds) { - if(tasks.isEmpty()) + if (tasks.isEmpty()) throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays .toString(containersIds.toArray())); - if(containersIds.size() > tasks.size()) + if (containersIds.size() > tasks.size()) throw new IllegalArgumentException("number of containers " + containersIds.size() + " is bigger than number of tasks " + tasks.size()); if (containersIds == null) diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java index fa79473b39..4de7408226 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.container.grouper.task; import java.util.ArrayList; @@ -83,9 +102,12 @@ public void testGroupHappyPath() { public void testGroupHappyPathWithListOfContainers() { Set taskModels = generateTaskModels(5); - List containerIds = new ArrayList() {{ - add(4); add(2); - }}; + List containerIds = new ArrayList() { + { + add(4); + add(2); + } + }; Set containers = new SimpleGroupByContainerCount().group(taskModels, containerIds); @@ -115,9 +137,12 @@ public void testGroupHappyPathWithListOfContainers() { public void testGroupManyTasks() { Set taskModels = generateTaskModels(21); - List containerIds = new ArrayList() {{ - add(4); add(2); - }}; + List containerIds = new ArrayList() { + { + add(4); + add(2); + } + }; Set containers = new SimpleGroupByContainerCount().group(taskModels, containerIds); From 22275e5fc0bf82393a9fad3d6c3995ce81dc5991 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Wed, 15 Feb 2017 16:26:40 -0800 Subject: [PATCH 7/9] check style errors --- .../container/grouper/task/BalancingTaskNameGrouper.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java index fc96bef75a..f8295c878f 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java @@ -18,7 +18,6 @@ */ package org.apache.samza.container.grouper.task; -import java.util.Collections; import java.util.Set; import org.apache.samza.container.LocalityManager; import org.apache.samza.job.model.ContainerModel; @@ -55,7 +54,5 @@ public interface BalancingTaskNameGrouper extends TaskNameGrouper { * @param localityManager provides a persisted task to container map to use as a baseline * @return the grouped tasks in the form of ContainerModels */ - default Set balance(Set tasks, LocalityManager localityManager) { - return Collections.emptySet(); - } + Set balance(Set tasks, LocalityManager localityManager); } From 7a98bce3007b9786c03dbbac0b421d90d6e94bbe Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Thu, 16 Feb 2017 11:07:33 -0800 Subject: [PATCH 8/9] removed the default constructor from the SimpleGroupByContainerCount class --- .../task/SimpleGroupByContainerCount.java | 9 ++---- .../SimpleGroupByContainerCountFactory.java | 3 +- .../task/TestSimpleGroupByContainerCount.java | 29 +++++++++++++++---- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java index 0593738355..e0d3a5de02 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java @@ -40,13 +40,8 @@ */ public class SimpleGroupByContainerCount implements TaskNameGrouper { private final int startContainerCount; - public SimpleGroupByContainerCount() { - this.startContainerCount = 1; - } - - public SimpleGroupByContainerCount(int containerCount) { - if (containerCount <= 0) throw new IllegalArgumentException("Must have at least one container"); - this.startContainerCount = containerCount; + public SimpleGroupByContainerCount(int count) { + this.startContainerCount = count; } @Override diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java index c85c825e0a..23e3e0716a 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java @@ -20,6 +20,7 @@ import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; /** @@ -28,6 +29,6 @@ public class SimpleGroupByContainerCountFactory implements TaskNameGrouperFactory { @Override public TaskNameGrouper build(Config config) { - return new SimpleGroupByContainerCount(); + return new SimpleGroupByContainerCount(new JobConfig(config).getContainerCount()); } } diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java index 4de7408226..adee526eaa 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; import org.apache.samza.container.LocalityManager; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskModel; @@ -50,24 +52,39 @@ public void setup() { taskAssignmentManager = mock(TaskAssignmentManager.class); localityManager = mock(LocalityManager.class); when(localityManager.getTaskAssignmentManager()).thenReturn(taskAssignmentManager); + + + } + + private Config buildConfigForContainerCount(int count) { + Map map = new HashMap<>(); + map.put("job.container.count", String.valueOf(count)); + return new MapConfig(map); + } + + private TaskNameGrouper buildSimpleGrouper() { + return buildSimpleGrouper(1); + } + private TaskNameGrouper buildSimpleGrouper(int containerCount) { + return new SimpleGroupByContainerCountFactory().build(buildConfigForContainerCount(containerCount)); } @Test(expected = IllegalArgumentException.class) public void testGroupEmptyTasks() { - new SimpleGroupByContainerCount(1).group(new HashSet()); + buildSimpleGrouper(1).group(new HashSet()); } @Test(expected = IllegalArgumentException.class) public void testGroupFewerTasksThanContainers() { Set taskModels = new HashSet<>(); taskModels.add(getTaskModel(1)); - new SimpleGroupByContainerCount(2).group(taskModels); + buildSimpleGrouper(2).group(taskModels); } @Test(expected = UnsupportedOperationException.class) public void testGrouperResultImmutable() { Set taskModels = generateTaskModels(3); - Set containers = new SimpleGroupByContainerCount(3).group(taskModels); + Set containers = buildSimpleGrouper(2).group(taskModels); containers.remove(containers.iterator().next()); } @@ -75,7 +92,7 @@ public void testGrouperResultImmutable() { public void testGroupHappyPath() { Set taskModels = generateTaskModels(5); - Set containers = new SimpleGroupByContainerCount(2).group(taskModels); + Set containers = buildSimpleGrouper(2).group(taskModels); Map containersMap = new HashMap<>(); for (ContainerModel container : containers) { @@ -109,7 +126,7 @@ public void testGroupHappyPathWithListOfContainers() { } }; - Set containers = new SimpleGroupByContainerCount().group(taskModels, containerIds); + Set containers = buildSimpleGrouper().group(taskModels, containerIds); Map containersMap = new HashMap<>(); for (ContainerModel container : containers) { @@ -145,7 +162,7 @@ public void testGroupManyTasks() { }; - Set containers = new SimpleGroupByContainerCount().group(taskModels, containerIds); + Set containers = buildSimpleGrouper().group(taskModels, containerIds); Map containersMap = new HashMap<>(); for (ContainerModel container : containers) { From aa0f32620bafbb0b793986ae284fae1e19cf2cd0 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Thu, 16 Feb 2017 11:24:20 -0800 Subject: [PATCH 9/9] renamed to GroupByContainerIds --- ...pleGroupByContainerCount.java => GroupByContainerIds.java} | 4 ++-- ...ainerCountFactory.java => GroupByContainerIdsFactory.java} | 4 ++-- ...roupByContainerCount.java => TestGroupByContainerIds.java} | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) rename samza-core/src/main/java/org/apache/samza/container/grouper/task/{SimpleGroupByContainerCount.java => GroupByContainerIds.java} (96%) rename samza-core/src/main/java/org/apache/samza/container/grouper/task/{SimpleGroupByContainerCountFactory.java => GroupByContainerIdsFactory.java} (86%) rename samza-core/src/test/java/org/apache/samza/container/grouper/task/{TestSimpleGroupByContainerCount.java => TestGroupByContainerIds.java} (98%) diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java similarity index 96% rename from samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java rename to samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java index e0d3a5de02..6d3f673061 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java @@ -38,9 +38,9 @@ * It exposes two group methods - one that assumes sequential container numbers and one that gets a set of container * IDs as an argument. Please note - this first implementation ignores locality information. */ -public class SimpleGroupByContainerCount implements TaskNameGrouper { +public class GroupByContainerIds implements TaskNameGrouper { private final int startContainerCount; - public SimpleGroupByContainerCount(int count) { + public GroupByContainerIds(int count) { this.startContainerCount = count; } diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIdsFactory.java similarity index 86% rename from samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java rename to samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIdsFactory.java index 23e3e0716a..0383d00dc4 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIdsFactory.java @@ -26,9 +26,9 @@ /** * Factory to build the GroupByContainerCount class. */ -public class SimpleGroupByContainerCountFactory implements TaskNameGrouperFactory { +public class GroupByContainerIdsFactory implements TaskNameGrouperFactory { @Override public TaskNameGrouper build(Config config) { - return new SimpleGroupByContainerCount(new JobConfig(config).getContainerCount()); + return new GroupByContainerIds(new JobConfig(config).getContainerCount()); } } diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java similarity index 98% rename from samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java rename to samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java index adee526eaa..82f2b7ab59 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestSimpleGroupByContainerCount.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java @@ -43,7 +43,7 @@ import static org.mockito.Mockito.when; -public class TestSimpleGroupByContainerCount { +public class TestGroupByContainerIds { private TaskAssignmentManager taskAssignmentManager; private LocalityManager localityManager; @@ -66,7 +66,7 @@ private TaskNameGrouper buildSimpleGrouper() { return buildSimpleGrouper(1); } private TaskNameGrouper buildSimpleGrouper(int containerCount) { - return new SimpleGroupByContainerCountFactory().build(buildConfigForContainerCount(containerCount)); + return new GroupByContainerIdsFactory().build(buildConfigForContainerCount(containerCount)); } @Test(expected = IllegalArgumentException.class)