Skip to content

Commit

Permalink
Adding best_compression (elastic#49974)
Browse files Browse the repository at this point in the history
This commit adds a `codec` parameter to the ILM `forcemerge` action. When setting the codec to `best_compression` ILM will close the index, then update the codec setting, re-open the index, and finally perform a force merge.
  • Loading branch information
SivagurunathanV authored and dakrone committed Feb 3, 2020
1 parent 8aaca45 commit 2ce17f1
Show file tree
Hide file tree
Showing 11 changed files with 926 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetaData;

/**
* Invokes a close step on a single index.
*/

public class CloseIndexStep extends AsyncActionStep {
public static final String NAME = "close-index";

CloseIndexStep(StepKey key, StepKey nextStepKey, Client client) {
super(key, nextStepKey, client);
}

@Override
public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState,
ClusterStateObserver observer, Listener listener) {
if (indexMetaData.getState() == IndexMetaData.State.OPEN) {
CloseIndexRequest request = new CloseIndexRequest(indexMetaData.getIndex().getName());
getClient().admin().indices()
.close(request, ActionListener.wrap(closeIndexResponse -> {
if (closeIndexResponse.isAcknowledged() == false) {
throw new ElasticsearchException("close index request failed to be acknowledged");
}
listener.onResponse(true);
}, listener::onFailure));
}
else {
listener.onResponse(true);
}
}

@Override
public boolean isRetryable() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -15,10 +18,12 @@
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;

import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

Expand All @@ -28,42 +33,62 @@
public class ForceMergeAction implements LifecycleAction {
public static final String NAME = "forcemerge";
public static final ParseField MAX_NUM_SEGMENTS_FIELD = new ParseField("max_num_segments");
public static final ParseField CODEC = new ParseField("index_codec");

private static final ConstructingObjectParser<ForceMergeAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
false, a -> {
int maxNumSegments = (int) a[0];
return new ForceMergeAction(maxNumSegments);
String codec = a[1] != null ? (String) a[1] : null;
return new ForceMergeAction(maxNumSegments, codec);
});

static {
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_NUM_SEGMENTS_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), CODEC);
}

private final int maxNumSegments;
private final String codec;

public static ForceMergeAction parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

public ForceMergeAction(int maxNumSegments) {
public ForceMergeAction(int maxNumSegments, @Nullable String codec) {
if (maxNumSegments <= 0) {
throw new IllegalArgumentException("[" + MAX_NUM_SEGMENTS_FIELD.getPreferredName()
+ "] must be a positive integer");
}
this.maxNumSegments = maxNumSegments;
if (codec != null && CodecService.BEST_COMPRESSION_CODEC.equals(codec) == false) {
throw new IllegalArgumentException("unknown index codec: [" + codec + "]");
}
this.codec = codec;
}

public ForceMergeAction(StreamInput in) throws IOException {
this.maxNumSegments = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
this.codec = in.readOptionalString();
} else {
this.codec = null;
}
}

public int getMaxNumSegments() {
return maxNumSegments;
}

public String getCodec() {
return this.codec;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(maxNumSegments);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalString(codec);
}
}

@Override
Expand All @@ -80,27 +105,58 @@ public boolean isSafeAction() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MAX_NUM_SEGMENTS_FIELD.getPreferredName(), maxNumSegments);
if (codec != null) {
builder.field(CODEC.getPreferredName(), codec);
}
builder.endObject();
return builder;
}

@Override
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
Settings readOnlySettings = Settings.builder().put(IndexMetaData.SETTING_BLOCKS_WRITE, true).build();
Settings bestCompressionSettings = Settings.builder()
.put(EngineConfig.INDEX_CODEC_SETTING.getKey(), CodecService.BEST_COMPRESSION_CODEC).build();

StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyAction.NAME);
StepKey forceMergeKey = new StepKey(phase, NAME, ForceMergeStep.NAME);
StepKey countKey = new StepKey(phase, NAME, SegmentCountStep.NAME);

StepKey closeKey = new StepKey(phase, NAME, CloseIndexStep.NAME);
StepKey openKey = new StepKey(phase, NAME, OpenIndexStep.NAME);
StepKey waitForGreenIndexKey = new StepKey(phase, NAME, WaitForIndexColorStep.NAME);
StepKey updateCompressionKey = new StepKey(phase, NAME, UpdateSettingsStep.NAME);

UpdateSettingsStep readOnlyStep = new UpdateSettingsStep(readOnlyKey, forceMergeKey, client, readOnlySettings);
ForceMergeStep forceMergeStep = new ForceMergeStep(forceMergeKey, countKey, client, maxNumSegments);
CloseIndexStep closeIndexStep = new CloseIndexStep(closeKey, updateCompressionKey, client);
ForceMergeStep forceMergeStepForBestCompression = new ForceMergeStep(forceMergeKey, closeKey, client, maxNumSegments);
UpdateSettingsStep updateBestCompressionSettings = new UpdateSettingsStep(updateCompressionKey,
openKey, client, bestCompressionSettings);
OpenIndexStep openIndexStep = new OpenIndexStep(openKey, waitForGreenIndexKey, client);
WaitForIndexColorStep waitForIndexGreenStep = new WaitForIndexColorStep(waitForGreenIndexKey,
forceMergeKey, ClusterHealthStatus.GREEN);
SegmentCountStep segmentCountStep = new SegmentCountStep(countKey, nextStepKey, client, maxNumSegments);
return Arrays.asList(readOnlyStep, forceMergeStep, segmentCountStep);

List<Step> mergeSteps = new ArrayList<>();
mergeSteps.add(readOnlyStep);

if (codec != null && codec.equals(CodecService.BEST_COMPRESSION_CODEC)) {
mergeSteps.add(forceMergeStepForBestCompression);
mergeSteps.add(closeIndexStep);
mergeSteps.add(updateBestCompressionSettings);
mergeSteps.add(openIndexStep);
mergeSteps.add(waitForIndexGreenStep);
}

mergeSteps.add(forceMergeStep);
mergeSteps.add(segmentCountStep);
return mergeSteps;
}

@Override
public int hashCode() {
return Objects.hash(maxNumSegments);
return Objects.hash(maxNumSegments, codec);
}

@Override
Expand All @@ -112,7 +168,8 @@ public boolean equals(Object obj) {
return false;
}
ForceMergeAction other = (ForceMergeAction) obj;
return Objects.equals(maxNumSegments, other.maxNumSegments);
return Objects.equals(this.maxNumSegments, other.maxNumSegments)
&& Objects.equals(this.codec, other.codec);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetaData;

/**
* Invokes a open step on a single index.
*/

final class OpenIndexStep extends AsyncActionStep {

static final String NAME = "open-index";

OpenIndexStep(StepKey key, StepKey nextStepKey, Client client) {
super(key, nextStepKey, client);
}

@Override
public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState,
ClusterStateObserver observer, Listener listener) {
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
OpenIndexRequest request = new OpenIndexRequest(indexMetaData.getIndex().getName());
getClient().admin().indices()
.open(request,
ActionListener.wrap(openIndexResponse -> {
if (openIndexResponse.isAcknowledged() == false) {
throw new ElasticsearchException("open index request failed to be acknowledged");
}
listener.onResponse(true);
}, listener::onFailure));

} else {
listener.onResponse(true);
}
}

@Override
public boolean isRetryable() {
return true;
}
}
Loading

0 comments on commit 2ce17f1

Please sign in to comment.