Skip to content

Commit f735baf

Browse files
committed
Core: Remove ability to run optimize and upgrade async
This has been very trappy. Rather than continue to allow buggy behavior of having upgrade/optimize requests sidestep the single shard per node limits optimize is supposed to be subject to, this removes the ability to run the upgrade/optimize async. closes #9638
1 parent faae98c commit f735baf

File tree

17 files changed

+33
-135
lines changed

17 files changed

+33
-135
lines changed

docs/reference/indices/optimize.asciidoc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ operations (and relates to the number of segments a Lucene index holds
77
within each shard). The optimize operation allows to reduce the number
88
of segments by merging them.
99

10+
This call will block until the optimize is complete. If the http connection
11+
is lost, the request will continue in the background, and
12+
any new requests will block until the previous optimize is complete.
13+
1014
[source,js]
1115
--------------------------------------------------
1216
$ curl -XPOST 'http://localhost:9200/twitter/_optimize'
@@ -33,10 +37,6 @@ deletes. Defaults to `false`. Note that this won't override the
3337
`flush`:: Should a flush be performed after the optimize. Defaults to
3438
`true`.
3539

36-
`wait_for_merge`:: Should the request wait for the merge to end. Defaults
37-
to `true`. Note, a merge can potentially be a very heavy operation, so
38-
it might make sense to run it set to `false`.
39-
4040
[float]
4141
[[optimize-multi-index]]
4242
=== Multi Index

docs/reference/indices/upgrade.asciidoc

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,9 @@ NOTE: Upgrading is an I/O intensive operation, and is limited to processing a
1717
single shard per node at a time. It also is not allowed to run at the same
1818
time as optimize.
1919

20-
[float]
21-
[[upgrade-parameters]]
22-
==== Request Parameters
23-
24-
The `upgrade` API accepts the following request parameters:
25-
26-
[horizontal]
27-
`wait_for_completion`:: Should the request wait for the upgrade to complete. Defaults
28-
to `false`.
20+
This call will block until the upgrade is complete. If the http connection
21+
is lost, the request will continue in the background, and
22+
any new requests will block until the previous upgrade is complete.
2923

3024
[float]
3125
=== Check upgrade status

src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@
3030
* A request to optimize one or more indices. In order to optimize on all the indices, pass an empty array or
3131
* <tt>null</tt> for the indices.
3232
* <p/>
33-
* <p>{@link #waitForMerge(boolean)} allows to control if the call will block until the optimize completes and
34-
* defaults to <tt>true</tt>.
35-
* <p/>
3633
* <p>{@link #maxNumSegments(int)} allows to control the number of segments to optimize down to. By default, will
3734
* cause the optimize process to optimize down to half the configured number of segments.
3835
*
@@ -43,14 +40,12 @@
4340
public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest> {
4441

4542
public static final class Defaults {
46-
public static final boolean WAIT_FOR_MERGE = true;
4743
public static final int MAX_NUM_SEGMENTS = -1;
4844
public static final boolean ONLY_EXPUNGE_DELETES = false;
4945
public static final boolean FLUSH = true;
5046
public static final boolean UPGRADE = false;
5147
}
52-
53-
private boolean waitForMerge = Defaults.WAIT_FOR_MERGE;
48+
5449
private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
5550
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
5651
private boolean flush = Defaults.FLUSH;
@@ -69,21 +64,6 @@ public OptimizeRequest() {
6964

7065
}
7166

72-
/**
73-
* Should the call block until the optimize completes. Defaults to <tt>true</tt>.
74-
*/
75-
public boolean waitForMerge() {
76-
return waitForMerge;
77-
}
78-
79-
/**
80-
* Should the call block until the optimize completes. Defaults to <tt>true</tt>.
81-
*/
82-
public OptimizeRequest waitForMerge(boolean waitForMerge) {
83-
this.waitForMerge = waitForMerge;
84-
return this;
85-
}
86-
8767
/**
8868
* Will optimize the index down to <= maxNumSegments. By default, will cause the optimize
8969
* process to optimize down to half the configured number of segments.
@@ -151,7 +131,6 @@ public OptimizeRequest upgrade(boolean upgrade) {
151131

152132
public void readFrom(StreamInput in) throws IOException {
153133
super.readFrom(in);
154-
waitForMerge = in.readBoolean();
155134
maxNumSegments = in.readInt();
156135
onlyExpungeDeletes = in.readBoolean();
157136
flush = in.readBoolean();
@@ -160,7 +139,6 @@ public void readFrom(StreamInput in) throws IOException {
160139

161140
public void writeTo(StreamOutput out) throws IOException {
162141
super.writeTo(out);
163-
out.writeBoolean(waitForMerge);
164142
out.writeInt(maxNumSegments);
165143
out.writeBoolean(onlyExpungeDeletes);
166144
out.writeBoolean(flush);
@@ -170,8 +148,7 @@ public void writeTo(StreamOutput out) throws IOException {
170148
@Override
171149
public String toString() {
172150
return "OptimizeRequest{" +
173-
"waitForMerge=" + waitForMerge +
174-
", maxNumSegments=" + maxNumSegments +
151+
"maxNumSegments=" + maxNumSegments +
175152
", onlyExpungeDeletes=" + onlyExpungeDeletes +
176153
", flush=" + flush +
177154
", upgrade=" + upgrade +

src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequestBuilder.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@
2727
* A request to optimize one or more indices. In order to optimize on all the indices, pass an empty array or
2828
* <tt>null</tt> for the indices.
2929
* <p/>
30-
* <p>{@link #setWaitForMerge(boolean)} allows to control if the call will block until the optimize completes and
31-
* defaults to <tt>true</tt>.
32-
* <p/>
3330
* <p>{@link #setMaxNumSegments(int)} allows to control the number of segments to optimize down to. By default, will
3431
* cause the optimize process to optimize down to half the configured number of segments.
3532
*/
@@ -39,14 +36,6 @@ public OptimizeRequestBuilder(IndicesAdminClient indicesClient) {
3936
super(indicesClient, new OptimizeRequest());
4037
}
4138

42-
/**
43-
* Should the call block until the optimize completes. Defaults to <tt>true</tt>.
44-
*/
45-
public OptimizeRequestBuilder setWaitForMerge(boolean waitForMerge) {
46-
request.waitForMerge(waitForMerge);
47-
return this;
48-
}
49-
5039
/**
5140
* Will optimize the index down to <= maxNumSegments. By default, will cause the optimize
5241
* process to optimize down to half the configured number of segments.

src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,12 +232,12 @@ public Condition newCondition() {
232232
/**
233233
* Optimizes to 1 segment
234234
*/
235-
abstract void forceMerge(boolean flush, boolean waitForMerge);
235+
abstract void forceMerge(boolean flush);
236236

237237
/**
238238
* Triggers a forced merge on this engine
239239
*/
240-
public abstract void forceMerge(boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
240+
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
241241

242242
/**
243243
* Snapshots the index and returns a handle to it. Will always try and "commit" the

src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -817,12 +817,12 @@ private void waitForMerges(boolean flushAfter, boolean upgrade) {
817817
}
818818

819819
@Override
820-
public void forceMerge(boolean flush, boolean waitForMerge) {
821-
forceMerge(flush, waitForMerge, 1, false, false);
820+
public void forceMerge(boolean flush) {
821+
forceMerge(flush, 1, false, false);
822822
}
823823

824824
@Override
825-
public void forceMerge(final boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException {
825+
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException {
826826
if (optimizeMutex.compareAndSet(false, true)) {
827827
try (ReleasableLock _ = readLock.acquire()) {
828828
ensureOpen();
@@ -855,23 +855,7 @@ public void forceMerge(final boolean flush, boolean waitForMerge, int maxNumSegm
855855
}
856856
}
857857

858-
// wait for the merges outside of the read lock
859-
if (waitForMerge) {
860-
waitForMerges(flush, upgrade);
861-
} else if (flush || upgrade) {
862-
// we only need to monitor merges for async calls if we are going to flush
863-
engineConfig.getThreadPool().executor(ThreadPool.Names.OPTIMIZE).execute(new AbstractRunnable() {
864-
@Override
865-
public void onFailure(Throwable t) {
866-
logger.error("Exception while waiting for merges asynchronously after optimize", t);
867-
}
868-
869-
@Override
870-
protected void doRun() throws Exception {
871-
waitForMerges(flush, upgrade);
872-
}
873-
});
874-
}
858+
waitForMerges(flush, upgrade);
875859
}
876860

877861

src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -625,8 +625,7 @@ public void optimize(OptimizeRequest optimize) throws ElasticsearchException {
625625
if (logger.isTraceEnabled()) {
626626
logger.trace("optimize with {}", optimize);
627627
}
628-
engine().forceMerge(optimize.flush(), optimize.waitForMerge(), optimize
629-
.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade());
628+
engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade());
630629
}
631630

632631
public SnapshotIndexCommit snapshotIndex() throws EngineException {

src/main/java/org/elasticsearch/rest/action/admin/indices/optimize/RestOptimizeAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
5555
OptimizeRequest optimizeRequest = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index")));
5656
optimizeRequest.listenerThreaded(false);
5757
optimizeRequest.indicesOptions(IndicesOptions.fromRequest(request, optimizeRequest.indicesOptions()));
58-
optimizeRequest.waitForMerge(request.paramAsBoolean("wait_for_merge", optimizeRequest.waitForMerge()));
5958
optimizeRequest.maxNumSegments(request.paramAsInt("max_num_segments", optimizeRequest.maxNumSegments()));
6059
optimizeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", optimizeRequest.onlyExpungeDeletes()));
6160
optimizeRequest.flush(request.paramAsBoolean("flush", optimizeRequest.flush()));

src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ public RestResponse buildResponse(IndicesSegmentResponse response, XContentBuild
9090

9191
void handlePost(RestRequest request, RestChannel channel, Client client) {
9292
OptimizeRequest optimizeReq = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index")));
93-
optimizeReq.waitForMerge(request.paramAsBoolean("wait_for_completion", false));
9493
optimizeReq.flush(true);
9594
optimizeReq.upgrade(true);
9695
optimizeReq.maxNumSegments(Integer.MAX_VALUE); // we just want to upgrade the segments, not actually optimize to a single segment

src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ public void testReusePeerRecovery() throws Exception {
367367
}
368368
logger.info("Running Cluster Health");
369369
ensureGreen();
370-
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).setMaxNumSegments(100).get(); // just wait for merges
370+
client().admin().indices().prepareOptimize("test").setMaxNumSegments(100).get(); // just wait for merges
371371
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();
372372

373373
logger.info("--> disabling allocation while the cluster is shut down");

src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 6 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -411,30 +411,9 @@ public void testVerboseSegments() throws Exception {
411411
public void testSegmentsWithMergeFlag() throws Exception {
412412
final Store store = createStore();
413413
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS));
414-
final AtomicReference<CountDownLatch> waitTillMerge = new AtomicReference<>();
415-
final AtomicReference<CountDownLatch> waitForMerge = new AtomicReference<>();
416-
mergeSchedulerProvider.addListener(new MergeSchedulerProvider.Listener() {
417-
@Override
418-
public void beforeMerge(OnGoingMerge merge) {
419-
try {
420-
if (waitTillMerge.get() != null) {
421-
waitTillMerge.get().countDown();
422-
}
423-
if (waitForMerge.get() != null) {
424-
waitForMerge.get().await();
425-
}
426-
} catch (InterruptedException e) {
427-
throw ExceptionsHelper.convertToRuntime(e);
428-
}
429-
}
430-
431-
@Override
432-
public void afterMerge(OnGoingMerge merge) {
433-
}
434-
});
435-
436414
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
437415
final Engine engine = createEngine(indexSettingsService, store, createTranslog(), mergeSchedulerProvider);
416+
438417
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, false);
439418
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
440419
engine.index(index);
@@ -456,24 +435,13 @@ public void afterMerge(OnGoingMerge merge) {
456435
for (Segment segment : segments) {
457436
assertThat(segment.getMergeId(), nullValue());
458437
}
459-
460-
waitTillMerge.set(new CountDownLatch(1));
461-
waitForMerge.set(new CountDownLatch(1));
462-
engine.forceMerge(false, false);
463-
waitTillMerge.get().await();
464-
465-
for (Segment segment : engine.segments(false)) {
466-
assertThat(segment.getMergeId(), notNullValue());
467-
}
468-
469-
waitForMerge.get().countDown();
470-
438+
471439
index = new Engine.Index(null, newUid("4"), doc);
472440
engine.index(index);
473441
engine.flush();
474442
final long gen1 = store.readLastCommittedSegmentsInfo().getGeneration();
475443
// now, optimize and wait for merges, see that we have no merge flag
476-
engine.forceMerge(true, true);
444+
engine.forceMerge(true);
477445

478446
for (Segment segment : engine.segments(false)) {
479447
assertThat(segment.getMergeId(), nullValue());
@@ -483,25 +451,14 @@ public void afterMerge(OnGoingMerge merge) {
483451

484452
final boolean flush = randomBoolean();
485453
final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration();
486-
engine.forceMerge(flush, false);
487-
waitTillMerge.get().await();
454+
engine.forceMerge(flush);
488455
for (Segment segment : engine.segments(false)) {
489456
assertThat(segment.getMergeId(), nullValue());
490457
}
491-
waitForMerge.get().countDown();
492458

493459
if (flush) {
494-
awaitBusy(new Predicate<Object>() {
495-
@Override
496-
public boolean apply(Object o) {
497-
try {
498-
// we should have had just 1 merge, so last generation should be exact
499-
return store.readLastCommittedSegmentsInfo().getLastGeneration() == gen2;
500-
} catch (IOException e) {
501-
throw ExceptionsHelper.convertToRuntime(e);
502-
}
503-
}
504-
});
460+
// we should have had just 1 merge, so last generation should be exact
461+
assertEquals(gen2 + 1, store.readLastCommittedSegmentsInfo().getLastGeneration());
505462
}
506463

507464
engine.close();

src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ public void testUpdateThrottleSettings() {
215215

216216
// Optimize does a waitForMerges, which we must do to make sure all in-flight (throttled) merges finish:
217217
logger.info("test: optimize");
218-
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
218+
client().admin().indices().prepareOptimize("test").get();
219219
logger.info("test: optimize done");
220220

221221
// Record current throttling so far
@@ -253,7 +253,7 @@ public void testUpdateThrottleSettings() {
253253
// when ElasticsearchIntegrationTest.after tries to remove indices created by the test:
254254

255255
// Wait for merges to finish
256-
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
256+
client().admin().indices().prepareOptimize("test").get();
257257
flush();
258258

259259
logger.info("test: test done");
@@ -369,7 +369,7 @@ public void testUpdateMergeMaxThreadCount() {
369369
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
370370
)
371371
.get();
372-
372+
373373
// Make sure we log the change:
374374
assertTrue(mockAppender.sawUpdateMaxThreadCount);
375375

src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ public void throttleStats() throws Exception {
380380
// Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked"
381381
// when ElasticsearchIntegrationTest.after tries to remove indices created by the test:
382382
logger.info("test: now optimize");
383-
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
383+
client().admin().indices().prepareOptimize("test").get();
384384
flush();
385385
logger.info("test: test done");
386386
}
@@ -517,7 +517,7 @@ public void testMergeStats() {
517517
client().prepareIndex("test1", "type2", Integer.toString(i)).setSource("field", "value").execute().actionGet();
518518
client().admin().indices().prepareFlush().execute().actionGet();
519519
}
520-
client().admin().indices().prepareOptimize().setWaitForMerge(true).setMaxNumSegments(1).execute().actionGet();
520+
client().admin().indices().prepareOptimize().setMaxNumSegments(1).execute().actionGet();
521521
stats = client().admin().indices().prepareStats()
522522
.setMerge(true)
523523
.execute().actionGet();
@@ -544,7 +544,7 @@ public void testSegmentsStats() {
544544
assertThat(stats.getTotal().getSegments().getVersionMapMemoryInBytes(), greaterThan(0l));
545545

546546
client().admin().indices().prepareFlush().get();
547-
client().admin().indices().prepareOptimize().setWaitForMerge(true).setMaxNumSegments(1).execute().actionGet();
547+
client().admin().indices().prepareOptimize().setMaxNumSegments(1).execute().actionGet();
548548
stats = client().admin().indices().prepareStats().setSegments(true).get();
549549

550550
assertThat(stats.getTotal().getSegments(), notNullValue());

src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public boolean apply(Object o) {
157157
logger.info("--> Single index upgrade complete");
158158

159159
logger.info("--> Running upgrade on the rest of the indexes");
160-
runUpgrade(httpClient, null, "wait_for_completion", "true");
160+
runUpgrade(httpClient, null);
161161
logSegmentsState();
162162
logger.info("--> Full upgrade complete");
163163
assertUpgraded(httpClient, null);

src/test/java/org/elasticsearch/search/child/SimpleChildQuerySearchTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1929,7 +1929,7 @@ public void testParentChildCaching() throws Exception {
19291929
client().prepareIndex("test", "child", "c1").setParent("p1").setSource("c_field", "blue").get();
19301930
client().prepareIndex("test", "child", "c2").setParent("p1").setSource("c_field", "red").get();
19311931
client().prepareIndex("test", "child", "c3").setParent("p2").setSource("c_field", "red").get();
1932-
client().admin().indices().prepareOptimize("test").setFlush(true).setWaitForMerge(true).get();
1932+
client().admin().indices().prepareOptimize("test").setFlush(true).get();
19331933
client().prepareIndex("test", "parent", "p3").setSource("p_field", "p_value3").get();
19341934
client().prepareIndex("test", "parent", "p4").setSource("p_field", "p_value4").get();
19351935
client().prepareIndex("test", "child", "c4").setParent("p3").setSource("c_field", "green").get();

src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1414,7 +1414,7 @@ public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedExc
14141414
}
14151415
indexRandom(true, builders);
14161416
flushAndRefresh();
1417-
assertNoFailures(client().admin().indices().prepareOptimize("test").setFlush(true).setWaitForMerge(true).setMaxNumSegments(1).get());
1417+
assertNoFailures(client().admin().indices().prepareOptimize("test").setFlush(true).setMaxNumSegments(1).get());
14181418

14191419
CreateSnapshotResponse createSnapshotResponseFirst = client.admin().cluster().prepareCreateSnapshot("test-repo", "test").setWaitForCompletion(true).setIndices("test").get();
14201420
assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), greaterThan(0));

0 commit comments

Comments
 (0)