Skip to content

Task assign timeout and stop problem in ZK#10798

Closed
chenyuzhi459 wants to merge 2 commits intoapache:masterfrom
chenyuzhi459:fix-remote-runner
Closed

Task assign timeout and stop problem in ZK#10798
chenyuzhi459 wants to merge 2 commits intoapache:masterfrom
chenyuzhi459:fix-remote-runner

Conversation

@chenyuzhi459
Copy link
Contributor

@chenyuzhi459 chenyuzhi459 commented Jan 25, 2021

Fixes #Fix stop task uncorrectly and timeout bug for tasks.

Description

Can not stop task in overlod ,

  • Problem:
    when zk service is overloaded , we got a task node lost with zk connection and we can't stop tasks on the node normally in overlord.

  • Reason:
    the shutdown() method in indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java will try to stop task
    by invoking findWorkerRunningTask(String taskId), which is depend on zk service. Obviously, it comes to a endless loop.
    Here is source code:

  public void shutdown(final String taskId, String reason)
  {
    log.info("Shutdown [%s] because: [%s]", taskId, reason);
    if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
      log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", taskId);
    } else if (pendingTasks.remove(taskId) != null) {
      pendingTaskPayloads.remove(taskId);
      log.info("Removed task from pending queue: %s", taskId);
    } else if (completeTasks.containsKey(taskId)) {
      cleanup(taskId);
    } else {
      final ZkWorker zkWorker = findWorkerRunningTask(taskId);

      if (zkWorker == null) {
        log.info("Can't shutdown! No worker running task %s", taskId);
        return;
      }
      URL url = null;
      try {
        url = TaskRunnerUtils.makeWorkerURL(zkWorker.getWorker(), "/druid/worker/v1/task/%s/shutdown", taskId);
        final StatusResponseHolder response = httpClient.go(
            new Request(HttpMethod.POST, url),
            StatusResponseHandler.getInstance(),
            shutdownTimeout
        ).get();

        log.info(
            "Sent shutdown message to worker: %s, status %s, response: %s",
            zkWorker.getWorker().getHost(),
            response.getStatus(),
            response.getContent()
        );

        if (!HttpResponseStatus.OK.equals(response.getStatus())) {
          log.error("Shutdown failed for %s! Are you sure the task was running?", taskId);
        }
      }
      catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RE(e, "Interrupted posting shutdown to [%s] for task [%s]", url, taskId);
      }
      catch (Exception e) {
        throw new RE(e, "Error in handling post to [%s] for task [%s]", zkWorker.getWorker().getHost(), taskId);
      }
    }
  }

  • solution:
    when we can't stop tasks using findWorkerRunningTask(String taskId) , we try to fetch from runningTasks at least.
  1. timeout when assign task
     // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
     // on a worker - this avoids overflowing a worker with tasks
     Stopwatch timeoutStopwatch = Stopwatch.createStarted();
     while (!isWorkerRunningTask(theZkWorker, task.getId())) {
       final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
       statusLock.wait(waitMs/3);
       long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
       if (elapsed >= waitMs) {
         log.makeAlert(
             "Task assignment timed out on worker [%s], never ran task [%s]! Timeout: (%s >= %s)!",
             worker,
             task.getId(),
             elapsed,
             config.getTaskAssignmentTimeout()
         ).emit();
         taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId()));
         break;
       }
     }
     return true;

In the original code ,we only check status using isWorkerRunningTask once. However, in a bad Network Environment ,we should try more.

Fixed the bug ...

Renamed the class ...

Added a forbidden-apis entry ...


This PR has:

  • been self-reviewed.
    • [ - ] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Key changed/added classes in this PR
  • RemoteTaskRunner

@stale
Copy link

stale bot commented Apr 29, 2022

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Apr 29, 2022
@github-actions
Copy link

github-actions bot commented Oct 6, 2023

This pull request/issue has been closed due to lack of activity. If you think that
is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Oct 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant