Skip to content
Permalink
Browse files
Worker level task metrics (#12446)
* * fix metric name inconsistency

* * add task slot metrics for middle managers

* * add new WorkerTaskCountStatsMonitor to report task count metrics
  from worker

* * more stuff

* * remove unused variable

* * more stuff

* * add javadocs

* * fix checkstyle

* * fix hadoop test failure

* * cleanup

* * add more code coverage in tests

* * fix test failure

* * add docs

* * increase code coverage

* * fix spelling

* * fix failing tests

* * remove dead code

* * fix spelling
  • Loading branch information
zachjsh committed Apr 26, 2022
1 parent 4868ef9 commit 564d6defd47749d55dd07e5549d7264cbc1c4019
Showing 10 changed files with 454 additions and 11 deletions.
@@ -383,6 +383,8 @@ Metric monitoring is an essential part of Druid operations. The following monit
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|
|`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: https://github.com/apache/druid/pull/4973.|
|`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.|
|`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics about task slot usage per emission period.|
|`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting, the number of successful/failed tasks, and metrics about task slot usage for the reporting worker, per emission period. Only supported by middleManager node types.|

For example, you might configure monitors on all processes for system and JVM information within `common.runtime.properties` as follows:

@@ -214,6 +214,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.|
|`worker/task/failed/count`|Number of failed tasks run on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.|
|`worker/task/success/count`|Number of successful tasks run on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.|
|`worker/taskSlot/idle/count`|Number of idle task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.|
|`worker/taskSlot/total/count`|Number of total task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|
|`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|

## Shuffle metrics (Native parallel task)

@@ -63,9 +63,15 @@
"task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
"task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },

"worker/task/failed/count" : { "dimensions" : ["category", "version"], "type" : "count" },
"worker/task/success/count" : { "dimensions" : ["category", "version"], "type" : "count" },
"worker/taskSlot/idle/count" : { "dimensions" : ["category", "version"], "type" : "gauge" },
"worker/taskSlot/total/count" : { "dimensions" : ["category", "version"], "type" : "gauge" },
"worker/taskSlot/used/count" : { "dimensions" : ["category", "version"], "type" : "gauge" },

"taskSlot/total/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/idle/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/busy/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/used/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/lazy/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/blacklisted/count" : { "dimensions" : ["category"], "type" : "gauge" },

@@ -63,6 +63,7 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.server.metrics.MonitorsConfig;
import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.joda.time.DateTime;
@@ -83,13 +84,14 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* Runs tasks in separate processes using the "internal peon" verb.
*/
public class ForkingTaskRunner
extends BaseRestorableTaskRunner<ForkingTaskRunner.ForkingTaskRunnerWorkItem>
implements TaskLogStreamer
implements TaskLogStreamer, WorkerTaskCountStatsProvider
{
private static final EmittingLogger LOGGER = new EmittingLogger(ForkingTaskRunner.class);
private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
@@ -104,6 +106,11 @@

private volatile boolean stopping = false;

private static final AtomicLong LAST_REPORTED_FAILED_TASK_COUNT = new AtomicLong();
private static final AtomicLong FAILED_TASK_COUNT = new AtomicLong();
private static final AtomicLong SUCCESSFUL_TASK_COUNT = new AtomicLong();
private static final AtomicLong LAST_REPORTED_SUCCESSFUL_TASK_COUNT = new AtomicLong();

@Inject
public ForkingTaskRunner(
ForkingTaskRunnerConfig config,
@@ -399,7 +406,11 @@ public TaskStatus call()
)
);
}

if (status.isSuccess()) {
SUCCESSFUL_TASK_COUNT.incrementAndGet();
} else {
FAILED_TASK_COUNT.incrementAndGet();
}
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
return status;
}
@@ -690,18 +701,12 @@ String getMaskedCommand(List<String> maskedProperties, List<String> command)
@Override
public Map<String, Long> getTotalTaskSlotCount()
{
if (config.getPorts() != null && !config.getPorts().isEmpty()) {
return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getPorts().size()));
}
return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getEndPort() - config.getStartPort() + 1));
return ImmutableMap.of(workerConfig.getCategory(), getTotalTaskSlotCountLong());
}

public long getTotalTaskSlotCountLong()
{
if (config.getPorts() != null && !config.getPorts().isEmpty()) {
return config.getPorts().size();
}
return config.getEndPort() - config.getStartPort() + 1;
return workerConfig.getCapacity();
}

@Override
@@ -733,6 +738,54 @@ public Map<String, Long> getBlacklistedTaskSlotCount()
return ImmutableMap.of(workerConfig.getCategory(), 0L);
}

@Override
public Long getWorkerFailedTaskCount()
{
long failedTaskCount = FAILED_TASK_COUNT.get();
long lastReportedFailedTaskCount = LAST_REPORTED_FAILED_TASK_COUNT.get();
LAST_REPORTED_FAILED_TASK_COUNT.set(failedTaskCount);
return failedTaskCount - lastReportedFailedTaskCount;
}

@Override
public Long getWorkerIdleTaskSlotCount()
{
return Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0);
}

@Override
public Long getWorkerUsedTaskSlotCount()
{
return (long) portFinder.findUsedPortCount();
}

@Override
public Long getWorkerTotalTaskSlotCount()
{
return getTotalTaskSlotCountLong();
}

@Override
public String getWorkerCategory()
{
return workerConfig.getCategory();
}

@Override
public String getWorkerVersion()
{
return workerConfig.getVersion();
}

@Override
public Long getWorkerSuccessfulTaskCount()
{
long successfulTaskCount = SUCCESSFUL_TASK_COUNT.get();
long lastReportedSuccessfulTaskCount = LAST_REPORTED_SUCCESSFUL_TASK_COUNT.get();
LAST_REPORTED_SUCCESSFUL_TASK_COUNT.set(successfulTaskCount);
return successfulTaskCount - lastReportedSuccessfulTaskCount;
}

protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final Task task;
@@ -231,6 +231,12 @@ ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation ta
@Override
int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
{
WorkerConfig workerConfig = new WorkerConfig();
Assert.assertEquals(1L, (long) this.getWorkerUsedTaskSlotCount());
Assert.assertEquals(workerConfig.getCapacity(), (long) this.getWorkerTotalTaskSlotCount());
Assert.assertEquals(workerConfig.getCapacity() - 1, (long) this.getWorkerIdleTaskSlotCount());
Assert.assertEquals(workerConfig.getCategory(), this.getWorkerCategory());
Assert.assertEquals(workerConfig.getVersion(), this.getWorkerVersion());
// Emulate task process failure
return 1;
}
@@ -242,6 +248,8 @@ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File lo
"Task execution process exited unsuccessfully with code[1]. See middleManager logs for more details.",
status.getErrorMsg()
);
Assert.assertEquals(1L, (long) forkingTaskRunner.getWorkerFailedTaskCount());
Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount());
}

@Test
@@ -294,13 +302,21 @@ ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation ta
@Override
int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
{
WorkerConfig workerConfig = new WorkerConfig();
Assert.assertEquals(1L, (long) this.getWorkerUsedTaskSlotCount());
Assert.assertEquals(workerConfig.getCapacity(), (long) this.getWorkerTotalTaskSlotCount());
Assert.assertEquals(workerConfig.getCapacity() - 1, (long) this.getWorkerIdleTaskSlotCount());
Assert.assertEquals(workerConfig.getCategory(), this.getWorkerCategory());
Assert.assertEquals(workerConfig.getVersion(), this.getWorkerVersion());
return 0;
}
};

final TaskStatus status = forkingTaskRunner.run(task).get();
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
Assert.assertNull(status.getErrorMsg());
Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerFailedTaskCount());
Assert.assertEquals(1L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount());
}

@Test
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.metrics;

import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;

import java.util.Set;

public class WorkerTaskCountStatsMonitor extends AbstractMonitor
{
private final WorkerTaskCountStatsProvider statsProvider;
private final String workerCategory;
private final String workerVersion;
private final boolean isMiddleManager;

@Inject
public WorkerTaskCountStatsMonitor(
Injector injector,
@Self Set<NodeRole> nodeRoles
)
{
this.isMiddleManager = nodeRoles.contains(NodeRole.MIDDLE_MANAGER);
if (isMiddleManager) {
this.statsProvider = injector.getInstance(WorkerTaskCountStatsProvider.class);
this.workerCategory = statsProvider.getWorkerCategory();
this.workerVersion = statsProvider.getWorkerVersion();
} else {
this.statsProvider = null;
this.workerCategory = null;
this.workerVersion = null;
}
}

@Override
public boolean doMonitor(ServiceEmitter emitter)
{
if (isMiddleManager) {
emit(emitter, "worker/task/failed/count", statsProvider.getWorkerFailedTaskCount());
emit(emitter, "worker/task/success/count", statsProvider.getWorkerSuccessfulTaskCount());
emit(emitter, "worker/taskSlot/idle/count", statsProvider.getWorkerIdleTaskSlotCount());
emit(emitter, "worker/taskSlot/total/count", statsProvider.getWorkerTotalTaskSlotCount());
emit(emitter, "worker/taskSlot/used/count", statsProvider.getWorkerUsedTaskSlotCount());
}
return true;
}

private void emit(ServiceEmitter emitter, String metricName, Long value)
{
if (value != null) {
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
builder.setDimension("category", workerCategory);
builder.setDimension("version", workerVersion);
emitter.emit(builder.build(metricName, value));
}
}
}
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.metrics;

/**
* Proides task / task count status at the level of individual worker nodes. These merics
* are repoerted by workers, like middle-managers.
*/
public interface WorkerTaskCountStatsProvider
{
/**
* The number of failed tasks run on worker during emission period.
*/
Long getWorkerFailedTaskCount();

/**
* The number of successful tasks run on worker during emission period.
*/
Long getWorkerSuccessfulTaskCount();

/**
* The number of idle task slots on worker.
*/
Long getWorkerIdleTaskSlotCount();

/**
* The number of total task slots on worker.
*/
Long getWorkerTotalTaskSlotCount();

/**
* The number of used task slots on worker.
*/
Long getWorkerUsedTaskSlotCount();


/**
* The worker category.
*/
String getWorkerCategory();

/**
* The worker version.
*/
String getWorkerVersion();
}

0 comments on commit 564d6de

Please sign in to comment.