Skip to content

Commit

Permalink
fixed multi routings in count and delete by query requests
Browse files Browse the repository at this point in the history
removed special transport actions for those and set the routing to null if any routing contains a comma, which can’t be handled by the ES delete and count requests because they require a string instead of a Set<String>
  • Loading branch information
dobe committed Feb 23, 2015
1 parent 60ae354 commit 4e7b710
Show file tree
Hide file tree
Showing 13 changed files with 75 additions and 394 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import org.elasticsearch.action.bulk.SymbolBasedTransportShardUpsertActionDelegateImpl;
import org.elasticsearch.action.bulk.TransportShardUpsertActionDelegate;
import org.elasticsearch.action.bulk.TransportShardUpsertActionDelegateImpl;
import org.elasticsearch.action.count.CrateTransportCountAction;
import org.elasticsearch.action.count.TransportCountAction;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.deletebyquery.CrateTransportDeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;
import org.elasticsearch.action.get.TransportGetAction;
import org.elasticsearch.action.get.TransportMultiGetAction;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -54,8 +54,8 @@ public class TransportActionProvider {
private final Provider<TransportPutIndexTemplateAction> transportPutIndexTemplateActionProvider;
private final Provider<TransportDeleteIndexTemplateAction> transportDeleteIndexTemplateActionProvider;
private final Provider<TransportClusterUpdateSettingsAction> transportClusterUpdateSettingsActionProvider;
private final Provider<CrateTransportCountAction> transportCountActionProvider;
private final Provider<CrateTransportDeleteByQueryAction> transportDeleteByQueryActionProvider;
private final Provider<TransportCountAction> transportCountActionProvider;
private final Provider<TransportDeleteByQueryAction> transportDeleteByQueryActionProvider;
private final Provider<TransportDeleteAction> transportDeleteActionProvider;

private final Provider<TransportGetAction> transportGetActionProvider;
Expand All @@ -75,8 +75,8 @@ public TransportActionProvider(Provider<TransportCollectNodeAction> transportCol
Provider<TransportPutIndexTemplateAction> transportPutIndexTemplateActionProvider,
Provider<TransportDeleteIndexTemplateAction> transportDeleteIndexTemplateActionProvider,
Provider<TransportClusterUpdateSettingsAction> transportClusterUpdateSettingsActionProvider,
Provider<CrateTransportCountAction> transportCountActionProvider,
Provider<CrateTransportDeleteByQueryAction> transportDeleteByQueryActionProvider,
Provider<TransportCountAction> transportCountActionProvider,
Provider<TransportDeleteByQueryAction> transportDeleteByQueryActionProvider,
Provider<TransportDeleteAction> transportDeleteActionProvider,
Provider<TransportGetAction> transportGetActionProvider,
Provider<TransportMultiGetAction> transportMultiGetActionProvider,
Expand Down Expand Up @@ -130,11 +130,11 @@ public TransportClusterUpdateSettingsAction transportClusterUpdateSettingsAction
return transportClusterUpdateSettingsActionProvider.get();
}

public CrateTransportCountAction transportCountAction() {
public TransportCountAction transportCountAction() {
return transportCountActionProvider.get();
}

public CrateTransportDeleteByQueryAction transportDeleteByQueryAction() {
public TransportDeleteByQueryAction transportDeleteByQueryAction() {
return transportDeleteByQueryActionProvider.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import com.google.common.util.concurrent.SettableFuture;
import io.crate.Constants;
import io.crate.exceptions.FailedShardsException;
import io.crate.executor.QueryResult;
import io.crate.executor.JobTask;
import io.crate.executor.QueryResult;
import io.crate.executor.TaskResult;
import io.crate.planner.node.dql.ESCountNode;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.count.CrateTransportCountAction;
import org.elasticsearch.action.count.TransportCountAction;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -41,14 +41,14 @@

public class ESCountTask extends JobTask {

private final CrateTransportCountAction transportCountAction;
private final TransportCountAction transportCountAction;
private final List<ListenableFuture<TaskResult>> results;
private CountRequest request;
private ActionListener<CountResponse> listener;
private final static ESQueryBuilder queryBuilder = new ESQueryBuilder();
private final static TaskResult ZERO_RESULT = new QueryResult(new Object[][] { new Object[] { 0L }});

public ESCountTask(UUID jobId, ESCountNode node, CrateTransportCountAction transportCountAction) {
public ESCountTask(UUID jobId, ESCountNode node, TransportCountAction transportCountAction) {
super(jobId);
this.transportCountAction = transportCountAction;
assert node != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@
import io.crate.executor.transport.task.AsyncChainedTask;
import io.crate.planner.node.dml.ESDeleteByQueryNode;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.deletebyquery.CrateTransportDeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;

import java.io.IOException;
import java.util.UUID;

public class ESDeleteByQueryTask extends AsyncChainedTask {

private final ESDeleteByQueryNode deleteByQueryNode;
private final CrateTransportDeleteByQueryAction transportDeleteByQueryAction;
private final TransportDeleteByQueryAction transportDeleteByQueryAction;
private final ESQueryBuilder queryBuilder;

public ESDeleteByQueryTask(UUID jobId,
ESDeleteByQueryNode deleteByQueryNode,
CrateTransportDeleteByQueryAction transportDeleteByQueryAction) {
TransportDeleteByQueryAction transportDeleteByQueryAction) {
super(jobId);
this.deleteByQueryNode = deleteByQueryNode;
this.transportDeleteByQueryAction = transportDeleteByQueryAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@

package io.crate.planner.node.dml;

import com.google.common.collect.Iterators;
import io.crate.analyze.WhereClause;
import io.crate.core.StringUtils;
import io.crate.planner.node.PlanNodeVisitor;
import io.crate.planner.symbol.StringValueSymbolVisitor;
import io.crate.planner.node.dql.ESDQLPlanNode;

public class ESDeleteByQueryNode extends DMLPlanNode {

Expand All @@ -37,14 +35,10 @@ public ESDeleteByQueryNode(String[] indices, WhereClause whereClause) {
assert whereClause != null;
this.indices = indices;
this.whereClause = whereClause;
if (whereClause.clusteredBy().isPresent()){
routing = StringUtils.ROUTING_JOINER.join(Iterators.transform(
whereClause.clusteredBy().get().iterator(), StringValueSymbolVisitor.PROCESS_FUNCTION));
} else {
this.routing = null;
}
this.routing = ESDQLPlanNode.noCommaStringRouting(whereClause.clusteredBy());
}


public String[] indices() {
return indices;
}
Expand Down
12 changes: 1 addition & 11 deletions sql/src/main/java/io/crate/planner/node/dql/ESCountNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@

package io.crate.planner.node.dql;

import com.google.common.collect.Iterators;
import io.crate.analyze.WhereClause;
import io.crate.core.StringUtils;
import io.crate.planner.node.PlanNodeVisitor;
import io.crate.planner.symbol.StringValueSymbolVisitor;
import io.crate.types.DataType;
import io.crate.types.LongType;

Expand All @@ -42,14 +39,7 @@ public class ESCountNode extends ESDQLPlanNode {
public ESCountNode(String[] indices, WhereClause whereClause) {
this.indices = indices;
this.whereClause = whereClause;
if (whereClause.clusteredBy().isPresent()){
routing = StringUtils.ROUTING_JOINER.join(Iterators.transform(
whereClause.clusteredBy().get().iterator(), StringValueSymbolVisitor.PROCESS_FUNCTION));
} else {
this.routing = null;
}


this.routing = noCommaStringRouting(whereClause.clusteredBy());
}

@Override
Expand Down
28 changes: 28 additions & 0 deletions sql/src/main/java/io/crate/planner/node/dql/ESDQLPlanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

package io.crate.planner.node.dql;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.crate.analyze.relations.PlannedAnalyzedRelation;
Expand All @@ -31,14 +32,19 @@
import io.crate.planner.Plan;
import io.crate.planner.projection.Projection;
import io.crate.planner.symbol.Field;
import io.crate.planner.symbol.Literal;
import io.crate.planner.symbol.StringValueSymbolVisitor;
import io.crate.planner.symbol.Symbol;
import io.crate.types.DataType;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;

public abstract class ESDQLPlanNode implements DQLPlanNode, PlannedAnalyzedRelation {

private static final char COMMA = ',';

protected List<Symbol> outputs;
private List<DataType> inputTypes;
private List<DataType> outputTypes;
Expand All @@ -47,6 +53,28 @@ public List<Symbol> outputs() {
return outputs;
}

@Nullable
public static String noCommaStringRouting(Optional<Set<Literal>> clusteredBy) {
if (clusteredBy.isPresent()){
StringBuilder sb = new StringBuilder();
boolean first = true;
for (Symbol symbol : clusteredBy.get()) {
String s = StringValueSymbolVisitor.INSTANCE.process(symbol);
if (s.indexOf(COMMA)>-1){
return null;
}
if (!first){
sb.append(COMMA);
} else {
first = false;
}
sb.append(s);
}
return sb.toString();
}
return null;
}

@Override
public boolean hasProjections() {
return false;
Expand Down
9 changes: 0 additions & 9 deletions sql/src/main/java/io/crate/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@
import io.crate.rest.action.RestSQLAction;
import io.crate.service.SQLService;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.count.CrateCountAction;
import org.elasticsearch.action.count.CrateTransportCountAction;
import org.elasticsearch.action.deletebyquery.CrateDeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.CrateTransportDeleteByQueryAction;
import org.elasticsearch.cluster.settings.ClusterDynamicSettingsModule;
import org.elasticsearch.cluster.settings.Validator;
import org.elasticsearch.common.component.LifecycleComponent;
Expand Down Expand Up @@ -184,10 +180,5 @@ public void onModule(ScriptModule scriptModule) {
public void onModule(ActionModule actionModule) {
actionModule.registerAction(SQLAction.INSTANCE, TransportSQLAction.class);
actionModule.registerAction(SQLBulkAction.INSTANCE, TransportSQLBulkAction.class);

// overridden classes fixing handling the routing parameter differently
// always interpret it as a single value, no multiple routing values allowed here
actionModule.registerAction(CrateCountAction.INSTANCE, CrateTransportCountAction.class);
actionModule.registerAction(CrateDeleteByQueryAction.INSTANCE, CrateTransportDeleteByQueryAction.class);
}
}

This file was deleted.

Loading

0 comments on commit 4e7b710

Please sign in to comment.