Skip to content
Permalink
Browse files
Reduce helix controller log and minor code improve (#2102)
Turn down a few log level to DEBUG and reduce duplicated logs
  • Loading branch information
qqu0127 committed May 27, 2022
1 parent 0589fa9 commit 56f983ad725112432ac22c8895b4bb55c2478211
Showing 5 changed files with 17 additions and 21 deletions.
@@ -138,7 +138,7 @@ private void updatePendingMessages(LiveInstance instance, BaseControllerDataProv
String resourceName = message.getResourceName();
Resource resource = resourceMap.get(resourceName);
if (resource == null) {
LogUtil.logInfo(LOG, _eventId, String.format(
LogUtil.logDebug(LOG, _eventId, String.format(
"Ignore a pending relay message %s for a non-exist resource %s and partition %s",
message.getMsgId(), resourceName, message.getPartitionName()));
continue;
@@ -156,7 +156,7 @@ private void updatePendingMessages(LiveInstance instance, BaseControllerDataProv
cache.addStaleMessage(instanceName, message);
}
} else {
LogUtil.logInfo(LOG, _eventId, String
LogUtil.logDebug(LOG, _eventId, String
.format("Ignore a pending message %s for a non-exist resource %s and partition %s",
message.getMsgId(), resourceName, message.getPartitionName()));
}
@@ -168,7 +168,7 @@ private void updatePendingMessages(LiveInstance instance, BaseControllerDataProv
if (partition != null) {
setMessageState(currentStateOutput, resourceName, partition, instanceName, message);
} else {
LogUtil.logInfo(LOG, _eventId, String.format(
LogUtil.logDebug(LOG, _eventId, String.format(
"Ignore a pending message %s for a non-exist resource %s and partition %s",
message.getMsgId(), resourceName, message.getPartitionName()));
}
@@ -193,7 +193,7 @@ private void updatePendingMessages(LiveInstance instance, BaseControllerDataProv
String resourceName = message.getResourceName();
Resource resource = resourceMap.get(resourceName);
if (resource == null) {
LogUtil.logInfo(LOG, _eventId, String.format(
LogUtil.logDebug(LOG, _eventId, String.format(
"Ignore a pending relay message %s for a non-exist resource %s and partition %s",
message.getMsgId(), resourceName, message.getPartitionName()));
continue;
@@ -205,7 +205,7 @@ private void updatePendingMessages(LiveInstance instance, BaseControllerDataProv
if (partition != null) {
currentStateOutput.setPendingRelayMessage(resourceName, partition, instanceName, message);
} else {
LogUtil.logInfo(LOG, _eventId, String.format(
LogUtil.logDebug(LOG, _eventId, String.format(
"Ignore a pending relay message %s for a non-exist resource %s and partition %s",
message.getMsgId(), resourceName, message.getPartitionName()));
}
@@ -325,13 +325,13 @@ private Message generateCancellationMessageForPendingMessage(final String desire
.getResourceName() + "." + partition.getPartitionName() + " from " + currentState
+ " to " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
} else if (currentState.equalsIgnoreCase(pendingState)) {
LogUtil.logInfo(logger, _eventId,
LogUtil.logDebug(logger, _eventId,
"Message hasn't been removed for " + instanceName + " to transit " + resource
.getResourceName() + "." + partition.getPartitionName() + " to " + pendingState
+ ", desiredState: " + desiredState + ", isRelay: " + pendingMessage
.isRelayMessage());
} else {
LogUtil.logInfo(logger, _eventId,
LogUtil.logDebug(logger, _eventId,
"IdealState changed before state transition completes for " + resource.getResourceName()
+ "." + partition.getPartitionName() + " on " + instanceName + ", pendingState: "
+ pendingState + ", currentState: " + currentState + ", nextState: " + nextState
@@ -419,7 +419,6 @@ public void updateAssignableInstances(ClusterConfig clusterConfig,
}
LOG.info(
"AssignableInstanceManager updated AssignableInstances due to LiveInstance/InstanceConfig change.");

computeGlobalThreadBasedCapacity();
}

@@ -21,7 +21,6 @@

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
@@ -75,7 +74,7 @@ public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg,
// Clean up if workflow marked for deletion
TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
LOG.info("Workflow is marked as deleted {} cleaning up the workflow context.", workflow);
LOG.debug("Workflow is marked as deleted {} cleaning up the workflow context.", workflow);
updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
cleanupWorkflow(workflow);
return;
@@ -119,7 +118,7 @@ && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {

// Step 4: Handle finished workflows
if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
LOG.info("Workflow {} is finished.", workflow);
LOG.debug("Workflow {} is finished.", workflow);
updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
long expiryTime = workflowCfg.getExpiry();
// Check if this workflow has been finished past its expiry.
@@ -150,9 +149,9 @@ && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
// For workflows that have already reached final states, STOP should not take into effect.
if (!TaskConstants.FINAL_STATES.contains(workflowCtx.getWorkflowState())
&& TargetState.STOP.equals(targetState)) {
LOG.info("Workflow {} is marked as stopped. Workflow state is {}", workflow,
workflowCtx.getWorkflowState());
if (isWorkflowStopped(workflowCtx, workflowCfg)) {
if (isWorkflowStopped(workflowCtx, workflowCfg) && workflowCtx.getWorkflowState() != TaskState.STOPPED) {
LOG.debug("Workflow {} is marked as stopped. Workflow state is {}", workflow,
workflowCtx.getWorkflowState());
workflowCtx.setWorkflowState(TaskState.STOPPED);
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.PriorityQueue;

import java.util.Set;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.task.AssignableInstanceManager;
import org.apache.helix.task.TaskConfig;
@@ -74,19 +75,16 @@ public Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> as
public Map<String, TaskAssignResult> assignTasks(
AssignableInstanceManager assignableInstanceManager, Collection<String> instances,
Iterable<TaskConfig> tasks, String quotaType) {
Iterable<AssignableInstance> assignableInstances = new HashSet<>();
Set<AssignableInstance> assignableInstances = new HashSet<>();
// Only add the AssignableInstances that are also in instances
for (String instance : instances) {
((HashSet<AssignableInstance>) assignableInstances)
.add(assignableInstanceManager.getAssignableInstance(instance));
assignableInstances.add(assignableInstanceManager.getAssignableInstance(instance));
}

if (tasks == null || !tasks.iterator().hasNext()) {
logger.warn("No task to assign!");
return Collections.emptyMap();
}
if (assignableInstances == null || !assignableInstances.iterator().hasNext()) {
logger.warn("No instance to assign!");
if (assignableInstances.isEmpty()) {
return buildNoInstanceAssignment(tasks, quotaType);
}
if (quotaType == null || quotaType.equals("") || quotaType.equals("null")) {
@@ -148,7 +146,7 @@ private Map<String, TaskAssignResult> buildNoInstanceAssignment(Iterable<TaskCon
return result;
}

private class AssignableInstanceComparator implements Comparator<AssignableInstance> {
private static class AssignableInstanceComparator implements Comparator<AssignableInstance> {

/**
* Resource type this comparator needs to compare

0 comments on commit 56f983a

Please sign in to comment.