Skip to content

Commit

Permalink
Add a snapshot-release-only plugin to track lucene directory seeks (#…
Browse files Browse the repository at this point in the history
…94686)

This plugin will report the number of times seek is called on an IndexInput
for each file in each segment of each lucene index in an elasticsearch cluster.
The plugin is disabled by default, and can be enabled on a node by adding
seektracker.enabled=true to elasticsearch.yml.

Stats are made available via a new endpoint, /_seek_stats or
/{index}/_seek_stats
  • Loading branch information
romseygeek committed Mar 24, 2023
1 parent dc6ccbb commit f9b6086
Show file tree
Hide file tree
Showing 13 changed files with 879 additions and 0 deletions.
14 changes: 14 additions & 0 deletions test/external-modules/seek-tracking-directory/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

esplugin {
description 'A test module that tracks seeks in lucene Directories'
classname 'org.elasticsearch.test.seektracker.SeekTrackerPlugin'
}

apply plugin: 'elasticsearch.internal-cluster-test'
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test.seektracker;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThan;

public class SeekTrackerPluginIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(SeekTrackerPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(SeekTrackerPlugin.SEEK_TRACKING_ENABLED.getKey(), "true")
.build();
}

public void testSeekTrackerPlugin() throws InterruptedException {

assertAcked(client().admin().indices().prepareCreate("index"));
List<IndexRequestBuilder> docs = new ArrayList<>();
for (int i = 0; i < 100; i++) {
docs.add(client().prepareIndex("index").setSource("field", "term" + i % 5));
}
indexRandom(true, docs);

client().prepareSearch("index").setQuery(QueryBuilders.termQuery("field", "term2")).get();

SeekStatsResponse response = client().execute(SeekStatsAction.INSTANCE, new SeekStatsRequest("index")).actionGet();
List<ShardSeekStats> shardSeekStats = response.getSeekStats().get("index");
assertThat(shardSeekStats.size(), greaterThan(0));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test.seektracker;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;

public class IndexSeekTracker {

private final Map<String, Map<String, LongAdder>> seeks = new HashMap<>();

public void track(String shard) {
seeks.computeIfAbsent(shard, k -> new ConcurrentHashMap<>()); // increment can be called by multiple threads
}

public void increment(String shard, String file) {
seeks.get(shard).computeIfAbsent(file, s -> new LongAdder()).increment();
}

public List<ShardSeekStats> getSeeks() {
List<ShardSeekStats> values = new ArrayList<>();
seeks.forEach((k, v) -> values.add(getSeeksForShard(k)));
return values;
}

private ShardSeekStats getSeeksForShard(String shard) {
Map<String, Long> seeksPerFile = new HashMap<>();
seeks.get(shard).forEach((name, adder) -> seeksPerFile.put(name, adder.longValue()));
return new ShardSeekStats(shard, seeksPerFile);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test.seektracker;

import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class NodeSeekStats extends BaseNodeResponse implements ToXContentFragment {

private final Map<String, List<ShardSeekStats>> seeks;

public NodeSeekStats(DiscoveryNode node, Map<String, List<ShardSeekStats>> seeks) {
super(node);
this.seeks = seeks;
}

public NodeSeekStats(StreamInput in) throws IOException {
super(in);
this.seeks = in.readMap(StreamInput::readString, (s -> s.readList(ShardSeekStats::new)));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(seeks, StreamOutput::writeString, StreamOutput::writeList);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.mapContents(seeks);
return builder;
}

public Map<String, List<ShardSeekStats>> getSeekStats() {
return seeks;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test.seektracker;

import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;

import java.util.List;

public class RestSeekStatsAction extends BaseRestHandler {

@Override
public String getName() {
return "seek_stats_action";
}

@Override
public List<Route> routes() {
return List.of(
new RestHandler.Route(RestRequest.Method.GET, "/_seek_stats"),
new RestHandler.Route(RestRequest.Method.GET, "/{index}/_seek_stats")
);
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
String[] indices = request.paramAsStringArray("index", Strings.EMPTY_ARRAY);
SeekStatsRequest seekStatsRequest = new SeekStatsRequest(indices);
return channel -> client.execute(SeekStatsAction.INSTANCE, seekStatsRequest, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test.seektracker;

import org.elasticsearch.action.ActionType;

public class SeekStatsAction extends ActionType<SeekStatsResponse> {

public static final SeekStatsAction INSTANCE = new SeekStatsAction();
public static final String NAME = "cluster:monitor/seek_stats";

public SeekStatsAction() {
super(NAME, SeekStatsResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test.seektracker;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

public class SeekStatsRequest extends BaseNodesRequest<SeekStatsRequest> {

private final String[] indices;

public SeekStatsRequest(String... indices) {
super(Strings.EMPTY_ARRAY);
this.indices = indices;
}

public SeekStatsRequest(StreamInput in) throws IOException {
super(in);
this.indices = in.readStringArray();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
}

public String[] getIndices() {
return indices;
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test.seektracker;

import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SeekStatsResponse extends BaseNodesResponse<NodeSeekStats> implements ToXContentObject {

public SeekStatsResponse(ClusterName clusterName, List<NodeSeekStats> seekStats, List<FailedNodeException> failures) {
super(clusterName, seekStats, failures);
}

public SeekStatsResponse(StreamInput in) throws IOException {
super(in);
}

@Override
protected List<NodeSeekStats> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeSeekStats::new);
}

@Override
protected void writeNodesTo(StreamOutput out, List<NodeSeekStats> nodes) throws IOException {
out.writeList(nodes);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
for (NodeSeekStats seekStats : getNodes()) {
builder.startObject(seekStats.getNode().getId());
seekStats.toXContent(builder, params);
builder.endObject();
}
builder.endObject();
return builder;
}

public Map<String, List<ShardSeekStats>> getSeekStats() {
Map<String, List<ShardSeekStats>> combined = new HashMap<>();
for (NodeSeekStats nodeSeekStats : getNodes()) {
nodeSeekStats.getSeekStats()
.forEach((index, shardSeekStats) -> combined.computeIfAbsent(index, k -> new ArrayList<>()).addAll(shardSeekStats));
}
return combined;
}

@Override
public String toString() {
return Strings.toString(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test.seektracker;

import java.util.HashMap;
import java.util.Map;

public class SeekStatsService {

private final Map<String, IndexSeekTracker> seeks = new HashMap<>();

public IndexSeekTracker registerIndex(String index) {
IndexSeekTracker tracker = new IndexSeekTracker();
seeks.put(index, tracker);
return tracker;
}

public Map<String, IndexSeekTracker> getSeekStats() {
return seeks;
}

public IndexSeekTracker getSeekStats(String index) {
return seeks.get(index);
}

}

0 comments on commit f9b6086

Please sign in to comment.