Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137409.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137409
summary: Coordinator phase duration APM metric attributes
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -103,6 +104,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final int skippedCount;
private final TransportVersion mintransportVersion;
protected final SearchResponseMetrics searchResponseMetrics;
protected final Map<String, Object> searchRequestAttributes;
protected long phaseStartTimeInNanos;

// protected for tests
Expand All @@ -127,7 +129,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters,
SearchResponseMetrics searchResponseMetrics
SearchResponseMetrics searchResponseMetrics,
Map<String, Object> searchRequestAttributes
) {
super(name);
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down Expand Up @@ -171,6 +174,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
addReleasable(resultConsumer);
this.clusters = clusters;
this.searchResponseMetrics = searchResponseMetrics;
this.searchRequestAttributes = searchRequestAttributes;
}

protected void notifyListShards(
Expand Down Expand Up @@ -761,7 +765,7 @@ void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connecti
* @see #onShardResult(SearchPhaseResult)
*/
private void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
searchResponseMetrics.recordSearchPhaseDuration(getName(), System.nanoTime() - phaseStartTimeInNanos);
searchResponseMetrics.recordSearchPhaseDuration(getName(), System.nanoTime() - phaseStartTimeInNanos, searchRequestAttributes);
executeNextPhase(getName(), this::getNextPhase);
}

Expand All @@ -787,6 +791,13 @@ public SearchResponseMetrics getSearchResponseMetrics() {
return searchResponseMetrics;
}

/**
* Returns search request attributes used to record attributes for search phase timings in an immutable map.
*/
public Map<String, Object> getSearchRequestAttributes() {
return Collections.unmodifiableMap(searchRequestAttributes);
}

public final void execute(Runnable command) {
executor.execute(command);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public static SubscribableListener<List<SearchShardIterator>> execute(
SearchTask task,
boolean requireAtLeastOneMatch,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
SearchResponseMetrics searchResponseMetrics
SearchResponseMetrics searchResponseMetrics,
Map<String, Object> searchRequestAttributes
) {
if (shardsIts.isEmpty()) {
return SubscribableListener.newSucceeded(List.of());
Expand All @@ -151,7 +152,11 @@ public static SubscribableListener<List<SearchShardIterator>> execute(
listener.addListener(new ActionListener<>() {
@Override
public void onResponse(List<SearchShardIterator> shardsIts) {
searchResponseMetrics.recordSearchPhaseDuration(PHASE_NAME, System.nanoTime() - phaseStartTimeInNanos);
searchResponseMetrics.recordSearchPhaseDuration(
PHASE_NAME,
System.nanoTime() - phaseStartTimeInNanos,
searchRequestAttributes
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public void onFailure(Exception exception) {
}

private void onFinish(AggregatedDfs dfs) {
context.getSearchResponseMetrics().recordSearchPhaseDuration(getName(), System.nanoTime() - phaseStartTimeInNanos);
context.getSearchResponseMetrics()
.recordSearchPhaseDuration(getName(), System.nanoTime() - phaseStartTimeInNanos, context.getSearchRequestAttributes());
context.executeNextPhase(NAME, () -> nextPhase(dfs));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ private void moveToNextPhase(
SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
long phaseStartTimeInNanos
) {
context.getSearchResponseMetrics().recordSearchPhaseDuration(getName(), System.nanoTime() - phaseStartTimeInNanos);
context.getSearchResponseMetrics()
.recordSearchPhaseDuration(getName(), System.nanoTime() - phaseStartTimeInNanos, context.getSearchRequestAttributes());
context.executeNextPhase(NAME, () -> {
var resp = SearchPhaseController.merge(context.getRequest().scroll() != null, reducedQueryPhase, fetchResultsArr);
context.addReleasable(resp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
SearchTask task,
SearchResponse.Clusters clusters,
Client client,
SearchResponseMetrics searchResponseMetrics
SearchResponseMetrics searchResponseMetrics,
Map<String, Object> searchRequestAttributes
) {
super(
"dfs",
Expand All @@ -69,7 +70,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters,
searchResponseMetrics
searchResponseMetrics,
searchRequestAttributes
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
addReleasable(queryPhaseResultConsumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
SearchResponse.Clusters clusters,
Client client,
boolean batchQueryPhase,
SearchResponseMetrics searchResponseMetrics
SearchResponseMetrics searchResponseMetrics,
Map<String, Object> searchRequestAttributes
) {
super(
"query",
Expand All @@ -137,7 +138,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters,
searchResponseMetrics
searchResponseMetrics,
searchRequestAttributes
);
this.topDocsSize = getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ public void runNewSearchPhase(
Map<String, Float> concreteIndexBoosts,
boolean preFilter,
ThreadPool threadPool,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
Map<String, Object> searchRequestAttributes
) {
// Note: remote shards are prefiltered via can match as part of search shards. They don't need additional pre-filtering and
// that is signaled to the local can match through the SearchShardIterator#prefiltered flag. Local shards do need to go
Expand All @@ -185,7 +186,8 @@ public void runNewSearchPhase(
task,
false,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
searchResponseMetrics
searchResponseMetrics,
searchRequestAttributes
)
.addListener(
listener.delegateFailureAndWrap(
Expand All @@ -199,7 +201,8 @@ public void runNewSearchPhase(
clusterState,
aliasFilter,
concreteIndexBoosts,
clusters
clusters,
searchRequestAttributes
)
)
);
Expand All @@ -214,7 +217,8 @@ public void runNewSearchPhase(
clusterState,
aliasFilter,
concreteIndexBoosts,
clusters
clusters,
searchRequestAttributes
);
}
}
Expand All @@ -229,7 +233,8 @@ void runOpenPointInTimePhase(
ClusterState clusterState,
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
Map<String, Object> searchRequestAttributes
) {
assert searchRequest.getMaxConcurrentShardRequests() == pitRequest.maxConcurrentShardRequests()
: searchRequest.getMaxConcurrentShardRequests() + " != " + pitRequest.maxConcurrentShardRequests();
Expand All @@ -252,7 +257,8 @@ void runOpenPointInTimePhase(
new ArraySearchPhaseResults<>(shardIterators.size()),
searchRequest.getMaxConcurrentShardRequests(),
clusters,
searchResponseMetrics
searchResponseMetrics,
searchRequestAttributes
) {
@Override
protected void executePhaseOnShard(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,10 @@ private void executeSearch(
localShardIterators.size() + remoteShardIterators.size(),
defaultPreFilterShardSize
);
final Map<String, Object> searchRequestAttributes = SearchRequestAttributesExtractor.extractAttributes(
searchRequest,
concreteLocalIndices
);
searchPhaseProvider.runNewSearchPhase(
task,
searchRequest,
Expand All @@ -1629,7 +1633,8 @@ private void executeSearch(
concreteIndexBoosts,
preFilterSearchShards,
threadPool,
clusters
clusters,
searchRequestAttributes
);
}

Expand Down Expand Up @@ -1747,7 +1752,8 @@ void runNewSearchPhase(
Map<String, Float> concreteIndexBoosts,
boolean preFilter,
ThreadPool threadPool,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
Map<String, Object> searchRequestAttributes
);
}

Expand All @@ -1771,7 +1777,8 @@ public void runNewSearchPhase(
Map<String, Float> concreteIndexBoosts,
boolean preFilter,
ThreadPool threadPool,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
Map<String, Object> searchRequestAttributes
) {
if (preFilter) {
// only for aggs we need to contact shards even if there are no matches
Expand All @@ -1789,7 +1796,8 @@ public void runNewSearchPhase(
task,
requireAtLeastOneMatch,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
searchResponseMetrics
searchResponseMetrics,
searchRequestAttributes
)
.addListener(
listener.delegateFailureAndWrap(
Expand All @@ -1805,7 +1813,8 @@ public void runNewSearchPhase(
concreteIndexBoosts,
false,
threadPool,
clusters
clusters,
searchRequestAttributes
)
)
);
Expand Down Expand Up @@ -1849,7 +1858,8 @@ public void runNewSearchPhase(
task,
clusters,
client,
searchResponseMetrics
searchResponseMetrics,
searchRequestAttributes
);
} else {
assert searchRequest.searchType() == QUERY_THEN_FETCH : searchRequest.searchType();
Expand All @@ -1871,7 +1881,8 @@ public void runNewSearchPhase(
clusters,
client,
searchService.batchQueryPhase(),
searchResponseMetrics
searchResponseMetrics,
searchRequestAttributes
);
}
success = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
)
);
} else {
final Map<String, Object> searchRequestAttributes = SearchRequestAttributesExtractor.extractAttributes(
searchRequest,
concreteIndexNames
);
CanMatchPreFilterSearchPhase.execute(logger, searchTransportService, (clusterAlias, node) -> {
assert Objects.equals(clusterAlias, searchShardsRequest.clusterAlias());
return transportService.getConnection(project.cluster().nodes().get(node));
Expand All @@ -185,7 +189,8 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
(SearchTask) task,
false,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
searchResponseMetrics
searchResponseMetrics,
searchRequestAttributes
)
.addListener(
delegate.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ public void incrementResponseCount(ResponseCountTotalStatus responseCountTotalSt
responseCountTotalCounter.incrementBy(1L, attributesWithStatus);
}

public void recordSearchPhaseDuration(String phaseName, long tookInNanos) {
public void recordSearchPhaseDuration(String phaseName, long tookInNanos, Map<String, Object> attributes) {
LongHistogram queryPhaseDurationHistogram = phaseNameToDurationHistogram.get(phaseName);
assert queryPhaseDurationHistogram != null;
queryPhaseDurationHistogram.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos));
queryPhaseDurationHistogram.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos), attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
results,
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY,
Mockito.mock(SearchResponseMetrics.class)
Mockito.mock(SearchResponseMetrics.class),
Map.of()
) {
@Override
protected SearchPhase getNextPhase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ public void sendCanMatch(
null,
true,
EMPTY_CONTEXT_PROVIDER,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()),
Map.of()
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -260,7 +261,8 @@ public void sendCanMatch(
null,
true,
EMPTY_CONTEXT_PROVIDER,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()),
Map.of()
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -352,7 +354,8 @@ public void sendCanMatch(
null,
true,
EMPTY_CONTEXT_PROVIDER,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()),
Map.of()
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -452,7 +455,8 @@ public void sendCanMatch(
null,
shardsIter.size() > shardToSkip.size(),
EMPTY_CONTEXT_PROVIDER,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()),
Map.of()
).addListener(ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
Expand Down Expand Up @@ -1425,7 +1429,8 @@ public void sendCanMatch(
null,
true,
contextProvider,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()),
Map.of()
),
requests
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -69,7 +70,8 @@ public MockSearchPhaseContext(int numShards) {
new ArraySearchPhaseResults<>(numShards),
5,
null,
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry())
new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()),
Map.of()
);
this.numShards = numShards;
numSuccess = new AtomicInteger(numShards);
Expand Down
Loading