Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set a specific shard id to force merge and clean commit logs. #76400

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/reference/indices/forcemerge.asciidoc
Expand Up @@ -118,6 +118,18 @@ Defaults to checking if a merge needs to execute.
If so, executes it.
--

`shard_id`::
+
--
(Optional, integer)
The Id of shard to merge to.
To fully merge a specific shard of indices,
set it to `Shard Id`.

Defaults to force merge all shard of indices.
If so, executes it.
--

`only_expunge_deletes`::
+
--
Expand Down
Expand Up @@ -38,11 +38,13 @@ public static final class Defaults {
public static final int MAX_NUM_SEGMENTS = -1;
public static final boolean ONLY_EXPUNGE_DELETES = false;
public static final boolean FLUSH = true;
public static final int SHARD_ID = -1;
}

private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
private int shardId = Defaults.SHARD_ID;

private static final Version FORCE_MERGE_UUID_SIMPLE_VERSION = Version.V_8_0_0;

Expand All @@ -67,6 +69,7 @@ public ForceMergeRequest(StreamInput in) throws IOException {
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
shardId = in.readInt();
if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_SIMPLE_VERSION)) {
forceMergeUUID = in.readString();
} else {
Expand All @@ -92,6 +95,23 @@ public ForceMergeRequest maxNumSegments(int maxNumSegments) {
return this;
}

/**
* Will merge the specific shard of index down to <= maxNumSegments. By default, will cause the merge
* process to merge down to half the configured number of segments.
*/
public int shardId() {
return shardId;
}

/**
* Will merge the specific shard of index down to <= maxNumSegments. By default, will cause the merge
* process to merge down to half the configured number of segments.
*/
public ForceMergeRequest shardId(int shardId) {
this.shardId = shardId;
return this;
}

/**
* Should the merge only expunge deletes from the index, without full merging.
* Defaults to full merging ({@code false}).
Expand Down Expand Up @@ -135,6 +155,7 @@ public ForceMergeRequest flush(boolean flush) {
public String getDescription() {
return "Force-merge indices " + Arrays.toString(indices()) +
", maxSegments[" + maxNumSegments +
"], shardId[" + shardId +
"], onlyExpungeDeletes[" + onlyExpungeDeletes +
"], flush[" + flush + "]";
}
Expand All @@ -145,6 +166,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
out.writeInt(shardId);
if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_SIMPLE_VERSION)) {
out.writeString(forceMergeUUID);
} else {
Expand All @@ -166,6 +188,7 @@ public ActionRequestValidationException validate() {
public String toString() {
return "ForceMergeRequest{" +
"maxNumSegments=" + maxNumSegments +
", shardId=" + shardId +
", onlyExpungeDeletes=" + onlyExpungeDeletes +
", flush=" + flush +
'}';
Expand Down
Expand Up @@ -35,6 +35,16 @@ public ForceMergeRequestBuilder setMaxNumSegments(int maxNumSegments) {
return this;
}

/**
* Will force merge the specific shard of index down to <= maxNumSegments. By default, will
* cause the merge process to merge down to half the configured number of
* segments.
*/
public ForceMergeRequestBuilder setShardId(int shardId) {
request.shardId(shardId);
return this;
}

/**
* Should the merge only expunge deletes from the index, without full merging.
* Defaults to full merging ({@code false}).
Expand Down
Expand Up @@ -73,9 +73,11 @@ protected void shardOperation(ForceMergeRequest request, ShardRouting shardRouti
assert (task instanceof CancellableTask) == false; // TODO: add cancellation handling here once the task supports it
threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.supply(listener,
() -> {
IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex())
.getShard(shardRouting.shardId().id());
indexShard.forceMerge(request);
if ( request.shardId() == -1 || request.shardId() == shardRouting.shardId().id() ) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex())
.getShard(shardRouting.shardId().id());
indexShard.forceMerge(request);
}
return EmptyResult.INSTANCE;
}));
}
Expand Down
Expand Up @@ -42,6 +42,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
mergeRequest.maxNumSegments(request.paramAsInt("max_num_segments", mergeRequest.maxNumSegments()));
mergeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", mergeRequest.onlyExpungeDeletes()));
mergeRequest.flush(request.paramAsBoolean("flush", mergeRequest.flush()));
mergeRequest.shardId(request.paramAsInt("shard_id", mergeRequest.shardId()));
return channel -> client.admin().indices().forceMerge(mergeRequest, new RestToXContentListener<>(channel));
}

Expand Down
Expand Up @@ -21,15 +21,18 @@ public void testValidate() {
final boolean flush = randomBoolean();
final boolean onlyExpungeDeletes = randomBoolean();
final int maxNumSegments = randomIntBetween(ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS, 100);
final int shardId = randomIntBetween(ForceMergeRequest.Defaults.SHARD_ID, 100);

final ForceMergeRequest request = new ForceMergeRequest();
request.flush(flush);
request.onlyExpungeDeletes(onlyExpungeDeletes);
request.maxNumSegments(maxNumSegments);
request.shardId(shardId);

assertThat(request.flush(), equalTo(flush));
assertThat(request.onlyExpungeDeletes(), equalTo(onlyExpungeDeletes));
assertThat(request.maxNumSegments(), equalTo(maxNumSegments));
assertThat(request.shardId(), equalTo(shardId));

ActionRequestValidationException validation = request.validate();
if (onlyExpungeDeletes && maxNumSegments != ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS) {
Expand All @@ -43,15 +46,16 @@ public void testValidate() {

public void testDescription() {
ForceMergeRequest request = new ForceMergeRequest();
assertEquals("Force-merge indices [], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());
assertEquals("Force-merge indices [], maxSegments[-1], shardId[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());

request = new ForceMergeRequest("shop", "blog");
assertEquals("Force-merge indices [shop, blog], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());
assertEquals("Force-merge indices [shop, blog], maxSegments[-1], shardId[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());

request = new ForceMergeRequest();
request.maxNumSegments(12);
request.shardId(12);
request.onlyExpungeDeletes(true);
request.flush(false);
assertEquals("Force-merge indices [], maxSegments[12], onlyExpungeDeletes[true], flush[false]", request.getDescription());
assertEquals("Force-merge indices [], maxSegments[12], shardId[12], onlyExpungeDeletes[true], flush[false]", request.getDescription());
}
}