Skip to content

Commit

Permalink
Bulk API: Allow to control if its compressed or not using `action.bul…
Browse files Browse the repository at this point in the history
…k.compress` (defaults to true which is current behavior), closes elastic#1850.
  • Loading branch information
kimchy committed Apr 5, 2012
1 parent 65ddfe8 commit 48e7e4e
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 14 deletions.
3 changes: 2 additions & 1 deletion src/main/java/org/elasticsearch/action/GenericAction.java
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportRequestOptions;

/**
Expand Down Expand Up @@ -50,7 +51,7 @@ public String name() {
/**
* Optional request options for the action.
*/
public TransportRequestOptions options() {
public TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.EMPTY;
}

Expand Down
Expand Up @@ -22,27 +22,34 @@
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import static org.elasticsearch.action.support.PlainActionFuture.newFuture;

/**
* A generic proxy that will execute the given action against a specific node.
*/
public class TransportActionNodeProxy<Request extends ActionRequest, Response extends ActionResponse> {
public class TransportActionNodeProxy<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {

protected final TransportService transportService;

private final GenericAction<Request, Response> action;

private final TransportRequestOptions transportOptions;

@Inject
public TransportActionNodeProxy(GenericAction<Request, Response> action, TransportService transportService) {
public TransportActionNodeProxy(Settings settings, GenericAction<Request, Response> action, TransportService transportService) {
super(settings);
this.action = action;
this.transportService = transportService;
this.transportOptions = action.transportOptions(settings);
}

public ActionFuture<Response> execute(DiscoveryNode node, Request request) throws ElasticSearchException {
Expand All @@ -53,7 +60,7 @@ public ActionFuture<Response> execute(DiscoveryNode node, Request request) throw
}

public void execute(DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
transportService.sendRequest(node, action.name(), request, action.options(), new BaseTransportResponseHandler<Response>() {
transportService.sendRequest(node, action.name(), request, transportOptions, new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return action.newResponse();
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/org/elasticsearch/action/bulk/BulkAction.java
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.Action;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportRequestOptions;

/**
Expand All @@ -45,7 +46,10 @@ public BulkRequestBuilder newRequestBuilder(Client client) {
}

@Override
public TransportRequestOptions options() {
return TransportRequestOptions.options().withLowType().withCompress(true);
public TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.options()
.withType(TransportRequestOptions.Type.fromString(settings.get("action.bulk.transport.type", TransportRequestOptions.Type.LOW.toString())))
.withCompress(settings.getAsBoolean("action.bulk.compress", true)
);
}
}
Expand Up @@ -85,8 +85,7 @@ protected boolean checkWriteConsistency() {

@Override
protected TransportRequestOptions transportOptions() {
// low type since we don't want the large bulk requests to cause high latency on typical requests
return TransportRequestOptions.options().withCompress(true).withLowType();
return BulkAction.INSTANCE.transportOptions(settings);
}

@Override
Expand Down
Expand Up @@ -73,6 +73,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S

protected final WriteConsistencyLevel defaultWriteConsistencyLevel;

protected final TransportRequestOptions transportOptions;

final String transportAction;
final String transportReplicaAction;
final String executor;
Expand All @@ -95,6 +97,8 @@ protected TransportShardReplicationOperationAction(Settings settings, TransportS
transportService.registerHandler(transportAction, new OperationTransportHandler());
transportService.registerHandler(transportReplicaAction, new ReplicaOperationTransportHandler());

this.transportOptions = transportOptions();

this.defaultReplicationType = ReplicationType.fromString(settings.get("action.replication_type", "sync"));
this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
}
Expand Down Expand Up @@ -431,7 +435,7 @@ public void run() {
}
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
transportService.sendRequest(node, transportAction, request, transportOptions(), new BaseTransportResponseHandler<Response>() {
transportService.sendRequest(node, transportAction, request, transportOptions, new BaseTransportResponseHandler<Response>() {

@Override
public Response newInstance() {
Expand Down Expand Up @@ -625,7 +629,7 @@ void performOnReplica(final PrimaryResponse<Response, ReplicaRequest> response,
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId().id(), response.replicaRequest());
if (!nodeId.equals(nodes.localNodeId())) {
DiscoveryNode node = nodes.get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions(), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(VoidStreamable vResponse) {
finishIfPossible();
Expand Down
Expand Up @@ -59,7 +59,7 @@ public InternalTransportClient(Settings settings, ThreadPool threadPool, Transpo
MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<Action, TransportActionNodeProxy>();
for (GenericAction action : actions.values()) {
if (action instanceof Action) {
actionsBuilder.put((Action) action, new TransportActionNodeProxy(action, transportService));
actionsBuilder.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
}
}
this.actions = actionsBuilder.immutableMap();
Expand Down
Expand Up @@ -54,7 +54,7 @@ public InternalTransportClusterAdminClient(Settings settings, TransportClientNod
MapBuilder<ClusterAction, TransportActionNodeProxy> actionsBuilder = new MapBuilder<ClusterAction, TransportActionNodeProxy>();
for (GenericAction action : actions.values()) {
if (action instanceof ClusterAction) {
actionsBuilder.put((ClusterAction) action, new TransportActionNodeProxy(action, transportService));
actionsBuilder.put((ClusterAction) action, new TransportActionNodeProxy(settings, action, transportService));
}
}
this.actions = actionsBuilder.immutableMap();
Expand Down
Expand Up @@ -54,7 +54,7 @@ public InternalTransportIndicesAdminClient(Settings settings, TransportClientNod
MapBuilder<IndicesAction, TransportActionNodeProxy> actionsBuilder = new MapBuilder<IndicesAction, TransportActionNodeProxy>();
for (GenericAction action : actions.values()) {
if (action instanceof IndicesAction) {
actionsBuilder.put((IndicesAction) action, new TransportActionNodeProxy(action, transportService));
actionsBuilder.put((IndicesAction) action, new TransportActionNodeProxy(settings, action, transportService));
}
}
this.actions = actionsBuilder.immutableMap();
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport;

import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.unit.TimeValue;

/**
Expand All @@ -35,7 +36,19 @@ public static TransportRequestOptions options() {
public static enum Type {
LOW,
MED,
HIGH
HIGH;

public static Type fromString(String type) {
if ("low".equalsIgnoreCase(type)) {
return LOW;
} else if ("med".equalsIgnoreCase(type)) {
return MED;
} else if ("high".equalsIgnoreCase(type)) {
return HIGH;
} else {
throw new ElasticSearchIllegalArgumentException("failed to match transport type for [" + type + "]");
}
}
}

private TimeValue timeout;
Expand Down

0 comments on commit 48e7e4e

Please sign in to comment.