From 9ddbefcacff6b8e229e6413299d53d89f1cbcd43 Mon Sep 17 00:00:00 2001 From: Hang Qi Date: Sun, 17 May 2015 23:06:20 -0700 Subject: [PATCH] [HELIX-596] fix throttled messages still take constraints' quota --- .../stages/MessageThrottleStage.java | 7 +- .../stages/TestMessageThrottleStage.java | 126 ++++++++++++++++++ .../apache/helix/testutil/HelixTestUtil.java | 8 ++ 3 files changed, 140 insertions(+), 1 deletion(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java index 39bb2288d3..635ac17736 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java @@ -173,13 +173,14 @@ private List throttle(Map throttleMap, ClusterConstrai matches = selectConstraints(matches, msgAttr); boolean msgThrottled = false; + Map perMessageThrottleQuotaMap = new HashMap(); for (ConstraintItem item : matches) { String key = item.filter(msgAttr).toString(); if (!throttleMap.containsKey(key)) { throttleMap.put(key, valueOf(item.getConstraintValue())); } int value = throttleMap.get(key); - throttleMap.put(key, --value); + perMessageThrottleQuotaMap.put(key, --value); if (needThrottle && value < 0) { msgThrottled = true; @@ -193,6 +194,10 @@ private List throttle(Map throttleMap, ClusterConstrai if (!msgThrottled) { throttleOutputMsgs.add(message); + // copy back perMessageThrottleQuotaMap to throttleMap + for (Map.Entry entry: perMessageThrottleQuotaMap.entrySet()) { + throttleMap.put(entry.getKey(), entry.getValue()); + } } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java index 29228e422b..40823a80e6 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java @@ -329,6 +329,132 @@ public void testMsgThrottleConstraints() throws Exception { } + @Test() + public void testMsgThrottleConstraintsQuota() throws Exception { + String clusterName = "CLUSTER_" + _className + "_constraints_quota"; + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); + TestHelper.setupEmptyCluster(_zkclient, clusterName); + + HelixManager manager = new DummyClusterManager(clusterName, accessor); + + // ideal state: node0 is MASTER, node1 is SLAVE + // replica=2 means 1 master and 1 slave + List idealStates = + HelixTestUtil.setupIdealState(_baseAccessor, clusterName, new int[] { + 0, 1 + }, new String[] { + "TestDB" + }, 2, 2); + HelixTestUtil.setupLiveInstances(_baseAccessor, clusterName, new int[] { + 0, 1 + }); + HelixTestUtil.setupStateModel(_baseAccessor, clusterName); + + // setup constraints + ZNRecord record = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString()); + + // constraint 0 & 1, per instance constraint + record.setMapField("constraint0", new TreeMap()); + record.getMapField("constraint0").put("MESSAGE_TYPE", "STATE_TRANSITION"); + record.getMapField("constraint0").put("INSTANCE", "localhost_0"); + record.getMapField("constraint0").put("CONSTRAINT_VALUE", "1"); + ConstraintItem constraint0 = new ConstraintItem(record.getMapField("constraint0")); + + record.setMapField("constraint1", new TreeMap()); + record.getMapField("constraint1").put("MESSAGE_TYPE", "STATE_TRANSITION"); + record.getMapField("constraint1").put("INSTANCE", "localhost_1"); + record.getMapField("constraint1").put("CONSTRAINT_VALUE", "1"); + ConstraintItem constraint1 = new ConstraintItem(record.getMapField("constraint1")); + + // constraint 2 & 3, per partition constraint + record.setMapField("constraint2", new TreeMap()); + record.getMapField("constraint2").put("MESSAGE_TYPE", "STATE_TRANSITION"); + record.getMapField("constraint2").put("PARTITION", "TestDB_0"); + record.getMapField("constraint2").put("CONSTRAINT_VALUE", "1"); + ConstraintItem constraint2 = new ConstraintItem(record.getMapField("constraint2")); + + record.setMapField("constraint3", new TreeMap()); + record.getMapField("constraint3").put("MESSAGE_TYPE", "STATE_TRANSITION"); + record.getMapField("constraint3").put("PARTITION", "TestDB_1"); + record.getMapField("constraint3").put("CONSTRAINT_VALUE", "1"); + ConstraintItem constraint3 = new ConstraintItem(record.getMapField("constraint1")); + + Builder keyBuilder = accessor.keyBuilder(); + accessor.setProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()), + new ClusterConstraints(record)); + + // ClusterConstraints constraint = + // accessor.getProperty(ClusterConstraints.class, + // PropertyType.CONFIGS, + // ConfigScopeProperty.CONSTRAINT.toString(), + // ConstraintType.MESSAGE_CONSTRAINT.toString()); + ClusterConstraints constraint = + accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString())); + + MessageThrottleStage throttleStage = new MessageThrottleStage(); + + // test messageThrottleStage + ClusterEvent event = new ClusterEvent("testEvent"); + event.addAttribute("helixmanager", manager); + + // get an empty best possible output for the partitions + BestPossibleStateOutput bestPossOutput = getEmptyBestPossibleStateOutput(idealStates); + event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossOutput); + + Pipeline dataRefresh = new Pipeline(); + dataRefresh.addStage(new ReadClusterDataStage()); + HelixTestUtil.runPipeline(event, dataRefresh); + HelixTestUtil.runStage(event, new ResourceComputationStage()); + MessageOutput msgSelectOutput = new MessageOutput(); + + Message msg1 = + HelixTestUtil.newMessage(MessageType.STATE_TRANSITION, MessageId.from("msgId-001"), + "OFFLINE", "SLAVE", "TestDB", "localhost_0", "TestDB_0"); + + Message msg2 = + HelixTestUtil.newMessage(MessageType.STATE_TRANSITION, MessageId.from("msgId-002"), + "OFFLINE", "SLAVE", "TestDB", "localhost_0", "TestDB_1"); + + Message msg3 = + HelixTestUtil.newMessage(MessageType.STATE_TRANSITION, MessageId.from("msgId-003"), + "OFFLINE", "SLAVE", "TestDB", "localhost_1", "TestDB_0"); + + Message msg4 = + HelixTestUtil.newMessage(MessageType.STATE_TRANSITION, MessageId.from("msgId-004"), + "OFFLINE", "SLAVE", "TestDB", "localhost_1", "TestDB_1"); + + List selectMessages0 = new ArrayList(); + selectMessages0.add(msg1); + selectMessages0.add(msg2); + List selectMessages1 = new ArrayList(); + selectMessages1.add(msg3); + selectMessages1.add(msg4); + + msgSelectOutput.setMessages(ResourceId.from("TestDB"), PartitionId.from("TestDB_0"), + selectMessages0); + msgSelectOutput.setMessages(ResourceId.from("TestDB"), PartitionId.from("TestDB_1"), + selectMessages1); + event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput); + + HelixTestUtil.runStage(event, throttleStage); + + MessageOutput msgThrottleOutput = + event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString()); + List throttleMessages = + msgThrottleOutput.getMessages(ResourceId.from("TestDB"), PartitionId.from("TestDB_0")); + Assert.assertEquals(throttleMessages.size(), 1); + Assert.assertTrue(throttleMessages.contains(msg1)); + + throttleMessages = msgThrottleOutput.getMessages(ResourceId.from("TestDB"), PartitionId.from("TestDB_1")); + Assert.assertEquals(throttleMessages.size(), 1); + Assert.assertTrue(throttleMessages.contains(msg4)); + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + + } + private boolean containsConstraint(Set constraints, ConstraintItem constraint) { for (ConstraintItem item : constraints) { if (item.toString().equals(constraint.toString())) { diff --git a/helix-core/src/test/java/org/apache/helix/testutil/HelixTestUtil.java b/helix-core/src/test/java/org/apache/helix/testutil/HelixTestUtil.java index 32106e12ce..edab4c4b2f 100644 --- a/helix-core/src/test/java/org/apache/helix/testutil/HelixTestUtil.java +++ b/helix-core/src/test/java/org/apache/helix/testutil/HelixTestUtil.java @@ -30,6 +30,7 @@ import org.apache.helix.ZNRecord; import org.apache.helix.api.State; import org.apache.helix.api.id.MessageId; +import org.apache.helix.api.id.PartitionId; import org.apache.helix.api.id.StateModelDefId; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.Stage; @@ -237,4 +238,11 @@ public static Message newMessage(MessageType type, MessageId msgId, String fromS msg.setTgtName(tgtName); return msg; } + + public static Message newMessage(MessageType type, MessageId msgId, String fromState, + String toState, String resourceName, String tgtName, String partitionId) { + Message msg = newMessage(type, msgId, fromState, toState, resourceName, tgtName); + msg.setPartitionId(PartitionId.from(partitionId)); + return msg; + } }