Skip to content

Commit

Permalink
support functions in QueryThenFetchTask
Browse files Browse the repository at this point in the history
removes LocalMerge with TopN projection from qtf-plans
  • Loading branch information
mfussenegger committed Nov 17, 2014
1 parent 988811a commit 908dc31
Show file tree
Hide file tree
Showing 19 changed files with 341 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private OutputContext(SearchContext searchContext, List<ReferenceInfo> partition

private static class OutputSymbolVisitor extends SymbolVisitor<OutputContext, Void> {

public void process(List<Symbol> outputs, OutputContext context) {
public void process(List<? extends Symbol> outputs, OutputContext context) {
for (Symbol output : outputs) {
process(output, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class QueryShardRequest extends ActionRequest<QueryShardRequest> {

private String index;
private Integer shard;
private List<Symbol> outputs;
private List<? extends Symbol> outputs;
private List<Symbol> orderBy;
private boolean[] reverseFlags;
private Boolean[] nullsFirst;
Expand All @@ -51,7 +51,7 @@ public QueryShardRequest() {}

public QueryShardRequest(String index,
int shard,
List<Symbol> outputs,
List<? extends Symbol> outputs,
List<Symbol> orderBy,
boolean[] reverseFlags,
Boolean[] nullsFirst,
Expand Down Expand Up @@ -84,10 +84,11 @@ public void readFrom(StreamInput in) throws IOException {
shard = in.readVInt();

int numOutputs = in.readVInt();
outputs = new ArrayList<>(numOutputs);
List<Symbol> outputs = new ArrayList<>(numOutputs);
for (int i = 0; i < numOutputs; i++) {
outputs.add(Symbol.fromStream(in));
}
this.outputs = outputs;

int numOrderBy = in.readVInt();
orderBy = new ArrayList<>(numOrderBy);
Expand Down Expand Up @@ -166,7 +167,7 @@ public int shardId() {
return shard;
}

public List<Symbol> outputs() {
public List<? extends Symbol> outputs() {
return outputs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ public Void visitMergeNode(MergeNode node, Job context) {
}

@Override
public Void visitESSearchNode(QueryThenFetchNode node, Job context) {
public Void visitQueryThenFetchNode(QueryThenFetchNode node, Job context) {
context.addTask(new QueryThenFetchTask(
functions,
node,
clusterService,
transportActionProvider.transportQueryShardAction(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.List;
import java.util.Map;

public abstract class ESFieldExtractor {
public abstract class ESFieldExtractor implements FieldExtractor<SearchHit> {

private static final Object NOT_FOUND = new Object();

Expand Down Expand Up @@ -105,13 +105,11 @@ Object toValue(@Nullable Map<String, Object> source) {
public static class PartitionedByColumnExtractor extends ESFieldExtractor {

private final Reference reference;
private final List<ReferenceInfo> partitionedByInfos;
private final int valueIdx;
private final Map<String, List<BytesRef>> cache;

public PartitionedByColumnExtractor(Reference reference, List<ReferenceInfo> partitionedByInfos) {
this.reference = reference;
this.partitionedByInfos = partitionedByInfos;
this.valueIdx = partitionedByInfos.indexOf(reference.info());
this.cache = new HashMap<>();
}
Expand All @@ -121,8 +119,7 @@ public Object extract(SearchHit hit) {
try {
List<BytesRef> values = cache.get(hit.index());
if (values == null) {
values = PartitionName
.fromStringSafe(hit.index()).values();
values = PartitionName.fromStringSafe(hit.index()).values();
}
BytesRef value = values.get(valueIdx);
if (value == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.crate.metadata.Functions;
import io.crate.metadata.ReferenceInfo;
import io.crate.metadata.Scalar;
import io.crate.operation.Input;
import io.crate.operation.ProjectorUpstream;
import io.crate.operation.projectors.FlatProjectorChain;
import io.crate.operation.projectors.ProjectionToProjectorVisitor;
Expand Down Expand Up @@ -323,30 +322,30 @@ public void upstreamResult(List<ListenableFuture<TaskResult>> result) {
getClass().getSimpleName()));
}

static FieldExtractor buildExtractor(final String field, final Context context) {
static FieldExtractor<GetResponse> buildExtractor(final String field, final Context context) {
if (field.equals("_version")) {
return new FieldExtractor() {
return new FieldExtractor<GetResponse>() {
@Override
public Object extract(GetResponse response) {
return response.getVersion();
}
};
} else if (field.equals("_id")) {
return new FieldExtractor() {
return new FieldExtractor<GetResponse>() {
@Override
public Object extract(GetResponse response) {
return response.getId();
}
};
} else if (context.partitionValues.containsKey(field)) {
return new FieldExtractor() {
return new FieldExtractor<GetResponse>() {
@Override
public Object extract(GetResponse response) {
return context.partitionValues.get(field);
}
};
} else {
return new FieldExtractor() {
return new FieldExtractor<GetResponse>() {
@Override
public Object extract(GetResponse response) {
assert response.getSourceAsMap() != null;
Expand Down Expand Up @@ -379,83 +378,38 @@ public String[] fields() {
}
}

static class Visitor extends SymbolVisitor<Context, FieldExtractor> {
static class Visitor extends SymbolVisitor<Context, FieldExtractor<GetResponse>> {

@Override
public FieldExtractor visitReference(Reference symbol, Context context) {
public FieldExtractor<GetResponse> visitReference(Reference symbol, Context context) {
String fieldName = symbol.info().ident().columnIdent().fqn();
context.addField(fieldName);
return buildExtractor(fieldName, context);
}

@Override
public FieldExtractor visitDynamicReference(DynamicReference symbol, Context context) {
public FieldExtractor<GetResponse> visitDynamicReference(DynamicReference symbol, Context context) {
return visitReference(symbol, context);
}

@Override
public FieldExtractor visitFunction(Function symbol, Context context) {
List<FieldExtractor> subExtractors = new ArrayList<>(symbol.arguments().size());
public FieldExtractor<GetResponse> visitFunction(Function symbol, Context context) {
List<FieldExtractor<GetResponse>> subExtractors = new ArrayList<>(symbol.arguments().size());
for (Symbol argument : symbol.arguments()) {
subExtractors.add(process(argument, context));
}
return new FunctionExtractor((Scalar) context.functions.getSafe(symbol.info().ident()), subExtractors);
return new FunctionExtractor<>((Scalar) context.functions.getSafe(symbol.info().ident()), subExtractors);
}

@Override
public FieldExtractor visitLiteral(Literal symbol, Context context) {
return new LiteralExtractor(symbol.value());
public FieldExtractor<GetResponse> visitLiteral(Literal symbol, Context context) {
return new LiteralExtractor<>(symbol.value());
}

@Override
protected FieldExtractor visitSymbol(Symbol symbol, Context context) {
protected FieldExtractor<GetResponse> visitSymbol(Symbol symbol, Context context) {
throw new UnsupportedOperationException(
SymbolFormatter.format("Get operation not supported with symbol %s in the result column list", symbol));
}
}

private interface FieldExtractor {
Object extract(GetResponse response);
}

private static class LiteralExtractor implements FieldExtractor {
private final Object literal;

private LiteralExtractor(Object literal) {
this.literal = literal;
}

@Override
public Object extract(GetResponse response) {
return literal;
}
}

private static class FunctionExtractor implements FieldExtractor {

private final Scalar scalar;
private final List<FieldExtractor> subExtractors;

public FunctionExtractor(Scalar scalar, List<FieldExtractor> subExtractors) {
this.scalar = scalar;
this.subExtractors = subExtractors;
}

@Override
public Object extract(final GetResponse response) {
Input[] inputs = new Input[subExtractors.size()];
int idx = 0;
for (final FieldExtractor subExtractor : subExtractors) {
inputs[idx] = new Input() {
@Override
public Object value() {
return subExtractor.extract(response);
}
};
idx++;
}
//noinspection unchecked
return scalar.evaluate(inputs);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport.task.elasticsearch;

public interface FieldExtractor<T> {

public Object extract(T value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport.task.elasticsearch;

import io.crate.metadata.Scalar;
import io.crate.operation.Input;

import java.util.List;

public class FunctionExtractor<T> implements FieldExtractor<T> {

private final Scalar scalar;
private final List<FieldExtractor<T>> subExtractors;

public FunctionExtractor(Scalar scalar, List<FieldExtractor<T>> subExtractors) {
this.scalar = scalar;
this.subExtractors = subExtractors;
}

@Override
public Object extract(final T response) {
Input[] inputs = new Input[subExtractors.size()];
int idx = 0;
for (final FieldExtractor<T> subExtractor : subExtractors) {
inputs[idx] = new Input() {
@Override
public Object value() {
return subExtractor.extract(response);
}
};
idx++;
}
//noinspection unchecked
return scalar.evaluate(inputs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport.task.elasticsearch;

public class LiteralExtractor<T> implements FieldExtractor<T> {

private final Object literal;

public LiteralExtractor(Object literal) {
this.literal = literal;
}

@Override
public Object extract(T response) {
return literal;
}
}

0 comments on commit 908dc31

Please sign in to comment.