Skip to content

Commit

Permalink
Flush API: remove refresh flag
Browse files Browse the repository at this point in the history
Refresh flag in flush is problematic, since the shards refresh is allowed to execute on is different compared to the flush shards. In order to do flush and then refresh, they should be executed as separate APIs when needed.
closes elastic#3689
  • Loading branch information
kimchy committed Sep 13, 2013
1 parent 62c0a9b commit 07f65dd
Show file tree
Hide file tree
Showing 13 changed files with 387 additions and 386 deletions.
2 changes: 0 additions & 2 deletions docs/reference/indices/flush.asciidoc
Expand Up @@ -21,8 +21,6 @@ The flush API accepts the following request parameters:
[cols="<,<",options="header",]
|=======================================================================
|Name |Description
|`refresh` |Should a refresh be performed after the flush. Defaults to
`false`.
|=======================================================================

[float]
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.flush;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -38,10 +39,7 @@
*/
public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {

private boolean refresh = false;

private boolean force = false;

private boolean full = false;

FlushRequest() {
Expand All @@ -56,21 +54,6 @@ public FlushRequest(String... indices) {
super(indices);
}

/**
* Should a refresh be performed once the flush is done. Defaults to <tt>false</tt>.
*/
public boolean refresh() {
return this.refresh;
}

/**
* Should a refresh be performed once the flush is done. Defaults to <tt>false</tt>.
*/
public FlushRequest refresh(boolean refresh) {
this.refresh = refresh;
return this;
}

/**
* Should a "full" flush be performed.
*/
Expand Down Expand Up @@ -104,15 +87,19 @@ public FlushRequest force(boolean force) {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(refresh);
if (out.getVersion().onOrBefore(Version.V_0_90_3)) {
out.writeBoolean(false); // refresh flag
}
out.writeBoolean(full);
out.writeBoolean(force);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
refresh = in.readBoolean();
if (in.getVersion().onOrBefore(Version.V_0_90_3)) {
in.readBoolean(); // refresh flag
}
full = in.readBoolean();
force = in.readBoolean();
}
Expand Down
Expand Up @@ -33,11 +33,6 @@ public FlushRequestBuilder(IndicesAdminClient indicesClient) {
super((InternalIndicesAdminClient) indicesClient, new FlushRequest());
}

public FlushRequestBuilder setRefresh(boolean refresh) {
request.refresh(refresh);
return this;
}

public FlushRequestBuilder setFull(boolean full) {
request.full(full);
return this;
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.flush;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -30,7 +31,6 @@
*/
class ShardFlushRequest extends BroadcastShardOperationRequest {

private boolean refresh;
private boolean full;
private boolean force;

Expand All @@ -39,15 +39,10 @@ class ShardFlushRequest extends BroadcastShardOperationRequest {

public ShardFlushRequest(String index, int shardId, FlushRequest request) {
super(index, shardId, request);
this.refresh = request.refresh();
this.full = request.full();
this.force = request.force();
}

public boolean refresh() {
return this.refresh;
}

public boolean full() {
return this.full;
}
Expand All @@ -59,15 +54,19 @@ public boolean force() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
refresh = in.readBoolean();
if (in.getVersion().onOrBefore(Version.V_0_90_3)) {
in.readBoolean(); // refresh flag
}
full = in.readBoolean();
force = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(refresh);
if (out.getVersion().onOrBefore(Version.V_0_90_3)) {
out.writeBoolean(false); // refresh flag
}
out.writeBoolean(full);
out.writeBoolean(force);
}
Expand Down
Expand Up @@ -111,7 +111,7 @@ protected ShardFlushResponse newShardResponse() {
@Override
protected ShardFlushResponse shardOperation(ShardFlushRequest request) throws ElasticSearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
indexShard.flush(new Engine.Flush().refresh(request.refresh()).type(request.full() ? Engine.Flush.Type.NEW_WRITER : Engine.Flush.Type.COMMIT_TRANSLOG).force(request.force()));
indexShard.flush(new Engine.Flush().type(request.full() ? Engine.Flush.Type.NEW_WRITER : Engine.Flush.Type.COMMIT_TRANSLOG).force(request.force()));
return new ShardFlushResponse(request.index(), request.shardId());
}

Expand Down
18 changes: 1 addition & 17 deletions src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -224,28 +224,12 @@ public static enum Type {
}

private Type type = Type.COMMIT_TRANSLOG;
private boolean refresh = false;
private boolean force = false;
/**
* Should the flush operation wait if there is an ongoing flush operation.
*/
private boolean waitIfOngoing = false;

/**
* Should a refresh be performed after flushing. Defaults to <tt>false</tt>.
*/
public boolean refresh() {
return this.refresh;
}

/**
* Should a refresh be performed after flushing. Defaults to <tt>false</tt>.
*/
public Flush refresh(boolean refresh) {
this.refresh = refresh;
return this;
}

public Type type() {
return this.type;
}
Expand Down Expand Up @@ -278,7 +262,7 @@ public Flush waitIfOngoing(boolean waitIfOngoing) {

@Override
public String toString() {
return "type[" + type + "], refresh[" + refresh + "], force[" + force + "]";
return "type[" + type + "], force[" + force + "]";
}
}

Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.engine;

import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;

/**
*
Expand All @@ -29,4 +30,9 @@ public class FlushNotAllowedEngineException extends EngineException {
public FlushNotAllowedEngineException(ShardId shardId, String msg) {
super(shardId, msg);
}

@Override
public RestStatus status() {
return RestStatus.SERVICE_UNAVAILABLE;
}
}
Expand Up @@ -67,7 +67,6 @@ public void handleRequest(final RestRequest request, final RestChannel channel)
operationThreading = BroadcastOperationThreading.THREAD_PER_SHARD;
}
flushRequest.operationThreading(operationThreading);
flushRequest.refresh(request.paramAsBoolean("refresh", flushRequest.refresh()));
flushRequest.full(request.paramAsBoolean("full", flushRequest.full()));
flushRequest.force(request.paramAsBoolean("force", flushRequest.force()));
client.admin().indices().flush(flushRequest, new ActionListener<FlushResponse>() {
Expand Down
25 changes: 21 additions & 4 deletions src/test/java/org/elasticsearch/AbstractSharedClusterTest.java
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.util.AbstractRandomizedTest.IntegrationTests;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.rest.RestStatus;
import org.junit.*;

import java.io.IOException;
Expand Down Expand Up @@ -318,10 +320,25 @@ protected RefreshResponse refresh() {
return actionGet;
}

protected void flushAndRefresh() {
flush(true);
refresh();
}

protected FlushResponse flush() {
return flush(true);
}

protected FlushResponse flush(boolean ignoreNotAllowed) {
waitForRelocation();
FlushResponse actionGet = client().admin().indices().prepareFlush().setRefresh(true).execute().actionGet();
assertNoFailures(actionGet);
FlushResponse actionGet = client().admin().indices().prepareFlush().execute().actionGet();
if (ignoreNotAllowed) {
for (ShardOperationFailedException failure : actionGet.getShardFailures()) {
assertThat("unexpected flush failure " + failure.reason(), failure.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
} else {
assertNoFailures(actionGet);
}
return actionGet;
}

Expand Down Expand Up @@ -386,15 +403,15 @@ public void indexRandom(String index, boolean forceRefresh, IndexRequestBuilder.
} else if (rarely()) {
client().admin().indices().prepareFlush(index).execute().get();
} else if (rarely()) {
client().admin().indices().prepareOptimize(index).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute().get();
client().admin().indices().prepareOptimize(index).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute().get();
}
}
}
if (forceRefresh) {
assertNoFailures(client().admin().indices().prepareRefresh(index).execute().get());
}
}

public void clearScroll(String... scrollIds) {
ClearScrollResponse clearResponse = client().prepareClearScroll()
.setScrollIds(Arrays.asList(scrollIds)).get();
Expand Down
44 changes: 25 additions & 19 deletions src/test/java/org/elasticsearch/nested/SimpleNestedTests.java
Expand Up @@ -58,17 +58,17 @@ public class SimpleNestedTests extends AbstractSharedClusterTest {
public void simpleNested() throws Exception {
XContentBuilder builder = jsonBuilder().
startObject().
field("type1").
startObject().
field("properties").
startObject().
field("nested1").
startObject().
field("type").
value("nested").
endObject().
endObject().
endObject().
field("type1").
startObject().
field("properties").
startObject().
field("nested1").
startObject().
field("type").
value("nested").
endObject().
endObject().
endObject().
endObject();
ElasticsearchAssertions.assertAcked(prepareCreate("test").addMapping("type1", builder));
ensureGreen();
Expand Down Expand Up @@ -226,7 +226,8 @@ private void simpleNestedDeleteByQuery(int total, int docToDelete) throws Except
assertThat(statusResponse.getIndex("test").getDocs().getNumDocs(), equalTo(total * 3l));

client().prepareDeleteByQuery("test").setQuery(QueryBuilders.idsQuery("type1").ids(Integer.toString(docToDelete))).execute().actionGet();
client().admin().indices().prepareFlush().setRefresh(true).execute().actionGet();
flush();
refresh();
statusResponse = client().admin().indices().prepareStatus().execute().actionGet();
assertThat(statusResponse.getIndex("test").getDocs().getNumDocs(), equalTo((total * 3l) - 3));

Expand Down Expand Up @@ -272,12 +273,15 @@ private void noChildrenNestedDeleteByQuery(long total, int docToDelete) throws E
}


client().admin().indices().prepareFlush().setRefresh(true).execute().actionGet();
flush();
refresh();

IndicesStatusResponse statusResponse = client().admin().indices().prepareStatus().execute().actionGet();
assertThat(statusResponse.getIndex("test").getDocs().getNumDocs(), equalTo(total));

client().prepareDeleteByQuery("test").setQuery(QueryBuilders.idsQuery("type1").ids(Integer.toString(docToDelete))).execute().actionGet();
client().admin().indices().prepareFlush().setRefresh(true).execute().actionGet();
flush();
refresh();
statusResponse = client().admin().indices().prepareStatus().execute().actionGet();
assertThat(statusResponse.getIndex("test").getDocs().getNumDocs(), equalTo((total) - 1));

Expand Down Expand Up @@ -407,8 +411,8 @@ private void testFacets(int numberOfShards) throws Exception {
.addFacet(FacetBuilders.termsStatsFacet("facet1").keyField("nested1.nested2.field2_1").valueField("nested1.nested2.field2_2").nested("nested1.nested2"))
.addFacet(FacetBuilders.statisticalFacet("facet2").field("field2_2").nested("nested1.nested2"))
.addFacet(FacetBuilders.statisticalFacet("facet2_blue").field("field2_2").nested("nested1.nested2")
.facetFilter(boolFilter().must(termFilter("field2_1", "blue"))))
.execute().actionGet();
.facetFilter(boolFilter().must(termFilter("field2_1", "blue"))))
.execute().actionGet();

assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(2l));
Expand Down Expand Up @@ -538,12 +542,14 @@ public void testDeleteNestedDocsWithAlias() throws Exception {
.endArray()
.endObject()).execute().actionGet();

client().admin().indices().prepareFlush().setRefresh(true).execute().actionGet();
flush();
refresh();
IndicesStatusResponse statusResponse = client().admin().indices().prepareStatus().execute().actionGet();
assertThat(statusResponse.getIndex("test").getDocs().getNumDocs(), equalTo(6l));

client().prepareDeleteByQuery("alias1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
client().admin().indices().prepareFlush().setRefresh(true).execute().actionGet();
flush();
refresh();
statusResponse = client().admin().indices().prepareStatus().execute().actionGet();

// This must be 3, otherwise child docs aren't deleted.
Expand Down Expand Up @@ -604,7 +610,7 @@ public void testSimpleNestedSorting() throws Exception {
.setSettings(settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.referesh_interval", -1)
.put("index.refresh_interval", -1)
.build()
)
.addMapping("type1", jsonBuilder().startObject().startObject("type1").startObject("properties")
Expand Down

0 comments on commit 07f65dd

Please sign in to comment.