Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Jan 12, 2015
1 parent 9a5d23d commit 85e40ab
Show file tree
Hide file tree
Showing 23 changed files with 407 additions and 879 deletions.
19 changes: 13 additions & 6 deletions core/src/main/java/io/crate/types/DataTypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.ImmutableSet;
import io.crate.TimestampFormat;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
Expand Down Expand Up @@ -85,7 +86,7 @@ public class DataTypes {
LONG
);

public static final ImmutableMap<Integer, DataTypeFactory> typeRegistry = ImmutableMap.<Integer, DataTypeFactory>builder()
public static final Map<Integer, DataTypeFactory> TYPE_REGISTRY = new MapBuilder<Integer, DataTypeFactory>()
.put(UndefinedType.ID, UNDEFINED)
.put(NotSupportedType.ID, NOT_SUPPORTED)
.put(ByteType.ID, BYTE)
Expand Down Expand Up @@ -122,7 +123,7 @@ public DataType<?> create() {
public DataType<?> create(DataType innerType) {
return new SetType(innerType);
}
}).build();
}).map();

private static final Set<DataType> NUMBER_CONVERSIONS = ImmutableSet.<DataType>builder()
.addAll(NUMERIC_PRIMITIVE_TYPES)
Expand Down Expand Up @@ -156,11 +157,11 @@ public static boolean isCollectionType(DataType type) {
public static DataType fromStream(StreamInput in) throws IOException {
int i = in.readVInt();
try {
DataType type = typeRegistry.get(i).create();
DataType type = TYPE_REGISTRY.get(i).create();
type.readFrom(in);
return type;
} catch (NullPointerException e) {
logger.error(String.format(Locale.ENGLISH, "%d is missing in typeRegistry", i), e);
logger.error(String.format(Locale.ENGLISH, "%d is missing in TYPE_REGISTRY", i), e);
throw e;
}
}
Expand Down Expand Up @@ -263,9 +264,15 @@ public static DataType ofJsonObject(Object type) {
if (type instanceof List) {
int idCollectionType = (Integer) ((List) type).get(0);
int idInnerType = (Integer) ((List) type).get(1);
return ((CollectionTypeFactory) typeRegistry.get(idCollectionType)).create(ofJsonObject(idInnerType));
return ((CollectionTypeFactory) TYPE_REGISTRY.get(idCollectionType)).create(ofJsonObject(idInnerType));
}
assert type instanceof Integer;
return typeRegistry.get(type).create();
return TYPE_REGISTRY.get(type).create();
}

public static void register(int id, DataTypeFactory dataTypeFactory) {
if (TYPE_REGISTRY.put(id, dataTypeFactory) != null) {
throw new IllegalArgumentException("Already got a dataType with id " + id);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,38 @@
import io.crate.operation.Input;
import io.crate.planner.symbol.Function;
import io.crate.planner.symbol.Symbol;
import io.crate.types.DataType;
import org.elasticsearch.common.breaker.CircuitBreakingException;

public abstract class AggregationFunction<T extends AggregationState> implements FunctionImplementation<Function> {
public abstract class AggregationFunction<TPartial, TFinal> implements FunctionImplementation<Function> {

/**
* Apply the columnValue to the argument AggState using the logic in this AggFunction
*
* @param ramAccountingContext RamAccountingContext to account for additional memory usage if the state grows in size
* @param state the aggregation state for the iteration
* @param args the arguments according to FunctionInfo.argumentTypes
* @return false if we do not need any further iteration for this state
* @return changed state
*/
public abstract boolean iterate(T state, Input... args) throws CircuitBreakingException;

public abstract TPartial iterate(RamAccountingContext ramAccountingContext, TPartial state, Input... args)
throws CircuitBreakingException;

/**
* Creates a new state for this aggregation
*
* @return a new state instance
*/
public abstract T newState(RamAccountingContext ramAccountingContext);
public abstract TPartial newState(RamAccountingContext ramAccountingContext);


@Override
public Symbol normalizeSymbol(Function symbol) {
return symbol;
}

public abstract DataType partialType();

public abstract TPartial reduce(TPartial state1, TPartial state2);

public abstract TFinal terminatePartial(TPartial state);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,22 @@

import io.crate.breaker.RamAccountingContext;
import io.crate.operation.Input;
import io.crate.operation.collect.RowCollector;
import io.crate.planner.symbol.Aggregation;

import java.util.Locale;

public class AggregationCollector implements RowCollector {
/**
* A wrapper around an AggregationFunction that is aware of the aggregation steps (iter, partial, final)
* and will call the collect functions on the aggregationFunction depending on these steps.
*/
public class Aggregator {

private final Input[] inputs;
private final AggregationFunction aggregationFunction;
private final FromImpl fromImpl;
private final ToImpl toImpl;

private AggregationState aggregationState;

public AggregationCollector(Aggregation a, AggregationFunction aggregationFunction, Input... inputs) {
public Aggregator(Aggregation a, AggregationFunction aggregationFunction, Input... inputs) {
if (a.fromStep() == Aggregation.Step.PARTIAL && inputs.length > 1) {
throw new UnsupportedOperationException("Aggregation from PARTIAL is only allowed with one input.");
}
Expand Down Expand Up @@ -73,72 +74,63 @@ public AggregationCollector(Aggregation a, AggregationFunction aggregationFuncti
}


public boolean startCollect(RamAccountingContext ramAccountingContext) {
aggregationState = fromImpl.startCollect(ramAccountingContext);
return true;
public Object prepareState(RamAccountingContext ramAccountingContext) {
return fromImpl.prepareState(ramAccountingContext);
}

public boolean processRow() {
return fromImpl.processRow();
}


public Object finishCollect() {
return toImpl.finishCollect();
public Object processRow(RamAccountingContext ramAccountingContext, Object value) {
return fromImpl.processRow(ramAccountingContext, value);
}

public AggregationState state() {
return aggregationState;
}

public void state(AggregationState state) {
aggregationState = state;
public Object finishCollect(Object state) {
return toImpl.finishCollect(state);
}

abstract class FromImpl {

public AggregationState startCollect(RamAccountingContext ramAccountingContext) {
public Object prepareState(RamAccountingContext ramAccountingContext) {
return aggregationFunction.newState(ramAccountingContext);
}

public abstract boolean processRow();
public abstract Object processRow(RamAccountingContext ramAccountingContext, Object value);
}

class FromIter extends FromImpl {

@Override
@SuppressWarnings("unchecked")
public boolean processRow() {
return aggregationFunction.iterate(aggregationState, inputs);
public Object processRow(RamAccountingContext ramAccountingContext, Object value) {
return aggregationFunction.iterate(ramAccountingContext, value, inputs);
}
}

class FromPartial extends FromImpl {

@Override
@SuppressWarnings("unchecked")
public boolean processRow() {
aggregationState.reduce((AggregationState)inputs[0].value());
return true;
public Object processRow(RamAccountingContext ramAccountingContext, Object value) {
return aggregationFunction.reduce(value, inputs[0].value());
}
}

static abstract class ToImpl {
public abstract Object finishCollect();
public abstract Object finishCollect(Object state);
}

class ToPartial extends ToImpl {

@Override
public Object finishCollect() {
return aggregationState;
public Object finishCollect(Object state) {
return state;
}
}

class ToFinal extends ToImpl {

@Override
public Object finishCollect() {
aggregationState.terminatePartial();
return aggregationState.value();
public Object finishCollect(Object state) {
//noinspection unchecked
return aggregationFunction.terminatePartial(state);
}
}
}
Loading

0 comments on commit 85e40ab

Please sign in to comment.