Skip to content

Commit

Permalink
set a specific shard id to force merge and clean commit logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
henry-force committed Aug 12, 2021
1 parent e2b5eaf commit 9e1cdfa
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 6 deletions.
12 changes: 12 additions & 0 deletions docs/reference/indices/forcemerge.asciidoc
Expand Up @@ -118,6 +118,18 @@ Defaults to checking if a merge needs to execute.
If so, executes it.
--

`shard_id`::
+
--
(Optional, integer)
The Id of shard to merge to.
To fully merge a specific shard of indices,
set it to `Shard Id`.

Defaults to force merge all shard of indices.
If so, executes it.
--

`only_expunge_deletes`::
+
--
Expand Down
Expand Up @@ -38,11 +38,13 @@ public static final class Defaults {
public static final int MAX_NUM_SEGMENTS = -1;
public static final boolean ONLY_EXPUNGE_DELETES = false;
public static final boolean FLUSH = true;
public static final int SHARD_ID = -1;
}

private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
private int shardId = Defaults.SHARD_ID;

private static final Version FORCE_MERGE_UUID_SIMPLE_VERSION = Version.V_8_0_0;

Expand All @@ -67,6 +69,7 @@ public ForceMergeRequest(StreamInput in) throws IOException {
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
shardId = in.readInt();
if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_SIMPLE_VERSION)) {
forceMergeUUID = in.readString();
} else {
Expand All @@ -92,6 +95,23 @@ public ForceMergeRequest maxNumSegments(int maxNumSegments) {
return this;
}

/**
* Will merge the specific shard of index down to <= maxNumSegments. By default, will cause the merge
* process to merge down to half the configured number of segments.
*/
public int shardId() {
return shardId;
}

/**
* Will merge the specific shard of index down to <= maxNumSegments. By default, will cause the merge
* process to merge down to half the configured number of segments.
*/
public ForceMergeRequest shardId(int shardId) {
this.shardId = shardId;
return this;
}

/**
* Should the merge only expunge deletes from the index, without full merging.
* Defaults to full merging ({@code false}).
Expand Down Expand Up @@ -135,6 +155,7 @@ public ForceMergeRequest flush(boolean flush) {
public String getDescription() {
return "Force-merge indices " + Arrays.toString(indices()) +
", maxSegments[" + maxNumSegments +
"], shardId[" + shardId +
"], onlyExpungeDeletes[" + onlyExpungeDeletes +
"], flush[" + flush + "]";
}
Expand All @@ -145,6 +166,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
out.writeInt(shardId);
if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_SIMPLE_VERSION)) {
out.writeString(forceMergeUUID);
} else {
Expand All @@ -166,6 +188,7 @@ public ActionRequestValidationException validate() {
public String toString() {
return "ForceMergeRequest{" +
"maxNumSegments=" + maxNumSegments +
", shardId=" + shardId +
", onlyExpungeDeletes=" + onlyExpungeDeletes +
", flush=" + flush +
'}';
Expand Down
Expand Up @@ -35,6 +35,16 @@ public ForceMergeRequestBuilder setMaxNumSegments(int maxNumSegments) {
return this;
}

/**
* Will force merge the specific shard of index down to <= maxNumSegments. By default, will
* cause the merge process to merge down to half the configured number of
* segments.
*/
public ForceMergeRequestBuilder setShardId(int shardId) {
request.shardId(shardId);
return this;
}

/**
* Should the merge only expunge deletes from the index, without full merging.
* Defaults to full merging ({@code false}).
Expand Down
Expand Up @@ -73,9 +73,11 @@ protected void shardOperation(ForceMergeRequest request, ShardRouting shardRouti
assert (task instanceof CancellableTask) == false; // TODO: add cancellation handling here once the task supports it
threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.supply(listener,
() -> {
IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex())
.getShard(shardRouting.shardId().id());
indexShard.forceMerge(request);
if ( request.shardId() == -1 || request.shardId() == shardRouting.shardId().id() ) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex())
.getShard(shardRouting.shardId().id());
indexShard.forceMerge(request);
}
return EmptyResult.INSTANCE;
}));
}
Expand Down
Expand Up @@ -42,6 +42,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
mergeRequest.maxNumSegments(request.paramAsInt("max_num_segments", mergeRequest.maxNumSegments()));
mergeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", mergeRequest.onlyExpungeDeletes()));
mergeRequest.flush(request.paramAsBoolean("flush", mergeRequest.flush()));
mergeRequest.shardId(request.paramAsInt("shard_id", mergeRequest.shardId()));
return channel -> client.admin().indices().forceMerge(mergeRequest, new RestToXContentListener<>(channel));
}

Expand Down
Expand Up @@ -21,15 +21,18 @@ public void testValidate() {
final boolean flush = randomBoolean();
final boolean onlyExpungeDeletes = randomBoolean();
final int maxNumSegments = randomIntBetween(ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS, 100);
final int shardId = randomIntBetween(ForceMergeRequest.Defaults.SHARD_ID, 100);

final ForceMergeRequest request = new ForceMergeRequest();
request.flush(flush);
request.onlyExpungeDeletes(onlyExpungeDeletes);
request.maxNumSegments(maxNumSegments);
request.shardId(shardId);

assertThat(request.flush(), equalTo(flush));
assertThat(request.onlyExpungeDeletes(), equalTo(onlyExpungeDeletes));
assertThat(request.maxNumSegments(), equalTo(maxNumSegments));
assertThat(request.shardId(), equalTo(shardId));

ActionRequestValidationException validation = request.validate();
if (onlyExpungeDeletes && maxNumSegments != ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS) {
Expand All @@ -43,15 +46,16 @@ public void testValidate() {

public void testDescription() {
ForceMergeRequest request = new ForceMergeRequest();
assertEquals("Force-merge indices [], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());
assertEquals("Force-merge indices [], maxSegments[-1], shardId[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());

request = new ForceMergeRequest("shop", "blog");
assertEquals("Force-merge indices [shop, blog], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());
assertEquals("Force-merge indices [shop, blog], maxSegments[-1], shardId[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());

request = new ForceMergeRequest();
request.maxNumSegments(12);
request.shardId(12);
request.onlyExpungeDeletes(true);
request.flush(false);
assertEquals("Force-merge indices [], maxSegments[12], onlyExpungeDeletes[true], flush[false]", request.getDescription());
assertEquals("Force-merge indices [], maxSegments[12], shardId[12], onlyExpungeDeletes[true], flush[false]", request.getDescription());
}
}

0 comments on commit 9e1cdfa

Please sign in to comment.