Skip to content
Permalink
Browse files
Add a metric for task duration in the pending queue (#12492)
This PR is to measure how long a task stays in the pending queue and emits the value with the metric task/pending/time. The metric is measured in RemoteTaskRunner and HttpRemoteTaskRunner.

An example of the metric:

```
2022-04-26T21:59:09,488 INFO [rtr-pending-tasks-runner-0] org.apache.druid.java.util.emitter.core.LoggingEmitter - {"feed":"metrics","timestamp":"2022-04-26T21:59:09.487Z","service":"druid/coordinator","host":"localhost:8081","version":"2022.02.0-iap-SNAPSHOT","metric":"task/pending/time","value":8,"dataSource":"wikipedia","taskId":"index_parallel_wikipedia_gecpcglg_2022-04-26T21:59:09.432Z","taskType":"index_parallel"}
```

------------------------------------------
Key changed/added classes in this PR

    Emit metric task/pending/time in classes RemoteTaskRunner and HttpRemoteTaskRunner.
    Update related factory classes and tests.
  • Loading branch information
rockc2020 committed May 3, 2022
1 parent 785a1ee commit 770ad951693f30e6b56af4db430f88f0408d10d5
Showing 8 changed files with 146 additions and 17 deletions.
@@ -198,6 +198,7 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`task/run/time`|Milliseconds taken to run a task.|dataSource, taskId, taskType, taskStatus.|Varies.|
|`task/pending/time`|Milliseconds taken for a task to wait for running.|dataSource, taskId, taskType.|Varies.|
|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.|dataSource, taskId, taskType|< 1000 (subsecond)|
|`task/action/run/time`|Milliseconds taken to execute a task action.|dataSource, taskId, taskType|Varies from subsecond to a few seconds, based on action type.|
|`segment/added/bytes`|Size in bytes of new segments created.|dataSource, taskId, taskType, interval.|Varies.|
@@ -54,6 +54,7 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
@@ -74,6 +75,8 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
@@ -179,6 +182,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer

private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
private final ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy;
private final ServiceEmitter emitter;
private ProvisioningService provisioningService;

public RemoteTaskRunner(
@@ -189,7 +193,8 @@ public RemoteTaskRunner(
PathChildrenCacheFactory.Builder pathChildrenCacheFactory,
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
ServiceEmitter emitter
)
{
this.jsonMapper = jsonMapper;
@@ -213,6 +218,7 @@ public RemoteTaskRunner(
config.getPendingTasksRunnerNumThreads(),
"rtr-pending-tasks-runner-%d"
);
this.emitter = emitter;
}

@Override
@@ -934,6 +940,13 @@ private boolean announceTask(
return false;
}

final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
emitter.emit(metricBuilder.build(
"task/pending/time",
new Duration(workItem.getQueueInsertionTime(), DateTimes.nowUtc()).getMillis())
);

RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker(), null);
runningTasks.put(task.getId(), newWorkItem);
log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost());
@@ -1516,6 +1529,12 @@ Map<String, String> getWorkersWithUnacknowledgedTask()
return workersWithUnacknowledgedTask;
}

@VisibleForTesting
ProvisioningStrategy<WorkerTaskRunner> getProvisioningStrategy()
{
return provisioningStrategy;
}

@Override
public Map<String, Long> getTotalTaskSlotCount()
{
@@ -30,6 +30,7 @@
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;

@@ -46,6 +47,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ProvisioningSchedulerConfig provisioningSchedulerConfig;
private final ProvisioningStrategy provisioningStrategy;
private final ServiceEmitter emitter;

@Inject
public RemoteTaskRunnerFactory(
@@ -56,7 +58,8 @@ public RemoteTaskRunnerFactory(
@EscalatedGlobal final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef,
final ProvisioningSchedulerConfig provisioningSchedulerConfig,
final ProvisioningStrategy provisioningStrategy
final ProvisioningStrategy provisioningStrategy,
final ServiceEmitter emitter
)
{
this.curator = curator;
@@ -67,6 +70,7 @@ public RemoteTaskRunnerFactory(
this.workerConfigRef = workerConfigRef;
this.provisioningSchedulerConfig = provisioningSchedulerConfig;
this.provisioningStrategy = provisioningStrategy;
this.emitter = emitter;
}

@Override
@@ -80,7 +84,8 @@ public RemoteTaskRunner build()
new PathChildrenCacheFactory.Builder().withCompressed(true),
httpClient,
workerConfigRef,
provisioningSchedulerConfig.isDoAutoscale() ? provisioningStrategy : new NoopProvisioningStrategy<>()
provisioningSchedulerConfig.isDoAutoscale() ? provisioningStrategy : new NoopProvisioningStrategy<>(),
emitter
);
}
}
@@ -50,6 +50,7 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
@@ -76,13 +77,16 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;
import org.joda.time.Period;

import javax.annotation.Nullable;
@@ -182,6 +186,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private final HttpRemoteTaskRunnerConfig config;

private final TaskStorage taskStorage;
private final ServiceEmitter emitter;

// ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and WorkerTaskMonitor are removed.
private static final Joiner JOINER = Joiner.on("/");
@@ -203,7 +208,8 @@ public HttpRemoteTaskRunner(
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
TaskStorage taskStorage,
@Nullable CuratorFramework cf,
IndexerZkConfig indexerZkConfig
IndexerZkConfig indexerZkConfig,
ServiceEmitter emitter
)
{
this.smileMapper = smileMapper;
@@ -212,6 +218,7 @@ public HttpRemoteTaskRunner(
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.taskStorage = taskStorage;
this.workerConfigRef = workerConfigRef;
this.emitter = emitter;

this.pendingTasksExec = Execs.multiThreaded(
config.getPendingTasksRunnerNumThreads(),
@@ -1548,6 +1555,14 @@ void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder
taskItem.setWorker(worker);
taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
log.info("Task[%s] started RUNNING on worker[%s].", taskId, worker.getHost());

final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, taskItem.getTask());
emitter.emit(metricBuilder.build(
"task/pending/time",
new Duration(taskItem.getCreatedTime(), DateTimes.nowUtc()).getMillis())
);

// fall through
case RUNNING:
if (worker.getHost().equals(taskItem.getWorker().getHost())) {
@@ -35,6 +35,7 @@
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;

@@ -54,6 +55,7 @@ public class HttpRemoteTaskRunnerFactory implements TaskRunnerFactory<HttpRemote
private final ProvisioningStrategy provisioningStrategy;
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final TaskStorage taskStorage;
private final ServiceEmitter emitter;

// ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and WorkerTaskMonitor are removed.
@Nullable //Null if zk is disabled
@@ -72,7 +74,8 @@ public HttpRemoteTaskRunnerFactory(
final TaskStorage taskStorage,
final Provider<CuratorFramework> cfProvider,
final IndexerZkConfig indexerZkConfig,
final ZkEnablementConfig zkEnablementConfig
final ZkEnablementConfig zkEnablementConfig,
final ServiceEmitter emitter
)
{
this.smileMapper = smileMapper;
@@ -84,6 +87,7 @@ public HttpRemoteTaskRunnerFactory(
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.taskStorage = taskStorage;
this.indexerZkConfig = indexerZkConfig;
this.emitter = emitter;

if (zkEnablementConfig.isEnabled()) {
this.cf = cfProvider.get();
@@ -104,7 +108,8 @@ public HttpRemoteTaskRunner build()
druidNodeDiscoveryProvider,
taskStorage,
cf,
indexerZkConfig
indexerZkConfig,
emitter
);
}
}
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.overlord;

import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class RemoteTaskRunnerFactoryTest
{
@Test
public void testBuildWithAutoScale()
{
ProvisioningSchedulerConfig provisioningSchedulerConfig = Mockito.mock(ProvisioningSchedulerConfig.class);
Mockito.when(provisioningSchedulerConfig.isDoAutoscale()).thenReturn(true);

RemoteTaskRunnerFactory remoteTaskRunnerFactory = getTestRemoteTaskRunnerFactory(provisioningSchedulerConfig);

Assert.assertNull(remoteTaskRunnerFactory.build().getProvisioningStrategy());
}

@Test
public void testBuildWithoutAutoScale()
{
ProvisioningSchedulerConfig provisioningSchedulerConfig = Mockito.mock(ProvisioningSchedulerConfig.class);
Mockito.when(provisioningSchedulerConfig.isDoAutoscale()).thenReturn(false);

RemoteTaskRunnerFactory remoteTaskRunnerFactory = getTestRemoteTaskRunnerFactory(provisioningSchedulerConfig);

Assert.assertTrue(remoteTaskRunnerFactory.build().getProvisioningStrategy() instanceof NoopProvisioningStrategy);
}

private RemoteTaskRunnerFactory getTestRemoteTaskRunnerFactory(ProvisioningSchedulerConfig provisioningSchedulerConfig)
{
CuratorFramework curator = Mockito.mock(CuratorFramework.class);
Mockito.when(curator.newWatcherRemoveCuratorFramework()).thenReturn(null);
return new RemoteTaskRunnerFactory(
curator,
new RemoteTaskRunnerConfig(),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
null,
null,
null,
provisioningSchedulerConfig,
null,
null
);
}
}
@@ -47,6 +47,7 @@
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.zookeeper.CreateMode;

import java.util.concurrent.atomic.AtomicReference;
@@ -270,7 +271,8 @@ public TestableRemoteTaskRunner(
pathChildrenCacheFactory,
httpClient,
workerConfigRef,
provisioningStrategy
provisioningStrategy,
new NoopServiceEmitter()
);
}

0 comments on commit 770ad95

Please sign in to comment.