Skip to content

Commit

Permalink
Add metric for total downsampling latency (#106747)
Browse files Browse the repository at this point in the history
* Add DownsampleMetrics

* replace singleton with injection

* add comment

* add comment

* fix test

* Metric for total downsampling latency

* small fixes

* make startTime a local variable
  • Loading branch information
kkrik-es committed Mar 27, 2024
1 parent 3e406e2 commit a2af99c
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class DownsampleMetrics extends AbstractLifecycleComponent {

public static final String LATENCY_SHARD = "es.tsdb.downsample.latency.shard.histogram";
public static final String LATENCY_TOTAL = "es.tsdb.downsample.latency.total.histogram";

private final MeterRegistry meterRegistry;

Expand All @@ -41,6 +42,7 @@ public DownsampleMetrics(MeterRegistry meterRegistry) {
protected void doStart() {
// Register all metrics to track.
meterRegistry.registerLongHistogram(LATENCY_SHARD, "Downsampling action latency per shard", "ms");
meterRegistry.registerLongHistogram(LATENCY_TOTAL, "Downsampling latency end-to-end", "ms");
}

@Override
Expand All @@ -49,17 +51,17 @@ protected void doStop() {}
@Override
protected void doClose() throws IOException {}

enum ShardActionStatus {
enum ActionStatus {

SUCCESS("success"),
MISSING_DOCS("missing_docs"),
FAILED("failed");

public static final String NAME = "status";
static final String NAME = "status";

private final String message;

ShardActionStatus(String message) {
ActionStatus(String message) {
this.message = message;
}

Expand All @@ -68,7 +70,11 @@ String getMessage() {
}
}

void recordLatencyShard(long durationInMilliSeconds, ShardActionStatus status) {
meterRegistry.getLongHistogram(LATENCY_SHARD).record(durationInMilliSeconds, Map.of(ShardActionStatus.NAME, status.getMessage()));
void recordLatencyShard(long durationInMilliSeconds, ActionStatus status) {
meterRegistry.getLongHistogram(LATENCY_SHARD).record(durationInMilliSeconds, Map.of(ActionStatus.NAME, status.getMessage()));
}

void recordLatencyTotal(long durationInMilliSeconds, ActionStatus status) {
meterRegistry.getLongHistogram(LATENCY_TOTAL).record(durationInMilliSeconds, Map.of(ActionStatus.NAME, status.getMessage()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
+ task.getNumSent()
+ "]";
logger.info(error);
downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ShardActionStatus.MISSING_DOCS);
downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ActionStatus.MISSING_DOCS);
throw new DownsampleShardIndexerException(error, false);
}

Expand All @@ -204,7 +204,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
+ task.getNumFailed()
+ "]";
logger.info(error);
downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ShardActionStatus.FAILED);
downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ActionStatus.FAILED);
throw new DownsampleShardIndexerException(error, false);
}

Expand All @@ -214,7 +214,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
ActionListener.noop()
);
logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + indexShard.shardId() + " completed");
downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ShardActionStatus.SUCCESS);
downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ActionStatus.SUCCESS);
return new DownsampleIndexerAction.ShardDownsampleResponse(indexShard.shardId(), task.getNumIndexed());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

Expand All @@ -115,6 +116,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
private final IndexScopedSettings indexScopedSettings;
private final ThreadContext threadContext;
private final PersistentTasksService persistentTasksService;
private final DownsampleMetrics downsampleMetrics;

private static final Set<String> FORBIDDEN_SETTINGS = Set.of(
IndexSettings.DEFAULT_PIPELINE.getKey(),
Expand Down Expand Up @@ -153,7 +155,8 @@ public TransportDownsampleAction(
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings,
PersistentTasksService persistentTasksService
PersistentTasksService persistentTasksService,
DownsampleMetrics downsampleMetrics
) {
super(
DownsampleAction.NAME,
Expand All @@ -173,6 +176,21 @@ public TransportDownsampleAction(
this.threadContext = threadPool.getThreadContext();
this.taskQueue = clusterService.createTaskQueue("downsample", Priority.URGENT, STATE_UPDATE_TASK_EXECUTOR);
this.persistentTasksService = persistentTasksService;
this.downsampleMetrics = downsampleMetrics;
}

private void recordLatencyOnSuccess(long startTime) {
downsampleMetrics.recordLatencyTotal(
TimeValue.timeValueMillis(client.threadPool().relativeTimeInMillis() - startTime).getMillis(),
DownsampleMetrics.ActionStatus.SUCCESS
);
}

private void recordLatencyOnFailure(long startTime) {
downsampleMetrics.recordLatencyTotal(
TimeValue.timeValueMillis(client.threadPool().relativeTimeInMillis() - startTime).getMillis(),
DownsampleMetrics.ActionStatus.FAILED
);
}

@Override
Expand All @@ -182,6 +200,7 @@ protected void masterOperation(
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) {
long startTime = client.threadPool().relativeTimeInMillis();
String sourceIndexName = request.getSourceIndex();

final IndicesAccessControl indicesAccessControl = threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY);
Expand Down Expand Up @@ -236,7 +255,7 @@ protected void masterOperation(
final TaskId parentTask = new TaskId(clusterService.localNode().getId(), task.getId());
// Short circuit if target index has been downsampled:
final String downsampleIndexName = request.getTargetIndex();
if (canShortCircuit(downsampleIndexName, parentTask, request.getWaitTimeout(), state.metadata(), listener)) {
if (canShortCircuit(downsampleIndexName, parentTask, request.getWaitTimeout(), startTime, state.metadata(), listener)) {
logger.info("Skipping downsampling, because a previous execution already completed downsampling");
return;
}
Expand Down Expand Up @@ -325,6 +344,7 @@ protected void masterOperation(
sourceIndexMetadata,
downsampleIndexName,
parentTask,
startTime,
metricFields,
labelFields,
dimensionFields
Expand All @@ -335,7 +355,14 @@ protected void masterOperation(
}, e -> {
if (e instanceof ResourceAlreadyExistsException) {
var metadata = clusterService.state().metadata();
if (canShortCircuit(request.getTargetIndex(), parentTask, request.getWaitTimeout(), metadata, listener)) {
if (canShortCircuit(
request.getTargetIndex(),
parentTask,
request.getWaitTimeout(),
startTime,
metadata,
listener
)) {
logger.info("Downsample tasks are not created, because a previous execution already completed downsampling");
return;
}
Expand All @@ -345,6 +372,7 @@ protected void masterOperation(
sourceIndexMetadata,
downsampleIndexName,
parentTask,
startTime,
metricFields,
labelFields,
dimensionFields
Expand All @@ -364,6 +392,7 @@ private boolean canShortCircuit(
String targetIndexName,
TaskId parentTask,
TimeValue waitTimeout,
long startTime,
Metadata metadata,
ActionListener<AcknowledgedResponse> listener
) {
Expand Down Expand Up @@ -391,7 +420,13 @@ private boolean canShortCircuit(
.indices()
.refresh(
refreshRequest,
new RefreshDownsampleIndexActionListener(listener, parentTask, targetIndexMetadata.getIndex().getName(), waitTimeout)
new RefreshDownsampleIndexActionListener(
listener,
parentTask,
targetIndexMetadata.getIndex().getName(),
waitTimeout,
startTime
)
);
return true;
}
Expand All @@ -405,6 +440,7 @@ private void performShardDownsampling(
IndexMetadata sourceIndexMetadata,
String downsampleIndexName,
TaskId parentTask,
long startTime,
List<String> metricFields,
List<String> labelFields,
List<String> dimensionFields
Expand All @@ -414,6 +450,7 @@ private void performShardDownsampling(
// NOTE: before we set the number of replicas to 0, as a result here we are
// only dealing with primary shards.
final AtomicInteger countDown = new AtomicInteger(numberOfShards);
final AtomicBoolean errorReported = new AtomicBoolean(false);
for (int shardNum = 0; shardNum < numberOfShards; shardNum++) {
final ShardId shardId = new ShardId(sourceIndex, shardNum);
final String persistentTaskId = createPersistentTaskId(
Expand Down Expand Up @@ -458,13 +495,16 @@ public void onResponse(PersistentTasksCustomMetadata.PersistentTask<PersistentTa
logger.info("Downsampling task [" + persistentTaskId + " completed for shard " + params.shardId());
if (countDown.decrementAndGet() == 0) {
logger.info("All downsampling tasks completed [" + numberOfShards + "]");
updateTargetIndexSettingStep(request, listener, sourceIndexMetadata, downsampleIndexName, parentTask);
updateTargetIndexSettingStep(request, listener, sourceIndexMetadata, downsampleIndexName, parentTask, startTime);
}
}

@Override
public void onFailure(Exception e) {
logger.error("error while waiting for downsampling persistent task", e);
if (errorReported.getAndSet(true) == false) {
recordLatencyOnFailure(startTime);
}
listener.onFailure(e);
}
};
Expand Down Expand Up @@ -504,7 +544,8 @@ private void updateTargetIndexSettingStep(
final ActionListener<AcknowledgedResponse> listener,
final IndexMetadata sourceIndexMetadata,
final String downsampleIndexName,
final TaskId parentTask
final TaskId parentTask,
final long startTime
) {
// 4. Make downsample index read-only and set the correct number of replicas
final Settings.Builder settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true);
Expand All @@ -527,7 +568,13 @@ private void updateTargetIndexSettingStep(
.indices()
.updateSettings(
updateSettingsReq,
new UpdateDownsampleIndexSettingsActionListener(listener, parentTask, downsampleIndexName, request.getWaitTimeout())
new UpdateDownsampleIndexSettingsActionListener(
listener,
parentTask,
downsampleIndexName,
request.getWaitTimeout(),
startTime
)
);
}

Expand Down Expand Up @@ -871,17 +918,20 @@ class UpdateDownsampleIndexSettingsActionListener implements ActionListener<Ackn
final TaskId parentTask;
final String downsampleIndexName;
final TimeValue timeout;
final long startTime;

UpdateDownsampleIndexSettingsActionListener(
final ActionListener<AcknowledgedResponse> listener,
final TaskId parentTask,
final String downsampleIndexName,
final TimeValue timeout
final TimeValue timeout,
final long startTime
) {
this.listener = listener;
this.parentTask = parentTask;
this.downsampleIndexName = downsampleIndexName;
this.timeout = timeout;
this.startTime = startTime;
}

@Override
Expand All @@ -890,11 +940,12 @@ public void onResponse(final AcknowledgedResponse response) {
request.setParentTask(parentTask);
client.admin()
.indices()
.refresh(request, new RefreshDownsampleIndexActionListener(listener, parentTask, downsampleIndexName, timeout));
.refresh(request, new RefreshDownsampleIndexActionListener(listener, parentTask, downsampleIndexName, timeout, startTime));
}

@Override
public void onFailure(Exception e) {
recordLatencyOnSuccess(startTime); // Downsampling has already completed in all shards.
listener.onFailure(e);
}

Expand All @@ -909,17 +960,20 @@ class RefreshDownsampleIndexActionListener implements ActionListener<BroadcastRe
private final TaskId parentTask;
private final String downsampleIndexName;
private final TimeValue timeout;
private final long startTime;

RefreshDownsampleIndexActionListener(
final ActionListener<AcknowledgedResponse> actionListener,
TaskId parentTask,
final String downsampleIndexName,
final TimeValue timeout
final TimeValue timeout,
final long startTime
) {
this.actionListener = actionListener;
this.parentTask = parentTask;
this.downsampleIndexName = downsampleIndexName;
this.timeout = timeout;
this.startTime = startTime;
}

@Override
Expand All @@ -930,7 +984,9 @@ public void onResponse(final BroadcastResponse response) {
// Mark downsample index as "completed successfully" ("index.downsample.status": "success")
taskQueue.submitTask(
"update-downsample-metadata [" + downsampleIndexName + "]",
new DownsampleClusterStateUpdateTask(new ForceMergeActionListener(parentTask, downsampleIndexName, actionListener)) {
new DownsampleClusterStateUpdateTask(
new ForceMergeActionListener(parentTask, downsampleIndexName, startTime, actionListener)
) {

@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -957,6 +1013,7 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(Exception e) {
recordLatencyOnSuccess(startTime); // Downsampling has already completed in all shards.
actionListener.onFailure(e);
}

Expand All @@ -970,42 +1027,43 @@ class ForceMergeActionListener implements ActionListener<AcknowledgedResponse> {
final ActionListener<AcknowledgedResponse> actionListener;
private final TaskId parentTask;
private final String downsampleIndexName;
private final long startTime;

ForceMergeActionListener(
final TaskId parentTask,
final String downsampleIndexName,
final long startTime,
final ActionListener<AcknowledgedResponse> onFailure
) {
this.parentTask = parentTask;
this.downsampleIndexName = downsampleIndexName;
this.startTime = startTime;
this.actionListener = onFailure;
}

@Override
public void onResponse(final AcknowledgedResponse response) {
/*
* At this point downsample index has been created
* successfully even force merge fails.
* So, we should not fail the downsmaple operation
*/
ForceMergeRequest request = new ForceMergeRequest(downsampleIndexName);
request.maxNumSegments(1);
request.setParentTask(parentTask);
client.admin()
.indices()
.forceMerge(request, ActionListener.wrap(mergeIndexResp -> actionListener.onResponse(AcknowledgedResponse.TRUE), t -> {
/*
* At this point downsample index has been created
* successfully even if force merge failed.
* So, we should not fail the downsample operation.
*/
logger.error("Failed to force-merge downsample index [" + downsampleIndexName + "]", t);
actionListener.onResponse(AcknowledgedResponse.TRUE);
}));
client.admin().indices().forceMerge(request, ActionListener.wrap(mergeIndexResp -> {
actionListener.onResponse(AcknowledgedResponse.TRUE);
recordLatencyOnSuccess(startTime);
}, t -> {
/*
* At this point downsample index has been created
* successfully even if force merge failed.
* So, we should not fail the downsample operation.
*/
logger.error("Failed to force-merge downsample index [" + downsampleIndexName + "]", t);
actionListener.onResponse(AcknowledgedResponse.TRUE);
recordLatencyOnSuccess(startTime);
}));
}

@Override
public void onFailure(Exception e) {
recordLatencyOnSuccess(startTime);
this.actionListener.onFailure(e);
}

Expand Down

0 comments on commit a2af99c

Please sign in to comment.