Skip to content

Commit

Permalink
Add enrich node cache (#76800)
Browse files Browse the repository at this point in the history
Introduce a LRU cache to avoid searches that occur frequently
from the enrich processor.

Relates to #48988
  • Loading branch information
martijnvg committed Sep 3, 2021
1 parent 1f15764 commit 1ae4f3c
Show file tree
Hide file tree
Showing 16 changed files with 703 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@

public final class StatsResponse {

private static ParseField EXECUTING_POLICIES_FIELD = new ParseField("executing_policies");
private static ParseField COORDINATOR_STATS_FIELD = new ParseField("coordinator_stats");
private static final ParseField EXECUTING_POLICIES_FIELD = new ParseField("executing_policies");
private static final ParseField COORDINATOR_STATS_FIELD = new ParseField("coordinator_stats");
private static final ParseField CACHE_STATS_FIELD = new ParseField("cache_stats");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<StatsResponse, Void> PARSER = new ConstructingObjectParser<>(
"stats_response",
true,
args -> new StatsResponse((List<ExecutingPolicy>) args[0], (List<CoordinatorStats>) args[1])
args -> new StatsResponse((List<ExecutingPolicy>) args[0], (List<CoordinatorStats>) args[1], (List<CacheStats>) args[2])
);

static {
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), ExecutingPolicy.PARSER::apply, EXECUTING_POLICIES_FIELD);
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), CoordinatorStats.PARSER::apply, COORDINATOR_STATS_FIELD);
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), CacheStats.PARSER::apply, CACHE_STATS_FIELD);
}

public static StatsResponse fromXContent(XContentParser parser) {
Expand All @@ -38,10 +40,12 @@ public static StatsResponse fromXContent(XContentParser parser) {

private final List<ExecutingPolicy> executingPolicies;
private final List<CoordinatorStats> coordinatorStats;
private final List<CacheStats> cacheStats;

public StatsResponse(List<ExecutingPolicy> executingPolicies, List<CoordinatorStats> coordinatorStats) {
public StatsResponse(List<ExecutingPolicy> executingPolicies, List<CoordinatorStats> coordinatorStats, List<CacheStats> cacheStats) {
this.executingPolicies = executingPolicies;
this.coordinatorStats = coordinatorStats;
this.cacheStats = cacheStats;
}

public List<ExecutingPolicy> getExecutingPolicies() {
Expand All @@ -52,6 +56,10 @@ public List<CoordinatorStats> getCoordinatorStats() {
return coordinatorStats;
}

public List<CacheStats> getCacheStats() {
return cacheStats;
}

public static final class CoordinatorStats {

static ParseField NODE_ID_FIELD = new ParseField("node_id");
Expand Down Expand Up @@ -177,4 +185,74 @@ public int hashCode() {
}
}

public static final class CacheStats {

static ParseField NODE_ID_FIELD = new ParseField("node_id");
static ParseField COUNT_FIELD = new ParseField("count");
static ParseField HITS_FIELD = new ParseField("hits");
static ParseField MISSES_FIELD = new ParseField("misses");
static ParseField EVICTIONS_FIELD = new ParseField("evictions");

private static final ConstructingObjectParser<CacheStats, Void> PARSER = new ConstructingObjectParser<>(
"coordinator_stats_item",
true,
args -> new CacheStats((String) args[0], (long) args[1], (long) args[2], (long) args[3], (long) args[4])
);

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), COUNT_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), HITS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), MISSES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), EVICTIONS_FIELD);
}

private final String nodeId;
private final long count;
private final long hits;
private final long misses;
private final long evictions;

public CacheStats(String nodeId, long count, long hits, long misses, long evictions) {
this.nodeId = nodeId;
this.count = count;
this.hits = hits;
this.misses = misses;
this.evictions = evictions;
}

public String getNodeId() {
return nodeId;
}

public long getCount() {
return count;
}

public long getHits() {
return hits;
}

public long getMisses() {
return misses;
}

public long getEvictions() {
return evictions;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheStats that = (CacheStats) o;
return count == that.count && hits == that.hits && misses == that.misses &&
evictions == that.evictions && nodeId.equals(that.nodeId);
}

@Override
public int hashCode() {
return Objects.hash(nodeId, count, hits, misses, evictions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,19 @@ protected EnrichStatsAction.Response createServerTestInstance(XContentType xCont
}
int numCoordinatingStats = randomIntBetween(0, 16);
List<EnrichStatsAction.Response.CoordinatorStats> coordinatorStats = new ArrayList<>(numCoordinatingStats);
List<EnrichStatsAction.Response.CacheStats> cacheStats = new ArrayList<>(numCoordinatingStats);
for (int i = 0; i < numCoordinatingStats; i++) {
String nodeId = randomAlphaOfLength(4);
EnrichStatsAction.Response.CoordinatorStats stats = new EnrichStatsAction.Response.CoordinatorStats(
randomAlphaOfLength(4), randomIntBetween(0, 8096), randomIntBetween(0, 8096), randomNonNegativeLong(),
nodeId, randomIntBetween(0, 8096), randomIntBetween(0, 8096), randomNonNegativeLong(),
randomNonNegativeLong());
coordinatorStats.add(stats);
cacheStats.add(
new EnrichStatsAction.Response.CacheStats(nodeId, randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong())
);
}
return new EnrichStatsAction.Response(executingPolicies, coordinatorStats);
return new EnrichStatsAction.Response(executingPolicies, coordinatorStats, cacheStats);
}

@Override
Expand Down Expand Up @@ -68,6 +74,17 @@ protected void assertInstances(EnrichStatsAction.Response serverTestInstance, St
assertThat(actual.getRemoteRequestsTotal(), equalTo(expected.getRemoteRequestsTotal()));
assertThat(actual.getExecutedSearchesTotal(), equalTo(expected.getExecutedSearchesTotal()));
}

assertThat(clientInstance.getCacheStats().size(), equalTo(serverTestInstance.getCacheStats().size()));
for (int i = 0; i < clientInstance.getCacheStats().size(); i++) {
StatsResponse.CacheStats actual = clientInstance.getCacheStats().get(i);
EnrichStatsAction.Response.CacheStats expected = serverTestInstance.getCacheStats().get(i);
assertThat(actual.getNodeId(), equalTo(expected.getNodeId()));
assertThat(actual.getCount(), equalTo(expected.getCount()));
assertThat(actual.getHits(), equalTo(expected.getHits()));
assertThat(actual.getMisses(), equalTo(expected.getMisses()));
assertThat(actual.getEvictions(), equalTo(expected.getEvictions()));
}
}

private static TaskInfo randomTaskInfo() {
Expand Down
43 changes: 43 additions & 0 deletions docs/reference/ingest/apis/enrich/enrich-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,36 @@ that enrich processors have executed
since node startup.
--

`cache_stats`::
+
--
(Array of objects)
Objects containing information about the enrich
cache stats on each ingest node.

Returned parameters include:

`node_id`::
(String)
ID of the ingest node with a enrich cache.

`count`::
(Integer)
Number of cached entries.

`hits`::
(Integer)
The number of enrich lookups served from cache.

`missed`::
(Integer)
The number of time enrich lookups couldn't be
served from cache.

`evictions`::
(Integer)
The number cache entries evicted from the cache.
--

[[enrich-stats-api-example]]
==== {api-examples-title}
Expand Down Expand Up @@ -126,10 +156,23 @@ The API returns the following response:
"remote_requests_total": 0,
"executed_searches_total": 0
}
],
"cache_stats": [
{
"node_id": "1sFM8cmSROZYhPxVsiWew",
"count": 0,
"hits": 0,
"misses": 0,
"evictions": 0
}
]
}
----
// TESTRESPONSE[s/"executing_policies": \[[^\]]*\]/"executing_policies": $body.$_path/]
// TESTRESPONSE[s/"node_id": "1sFM8cmSROZYhPxVsiWew"/"node_id" : $body.coordinator_stats.0.node_id/]
// TESTRESPONSE[s/"remote_requests_total": 0/"remote_requests_total" : $body.coordinator_stats.0.remote_requests_total/]
// TESTRESPONSE[s/"executed_searches_total": 0/"executed_searches_total" : $body.coordinator_stats.0.executed_searches_total/]
// TESTRESPONSE[s/"node_id": "1sFM8cmSROZYhPxVsiWew"/"node_id" : $body.cache_stats.0.node_id/]
// TESTRESPONSE[s/"count": 0/"count" : $body.cache_stats.0.count/]
// TESTRESPONSE[s/"misses": 0/"misses" : $body.cache_stats.0.misses/]
// TESTRESPONSE[s/"evictions": 0/"evictions" : $body.cache_stats.0.evictions/]
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.core.enrich.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
Expand Down Expand Up @@ -50,16 +51,19 @@ public static class Response extends ActionResponse implements ToXContentObject

private final List<ExecutingPolicy> executingPolicies;
private final List<CoordinatorStats> coordinatorStats;
private final List<CacheStats> cacheStats;

public Response(List<ExecutingPolicy> executingPolicies, List<CoordinatorStats> coordinatorStats) {
public Response(List<ExecutingPolicy> executingPolicies, List<CoordinatorStats> coordinatorStats, List<CacheStats> cacheStats) {
this.executingPolicies = executingPolicies;
this.coordinatorStats = coordinatorStats;
this.cacheStats = cacheStats;
}

public Response(StreamInput in) throws IOException {
super(in);
executingPolicies = in.readList(ExecutingPolicy::new);
coordinatorStats = in.readList(CoordinatorStats::new);
cacheStats = in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readList(CacheStats::new) : null;
}

public List<ExecutingPolicy> getExecutingPolicies() {
Expand All @@ -70,10 +74,17 @@ public List<CoordinatorStats> getCoordinatorStats() {
return coordinatorStats;
}

public List<CacheStats> getCacheStats() {
return cacheStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(executingPolicies);
out.writeList(coordinatorStats);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeList(cacheStats);
}
}

@Override
Expand All @@ -93,6 +104,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.endObject();
}
builder.endArray();
if (cacheStats != null) {
builder.startArray("cache_stats");
for (CacheStats cacheStat : cacheStats) {
builder.startObject();
cacheStat.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
}
builder.endObject();
return builder;
}
Expand All @@ -103,12 +123,13 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return executingPolicies.equals(response.executingPolicies) &&
coordinatorStats.equals(response.coordinatorStats);
coordinatorStats.equals(response.coordinatorStats) &&
Objects.equals(cacheStats, response.cacheStats);
}

@Override
public int hashCode() {
return Objects.hash(executingPolicies, coordinatorStats);
return Objects.hash(executingPolicies, coordinatorStats, cacheStats);
}

public static class CoordinatorStats implements Writeable, ToXContentFragment {
Expand Down Expand Up @@ -245,6 +266,80 @@ public int hashCode() {
return Objects.hash(name, taskInfo);
}
}

public static class CacheStats implements Writeable, ToXContentFragment {

private final String nodeId;
private final long count;
private final long hits;
private final long misses;
private final long evictions;

public CacheStats(String nodeId, long count, long hits, long misses, long evictions) {
this.nodeId = nodeId;
this.count = count;
this.hits = hits;
this.misses = misses;
this.evictions = evictions;
}

public CacheStats(StreamInput in) throws IOException {
this(in.readString(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
}

public String getNodeId() {
return nodeId;
}

public long getCount() {
return count;
}

public long getHits() {
return hits;
}

public long getMisses() {
return misses;
}

public long getEvictions() {
return evictions;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("node_id", nodeId);
builder.field("count", count);
builder.field("hits", hits);
builder.field("misses", misses);
builder.field("evictions", evictions);
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(nodeId);
out.writeVLong(count);
out.writeVLong(hits);
out.writeVLong(misses);
out.writeVLong(evictions);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheStats that = (CacheStats) o;
return count == that.count && hits == that.hits && misses == that.misses && evictions == that.evictions &&
nodeId.equals(that.nodeId);
}

@Override
public int hashCode() {
return Objects.hash(nodeId, count, hits, misses, evictions);
}
}
}

}

0 comments on commit 1ae4f3c

Please sign in to comment.