Skip to content

Commit

Permalink
add refresh option to index/create/delete opereation, REST allows for…
Browse files Browse the repository at this point in the history
… refresh parameter (defaults to false)
  • Loading branch information
kimchy committed Sep 26, 2010
1 parent 2288c5d commit ce28882
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 6 deletions.
Expand Up @@ -47,6 +47,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest {

private String type;
private String id;
private boolean refresh;

/**
* Constructs a new delete request against the specified index. The {@link #type(String)} and {@link #id(String)}
Expand Down Expand Up @@ -154,16 +155,32 @@ public DeleteRequest timeout(TimeValue timeout) {
return this;
}

/**
* Should a refresh be executed post this index operation causing the operation to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
* to <tt>false</tt>.
*/
public DeleteRequest refresh(boolean refresh) {
this.refresh = refresh;
return this;
}

public boolean refresh() {
return this.refresh;
}

@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readUTF();
id = in.readUTF();
refresh = in.readBoolean();
}

@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeUTF(type);
out.writeUTF(id);
out.writeBoolean(refresh);
}

@Override public String toString() {
Expand Down
Expand Up @@ -33,6 +33,8 @@
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -102,7 +104,10 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct

@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
indexShard(shardRequest).delete(request.type(), request.id());
IndexShard indexShard = indexShard(shardRequest);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id());
delete.refresh(request.refresh());
indexShard.delete(delete);
}

@Override protected ShardsIterator shards(ClusterState clusterState, DeleteRequest request) {
Expand Down
Expand Up @@ -113,6 +113,8 @@ public static OpType fromId(byte id) {

private OpType opType = OpType.INDEX;

private boolean refresh = false;

public IndexRequest() {
}

Expand Down Expand Up @@ -390,6 +392,20 @@ public OpType opType() {
return this.opType;
}

/**
* Should a refresh be executed post this index operation causing the operation to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
* to <tt>false</tt>.
*/
public IndexRequest refresh(boolean refresh) {
this.refresh = refresh;
return this;
}

public boolean refresh() {
return this.refresh;
}

@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readUTF();
Expand All @@ -404,6 +420,7 @@ public OpType opType() {
in.readFully(source);

opType = OpType.fromId(in.readByte());
refresh = in.readBoolean();
}

@Override public void writeTo(StreamOutput out) throws IOException {
Expand All @@ -418,6 +435,7 @@ public OpType opType() {
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
out.writeByte(opType.id());
out.writeBoolean(refresh);
}

@Override public String toString() {
Expand Down
Expand Up @@ -35,9 +35,11 @@
import org.elasticsearch.common.UUID;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -127,12 +129,17 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}

@Override protected IndexResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
IndexShard indexShard = indexShard(shardRequest);
final IndexRequest request = shardRequest.request;
ParsedDocument doc;
if (request.opType() == IndexRequest.OpType.INDEX) {
doc = indexShard(shardRequest).index(request.type(), request.id(), request.source());
Engine.Index index = indexShard.prepareIndex(request.type(), request.id(), request.source());
index.refresh(request.refresh());
doc = indexShard.index(index);
} else {
doc = indexShard(shardRequest).create(request.type(), request.id(), request.source());
Engine.Create create = indexShard(shardRequest).prepareCreate(request.type(), request.id(), request.source());
create.refresh(request.refresh());
doc = indexShard(shardRequest).create(create);
}
if (doc.mappersAdded()) {
updateMappingOnMaster(request);
Expand Down
Expand Up @@ -63,6 +63,16 @@ public DeleteRequestBuilder setId(String id) {
return this;
}

/**
* Should a refresh be executed post this index operation causing the operation to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
* to <tt>false</tt>.
*/
public DeleteRequestBuilder setRefresh(boolean refresh) {
request.refresh(refresh);
return this;
}

/**
* Should the listener be called on a separate thread if needed.
*/
Expand Down
Expand Up @@ -182,6 +182,16 @@ public IndexRequestBuilder setCreate(boolean create) {
return this;
}

/**
* Should a refresh be executed post this index operation causing the operation to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
* to <tt>false</tt>.
*/
public IndexRequestBuilder setRefresh(boolean refresh) {
request.refresh(refresh);
return this;
}

/**
* Set the replication type for this operation.
*/
Expand Down
Expand Up @@ -265,6 +265,7 @@ public Operation[] ops() {
static class Create implements Operation {
private final ParsedDocument doc;
private final Analyzer analyzer;
private boolean refresh;

public Create(ParsedDocument doc, Analyzer analyzer) {
this.doc = doc;
Expand Down Expand Up @@ -298,12 +299,21 @@ public Analyzer analyzer() {
public byte[] source() {
return this.doc.source();
}

public boolean refresh() {
return refresh;
}

public void refresh(boolean refresh) {
this.refresh = refresh;
}
}

static class Index implements Operation {
private final Term uid;
private final ParsedDocument doc;
private final Analyzer analyzer;
private boolean refresh;

public Index(Term uid, ParsedDocument doc, Analyzer analyzer) {
this.uid = uid;
Expand Down Expand Up @@ -342,10 +352,19 @@ public String type() {
public byte[] source() {
return this.doc.source();
}

public boolean refresh() {
return refresh;
}

public void refresh(boolean refresh) {
this.refresh = refresh;
}
}

static class Delete implements Operation {
private final Term uid;
private boolean refresh;

public Delete(Term uid) {
this.uid = uid;
Expand All @@ -358,6 +377,14 @@ public Delete(Term uid) {
public Term uid() {
return this.uid;
}

public boolean refresh() {
return refresh;
}

public void refresh(boolean refresh) {
this.refresh = refresh;
}
}

static class DeleteByQuery {
Expand Down
Expand Up @@ -244,6 +244,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
writer.addDocument(create.doc(), create.analyzer());
translog.add(new Translog.Create(create));
dirty = true;
if (create.refresh()) {
refresh(new Refresh(false));
}
} catch (IOException e) {
throw new CreateFailedEngineException(shardId, create, e);
} finally {
Expand All @@ -261,6 +264,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
writer.updateDocument(index.uid(), index.doc(), index.analyzer());
translog.add(new Translog.Index(index));
dirty = true;
if (index.refresh()) {
refresh(new Refresh(false));
}
} catch (IOException e) {
throw new IndexFailedEngineException(shardId, index, e);
} finally {
Expand All @@ -278,6 +284,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
writer.deleteDocuments(delete.uid());
translog.add(new Translog.Delete(delete));
dirty = true;
if (delete.refresh()) {
refresh(new Refresh(false));
}
} catch (IOException e) {
throw new DeleteFailedEngineException(shardId, delete, e);
} finally {
Expand Down
Expand Up @@ -36,7 +36,7 @@
import static org.elasticsearch.rest.RestResponse.Status.*;

/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class RestDeleteAction extends BaseRestHandler {

Expand All @@ -48,6 +48,7 @@ public class RestDeleteAction extends BaseRestHandler {
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id"));
deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT));
deleteRequest.refresh(request.paramAsBoolean("refresh", deleteRequest.refresh()));
// we just send a response, no need to fork
deleteRequest.listenerThreaded(false);
// we don't spawn, then fork if local
Expand Down
Expand Up @@ -36,7 +36,7 @@
import static org.elasticsearch.rest.RestResponse.Status.*;

/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class RestIndexAction extends BaseRestHandler {

Expand All @@ -60,6 +60,7 @@ final class CreateHandler implements RestHandler {
IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
indexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));
String sOpType = request.param("op_type");
if (sOpType != null) {
if ("index".equals(sOpType)) {
Expand Down
Expand Up @@ -111,7 +111,7 @@ protected Client getClient2() {
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));

logger.info("Indexing [type1/1]");
IndexResponse indexResponse = client1.prepareIndex().setIndex("test").setType("type1").setId("1").setSource(source("1", "test")).execute().actionGet();
IndexResponse indexResponse = client1.prepareIndex().setIndex("test").setType("type1").setId("1").setSource(source("1", "test")).setRefresh(true).execute().actionGet();
assertThat(indexResponse.index(), equalTo(getConcreteIndexName()));
assertThat(indexResponse.id(), equalTo("1"));
assertThat(indexResponse.type(), equalTo("type1"));
Expand Down

0 comments on commit ce28882

Please sign in to comment.