Conversation
helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
Outdated
Show resolved
Hide resolved
helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
Outdated
Show resolved
Hide resolved
helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
Outdated
Show resolved
Hide resolved
|
@narendly, thx for the early review. Still in early stage for this feature.Ssome major refactoring and adding more tests are pending. |
db3abac to
2521c59
Compare
31094f2 to
ffbd243
Compare
helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
Show resolved
Hide resolved
junkaixue
left a comment
There was a problem hiding this comment.
Since the code is too long.. I will sync up with you before next round of review...
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
Outdated
Show resolved
Hide resolved
jiajunwang
left a comment
There was a problem hiding this comment.
I tried the best to review files other than the gigantic PerReplicaThrottleStage.java. May I suggest 2 ways to make reviewing this file possible?
- Could you please comment on all the modifications that you made based on the previous class?
- [My preference] try to create the new class as a child class of the IntermediateStateCalcStage and then only modifying the necessary method. You can add a TODO here so we can do the cleanup work in another PR. In the later PR we can say that there is no logic change. So even the PR will still be large, there is no issue in reviewing it.
| @@ -481,6 +482,7 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) { | |||
| rebalancePipeline.addStage(new MaintenanceRecoveryStage()); | |||
There was a problem hiding this comment.
Note the comment in the previous line, will the new logic generate different output with the IntermediateStateCalcStage result, so we entering the maintenance mode in the wrong condition?
Maybe we shall move the MaintenanceRecoveryStage after PerReplicaThrottleStage?
There was a problem hiding this comment.
Good point, let me check the details.
There was a problem hiding this comment.
moved MaintenanceRecoveryStage after Per replica stages and also changed corresponding input change to MaintenanceRecoveryStage
helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
Outdated
Show resolved
Hide resolved
| failedResources.add(resourceName); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Humm, where is the metrics part? At least we need a TODO here?
There was a problem hiding this comment.
Thought about it before. This worth another diff. This one is already very large.
There was a problem hiding this comment.
will add a to-do.
There was a problem hiding this comment.
to-do is added in public void process(ClusterEvent event) last line.
There was a problem hiding this comment.
TODO is totally fine, but we might need to check in this change into a branch first. Otherwise, it will break the already existing metrics in the master.
There was a problem hiding this comment.
Let me add in the implementation of metrics here too. This should be a small change.
There was a problem hiding this comment.
metrics added.
helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
Outdated
Show resolved
Hide resolved
This is not for debug info only. Note the selectedMessage from previous stage is of this type. Before, we don't need to access it this way. Now per replica stage is using MessageOutput as input, this why we need this method. |
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
Outdated
Show resolved
Hide resolved
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
Outdated
Show resolved
Hide resolved
|
@jiajunwang , thx for the review, address-ed current main logic feedback. Will add the comments about why test changed next. |
the retraced currentstate is feed to per partition message recovery testing. This should be revisited. Note, recovery message testing should always based on current state, not retraced. 2/ haven't finisih sorting of recovery messages and load messages.
current state and best possible only. 2/added comparator to sort recovery and load messages to give them a priority order before throttling.
2/ enhance zero replica avoidance cluster expansion log; these logs help to identify n->n+1 corner case that result in 2 replica.
1.1/ cleanupTest did not set DelayedPartition which effectively disabled all test except the first one. 1.2/ cleanupTest did not disableThrottleRecording which effectively fails all test except the first one. 1.3/ filter out transition toState dropped/error which is not subject to throttling 1.4/ enhance logging 2/ PerReplicaThrottleStage 2.1/ correct error shouldThrottleForInstance with wrongly resource name in 2.2/ enhance logging 3/ enhance log4j logging with log rotation 4/IntermediateStateCalcStage has serious bug. For pending message charging RebalanceType.NONE can be charged which result in ERROR logging. Since we are going to deprecate this stage. Won't propose more elegant fix.
and online/offline resource bringing ups. Also add mock online-offline model and model factory to support the testing of step by step online-offline model
1.1/ added recovery/loaded throttled messages to the event 2/ enhance TestPerReplicaThrottle with better logging upon assertion failure
pending messages, the 3rd step may be different from what is expected fix this part 2/ enhance logging
f591583 to
cea9067
Compare
…void github test failure due to straying awaying.
helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
Outdated
Show resolved
Hide resolved
helix-core/src/test/java/org/apache/helix/mock/participant/MockOFModelFactory.java
Outdated
Show resolved
Hide resolved
helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
Outdated
Show resolved
Hide resolved
jiajunwang
left a comment
There was a problem hiding this comment.
Let me put some of my comments first. I'm still reviewing it.
My feeling is that the IntermediateStateCalcStage code has been patched too many times. There are some confusing logic and arbitrary designs. It might be easier to re-write it, or at least majorly refactor to simplify it. Otherwise, reviewing the code would cost weeks. And if the new change introduces some undesired behavior, it would be very hard to fix.
| failedResources.add(resourceName); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
TODO is totally fine, but we might need to check in this change into a branch first. Otherwise, it will break the already existing metrics in the master.
helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
Show resolved
Hide resolved
| StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName); | ||
|
|
||
| // sort pendingMessages based on transition priority then timeStamp for state transition message | ||
| pendingMessages.sort(new PartitionMessageComparator(stateModelDef)); |
There was a problem hiding this comment.
This could potentially have problem. The sorting algorithm should be same as last round.
There was a problem hiding this comment.
Checked. This is not a concern. Indeed both chargingPendingMessage and later classifyMessages all use the same sorting comparator.
There was a problem hiding this comment.
As I commented, the logic is complicated. Please merge them first. Otherwise, I cannot determine what is the difference. Will be very hard for reader understanding it.
There was a problem hiding this comment.
Maybe there is some confusion here. Let me first make sure we are on the same page about usage of two different comparator -- PartitionMessageComparator and MessageThrottleComparator.
MessageThrottleComparator is the counter part of PartitionPriorityComparator in IntermediateCalcStage.
1/ PartitionMessageComparator are used in two places, namely a) pendingMessages load/recovery classification and b) input message from previous stage load/recovery classification. -- note in the previous IntermediateCalcStage, the is not need for such a comparator as the load/recovery classification is done per partition (not per replica), thus there is no need to give a order.
2/ MessageThrottleComparator is basically the same as PartitionPriorityComparator in IntermediateCalcStage. The usage is to give an order when throttling is applied to load/recovery messages.
The logic is the following -- note this is basically the same as PartitionPriorityComparator in IntermediateCalcStage
- Higher priority for topState
- Higher priority for the partition with fewer active replicas
- Higher priority for the partition with fewer replicas with states matching with IdealState
There was a problem hiding this comment.
Basically PartitionMessageComparator is only used to sort message (replica) with a partition. This logic is not need in previous IntermediateCalStage.
MessageThrottleComparator is use to determine the message order across partitions and this is the same logic as PartitionPriorityComparator in IntermediateCalcStage.
What is you take?
| if (isUpward && ((currentCount < expectedCount) || (currentCount == expectedCount && toState | ||
| .equals(secondTopState) && currentTopCount < expectedTopCount))) { | ||
| recoveryMessages.add(msg); | ||
| partitionsNeedRecovery.add(partition); | ||
| // update | ||
| currentStateCounts.put(toState, currentCount + 1); | ||
| } else { | ||
| loadMessages.add(msg); | ||
| } |
There was a problem hiding this comment.
We should think about the general case instead of only single top state and secondary state case.
There was a problem hiding this comment.
A general way is we have 2 things:
- As you implemented the expected state map.
- Totally matched replica numbers. This number is counting from top priority of the state and accumulatively down to the number of replicas. But there is another rule of "R".
I would suggest you first try to understand the state model rule of how we counting different states of different replicas first.
There was a problem hiding this comment.
This is a good suggestion to make the code more generic. The accumulation logic is added in getPartitionExpectedAndCurrentStateCountMap and the testing of load/recovery logic is simplified a lot here.
helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
Show resolved
Hide resolved
junkaixue
left a comment
There was a problem hiding this comment.
Reviewed to getExpected Replica numbers. Still in reviewing for other parts.
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
Show resolved
Hide resolved
| if (logger.isDebugEnabled()) { | ||
| LogUtil.logDebug(logger, _eventId, String.format("output is")); | ||
| for (String resource : resourceToRebalance.keySet()) { | ||
| if (output.getResourceMessages(resource) != null) { | ||
| LogUtil.logDebug(logger, _eventId, String.format("resource: %s", resource)); | ||
| Map<Partition, List<Message>> partitionListMap = output.getResourceMessages(resource); | ||
| for (Partition partition : partitionListMap.keySet()) { | ||
| for (Message msg : partitionListMap.get(partition)) { | ||
| LogUtil.logDebug(logger, _eventId, String | ||
| .format("\tresource: %s, partition: %s, msg: %s", resource, partition, msg)); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
We dont need this. Usually we print out the message has been throttled. Next stage should have the message send out. It will be duplicated logs.
| if (isEmitThrottledMsg) { | ||
| event.addAttribute(AttributeName.PER_REPLICA_THROTTLED_RECOVERY_MESSAGES.name(), throttledRecoveryMsg); | ||
| event.addAttribute(AttributeName.PER_REPLICA_THROTTLED_LOAD_MESSAGES.name(), throttledLoadMsg); | ||
| } |
There was a problem hiding this comment.
Let's minimize code structure change. We can let the print happening in the code instead of carrying them out at top level and print all.
There was a problem hiding this comment.
This is for testing purpose. The test would assert the messages got throttled as load or recovery. It is not for printing purpose. See TestPerReplicaThrottleStage.
There was a problem hiding this comment.
I dont believe have real production code for testing is a good idea.
| List<Message> throttledRecoveryMsg = new ArrayList<>(); | ||
| List<Message> throttledLoadMsg = new ArrayList<>(); |
There was a problem hiding this comment.
No need to have these inputs and carrying them out.
There was a problem hiding this comment.
This is for testing purpose. The test would assert the messages got throttled as load or recovery. It is not for printing purpose. See TestPerReplicaThrottleStage.
| * @param retracedResourceStateMap | ||
| * @param maxPartitionPerInstance | ||
| */ | ||
| private void validateMaxPartitionsPerInstance(ResourcesStateMap retracedResourceStateMap, |
There was a problem hiding this comment.
Let's not change the order of the function. That's the reason diff can not be generated successfully.
There was a problem hiding this comment.
moved this one to immediately after compute, the same as Intermediate stage.
| Map<Partition, List<Message>> out = new HashMap<>(); | ||
| for (Partition partition : resource.getPartitions()) { | ||
| List<Message> partitionMessages = selectedResourceMessages.get(partition); | ||
| if (partitionMessages == null) { | ||
| continue; | ||
| } | ||
| List<Message> finalPartitionMessages = new ArrayList<>(); | ||
| for (Message message: partitionMessages) { | ||
| if (throttledRecoveryMessages.contains(message)) { | ||
| continue; | ||
| } | ||
| if (throttledLoadMessages.contains(message)) { | ||
| continue; | ||
| } | ||
| finalPartitionMessages.add(message); | ||
| } | ||
| out.put(partition, finalPartitionMessages); | ||
| output.addMessages(resourceName, partition, finalPartitionMessages); | ||
| } |
There was a problem hiding this comment.
We dont need to do some loops to construct. Why not we add all the message in new output. Then:
- If it is not FULL_AUTO, we directly return.
- If message has been throttled, we remove the message from output where it throttled.
So we will not see such a block to construct result.
There was a problem hiding this comment.
The removing idea theoretically also works.
Just looked at implementing this idea. Noted that MessageOutput is the class holding the message. However, this class does not have any removing message helper functions. It is debatable whether we want to add message removing apis to this class.
Considering the benefit of using alternative approach is not really essential, maybe let us just keep the existing approach?
| // new instance in best possible not in currentstate would not be in retracedStateMap yet. | ||
| String toInstance = message.getTgtName(); | ||
| Map<String, String> retracedStateMap = retracedPartitionsStateMap.get(partition); | ||
| retracedStateMap.put(toInstance, toState); |
There was a problem hiding this comment.
nit:
retracedStateMap.put(message.getTgtName(), message.getToState());
might be cleaner.
junkaixue
left a comment
There was a problem hiding this comment.
Reviewed to classifyMessage. Will continue for next round of review.
|
|
||
| private void constructRetracedPartitionStateMap(Resource resource, | ||
| Map<Partition, Map<String, String>> retracedPartitionsStateMap, | ||
| Map<Partition, List<Message>> out) { |
There was a problem hiding this comment.
Name it as "out" could be confusing as naming... When I read code, I even looked back to understand what it is.
There was a problem hiding this comment.
changed to outMessagesByPartition
| Map<String, Integer> expectedStateCountMapOut, | ||
| Map<String, Integer> currentStateCountsOut | ||
| ) { | ||
| List<String> preferenceList = preferenceLists.get(partition.getPartitionName()); |
There was a problem hiding this comment.
What if it is null? Original code has a check.
There was a problem hiding this comment.
PreferenceList is null seem to be in the case of dropping instances, let me test this a little bit more to make sure this part is addressed carefully. This can be potentially an issue. Will update a little bit later.
There was a problem hiding this comment.
Tested, this is a good point, we should have some check as original code.
| } | ||
| } | ||
|
|
||
| private void getPartitionExpectedAndCurrentStateCountMap( |
There was a problem hiding this comment.
This function only serves the get expected current state. But I expect this function does the job of counting how many expected state just once.
Because once we have it the expected state map, we dont need to compute again in "classifyMessage" function. It just need to do some derive, as the change already reflected from charged pending message.
There was a problem hiding this comment.
I think the gist is that you are concerned that we will spend unnecessary CPU cycles to re-count. Instead, we can do the trick of store the data and re-use them later, right? typical space and time trade off.
Changes done.
| getPartitionExpectedAndCurrentStateCountMap(partition, preferenceLists, idealState, | ||
| cache, currentStateMap, expectedStateCountMap, currentStateCounts); |
There was a problem hiding this comment.
We should not count this again. Since when we charged the pending message, we need to use the retrace map.
Also, expected state does not require to recompute again as I mentioned above.
| // 1) toState priority (toTop is higher than toSecond) | ||
| // 2) same toState, the message classification time, the less required toState meeting minActive requirement has higher priority | ||
| // 3) Higher priority for the partition of messages with fewer replicas with states matching with bestPossible ??? do we need this one | ||
| private static class MessageThrottleComparator implements Comparator<Message> { |
There was a problem hiding this comment.
As you said, if there is no diff from this comparator and another one. Please merge them. They are very complicated. It will be hard for reader to understand the both. I will review the logic in detail once you merged.
There was a problem hiding this comment.
See my comment before. Let us chat about this one.
junkaixue
left a comment
There was a problem hiding this comment.
I went through all the logics already. Please address the comments. And let me know if you have uncertainty of the comments. The major concerns from my comments:
- Understand how we compute the number of states and make the general counting algorithm as I suggested.
- Merge the two comparators algorithms.
I will review it again after you addressing all the comments. The comparator algorithm is complicated. I will review detailed logic after you finished merging.
| expectedCount = expectedCount == null ? 0 : expectedCount; | ||
| currentCount = currentCount == null ? 0 : currentCount; | ||
|
|
||
| boolean isUpward = !isDownwardTransition(idealState, cache, msg); |
There was a problem hiding this comment.
No need to check this. I know message non-upwards are load but anyway we need to check expected state vs current state. This check is not necessary.
There was a problem hiding this comment.
Checkout upward seems to be essential. Otherwise, for downward message if it happens currentCount < expectedCount, it may classify the message as recovery.
| if (isUpward && ((currentCount < expectedCount) || (currentCount == expectedCount && toState | ||
| .equals(secondTopState) && currentTopCount < expectedTopCount))) { | ||
| recoveryMessages.add(msg); | ||
| partitionsNeedRecovery.add(partition); | ||
| // update | ||
| currentStateCounts.put(toState, currentCount + 1); | ||
| } else { | ||
| loadMessages.add(msg); | ||
| } |
There was a problem hiding this comment.
A general way is we have 2 things:
- As you implemented the expected state map.
- Totally matched replica numbers. This number is counting from top priority of the state and accumulatively down to the number of replicas. But there is another rule of "R".
I would suggest you first try to understand the state model rule of how we counting different states of different replicas first.
| for (Message recoveryMsg : recoveryMessages) { | ||
| String toState = recoveryMsg.getToState(); | ||
| String toInstance = recoveryMsg.getTgtName(); | ||
| // toInstance should be in currentStateMap | ||
| retracedStateMap.put(toInstance, toState); | ||
|
|
||
| throttleController | ||
| .chargeInstance(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, | ||
| toInstance); | ||
| throttleController | ||
| .chargeCluster(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE); | ||
| throttleController | ||
| .chargeResource(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, | ||
| resourceName); | ||
| logger.trace("throttleControllerstate->{} after pending recovery charge msg:{}", throttleController, recoveryMsg); | ||
| } | ||
| // charge load message and retrace; | ||
| // note if M->S with relay message, we don't charge relay message now. We would charge relay | ||
| // message only when it shows in pending messages in the next cycle of controller run. | ||
| for (Message loadMsg : loadMessages) { | ||
| String toState = loadMsg.getToState(); | ||
| String toInstance = loadMsg.getTgtName(); | ||
| retracedStateMap.put(toInstance, toState); | ||
|
|
||
| throttleController | ||
| .chargeInstance(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, toInstance); | ||
| throttleController.chargeCluster(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE); | ||
| throttleController | ||
| .chargeResource(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, resourceName); | ||
| logger.trace("throttleControllerstate->{} after pending load charge msg:{}", throttleController, loadMsg); | ||
| } | ||
| retracedPartitionsStateMap.put(partition, retracedStateMap); | ||
| } |
There was a problem hiding this comment.
This code logic is very duplicated. We can refactor into previous message type decision piece. This would make the code much concise.
loop message:
RebalanceType type = xxxx;
throttleController
.chargeInstance(type,
toInstance);
throttleController
.chargeCluster(type);
throttleController
.chargeResource(type,
resourceName);
logger.trace("throttleControllerstate->{} after pending recovery charge msg:{}", throttleController, recoveryMsg);
There was a problem hiding this comment.
removed duplicated code.
| if (isUpward && ((currentCount < expectedCount) || (currentCount == expectedCount && toState | ||
| .equals(secondTopState) && currentTopCount < expectedTopCount))) { | ||
| recoveryMessages.add(msg); | ||
| currentStateCounts.put(toState, currentCount + 1); | ||
| } else { | ||
| loadMessages.add(msg); | ||
| } |
There was a problem hiding this comment.
Same as I suggested above, let's loop the priority state with an accumulative number for it.
There was a problem hiding this comment.
accumulation logic is added in getPartitionExpectedAndCurrentStateCountMap. thus the testing logic is simplified a lot here.
helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
Outdated
Show resolved
Hide resolved
1/ save the expectedStateCount results for later use 2/ remove unnecessary log. 3/ change out varible name to ones with more context info.
|
Close due to inactive. |
Issues
resolve #343
Description
Here are some details about my PR, including screenshots of any UI changes:
Per replica throttling replacing intermediate stage which is partition based. The finer granularity
would skip boosting unnecessary replica in a recovery partition.
Tests
The following tests are written for this issue:
TestPerReplicaThrottle
TestPerReplicaThrottleStage
The following is the result of the "mvn test" command on the appropriate module:
(Before CI test pass, please copy & paste the result of "mvn test")
Documentation (Optional)
(Link the GitHub wiki you added)
Commits
Code Quality
(helix-style-intellij.xml if IntelliJ IDE is used)