Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add upgrade_only_ancient_segments option to upgrade API #10540

Closed
wants to merge 11 commits into from
24 changes: 22 additions & 2 deletions docs/reference/indices/upgrade.asciidoc
Expand Up @@ -21,12 +21,30 @@ This call will block until the upgrade is complete. If the http connection
is lost, the request will continue in the background, and
any new requests will block until the previous upgrade is complete.

[float]
[[upgrade-parameters]]
==== Request Parameters

The `upgrade` API accepts the following request parameters:

[horizontal]
`upgrade_only_ancient_segments`:: If true, only very old segments (from a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can upgrade be omitted here since its redundant with the upgrade api name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also can we make a private boolean method shouldUpgrade for the hairy boolean logic. Github review tool just broke so i cant put this comment where it belongs.

previous Lucene major release) will be upgraded. While this will do
the minimal work to ensure the next major release of Elasticsearch can
read the segments, it's dangerous because it can leave other very old
segments in sub-optimal formats. Defaults to `false`.

[float]
=== Check upgrade status

Use a `GET` request to monitor how much of an index is upgraded. This
can also be used prior to starting an upgrade to identify which indices
you want to upgrade at the same time.
can also be used prior to starting an upgrade to identify which
indices you want to upgrade at the same time.

The `ancient` byte values that are returned indicate total bytes of
segments whose version is extremely old (Lucene major version is
different from the current version), showing how much upgrading is
necessary when you run with `upgrade_only_ancient_segments=true`.

[source,sh]
--------------------------------------------------
Expand All @@ -41,6 +59,8 @@ curl 'http://localhost:9200/twitter/_upgrade?pretty&human'
"size_in_bytes": "21000000000",
"size_to_upgrade": "10gb",
"size_to_upgrade_in_bytes": "10000000000"
"size_to_upgrade_ancient": "1gb",
"size_to_upgrade_ancient_in_bytes": "1000000000"
}
}
--------------------------------------------------
6 changes: 5 additions & 1 deletion rest-api-spec/api/indices.upgrade.json
Expand Up @@ -27,8 +27,12 @@
"description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)"
},
"wait_for_completion": {
"type" : "boolean",
"description" : "Specify whether the request should block until the all segments are upgraded (default: true)"
},
"upgrade_only_ancient_segments": {
"type" : "boolean",
"description" : "Specify whether the request should block until the all segments are upgraded (default: true)"
"description" : "If true, only ancient (an older Lucene major release) segments will be upgraded"
}
}
},
Expand Down
Expand Up @@ -44,12 +44,14 @@ public static final class Defaults {
public static final boolean ONLY_EXPUNGE_DELETES = false;
public static final boolean FLUSH = true;
public static final boolean UPGRADE = false;
public static final boolean UPGRADE_ONLY_ANCIENT_SEGMENTS = false;
}

private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
private boolean upgrade = Defaults.UPGRADE;
private boolean upgradeOnlyAncientSegments = Defaults.UPGRADE_ONLY_ANCIENT_SEGMENTS;

/**
* Constructs an optimization request over one or more indices.
Expand Down Expand Up @@ -156,6 +158,11 @@ public void readFrom(StreamInput in) throws IOException {
flush = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_1_0)) {
upgrade = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_6_0)) {
upgradeOnlyAncientSegments = in.readBoolean();
} else {
upgradeOnlyAncientSegments = false;
}
}
}

Expand All @@ -169,16 +176,36 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(Version.V_1_1_0)) {
out.writeBoolean(upgrade);
if (out.getVersion().onOrAfter(Version.V_1_6_0)) {
out.writeBoolean(upgradeOnlyAncientSegments);
}
}
}

/**
* Should the merge upgrade only the ancient (older major version of Lucene) segments?
* Defaults to <tt>false</tt>.
*/
public boolean upgradeOnlyAncientSegments() {
return upgradeOnlyAncientSegments;
}

/**
* See {@link #upgradeOnlyAncientSegments()}
*/
public OptimizeRequest upgradeOnlyAncientSegments(boolean upgradeOnlyAncientSegments) {
this.upgradeOnlyAncientSegments = upgradeOnlyAncientSegments;
return this;
}

@Override
public String toString() {
return "OptimizeRequest{" +
"maxNumSegments=" + maxNumSegments +
", onlyExpungeDeletes=" + onlyExpungeDeletes +
", flush=" + flush +
", upgrade=" + upgrade +
", upgradeOnlyAncientSegments=" + upgradeOnlyAncientSegments +
'}';
}
}
Expand Up @@ -37,6 +37,7 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest {
private boolean onlyExpungeDeletes = OptimizeRequest.Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = OptimizeRequest.Defaults.FLUSH;
private boolean upgrade = OptimizeRequest.Defaults.UPGRADE;
private boolean upgradeOnlyAncientSegments = OptimizeRequest.Defaults.UPGRADE_ONLY_ANCIENT_SEGMENTS;

ShardOptimizeRequest() {
}
Expand All @@ -47,6 +48,7 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest {
onlyExpungeDeletes = request.onlyExpungeDeletes();
flush = request.flush();
upgrade = request.force() || request.upgrade();
upgradeOnlyAncientSegments = request.upgradeOnlyAncientSegments();
}

int maxNumSegments() {
Expand All @@ -65,6 +67,10 @@ public boolean upgrade() {
return upgrade;
}

public boolean upgradeOnlyAncientSegments() {
return upgradeOnlyAncientSegments;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -76,6 +82,11 @@ public void readFrom(StreamInput in) throws IOException {
flush = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_1_0)) {
upgrade = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_6_0)) {
upgradeOnlyAncientSegments = in.readBoolean();
} else {
upgradeOnlyAncientSegments = false;
}
}
}

Expand All @@ -90,6 +101,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(Version.V_1_1_0)) {
out.writeBoolean(upgrade);
if (out.getVersion().onOrAfter(Version.V_1_6_0)) {
out.writeBoolean(upgradeOnlyAncientSegments);
}
}
}
}
Expand Up @@ -108,7 +108,7 @@ protected ShardOptimizeResponse newShardResponse() {
protected ShardOptimizeResponse shardOperation(ShardOptimizeRequest request) throws ElasticsearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
indexShard.optimize(new OptimizeRequest().flush(request.flush()).onlyExpungeDeletes(request.onlyExpungeDeletes())
.maxNumSegments(request.maxNumSegments()).upgrade(request.upgrade()));
.maxNumSegments(request.maxNumSegments()).upgrade(request.upgrade()).upgradeOnlyAncientSegments(request.upgradeOnlyAncientSegments()));
return new ShardOptimizeResponse(request.shardId());
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -414,13 +414,13 @@ public final boolean refreshNeeded() {
* Optimizes to 1 segment
*/
public void forceMerge(boolean flush) {
forceMerge(flush, 1, false, false);
forceMerge(flush, 1, false, false, false);
}

/**
* Triggers a forced merge on this engine
*/
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException;

/**
* Snapshots the index and returns a handle to it. Will always try and "commit" the
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Expand Up @@ -603,7 +603,7 @@ private void flush(boolean commitTranslog, boolean force, boolean waitIfOngoing)
}
}
/*
* Unfortunately the lock order is important here. We have to acquire the readlock fist otherwise
* Unfortunately the lock order is important here. We have to acquire the readlock first otherwise
* if we are flushing at the end of the recovery while holding the write lock we can deadlock if:
* Thread 1: flushes via API and gets the flush lock but blocks on the readlock since Thread 2 has the writeLock
* Thread 2: flushes at the end of the recovery holding the writeLock and blocks on the flushLock owned by Thread 1
Expand Down Expand Up @@ -751,7 +751,8 @@ public void maybeMerge() throws EngineException {
}

@Override
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException {
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException {
/*
* We do NOT acquire the readlock here since we are waiting on the merges to finish
* that's fine since the IW.rollback should stop all the threads and trigger an IOException
Expand All @@ -769,8 +770,8 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
try {
ensureOpen();
if (upgrade) {
logger.info("starting segment upgrade");
mp.setUpgradeInProgress(true);
logger.info("starting segment upgrade upgradeOnlyAncientSegments={}", upgradeOnlyAncientSegments);
mp.setUpgradeInProgress(true, upgradeOnlyAncientSegments);
}
store.incRef(); // increment the ref just to ensure nobody closes the store while we optimize
try {
Expand Down Expand Up @@ -798,7 +799,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
throw ex;
} finally {
try {
mp.setUpgradeInProgress(false); // reset it just to make sure we reset it in a case of an error
mp.setUpgradeInProgress(false, false); // reset it just to make sure we reset it in a case of an error
} finally {
optimizeLock.unlock();
}
Expand Down
Expand Up @@ -136,7 +136,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
}

@Override
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException {
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException {
// no-op
logger.trace("skipping FORCE-MERGE on shadow engine");
}
Expand Down
Expand Up @@ -59,7 +59,13 @@ public final class ElasticsearchMergePolicy extends MergePolicy {
private static ESLogger logger = Loggers.getLogger(ElasticsearchMergePolicy.class);

private final MergePolicy delegate;

// True if the next merge request should do segment upgrades:
private volatile boolean upgradeInProgress;

// True if the next merge request should only upgrade ancient (an older Lucene major version than current) segments;
private volatile boolean upgradeOnlyAncientSegments;

private static final int MAX_CONCURRENT_UPGRADE_MERGES = 5;

/** @param delegate the merge policy to wrap */
Expand Down Expand Up @@ -208,25 +214,37 @@ public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
for (SegmentCommitInfo info : segmentInfos) {
org.apache.lucene.util.Version old = info.info.getVersion();
org.apache.lucene.util.Version cur = Version.CURRENT.luceneVersion;

assert old.major <= cur.major;

if (cur.major > old.major ||
cur.major == old.major && cur.minor > old.minor) {
(upgradeOnlyAncientSegments == false && cur.minor > old.minor)) {
// TODO: Use IndexUpgradeMergePolicy instead. We should be comparing codecs,
// for now we just assume every minor upgrade has a new format.
logger.debug("Adding segment " + info.info.name + " to be upgraded");
spec.add(new OneMerge(Lists.newArrayList(info)));
}

// TODO: we could check IndexWriter.getMergingSegments and avoid adding merges that IW will just reject?

if (spec.merges.size() == MAX_CONCURRENT_UPGRADE_MERGES) {
// hit our max upgrades, so return the spec. we will get a cascaded call to continue.
logger.debug("Returning " + spec.merges.size() + " merges for upgrade");
return spec;
}
}

// We must have less than our max upgrade merges, so the next return will be our last in upgrading mode.
upgradeInProgress = false;
if (spec.merges.isEmpty() == false) {
logger.debug("Return " + spec.merges.size() + " merges for end of upgrade");
logger.debug("Returning " + spec.merges.size() + " merges for end of upgrade");
return spec;
}

// Only set this once there are 0 segments needing upgrading, because when we return a
// spec, IndexWriter may (silently!) reject that merge if some of the segments we asked
// to be merged were already being (naturally) merged:
upgradeInProgress = false;

// fall through, so when we don't have any segments to upgrade, the delegate policy
// has a chance to decide what to do (e.g. collapse the segments to satisfy maxSegmentCount)
}
Expand All @@ -251,8 +269,9 @@ public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegme
* {@link IndexWriter#forceMerge} that is handled by this {@link MergePolicy}, as well as
* cascading calls made by {@link IndexWriter}.
*/
public void setUpgradeInProgress(boolean upgrade) {
public void setUpgradeInProgress(boolean upgrade, boolean onlyAncientSegments) {
this.upgradeInProgress = upgrade;
this.upgradeOnlyAncientSegments = onlyAncientSegments;
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -681,7 +681,8 @@ public void optimize(OptimizeRequest optimize) throws ElasticsearchException {
if (logger.isTraceEnabled()) {
logger.trace("optimize with {}", optimize);
}
engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade());
engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(),
optimize.upgrade(), optimize.upgradeOnlyAncientSegments());
}

public SnapshotIndexCommit snapshotIndex() throws EngineException {
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import java.io.IOException;

import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.POST;
Expand Down Expand Up @@ -73,12 +74,10 @@ void handleGet(RestRequest request, RestChannel channel, Client client) {
public RestResponse buildResponse(IndicesSegmentResponse response, XContentBuilder builder) throws Exception {
builder.startObject();

// TODO: getIndices().values() is what IndecesSegmentsResponse uses, but this will produce different orders with jdk8?
// TODO: getIndices().values() is what IndicesSegmentsResponse uses, but this will produce different orders with jdk8?
for (IndexSegments indexSegments : response.getIndices().values()) {
Tuple<Long, Long> summary = calculateUpgradeStatus(indexSegments);
builder.startObject(indexSegments.getIndex());
builder.byteSizeField(SIZE_IN_BYTES, SIZE, summary.v1());
builder.byteSizeField(SIZE_TO_UPGRADE_IN_BYTES, SIZE_TO_UPGRADE, summary.v2());
buildUpgradeStatus(indexSegments, builder);
builder.endObject();
}

Expand All @@ -92,6 +91,7 @@ void handlePost(RestRequest request, RestChannel channel, Client client) {
OptimizeRequest optimizeReq = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index")));
optimizeReq.flush(true);
optimizeReq.upgrade(true);
optimizeReq.upgradeOnlyAncientSegments(request.paramAsBoolean("upgrade_only_ancient_segments", false));
optimizeReq.maxNumSegments(Integer.MAX_VALUE); // we just want to upgrade the segments, not actually optimize to a single segment
client.admin().indices().optimize(optimizeReq, new RestBuilderListener<OptimizeResponse>(channel) {
@Override
Expand All @@ -104,27 +104,35 @@ public RestResponse buildResponse(OptimizeResponse response, XContentBuilder bui
});
}

Tuple<Long, Long> calculateUpgradeStatus(IndexSegments indexSegments) {
void buildUpgradeStatus(IndexSegments indexSegments, XContentBuilder builder) throws IOException {
long total_bytes = 0;
long to_upgrade_bytes = 0;
long to_upgrade_bytes_ancient = 0;
for (IndexShardSegments shard : indexSegments) {
for (ShardSegments segs : shard.getShards()) {
for (Segment seg : segs.getSegments()) {
total_bytes += seg.sizeInBytes;
if (seg.version.major != Version.CURRENT.luceneVersion.major ||
seg.version.minor != Version.CURRENT.luceneVersion.minor) {
if (seg.version.major != Version.CURRENT.luceneVersion.major) {
to_upgrade_bytes_ancient += seg.sizeInBytes;
to_upgrade_bytes += seg.sizeInBytes;
} else if (seg.version.minor != Version.CURRENT.luceneVersion.minor) {
// TODO: this comparison is bogus! it would cause us to upgrade even with the same format
// instead, we should check if the codec has changed
to_upgrade_bytes += seg.sizeInBytes;
}
}
}
}
return new Tuple<>(total_bytes, to_upgrade_bytes);

builder.byteSizeField(SIZE_IN_BYTES, SIZE, total_bytes);
builder.byteSizeField(SIZE_TO_UPGRADE_IN_BYTES, SIZE_TO_UPGRADE, to_upgrade_bytes);
builder.byteSizeField(SIZE_TO_UPGRADE_ANCIENT_IN_BYTES, SIZE_TO_UPGRADE_ANCIENT, to_upgrade_bytes_ancient);
}

static final XContentBuilderString SIZE = new XContentBuilderString("size");
static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes");
static final XContentBuilderString SIZE_TO_UPGRADE = new XContentBuilderString("size_to_upgrade");
static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT = new XContentBuilderString("size_to_upgrade_ancient");
static final XContentBuilderString SIZE_TO_UPGRADE_IN_BYTES = new XContentBuilderString("size_to_upgrade_in_bytes");
static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT_IN_BYTES = new XContentBuilderString("size_to_upgrade_ancient_in_bytes");
}