Skip to content

Commit

Permalink
fixup! fixup! ensure sorting on a dynamic column in a partitioned tab…
Browse files Browse the repository at this point in the history
…le works in all cases
  • Loading branch information
msbt committed Mar 24, 2015
1 parent 7f99127 commit e5ba03c
Show file tree
Hide file tree
Showing 4 changed files with 403 additions and 291 deletions.
292 changes: 1 addition & 291 deletions sql/src/main/java/io/crate/action/sql/query/CrateSearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,20 @@

package io.crate.action.sql.query;

import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.crate.Constants;
import io.crate.core.StringUtils;
import io.crate.executor.transport.task.elasticsearch.SortOrder;
import io.crate.lucene.LuceneQueryBuilder;
import io.crate.metadata.ColumnIdent;
import io.crate.metadata.Functions;
import io.crate.metadata.ReferenceInfo;
import io.crate.metadata.doc.DocSysColumns;
import io.crate.operation.Input;
import io.crate.operation.collect.CollectInputSymbolVisitor;
import io.crate.operation.collect.EngineSearcher;
import io.crate.operation.reference.doc.lucene.CollectorContext;
import io.crate.operation.reference.doc.lucene.LuceneCollectorExpression;
import io.crate.operation.reference.doc.lucene.LuceneDocLevelReferenceResolver;
import io.crate.planner.symbol.*;
import io.crate.types.DataType;
import io.crate.types.DataTypes;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
Expand All @@ -54,8 +46,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
Expand All @@ -65,7 +55,6 @@
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.InternalSearchService;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.DfsPhase;
import org.elasticsearch.search.fetch.FetchPhase;
Expand All @@ -74,14 +63,11 @@
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.search.sort.SortParseElement;
import org.elasticsearch.threadpool.ThreadPool;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class CrateSearchService extends InternalSearchService {
Expand Down Expand Up @@ -322,20 +308,6 @@ protected Void visitSymbol(Symbol symbol, OutputContext context) {
}
}

private static final Map<DataType, SortField.Type> luceneTypeMap = ImmutableMap.<DataType, SortField.Type>builder()
.put(DataTypes.BOOLEAN, SortField.Type.STRING)
.put(DataTypes.BYTE, SortField.Type.LONG)
.put(DataTypes.SHORT, SortField.Type.LONG)
.put(DataTypes.LONG, SortField.Type.LONG)
.put(DataTypes.INTEGER, SortField.Type.LONG)
.put(DataTypes.FLOAT, SortField.Type.FLOAT)
.put(DataTypes.DOUBLE, SortField.Type.DOUBLE)
.put(DataTypes.TIMESTAMP, SortField.Type.LONG)
.put(DataTypes.IP, SortField.Type.LONG)
.put(DataTypes.STRING, SortField.Type.STRING)
.build();


@Nullable
private Sort generateLuceneSort(SearchContext context,
List<Symbol> symbols,
Expand All @@ -347,270 +319,8 @@ private Sort generateLuceneSort(SearchContext context,
CollectInputSymbolVisitor<LuceneCollectorExpression<?>> inputSymbolVisitor =
new CollectInputSymbolVisitor<>(functions, new LuceneDocLevelReferenceResolver(context.mapperService()));
SortSymbolVisitor sortSymbolVisitor = new SortSymbolVisitor(inputSymbolVisitor);
SortField[] sortFields = new SortField[symbols.size()];
for (int i = 0, symbolsSize = symbols.size(); i < symbolsSize; i++) {
sortFields[i] = sortSymbolVisitor.generateSortField(
symbols.get(i), new SortSymbolContext(context, reverseFlags[i], nullsFirst[i]));
}
SortField[] sortFields = sortSymbolVisitor.generateSortFields(symbols, context, reverseFlags, nullsFirst);
return new Sort(sortFields);
}

private static class SortSymbolContext {

private final boolean reverseFlag;
private final CollectorContext context;
private final Boolean nullFirst;

public SortSymbolContext(SearchContext searchContext, boolean reverseFlag, Boolean nullFirst) {
this.nullFirst = nullFirst;
this.context = new CollectorContext();
this.context.searchContext(searchContext);
this.reverseFlag = reverseFlag;
}
}

private static class SortSymbolVisitor extends SymbolVisitor<SortSymbolContext, SortField> {

private final CollectInputSymbolVisitor<LuceneCollectorExpression<?>> inputSymbolVisitor;

public SortSymbolVisitor(CollectInputSymbolVisitor<LuceneCollectorExpression<?>> inputSymbolVisitor) {
super();
this.inputSymbolVisitor = inputSymbolVisitor;
}

public SortField generateSortField(Symbol symbol, SortSymbolContext sortSymbolContext) {
return process(symbol, sortSymbolContext);
}


/**
* generate a SortField from a Reference symbol.
*
* the implementation is similar to what {@link org.elasticsearch.search.sort.SortParseElement}
* does.
*/
@Override
public SortField visitReference(final Reference symbol, final SortSymbolContext context) {
// can't use the SortField(fieldName, type) constructor
// because values are saved using docValues and therefore they're indexed in lucene as binary and not
// with the reference valueType.
// this is why we use a custom comparator source with the same logic as ES

ColumnIdent columnIdent = symbol.info().ident().columnIdent();
if (columnIdent.isColumn() && SortParseElement.SCORE_FIELD_NAME.equals(columnIdent.name())) {
return !context.reverseFlag ? SortParseElement.SORT_SCORE_REVERSE : SortParseElement.SORT_SCORE;
}

MultiValueMode sortMode = context.reverseFlag ? MultiValueMode.MAX : MultiValueMode.MIN;
SearchContext searchContext = context.context.searchContext();

String indexName;
IndexFieldData.XFieldComparatorSource fieldComparatorSource;
FieldMapper fieldMapper = context.context.searchContext().smartNameFieldMapper(columnIdent.fqn());
if (fieldMapper == null){
indexName = columnIdent.fqn();
fieldComparatorSource = new NullFieldComparatorSource(luceneTypeMap.get(symbol.valueType()), context.reverseFlag, context.nullFirst);
} else {
indexName = fieldMapper.names().indexName();
fieldComparatorSource = searchContext.fieldData()
.getForField(fieldMapper)
.comparatorSource(SortOrder.missing(context.reverseFlag, context.nullFirst), sortMode, null);
}
return new SortField(
indexName,
fieldComparatorSource,
context.reverseFlag
);
}

@Override
public SortField visitFunction(final Function function, final SortSymbolContext context) {
CollectInputSymbolVisitor.Context inputContext = inputSymbolVisitor.process(function);
ArrayList<Input<?>> inputs = inputContext.topLevelInputs();
assert inputs.size() == 1;
final Input functionInput = inputs.get(0);
@SuppressWarnings("unchecked")
final List<LuceneCollectorExpression> expressions = inputContext.docLevelExpressions();
final SortField.Type type = luceneTypeMap.get(function.valueType());
final SortField.Type reducedType = MoreObjects.firstNonNull(type, SortField.Type.DOC);

return new SortField(function.toString(), new IndexFieldData.XFieldComparatorSource() {
@Override
public FieldComparator<?> newComparator(String fieldName, int numHits, int sortPos, boolean reversed) throws IOException {
return new InputFieldComparator(
numHits,
context.context,
expressions,
functionInput,
function.valueType(),
type == null ? null : missingObject(SortOrder.missing(context.reverseFlag, context.nullFirst), reversed)
);
}

@Override
public SortField.Type reducedType() {
return reducedType;
}
}, context.reverseFlag);
}

@Override
protected SortField visitSymbol(Symbol symbol, SortSymbolContext context) {
throw new UnsupportedOperationException(
SymbolFormatter.format("sorting on %s is not supported", symbol));
}
}

static class NullFieldComparatorSource extends IndexFieldData.XFieldComparatorSource {

private final SortField.Type sortFieldType;
private final Object missingValue;

NullFieldComparatorSource(SortField.Type sortFieldType, boolean reversed, Boolean nullsFirst) {
this.sortFieldType = sortFieldType;
missingValue = missingObject(SortOrder.missing(reversed, nullsFirst), reversed);
}

@Override
public SortField.Type reducedType() {
return sortFieldType;
}

@Override
public FieldComparator<?> newComparator(String fieldname, int numHits, int sortPos, boolean reversed) throws IOException {
return new FieldComparator<Object>() {
@Override
public int compare(int slot1, int slot2) {
return 0;
}

@Override
public void setBottom(int slot) {

}

@Override
public void setTopValue(Object value) {

}

@Override
public int compareBottom(int doc) throws IOException {
return 0;
}

@Override
public int compareTop(int doc) throws IOException {
return 0;
}

@Override
public void copy(int slot, int doc) throws IOException {

}

@Override
public FieldComparator<Object> setNextReader(AtomicReaderContext context) throws IOException {
return this;
}

@Override
public Object value(int slot) {
return missingValue;
}
};
}
}

static class InputFieldComparator extends FieldComparator {

private final Object[] values;
private final Input input;
private final List<LuceneCollectorExpression> collectorExpressions;
private final Object missingValue;
private final DataType valueType;
private Object bottom;
private Object top;

public InputFieldComparator(int numHits,
CollectorContext context,
List<LuceneCollectorExpression> collectorExpressions,
Input input,
DataType valueType,
Object missingValue) {
this.collectorExpressions = collectorExpressions;
this.missingValue = missingValue;
for (int i = 0, collectorExpressionsSize = collectorExpressions.size(); i < collectorExpressionsSize; i++) {
LuceneCollectorExpression collectorExpression = collectorExpressions.get(i);
collectorExpression.startCollect(context);
}
this.valueType = valueType;
this.values = new Object[numHits];
this.input = input;
}

@Override
@SuppressWarnings("unchecked")
public int compare(int slot1, int slot2) {
return valueType.compareValueTo(values[slot1], values[slot2]);
}

@Override
public void setBottom(int slot) {
bottom = values[slot];
}

@Override
public void setTopValue(Object value) {
top = value;
}

@SuppressWarnings("unchecked")
@Override
public int compareBottom(int doc) throws IOException {
for (int i = 0, collectorExpressionsSize = collectorExpressions.size(); i < collectorExpressionsSize; i++) {
LuceneCollectorExpression collectorExpression = collectorExpressions.get(i);
collectorExpression.setNextDocId(doc);
}
return valueType.compareValueTo(bottom, MoreObjects.firstNonNull(input.value(), missingValue));
}

@SuppressWarnings("unchecked")
@Override
public int compareTop(int doc) throws IOException {
for (int i = 0, collectorExpressionsSize = collectorExpressions.size(); i < collectorExpressionsSize; i++) {
LuceneCollectorExpression collectorExpression = collectorExpressions.get(i);
collectorExpression.setNextDocId(doc);
}
return valueType.compareValueTo(top, MoreObjects.firstNonNull(input.value(), missingValue));
}

@Override
public void copy(int slot, int doc) throws IOException {
for (int i = 0, collectorExpressionsSize = collectorExpressions.size(); i < collectorExpressionsSize; i++) {
LuceneCollectorExpression collectorExpression = collectorExpressions.get(i);
collectorExpression.setNextDocId(doc);
}
Object value = input.value();
if (value == null) {
values[slot] = missingValue;
} else {
values[slot] = value;
}
}

@Override
public FieldComparator setNextReader(AtomicReaderContext context) throws IOException {
for (int i = 0, collectorExpressionsSize = collectorExpressions.size(); i < collectorExpressionsSize; i++) {
LuceneCollectorExpression collectorExpression = collectorExpressions.get(i);
collectorExpression.setNextReader(context);
}
return this;
}

@Override
public Object value(int slot) {
return values[slot];
}
}
}
Loading

0 comments on commit e5ba03c

Please sign in to comment.