Skip to content

Commit

Permalink
Converting the inner record-like classes in EnrichStatsAction to reco…
Browse files Browse the repository at this point in the history
…rds (#107594)
  • Loading branch information
masseyke committed Apr 18, 2024
1 parent 8adc292 commit cad5d90
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,52 +128,18 @@ public int hashCode() {
return Objects.hash(executingPolicies, coordinatorStats, cacheStats);
}

public static class CoordinatorStats implements Writeable, ToXContentFragment {

private final String nodeId;
private final int queueSize;
private final int remoteRequestsCurrent;
private final long remoteRequestsTotal;
private final long executedSearchesTotal;

public CoordinatorStats(
String nodeId,
int queueSize,
int remoteRequestsCurrent,
long remoteRequestsTotal,
long executedSearchesTotal
) {
this.nodeId = nodeId;
this.queueSize = queueSize;
this.remoteRequestsCurrent = remoteRequestsCurrent;
this.remoteRequestsTotal = remoteRequestsTotal;
this.executedSearchesTotal = executedSearchesTotal;
}
public record CoordinatorStats(
String nodeId,
int queueSize,
int remoteRequestsCurrent,
long remoteRequestsTotal,
long executedSearchesTotal
) implements Writeable, ToXContentFragment {

public CoordinatorStats(StreamInput in) throws IOException {
this(in.readString(), in.readVInt(), in.readVInt(), in.readVLong(), in.readVLong());
}

public String getNodeId() {
return nodeId;
}

public int getQueueSize() {
return queueSize;
}

public int getRemoteRequestsCurrent() {
return remoteRequestsCurrent;
}

public long getRemoteRequestsTotal() {
return remoteRequestsTotal;
}

public long getExecutedSearchesTotal() {
return executedSearchesTotal;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(nodeId);
Expand All @@ -192,47 +158,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("executed_searches_total", executedSearchesTotal);
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CoordinatorStats stats = (CoordinatorStats) o;
return Objects.equals(nodeId, stats.nodeId)
&& queueSize == stats.queueSize
&& remoteRequestsCurrent == stats.remoteRequestsCurrent
&& remoteRequestsTotal == stats.remoteRequestsTotal
&& executedSearchesTotal == stats.executedSearchesTotal;
}

@Override
public int hashCode() {
return Objects.hash(nodeId, queueSize, remoteRequestsCurrent, remoteRequestsTotal, executedSearchesTotal);
}
}

public static class ExecutingPolicy implements Writeable, ToXContentFragment {

private final String name;
private final TaskInfo taskInfo;

public ExecutingPolicy(String name, TaskInfo taskInfo) {
this.name = name;
this.taskInfo = taskInfo;
}
public record ExecutingPolicy(String name, TaskInfo taskInfo) implements Writeable, ToXContentFragment {

ExecutingPolicy(StreamInput in) throws IOException {
this(in.readString(), TaskInfo.from(in));
}

public String getName() {
return name;
}

public TaskInfo getTaskInfo() {
return taskInfo;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
Expand All @@ -249,61 +182,17 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExecutingPolicy that = (ExecutingPolicy) o;
return name.equals(that.name) && taskInfo.equals(that.taskInfo);
}

@Override
public int hashCode() {
return Objects.hash(name, taskInfo);
}
}

public static class CacheStats implements Writeable, ToXContentFragment {

private final String nodeId;
private final long count;
private final long hits;
private final long misses;
private final long evictions;

public CacheStats(String nodeId, long count, long hits, long misses, long evictions) {
this.nodeId = nodeId;
this.count = count;
this.hits = hits;
this.misses = misses;
this.evictions = evictions;
}
public record CacheStats(String nodeId, long count, long hits, long misses, long evictions)
implements
Writeable,
ToXContentFragment {

public CacheStats(StreamInput in) throws IOException {
this(in.readString(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
}

public String getNodeId() {
return nodeId;
}

public long getCount() {
return count;
}

public long getHits() {
return hits;
}

public long getMisses() {
return misses;
}

public long getEvictions() {
return evictions;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("node_id", nodeId);
Expand All @@ -322,23 +211,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(misses);
out.writeVLong(evictions);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheStats that = (CacheStats) o;
return count == that.count
&& hits == that.hits
&& misses == that.misses
&& evictions == that.evictions
&& nodeId.equals(that.nodeId);
}

@Override
public int hashCode() {
return Objects.hash(nodeId, count, hits, misses, evictions);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,12 @@ private static void enrich(Map<String, List<String>> keys, String coordinatingNo
.actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(internalCluster().size()));
String nodeId = getNodeId(coordinatingNode);
CoordinatorStats stats = statsResponse.getCoordinatorStats().stream().filter(s -> s.getNodeId().equals(nodeId)).findAny().get();
assertThat(stats.getNodeId(), equalTo(nodeId));
assertThat(stats.getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
CoordinatorStats stats = statsResponse.getCoordinatorStats().stream().filter(s -> s.nodeId().equals(nodeId)).findAny().get();
assertThat(stats.nodeId(), equalTo(nodeId));
assertThat(stats.remoteRequestsTotal(), greaterThanOrEqualTo(1L));
// 'numDocs' lookups are done, but not 'numDocs' searches, because searches may get cached:
// and not all enrichments may happen via the same node.
assertThat(stats.getExecutedSearchesTotal(), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo((long) numDocs)));
assertThat(stats.executedSearchesTotal(), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo((long) numDocs)));
}

private static List<String> createSourceIndex(int numDocs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public void testEnrichCacheValuesCannotBeCorrupted() {
var statsRequest = new EnrichStatsAction.Request();
var statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(statsResponse.getCacheStats().size(), equalTo(1));
assertThat(statsResponse.getCacheStats().get(0).getCount(), equalTo(0L));
assertThat(statsResponse.getCacheStats().get(0).getMisses(), equalTo(0L));
assertThat(statsResponse.getCacheStats().get(0).getHits(), equalTo(0L));
assertThat(statsResponse.getCacheStats().get(0).count(), equalTo(0L));
assertThat(statsResponse.getCacheStats().get(0).misses(), equalTo(0L));
assertThat(statsResponse.getCacheStats().get(0).hits(), equalTo(0L));

String policyName = "device-enrich-policy";
String sourceIndexName = "devices-idx";
Expand Down Expand Up @@ -128,9 +128,9 @@ public void testEnrichCacheValuesCannotBeCorrupted() {
// Verify that there was a cache miss and a new entry was added to enrich cache.
statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(statsResponse.getCacheStats().size(), equalTo(1));
assertThat(statsResponse.getCacheStats().get(0).getCount(), equalTo(1L));
assertThat(statsResponse.getCacheStats().get(0).getMisses(), equalTo(1L));
assertThat(statsResponse.getCacheStats().get(0).getHits(), equalTo(0L));
assertThat(statsResponse.getCacheStats().get(0).count(), equalTo(1L));
assertThat(statsResponse.getCacheStats().get(0).misses(), equalTo(1L));
assertThat(statsResponse.getCacheStats().get(0).hits(), equalTo(0L));

simulatePipelineRequest = new SimulatePipelineRequest(new BytesArray("""
{
Expand Down Expand Up @@ -164,9 +164,9 @@ public void testEnrichCacheValuesCannotBeCorrupted() {
// Verify that enrich lookup was served from cache:
statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(statsResponse.getCacheStats().size(), equalTo(1));
assertThat(statsResponse.getCacheStats().get(0).getCount(), equalTo(1L));
assertThat(statsResponse.getCacheStats().get(0).getMisses(), equalTo(1L));
assertThat(statsResponse.getCacheStats().get(0).getHits(), equalTo(1L));
assertThat(statsResponse.getCacheStats().get(0).count(), equalTo(1L));
assertThat(statsResponse.getCacheStats().get(0).misses(), equalTo(1L));
assertThat(statsResponse.getCacheStats().get(0).hits(), equalTo(1L));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,21 @@ protected void masterOperation(
List<CoordinatorStats> coordinatorStats = response.getNodes()
.stream()
.map(EnrichCoordinatorStatsAction.NodeResponse::getCoordinatorStats)
.sorted(Comparator.comparing(CoordinatorStats::getNodeId))
.sorted(Comparator.comparing(CoordinatorStats::nodeId))
.collect(Collectors.toList());
List<ExecutingPolicy> policyExecutionTasks = taskManager.getTasks()
.values()
.stream()
.filter(t -> t.getAction().equals(EnrichPolicyExecutor.TASK_ACTION))
.map(t -> t.taskInfo(clusterService.localNode().getId(), true))
.map(t -> new ExecutingPolicy(t.description(), t))
.sorted(Comparator.comparing(ExecutingPolicy::getName))
.sorted(Comparator.comparing(ExecutingPolicy::name))
.collect(Collectors.toList());
List<EnrichStatsAction.Response.CacheStats> cacheStats = response.getNodes()
.stream()
.map(EnrichCoordinatorStatsAction.NodeResponse::getCacheStats)
.filter(Objects::nonNull)
.sorted(Comparator.comparing(EnrichStatsAction.Response.CacheStats::getNodeId))
.sorted(Comparator.comparing(EnrichStatsAction.Response.CacheStats::nodeId))
.collect(Collectors.toList());
delegate.onResponse(new EnrichStatsAction.Response(policyExecutionTasks, coordinatorStats, cacheStats));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ public void testIngestDataWithMatchProcessor() {
.actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1));
String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId();
assertThat(statsResponse.getCoordinatorStats().get(0).getNodeId(), equalTo(localNodeId));
assertThat(statsResponse.getCoordinatorStats().get(0).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), equalTo((long) numDocs));
assertThat(statsResponse.getCoordinatorStats().get(0).nodeId(), equalTo(localNodeId));
assertThat(statsResponse.getCoordinatorStats().get(0).remoteRequestsTotal(), greaterThanOrEqualTo(1L));
assertThat(statsResponse.getCoordinatorStats().get(0).executedSearchesTotal(), equalTo((long) numDocs));
}

public void testIngestDataWithGeoMatchProcessor() {
Expand Down Expand Up @@ -230,9 +230,9 @@ public void testIngestDataWithGeoMatchProcessor() {
.actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1));
String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId();
assertThat(statsResponse.getCoordinatorStats().get(0).getNodeId(), equalTo(localNodeId));
assertThat(statsResponse.getCoordinatorStats().get(0).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), equalTo(1L));
assertThat(statsResponse.getCoordinatorStats().get(0).nodeId(), equalTo(localNodeId));
assertThat(statsResponse.getCoordinatorStats().get(0).remoteRequestsTotal(), greaterThanOrEqualTo(1L));
assertThat(statsResponse.getCoordinatorStats().get(0).executedSearchesTotal(), equalTo(1L));
}

public void testMultiplePolicies() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,27 +84,27 @@ public void testCaching() {
enrichCache.put(searchRequest2, searchResponse);
enrichCache.put(searchRequest3, searchResponse);
var cacheStats = enrichCache.getStats("_id");
assertThat(cacheStats.getCount(), equalTo(3L));
assertThat(cacheStats.getHits(), equalTo(0L));
assertThat(cacheStats.getMisses(), equalTo(0L));
assertThat(cacheStats.getEvictions(), equalTo(0L));
assertThat(cacheStats.count(), equalTo(3L));
assertThat(cacheStats.hits(), equalTo(0L));
assertThat(cacheStats.misses(), equalTo(0L));
assertThat(cacheStats.evictions(), equalTo(0L));

assertThat(enrichCache.get(searchRequest1), notNullValue());
assertThat(enrichCache.get(searchRequest2), notNullValue());
assertThat(enrichCache.get(searchRequest3), notNullValue());
assertThat(enrichCache.get(searchRequest4), nullValue());
cacheStats = enrichCache.getStats("_id");
assertThat(cacheStats.getCount(), equalTo(3L));
assertThat(cacheStats.getHits(), equalTo(3L));
assertThat(cacheStats.getMisses(), equalTo(1L));
assertThat(cacheStats.getEvictions(), equalTo(0L));
assertThat(cacheStats.count(), equalTo(3L));
assertThat(cacheStats.hits(), equalTo(3L));
assertThat(cacheStats.misses(), equalTo(1L));
assertThat(cacheStats.evictions(), equalTo(0L));

enrichCache.put(searchRequest4, searchResponse);
cacheStats = enrichCache.getStats("_id");
assertThat(cacheStats.getCount(), equalTo(3L));
assertThat(cacheStats.getHits(), equalTo(3L));
assertThat(cacheStats.getMisses(), equalTo(1L));
assertThat(cacheStats.getEvictions(), equalTo(1L));
assertThat(cacheStats.count(), equalTo(3L));
assertThat(cacheStats.hits(), equalTo(3L));
assertThat(cacheStats.misses(), equalTo(1L));
assertThat(cacheStats.evictions(), equalTo(1L));

// Simulate enrich policy execution, which should make current cache entries unused.
metadata = Metadata.builder()
Expand Down Expand Up @@ -142,10 +142,10 @@ public void testCaching() {
assertThat(enrichCache.get(searchRequest3), notNullValue());
assertThat(enrichCache.get(searchRequest4), nullValue());
cacheStats = enrichCache.getStats("_id");
assertThat(cacheStats.getCount(), equalTo(3L));
assertThat(cacheStats.getHits(), equalTo(6L));
assertThat(cacheStats.getMisses(), equalTo(6L));
assertThat(cacheStats.getEvictions(), equalTo(4L));
assertThat(cacheStats.count(), equalTo(3L));
assertThat(cacheStats.hits(), equalTo(6L));
assertThat(cacheStats.misses(), equalTo(6L));
assertThat(cacheStats.evictions(), equalTo(4L));
}

public void testPutIfAbsent() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,10 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
assertThat(failure[0], nullValue());
assertThat(result[0], notNullValue());
assertThat(requestCounter[0], equalTo(1));
assertThat(enrichCache.getStats("_id").getCount(), equalTo(1L));
assertThat(enrichCache.getStats("_id").getMisses(), equalTo(1L));
assertThat(enrichCache.getStats("_id").getHits(), equalTo(0L));
assertThat(enrichCache.getStats("_id").getEvictions(), equalTo(0L));
assertThat(enrichCache.getStats("_id").count(), equalTo(1L));
assertThat(enrichCache.getStats("_id").misses(), equalTo(1L));
assertThat(enrichCache.getStats("_id").hits(), equalTo(0L));
assertThat(enrichCache.getStats("_id").evictions(), equalTo(0L));

// No search is performed, result is read from the cache:
result[0] = null;
Expand All @@ -318,10 +318,10 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
assertThat(failure[0], nullValue());
assertThat(result[0], notNullValue());
assertThat(requestCounter[0], equalTo(1));
assertThat(enrichCache.getStats("_id").getCount(), equalTo(1L));
assertThat(enrichCache.getStats("_id").getMisses(), equalTo(1L));
assertThat(enrichCache.getStats("_id").getHits(), equalTo(1L));
assertThat(enrichCache.getStats("_id").getEvictions(), equalTo(0L));
assertThat(enrichCache.getStats("_id").count(), equalTo(1L));
assertThat(enrichCache.getStats("_id").misses(), equalTo(1L));
assertThat(enrichCache.getStats("_id").hits(), equalTo(1L));
assertThat(enrichCache.getStats("_id").evictions(), equalTo(0L));
}
}

Expand Down

0 comments on commit cad5d90

Please sign in to comment.