Skip to content

Commit

Permalink
Add new metric that quantifies how long batch ingest jobs waited for …
Browse files Browse the repository at this point in the history
…segment availability and whether or not that wait was successful (#12002)

* add a unit test that tests that new metric is emitted

* remove unused import

* clarify in doc that this is for batch tasks

* fix IndexTaskTest
  • Loading branch information
capistrant committed Dec 10, 2021
1 parent ffa4783 commit 761fe9f
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.|

## Shuffle metrics (Native parallel task)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
Expand Down Expand Up @@ -674,6 +675,14 @@ protected boolean waitForSegmentAvailability(
}
finally {
segmentAvailabilityWaitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
toolbox.getEmitter().emit(
new ServiceMetricEvent.Builder()
.setDimension("dataSource", getDataSource())
.setDimension("taskType", getType())
.setDimension("taskId", getId())
.setDimension("segmentAvailabilityConfirmed", segmentAvailabilityConfirmationCompleted)
.build("task/segmentAvailability/wait/time", segmentAvailabilityWaitTimeMs)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
Expand Down Expand Up @@ -90,6 +92,7 @@
import org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
Expand Down Expand Up @@ -124,6 +127,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@RunWith(Parameterized.class)
Expand Down Expand Up @@ -1086,6 +1091,7 @@ public void testWaitForSegmentAvailabilityMultipleSegmentsTimeout() throws IOExc
EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();

EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()).andReturn(mockFactory).once();
EasyMock.expect(mockToolbox.getEmitter()).andReturn(new NoopServiceEmitter()).anyTimes();
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
EasyMock.expect(mockFactory.createSegmentHandoffNotifier("MockDataSource")).andReturn(mockNotifier).once();
mockNotifier.start();
Expand Down Expand Up @@ -1150,12 +1156,75 @@ public void testWaitForSegmentAvailabilityMultipleSegmentsSuccess() throws IOExc
EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory())
.andReturn(new NoopSegmentHandoffNotifierFactory())
.once();
EasyMock.expect(mockToolbox.getEmitter()).andReturn(new NoopServiceEmitter()).anyTimes();

EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();

EasyMock.replay(mockToolbox);
EasyMock.replay(mockDataSegment1, mockDataSegment2);

Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox, segmentsToWaitFor, 30000));
EasyMock.verify(mockToolbox);
EasyMock.verify(mockDataSegment1, mockDataSegment2);
}

@Test
public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws IOException, InterruptedException
{
final File tmpDir = temporaryFolder.newFolder();

LatchableServiceEmitter latchEmitter = new LatchableServiceEmitter();
latchEmitter.latch = new CountDownLatch(1);

TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);

DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class);
DataSegment mockDataSegment2 = EasyMock.createMock(DataSegment.class);
List<DataSegment> segmentsToWaitFor = new ArrayList<>();
segmentsToWaitFor.add(mockDataSegment1);
segmentsToWaitFor.add(mockDataSegment2);

IndexTask indexTask = new IndexTask(
null,
null,
createDefaultIngestionSpec(
jsonMapper,
tmpDir,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
null,
createTuningConfigWithMaxRowsPerSegment(2, true),
false,
false
),
null
);

EasyMock.expect(mockDataSegment1.getInterval()).andReturn(Intervals.of("1970-01-01/1971-01-01")).once();
EasyMock.expect(mockDataSegment1.getVersion()).andReturn("dummyString").once();
EasyMock.expect(mockDataSegment1.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();
EasyMock.expect(mockDataSegment1.getId()).andReturn(SegmentId.dummy("MockDataSource")).once();
EasyMock.expect(mockDataSegment2.getInterval()).andReturn(Intervals.of("1971-01-01/1972-01-01")).once();
EasyMock.expect(mockDataSegment2.getVersion()).andReturn("dummyString").once();
EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();
EasyMock.expect(mockDataSegment2.getId()).andReturn(SegmentId.dummy("MockDataSource")).once();

EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory())
.andReturn(new NoopSegmentHandoffNotifierFactory())
.once();
EasyMock.expect(mockToolbox.getEmitter())
.andReturn(latchEmitter).anyTimes();

EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();

EasyMock.replay(mockToolbox);
EasyMock.replay(mockDataSegment1, mockDataSegment2);

Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox, segmentsToWaitFor, 30000));
latchEmitter.latch.await(300000, TimeUnit.MILLISECONDS);
EasyMock.verify(mockToolbox);
EasyMock.verify(mockDataSegment1, mockDataSegment2);
}
Expand Down Expand Up @@ -2709,6 +2778,27 @@ private static IndexIngestionSpec createIngestionSpec(
}
}

/**
* Used to test that expected metric is emitted by AbstractBatchIndexTask#waitForSegmentAvailability
*/
private static class LatchableServiceEmitter extends ServiceEmitter
{
private CountDownLatch latch;

private LatchableServiceEmitter()
{
super("", "", null);
}

@Override
public void emit(Event event)
{
if (latch != null && "task/segmentAvailability/wait/time".equals(event.toMap().get("metric"))) {
latch.countDown();
}
}
}

@Test
public void testEqualsAndHashCode()
{
Expand Down
1 change: 1 addition & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,7 @@ numMetrics
poolKind
poolName
remoteAddress
segmentAvailabilityConfirmed
serviceName
taskStatus
taskType
Expand Down

0 comments on commit 761fe9f

Please sign in to comment.