Skip to content

Fix flaky test TestTaskRebalancerParallel.testWhenAllowOverlapJobAssignment #1283

@kaisun2000

Description

@kaisun2000

LOG:

2020-08-16T04:10:36.5238670Z [ERROR] TestTaskRebalancerParallel.testWhenAllowOverlapJobAssignment:125 expected:< true > but was:< false >

2020-08-16T04:10:36.1700878Z [ERROR] testWhenAllowOverlapJobAssignment(org.apache.helix.integration.task.TestTaskRebalancerParallel) Time elapsed: 6.21 s <<< FAILURE!
2020-08-16T04:10:36.1701274Z java.lang.AssertionError: expected: but was:
2020-08-16T04:10:36.1702924Z at org.apache.helix.integration.task.TestTaskRebalancerParallel.testWhenAllowOverlapJobAssignment(TestTaskRebalancerParallel.java:125)

Code:

  // 1. Different jobs in a same work flow is in RUNNING at the same time
  // 2. When disallow overlap assignment, no two jobs in the same work flow is in RUNNING at the same instance
  // Use this method with caution because it assumes workflow doesn't finish too quickly and number of parallel running
  // tasks can be counted.
  public static boolean pollForWorkflowParallelState(TaskDriver driver, String workflowName)
      throws InterruptedException {

    WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflowName);
    Assert.assertNotNull(workflowConfig);

    WorkflowContext workflowContext = null;
    while (workflowContext == null) {
      workflowContext = driver.getWorkflowContext(workflowName);
      Thread.sleep(100);
    }

    int maxRunningCount = 0;
    boolean finished = false;

    while (!finished) {
      finished = true;
      int runningCount = 0;

      workflowContext = driver.getWorkflowContext(workflowName);
      for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
        TaskState jobState = workflowContext.getJobState(jobName);
        if (jobState == TaskState.IN_PROGRESS) {
          ++runningCount;
          finished = false;
        }
      }

      if (runningCount > maxRunningCount ) {
        maxRunningCount = runningCount;
      }

      Thread.sleep(100);
    }

    List<JobContext> jobContextList = new ArrayList<>();
    for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
      JobContext jobContext = driver.getJobContext(jobName);
      if (jobContext != null) {
        jobContextList.add(driver.getJobContext(jobName));
      }
    }
    Map<String, List<long[]>> rangeMap = new HashMap<>();

    if (!workflowConfig.isAllowOverlapJobAssignment()) {
      for (JobContext jobContext : jobContextList) {
        for (int partition : jobContext.getPartitionSet()) {
          String instance = jobContext.getAssignedParticipant(partition);
          if (!rangeMap.containsKey(instance)) {
            rangeMap.put(instance, new ArrayList<long[]>());
          }
          rangeMap.get(instance).add(new long[] { jobContext.getPartitionStartTime(partition),
              jobContext.getPartitionFinishTime(partition)
          });
        }
      }
    }

    for (List<long[]> timeRange : rangeMap.values()) {
      Collections.sort(timeRange, new Comparator<long[]>() {
        @Override
        public int compare(long[] o1, long[] o2) {
          return (int) (o1[0] - o2[0]);
        }
      });

      for (int i = 0; i < timeRange.size() - 1; i++) {
        if (timeRange.get(i)[1] > timeRange.get(i + 1)[0]) {
          return false;
        }
      }
    }
    return maxRunningCount > 1 && (workflowConfig.isJobQueue() ? maxRunningCount <= workflowConfig
        .getParallelJobs() : true);
}

Not sure where it goes wrong as this function returned false. Add more logging when return failure.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions