Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
0f8fa84
Create TransportReplicatedMutationAction
nik9000 Apr 22, 2016
8bdc415
Create ReplicatedMutationRequest
nik9000 Apr 22, 2016
0642083
Factor out common code from shardOperationOnReplica
nik9000 Apr 22, 2016
80119b9
Factor out common code in shardOperationOnPrimary
nik9000 Apr 22, 2016
0fc045b
Make performOnPrimary asyncS
nik9000 Apr 22, 2016
b42b8da
Don't finish early if the primary finishes early
nik9000 Apr 22, 2016
34b3789
Doc
nik9000 Apr 22, 2016
3d22b2d
Push the listener into shardOperationOnPrimary
nik9000 Apr 22, 2016
5b142dc
Cleanup
nik9000 Apr 22, 2016
52c5f7c
Add a listener to shard operations
nik9000 Apr 22, 2016
1f25cf3
Cleanup
nik9000 Apr 22, 2016
85033a8
Never reply to replica actions while you have the operation lock
nik9000 Apr 23, 2016
a2bc7f3
Return last written location from refresh
nik9000 Apr 23, 2016
213bebb
Add refresh listeners
nik9000 Apr 23, 2016
a5ffd89
WIP
nik9000 Apr 25, 2016
46c855c
Move test to own class
nik9000 Apr 25, 2016
247cb48
Basic block_until_refresh exposed to java client
nik9000 Apr 25, 2016
9919758
Oh boy that wasn't working
nik9000 Apr 25, 2016
611cbee
Move ReplicationResponse
nik9000 Apr 25, 2016
e445cb0
Javadoc
nik9000 Apr 25, 2016
2058f4a
Pass back information about whether we refreshed
nik9000 Apr 25, 2016
8d121bf
Cleanup listener implementation
nik9000 Apr 25, 2016
1f36966
Cleanup translog tests
nik9000 Apr 25, 2016
8a80cc7
Support for update
nik9000 Apr 25, 2016
bcfded1
Replace LinkedList and synchronized with LinkedTransferQueue
nik9000 Apr 25, 2016
bd53116
Don't try and set forced refresh on bulk items without a response
nik9000 Apr 25, 2016
8250343
Switch to estimated count
nik9000 Apr 25, 2016
0c9b047
REST
nik9000 Apr 26, 2016
e61b739
Trigger listeners even when there is no refresh
nik9000 Apr 26, 2016
6c43be8
Rename refresh setter and getter
nik9000 May 16, 2016
b971d6d
Docs for setForcedRefresh
nik9000 May 16, 2016
066da45
Remove RefreshListener interface
nik9000 May 16, 2016
df91cde
unused import
nik9000 May 16, 2016
d8926d5
Move refresh listeners into IndexShard
nik9000 May 17, 2016
179c27c
Move refresh listeners into their own class
nik9000 May 17, 2016
88171a8
Rename test
nik9000 May 17, 2016
3322e26
Increase default maximum number of listeners to 1000
nik9000 May 17, 2016
55596ea
Remove listener from shardOperationOnPrimary
nik9000 May 20, 2016
1ff50c2
Remove Translog#lastWriteLocation
nik9000 May 20, 2016
91149e0
Finally!
nik9000 May 20, 2016
b8cadce
Docs
nik9000 May 20, 2016
13807ad
Move async parts of replica operation outside of the lock
nik9000 May 21, 2016
87be7ea
Revert "Move async parts of replica operation outside of the lock"
nik9000 May 23, 2016
7056b96
Patch from boaz
nik9000 May 23, 2016
8eebaa8
Take boaz's changes to their logic conclusion and unbreak important s…
nik9000 May 24, 2016
dc28951
Javadocs and compromises
nik9000 May 24, 2016
15d948a
Better....
nik9000 May 24, 2016
4d8bf5d
explain
nik9000 May 24, 2016
957e9b7
/Consumer<Runnable>/Executor/
nik9000 May 24, 2016
7da36a4
More cleanup for RefreshListeners
nik9000 May 24, 2016
1ec71ee
s/LinkedTransferQueue/ArrayList/
nik9000 May 24, 2016
5d8eecd
Remove funky synchronization in AsyncReplicaAction
nik9000 May 24, 2016
da1e765
Reply with non-null
nik9000 May 24, 2016
04343a2
Javadoc
nik9000 May 24, 2016
b2704b8
Remove unused imports
nik9000 May 24, 2016
0d49d9c
Flip relationship between RefreshListeners and Engine
nik9000 May 24, 2016
87ab6e6
Shorten lock time in RefreshListeners
nik9000 May 31, 2016
2f579f8
Clean up registration of RefreshListeners
nik9000 May 31, 2016
bb27392
Remove duplication in WritePrimaryResult and WriteReplicaResult
nik9000 May 31, 2016
43ce50a
Delay translog sync and flush until after refresh
nik9000 May 31, 2016
5797d1b
Fix forced_refresh flag
nik9000 May 31, 2016
fb16d2f
Rewrite refresh docs
nik9000 May 31, 2016
8453fc4
Javadoc
nik9000 May 31, 2016
d2123b1
Make more stuff final
nik9000 May 31, 2016
058481a
Fix javadoc links
nik9000 May 31, 2016
2b771f8
Pull listener out into an inner class with javadoc and stuff
nik9000 May 31, 2016
74be148
Move funny ShardInfo hack for bulk into bulk
nik9000 May 31, 2016
6bb4e5c
Support null RefreshListeners in InternalEngine
nik9000 Jun 1, 2016
19606ec
Assert translog ordering
nik9000 Jun 1, 2016
4ffb7c0
Fire all refresh listeners in a single thread
nik9000 Jun 1, 2016
d523b57
Explain Integer.MAX_VALUE
nik9000 Jun 1, 2016
30f972b
Handle hanging documents
nik9000 Jun 1, 2016
b74cf3f
Preserve `?refresh` behavior
nik9000 Jun 1, 2016
788164b
S/ReplicatedWriteResponse/WriteResponse/
nik9000 Jun 2, 2016
00d09a9
Improve comment
nik9000 Jun 2, 2016
aeb1be3
Remove checkstyle suppression
nik9000 Jun 2, 2016
0cd67b9
Deprecate setRefresh(boolean)
nik9000 Jun 2, 2016
522ecb5
Document deprecation
nik9000 Jun 2, 2016
9e63ad6
Test for TransportWriteAction
nik9000 Jun 2, 2016
1c3e64a
Merge branch 'master' into block_until_refresh2
nik9000 Jun 2, 2016
9c9a1de
Breaking changes notes
nik9000 Jun 2, 2016
03975ac
Cleanup after merge from master
nik9000 Jun 2, 2016
c2bc365
Fix docs
nik9000 Jun 2, 2016
9b49a48
Patch from boaz
nik9000 Jun 6, 2016
777e23a
Replace static method that takes consumer with delegate class that ta…
nik9000 Jun 6, 2016
31f7861
Revert "Replace static method that takes consumer with delegate class…
nik9000 Jun 6, 2016
59a753b
Replace a method reference with implementing an interface
nik9000 Jun 6, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]Action.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ActionModule.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ActionRequestBuilder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ReplicationResponse.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]cluster[/\\]health[/\\]ClusterHealthRequestBuilder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]cluster[/\\]health[/\\]TransportClusterHealthAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]cluster[/\\]node[/\\]hotthreads[/\\]NodesHotThreadsRequestBuilder.java" checks="LineLength" />
Expand Down Expand Up @@ -101,7 +100,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]open[/\\]TransportOpenIndexAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]recovery[/\\]TransportRecoveryAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]refresh[/\\]TransportRefreshAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]refresh[/\\]TransportShardRefreshAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]segments[/\\]IndexSegments.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]segments[/\\]IndicesSegmentResponse.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]segments[/\\]IndicesSegmentsRequestBuilder.java" checks="LineLength" />
Expand Down
27 changes: 25 additions & 2 deletions core/src/main/java/org/elasticsearch/action/DocWriteResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
*/
package org.elasticsearch.action;

import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;

Expand All @@ -30,12 +35,13 @@
/**
* A base class for the response of a write operation that involves a single doc
*/
public abstract class DocWriteResponse extends ReplicationResponse implements StatusToXContent {
public abstract class DocWriteResponse extends ReplicationResponse implements WriteResponse, StatusToXContent {

private ShardId shardId;
private String id;
private String type;
private long version;
private boolean forcedRefresh;

public DocWriteResponse(ShardId shardId, String type, String id, long version) {
this.shardId = shardId;
Expand Down Expand Up @@ -84,6 +90,20 @@ public long getVersion() {
return this.version;
}

/**
* Did this request force a refresh? Requests that set {@link WriteRequest#setRefreshPolicy(RefreshPolicy)} to
* {@link RefreshPolicy#IMMEDIATE} will always return true for this. Requests that set it to {@link RefreshPolicy#WAIT_UNTIL} will
* only return true here if they run out of refresh listener slots (see {@link IndexSettings#MAX_REFRESH_LISTENERS_PER_SHARD}).
*/
public boolean forcedRefresh() {
return forcedRefresh;
}

@Override
public void setForcedRefresh(boolean forcedRefresh) {
this.forcedRefresh = forcedRefresh;
}

/** returns the rest status for this response (based on {@link ShardInfo#status()} */
public RestStatus status() {
return getShardInfo().status();
Expand All @@ -97,6 +117,7 @@ public void readFrom(StreamInput in) throws IOException {
type = in.readString();
id = in.readString();
version = in.readZLong();
forcedRefresh = in.readBoolean();
}

@Override
Expand All @@ -106,6 +127,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(type);
out.writeString(id);
out.writeZLong(version);
out.writeBoolean(forcedRefresh);
}

static final class Fields {
Expand All @@ -121,7 +143,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Fields._INDEX, shardId.getIndexName())
.field(Fields._TYPE, type)
.field(Fields._ID, id)
.field(Fields._VERSION, version);
.field(Fields._VERSION, version)
.field("forced_refresh", forcedRefresh);
shardInfo.toXContent(builder, params);
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

package org.elasticsearch.action.admin.indices.flush;

import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@

package org.elasticsearch.action.admin.indices.flush;

import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -55,18 +54,19 @@ protected ReplicationResponse newResponseInstance() {
}

@Override
protected Tuple<ReplicationResponse, ShardFlushRequest> shardOperationOnPrimary(ShardFlushRequest shardRequest) {
protected PrimaryResult shardOperationOnPrimary(ShardFlushRequest shardRequest) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bye bye tupple..

IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
indexShard.flush(shardRequest.getRequest());
logger.trace("{} flush request executed on primary", indexShard.shardId());
return new Tuple<>(new ReplicationResponse(), shardRequest);
return new PrimaryResult(shardRequest, new ReplicationResponse());
}

@Override
protected void shardOperationOnReplica(ShardFlushRequest request) {
protected ReplicaResult shardOperationOnReplica(ShardFlushRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
indexShard.flush(request.getRequest());
logger.trace("{} flush request executed on replica", indexShard.shardId());
return new ReplicaResult();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

package org.elasticsearch.action.admin.indices.refresh;

import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

package org.elasticsearch.action.admin.indices.refresh;

import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -36,19 +35,17 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

/**
*
*/
public class TransportShardRefreshAction extends TransportReplicationAction<BasicReplicationRequest, BasicReplicationRequest, ReplicationResponse> {
public class TransportShardRefreshAction
extends TransportReplicationAction<BasicReplicationRequest, BasicReplicationRequest, ReplicationResponse> {

public static final String NAME = RefreshAction.NAME + "[s]";

@Inject
public TransportShardRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
}

@Override
Expand All @@ -57,19 +54,20 @@ protected ReplicationResponse newResponseInstance() {
}

@Override
protected Tuple<ReplicationResponse, BasicReplicationRequest> shardOperationOnPrimary(BasicReplicationRequest shardRequest) {
protected PrimaryResult shardOperationOnPrimary(BasicReplicationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
indexShard.refresh("api");
logger.trace("{} refresh request executed on primary", indexShard.shardId());
return new Tuple<>(new ReplicationResponse(), shardRequest);
return new PrimaryResult(shardRequest, new ReplicationResponse());
}

@Override
protected void shardOperationOnReplica(BasicReplicationRequest request) {
protected ReplicaResult shardOperationOnReplica(BasicReplicationRequest request) {
final ShardId shardId = request.shardId();
IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
indexShard.refresh("api");
logger.trace("{} refresh request executed on replica", indexShard.shardId());
return new ReplicaResult();
}

@Override
Expand Down
38 changes: 20 additions & 18 deletions core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -54,16 +55,21 @@
* Note that we only support refresh on the bulk request not per item.
* @see org.elasticsearch.client.Client#bulk(BulkRequest)
*/
public class BulkRequest extends ActionRequest<BulkRequest> implements CompositeIndicesRequest {
public class BulkRequest extends ActionRequest<BulkRequest> implements CompositeIndicesRequest, WriteRequest<BulkRequest> {

private static final int REQUEST_OVERHEAD = 50;

/**
* Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and
* {@link WriteRequest}s to this but java doesn't support syntax to declare that everything in the array has both types so we declare
* the one with the least casts.
*/
final List<ActionRequest<?>> requests = new ArrayList<>();
List<Object> payloads = null;

protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private boolean refresh = false;
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;

private long sizeInBytes = 0;

Expand Down Expand Up @@ -437,18 +443,15 @@ public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
}

/**
* Should a refresh be executed post this bulk operation causing the operations to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
* to <tt>false</tt>.
*/
public BulkRequest refresh(boolean refresh) {
this.refresh = refresh;
@Override
public BulkRequest setRefreshPolicy(RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
return this;
}

public boolean refresh() {
return this.refresh;
@Override
public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}

/**
Expand Down Expand Up @@ -483,7 +486,7 @@ private int findNextMarker(byte marker, int from, BytesReference data, int lengt
* @return Whether this bulk request contains index request with an ingest pipeline enabled.
*/
public boolean hasIndexRequestsWithPipelines() {
for (ActionRequest actionRequest : requests) {
for (ActionRequest<?> actionRequest : requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
if (Strings.hasText(indexRequest.getPipeline())) {
Expand All @@ -503,10 +506,9 @@ public ActionRequestValidationException validate() {
}
for (ActionRequest<?> request : requests) {
// We first check if refresh has been set
if ((request instanceof DeleteRequest && ((DeleteRequest)request).refresh()) ||
(request instanceof UpdateRequest && ((UpdateRequest)request).refresh()) ||
(request instanceof IndexRequest && ((IndexRequest)request).refresh())) {
validationException = addValidationError("Refresh is not supported on an item request, set the refresh flag on the BulkRequest instead.", validationException);
if (((WriteRequest<?>) request).getRefreshPolicy() != RefreshPolicy.NONE) {
validationException = addValidationError(
"RefreshPolicy is not supported on an item request. Set it on the BulkRequest instead.", validationException);
}
ActionRequestValidationException ex = request.validate();
if (ex != null) {
Expand Down Expand Up @@ -541,7 +543,7 @@ public void readFrom(StreamInput in) throws IOException {
requests.add(request);
}
}
refresh = in.readBoolean();
refreshPolicy = RefreshPolicy.readFrom(in);
timeout = TimeValue.readTimeValue(in);
}

Expand All @@ -560,7 +562,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
request.writeTo(out);
}
out.writeBoolean(refresh);
refreshPolicy.writeTo(out);
timeout.writeTo(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.WriteRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
Expand All @@ -35,7 +36,8 @@
* A bulk request holds an ordered {@link IndexRequest}s and {@link DeleteRequest}s and allows to executes
* it in a single batch.
*/
public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkResponse, BulkRequestBuilder> {
public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkResponse, BulkRequestBuilder>
implements WriteRequestBuilder<BulkRequestBuilder> {

public BulkRequestBuilder(ElasticsearchClient client, BulkAction action) {
super(client, action, new BulkRequest());
Expand Down Expand Up @@ -116,16 +118,6 @@ public BulkRequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyL
return this;
}

/**
* Should a refresh be executed post this bulk operation causing the operations to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
* to <tt>false</tt>.
*/
public BulkRequestBuilder setRefresh(boolean refresh) {
request.refresh(refresh);
return this;
}

/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
Expand Down
Loading