Skip to content

Commit

Permalink
add delete-by-query context & kill support
Browse files Browse the repository at this point in the history
also delete-by-query node supports list of where
clauses, so planner will create only one node 
instead of a list of nodes
  • Loading branch information
seut committed May 12, 2015
1 parent dade831 commit d7d7ece
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ public ImmutableList<Task> visitESDeleteByQueryNode(ESDeleteByQueryNode node, UU
return singleTask(new ESDeleteByQueryTask(
jobId,
node,
transportActionProvider.transportDeleteByQueryAction()));
transportActionProvider.transportDeleteByQueryAction(),
jobContextService));
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,100 @@

package io.crate.executor.transport.task.elasticsearch;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.crate.analyze.WhereClause;
import io.crate.executor.JobTask;
import io.crate.executor.TaskResult;
import io.crate.executor.transport.task.AsyncChainedTask;
import io.crate.jobs.ESJobContext;
import io.crate.jobs.JobContextService;
import io.crate.jobs.JobExecutionContext;
import io.crate.planner.node.dml.ESDeleteByQueryNode;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.UUID;

public class ESDeleteByQueryTask extends AsyncChainedTask {
public class ESDeleteByQueryTask extends JobTask {

private final ESDeleteByQueryNode deleteByQueryNode;
private final TransportDeleteByQueryAction transportDeleteByQueryAction;
private final ESQueryBuilder queryBuilder;
private final static ESQueryBuilder QUERY_BUILDER = new ESQueryBuilder();

private final List<ListenableFuture<TaskResult>> resultList;
private final ESJobContext context;

public ESDeleteByQueryTask(UUID jobId,
ESDeleteByQueryNode deleteByQueryNode,
TransportDeleteByQueryAction transportDeleteByQueryAction) {
ESDeleteByQueryNode node,
TransportDeleteByQueryAction transport,
JobContextService jobContextService) {
super(jobId);
this.deleteByQueryNode = deleteByQueryNode;
this.transportDeleteByQueryAction = transportDeleteByQueryAction;
this.queryBuilder = new ESQueryBuilder();
resultList = new ArrayList<>(node.whereClauses().size());
List<DeleteByQueryRequest> requests = new ArrayList<>(node.whereClauses().size());
List<ActionListener> listeners = new ArrayList<>(node.whereClauses().size());

for (int i = 0; i < node.whereClauses().size(); i++) {
DeleteByQueryRequest request = new DeleteByQueryRequest();
SettableFuture<TaskResult> result = SettableFuture.create();
String[] indices = node.indices().get(i);
WhereClause whereClause = node.whereClauses().get(i);
String routing = node.routings().get(i);
try {
request.source(QUERY_BUILDER.convert(whereClause), false);
request.indices(indices);
if (whereClause.clusteredBy().isPresent()){
request.routing(routing);
}
} catch (IOException e) {
result.setException(e);
}
resultList.add(result);
requests.add(request);
listeners.add(new Listener(result));
}

JobExecutionContext.Builder contextBuilder = jobContextService.newBuilder(jobId());
context = new ESJobContext(requests, listeners, resultList, transport);
contextBuilder.addSubContext(node.executionNodeId(), context);
jobContextService.createContext(contextBuilder);
}

@Override
public void start() {
final DeleteByQueryRequest request = new DeleteByQueryRequest();
context.start();
}

try {
request.source(queryBuilder.convert(deleteByQueryNode), false);
request.indices(deleteByQueryNode.indices());
if (deleteByQueryNode.whereClause().clusteredBy().isPresent()){
request.routing(deleteByQueryNode.routing());
}
@Override
public List<? extends ListenableFuture<TaskResult>> result() {
return resultList;
}

transportDeleteByQueryAction.execute(request, new ActionListener<DeleteByQueryResponse>() {
@Override
public void onResponse(DeleteByQueryResponse deleteByQueryResponses) {
result.set(TaskResult.ROW_COUNT_UNKNOWN);
}
@Override
public void upstreamResult(List<? extends ListenableFuture<TaskResult>> result) {
throw new UnsupportedOperationException(
String.format(Locale.ENGLISH, "upstreamResult not supported on %s",
getClass().getSimpleName()));
}

@Override
public void onFailure(Throwable e) {
result.setException(e);
}
});
} catch (IOException e) {
static class Listener implements ActionListener<DeleteByQueryResponse> {

protected final SettableFuture<TaskResult> result;

public Listener(SettableFuture<TaskResult> result) {
this.result = result;
}

@Override
public void onResponse(DeleteByQueryResponse indexDeleteByQueryResponses) {
result.set(TaskResult.ROW_COUNT_UNKNOWN);
}

@Override
public void onFailure(Throwable e) {
result.setException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public ESDeleteTask(UUID jobId,
jobContextService.createContext(contextBuilder);
}

@Override
public void start() {
context.start();
}

@Override
public List<? extends ListenableFuture<TaskResult>> result() {
return resultList;
Expand All @@ -87,11 +92,6 @@ public void upstreamResult(List<? extends ListenableFuture<TaskResult>> result)
getClass().getSimpleName()));
}

@Override
public void start() {
context.start();
}

static class DeleteResponseListener implements ActionListener<DeleteResponse> {

private final SettableFuture<TaskResult> result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,22 +137,6 @@ public BytesReference convert(WhereClause whereClause) throws IOException {
return context.builder.bytes();
}

/**
* use to create a full elasticsearch query "statement" used by deleteByQuery actions.
*/
public BytesReference convert(ESDeleteByQueryNode node) throws IOException {
assert node != null;

Context context = new Context();
context.builder = XContentFactory.jsonBuilder().startObject();
XContentBuilder builder = context.builder;

whereClause(context, node.whereClause());

builder.endObject();
return builder.bytes();
}

static class Context {
XContentBuilder builder;
final Map<String, Object> ignoredFields = new HashMap<>();
Expand Down
44 changes: 27 additions & 17 deletions sql/src/main/java/io/crate/planner/Planner.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,22 @@ protected Plan visitUpdateStatement(UpdateAnalyzedStatement statement, Context c
protected Plan visitDeleteStatement(DeleteAnalyzedStatement analyzedStatement, Context context) {
IterablePlan plan = new IterablePlan();
TableRelation tableRelation = analyzedStatement.analyzedRelation();
List<WhereClause> whereClauses = new ArrayList<>(analyzedStatement.whereClauses().size());
List<DocKeys.DocKey> docKeys = new ArrayList<>(analyzedStatement.whereClauses().size());
boolean deleteById = false;
for (WhereClause whereClause : analyzedStatement.whereClauses()) {
if (whereClause.noMatch()) {
continue;
}
if (whereClause.docKeys().isPresent() && whereClause.docKeys().get().size() == 1) {
deleteById = true;
docKeys.add(whereClause.docKeys().get().getOnlyKey());
//createESDeleteNode(tableRelation.tableInfo(), whereClause, plan, context);
} else {
createESDeleteByQueryNode(tableRelation.tableInfo(), whereClause, plan);
} else if (!whereClause.noMatch()) {
whereClauses.add(whereClause);
}
}
if (deleteById) {
if (!docKeys.isEmpty()) {
plan.add(new ESDeleteNode(context.nextExecutionNodeId(), tableRelation.tableInfo(), docKeys));
} else if (!whereClauses.isEmpty()) {
createESDeleteByQueryNode(tableRelation.tableInfo(), whereClauses, plan, context);
}

if (plan.isEmpty()) {
Expand Down Expand Up @@ -484,20 +484,30 @@ public Plan visitKillAnalyzedStatement(KillAnalyzedStatement analysis, Context c
return KillPlan.INSTANCE;
}

private void createESDeleteByQueryNode(TableInfo tableInfo, WhereClause whereClause, IterablePlan plan) {
String[] indices = indices(tableInfo, whereClause);
if (indices.length > 0 && !whereClause.noMatch()) {
if (!whereClause.hasQuery() && tableInfo.isPartitioned()) {
for (String index : indices) {
plan.add(new ESDeleteIndexNode(index, true));
private void createESDeleteByQueryNode(TableInfo tableInfo,
List<WhereClause> whereClauses,
IterablePlan plan,
Context context) {

List<String[]> indicesList = new ArrayList<>(whereClauses.size());
for (WhereClause whereClause : whereClauses) {
String[] indices = indices(tableInfo, whereClauses.get(0));
if (indices.length > 0) {
if (!whereClause.hasQuery() && tableInfo.isPartitioned()) {
for (String index : indices) {
plan.add(new ESDeleteIndexNode(index, true));
}
} else {
indicesList.add(indices);
}
} else {
// TODO: if we allow queries like 'partitionColumn=X or column=Y' which is currently
// forbidden through analysis, we must issue deleteByQuery request in addition
// to above deleteIndex request(s)
plan.add(new ESDeleteByQueryNode(indices, whereClause));
}
}
// TODO: if we allow queries like 'partitionColumn=X or column=Y' which is currently
// forbidden through analysis, we must issue deleteByQuery request in addition
// to above deleteIndex request(s)
if (!indicesList.isEmpty()) {
plan.add(new ESDeleteByQueryNode(context.nextExecutionNodeId(), indicesList, whereClauses));
}
}

private Upsert processInsertStatement(InsertFromValuesAnalyzedStatement analysis, Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,43 @@
import io.crate.planner.node.PlanNodeVisitor;
import io.crate.planner.node.dql.ESDQLPlanNode;

import java.util.ArrayList;
import java.util.List;

public class ESDeleteByQueryNode extends DMLPlanNode {

private final String[] indices;
private final WhereClause whereClause;
private final String routing;
private final int executionNodeId;
private final List<String[]> indices;
private final List<WhereClause> whereClauses;
private final List<String> routings;

public ESDeleteByQueryNode(String[] indices, WhereClause whereClause) {
assert whereClause != null;
public ESDeleteByQueryNode(int executionNodeId,
List<String[]> indices,
List<WhereClause> whereClauses) {
assert whereClauses.size() > 0;
this.executionNodeId = executionNodeId;
this.indices = indices;
this.whereClause = whereClause;
this.routing = ESDQLPlanNode.noCommaStringRouting(whereClause.clusteredBy());
this.whereClauses = whereClauses;
this.routings = new ArrayList<>(whereClauses.size());
for (WhereClause whereClause : whereClauses) {
routings.add(ESDQLPlanNode.noCommaStringRouting(whereClause.clusteredBy()));
}
}

public int executionNodeId() {
return executionNodeId;
}

public String[] indices() {
public List<String[]> indices() {
return indices;
}

public String routing() {
return routing;
public List<String> routings() {
return routings;
}

public WhereClause whereClause() {
return whereClause;
public List<WhereClause> whereClauses() {
return whereClauses;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,9 @@ public void testESDeleteByQueryTask() throws Exception {
Arrays.<Symbol>asList(idRef, Literal.newLiteral(2)));

ESDeleteByQueryNode node = new ESDeleteByQueryNode(
new String[]{"characters"},
new WhereClause(whereClause));
1,
ImmutableList.of(new String[]{"characters"}),
ImmutableList.of(new WhereClause(whereClause)));
Plan plan = new IterablePlan(node);
Job job = executor.newJob(plan);
ESDeleteByQueryTask task = (ESDeleteByQueryTask) job.tasks().get(0);
Expand Down
Loading

0 comments on commit d7d7ece

Please sign in to comment.