Skip to content

Commit

Permalink
reduce ram acocunting overhead in min() / max() aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Jan 8, 2015
1 parent da952bd commit 0911404
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 143 deletions.
17 changes: 15 additions & 2 deletions sql/src/main/java/io/crate/breaker/ConstSizeEstimator.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public ConstSizeEstimator(long size) {
this.size = size;
}

public long size() {
return size;
}

@Override
public long estimateSize(@Nullable Object value) {
if (value == null) {
Expand All @@ -40,7 +44,16 @@ public long estimateSize(@Nullable Object value) {
}

@Override
public long estimateSize(@Nullable Object oldValue, @Nullable Object newValue) {
return estimateSize(newValue);
public long estimateSizeDelta(@Nullable Object oldValue, @Nullable Object newValue) {
if (oldValue == null) {
if (newValue == null) {
return 0;
}
return size - 8;
}
if (newValue == null) {
return 8 - size;
}
return 0;
}
}
2 changes: 1 addition & 1 deletion sql/src/main/java/io/crate/breaker/SizeEstimator.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public abstract class SizeEstimator<T> {

public abstract long estimateSize(@Nullable T value);

public long estimateSize(@Nullable T oldValue, @Nullable T newValue) {
public long estimateSizeDelta(@Nullable T oldValue, @Nullable T newValue) {
return estimateSize(newValue) - estimateSize(oldValue);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.crate.operation.Input;
import io.crate.operation.aggregation.AggregationFunction;
import io.crate.operation.aggregation.AggregationState;
import io.crate.operation.aggregation.VariableSizeAggregationState;
import io.crate.types.DataType;
import io.crate.types.DataTypes;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -54,9 +53,8 @@ public static void register(AggregationImplModule mod) {
) {
@Override
public AggregationState newState(RamAccountingContext ramAccountingContext) {
return new ArbitraryAggState(ramAccountingContext,
t.streamer(),
SizeEstimatorFactory.create(t));
SizeEstimator<Object> sizeEstimator = SizeEstimatorFactory.create(t);
return new ArbitraryAggState(ramAccountingContext, t.streamer());
}
}
);
Expand All @@ -79,15 +77,13 @@ public boolean iterate(ArbitraryAggState<T> state, Input... args) {
}


public static class ArbitraryAggState<T extends Comparable<T>> extends VariableSizeAggregationState<ArbitraryAggState<T>> {
static class ArbitraryAggState<T extends Comparable<T>> extends AggregationState<ArbitraryAggState<T>> {

Streamer<T> streamer;
private T value = null;
Streamer streamer;
private Object value = null;

public ArbitraryAggState(RamAccountingContext ramAccountingContext,
Streamer<T> streamer,
SizeEstimator sizeEstimator) {
super(ramAccountingContext, sizeEstimator);
public ArbitraryAggState(RamAccountingContext ramAccountingContext, Streamer streamer) {
super(ramAccountingContext);
this.streamer = streamer;
}

Expand All @@ -99,8 +95,7 @@ public Object value() {
@Override
public void reduce(ArbitraryAggState<T> other) {
if (this.value == null){
addEstimatedSize(sizeEstimator.estimateSize(value, other.value));
this.value = other.value;
setValue(other.value);
}
}

Expand All @@ -114,16 +109,13 @@ public int compareTo(ArbitraryAggState<T> o) {
}

public void add(T otherValue) {
addEstimatedSize(sizeEstimator.estimateSize(value, otherValue));
value = otherValue;
setValue(otherValue);
}

public void setValue(T value) {
addEstimatedSize(sizeEstimator.estimateSize(this.value, value));
public void setValue(Object value) {
this.value = value;
}


@Override
public String toString() {
return "<AnyAggState \"" + (value) + "\">";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import io.crate.metadata.FunctionInfo;
import io.crate.operation.Input;
import io.crate.operation.aggregation.AggregationFunction;
import io.crate.operation.aggregation.VariableSizeAggregationState;
import io.crate.operation.aggregation.AggregationState;
import io.crate.types.DataType;
import io.crate.types.DataTypes;
import io.crate.types.SetType;
Expand Down Expand Up @@ -98,13 +98,15 @@ public boolean iterate(CollectSetAggState state, Input... args) throws CircuitBr
return true;
}

public static abstract class CollectSetAggState extends VariableSizeAggregationState<CollectSetAggState> {
public static abstract class CollectSetAggState extends AggregationState<CollectSetAggState> {

private final SizeEstimator<Object> sizeEstimator;
private Set<Object> value = new HashSet<>();
private long valueSize = 0;

public CollectSetAggState(RamAccountingContext ramAccountingContext, SizeEstimator sizeEstimator) {
super(ramAccountingContext, sizeEstimator);
public CollectSetAggState(RamAccountingContext ramAccountingContext, SizeEstimator<Object> sizeEstimator) {
super(ramAccountingContext);
this.sizeEstimator = sizeEstimator;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
package io.crate.operation.aggregation.impl;

import com.google.common.collect.ImmutableList;
import io.crate.Streamer;
import io.crate.breaker.ConstSizeEstimator;
import io.crate.breaker.RamAccountingContext;
import io.crate.breaker.SizeEstimator;
import io.crate.breaker.SizeEstimatorFactory;
import io.crate.metadata.FunctionIdent;
import io.crate.metadata.FunctionInfo;
import io.crate.operation.Input;
import io.crate.operation.aggregation.AggregationFunction;
import io.crate.operation.aggregation.VariableSizeAggregationState;
import io.crate.operation.aggregation.AggregationState;
import io.crate.types.DataType;
import io.crate.types.DataTypes;
import org.elasticsearch.common.breaker.CircuitBreakingException;
Expand All @@ -48,31 +50,19 @@ public static void register(AggregationImplModule mod) {
for (final DataType dataType : DataTypes.PRIMITIVE_TYPES) {
mod.register(
new MaximumAggregation(
new FunctionInfo(new FunctionIdent(NAME,
ImmutableList.of(dataType)),
dataType, FunctionInfo.Type.AGGREGATE
)
new FunctionInfo(new FunctionIdent(NAME, ImmutableList.of(dataType)), dataType,
FunctionInfo.Type.AGGREGATE)
) {
@Override
public MaximumAggState newState(RamAccountingContext ramAccountingContext) {
return new MaximumAggState(ramAccountingContext, SizeEstimatorFactory.create(dataType)) {

@Override
public void readFrom(StreamInput in) throws IOException {
if (!in.readBoolean()) {
setValue((Comparable) dataType.streamer().readValueFrom(in));
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
Object value = value();
out.writeBoolean(value == null);
if (value != null) {
dataType.streamer().writeValueTo(out, value);
}
}
};
SizeEstimator<Object> sizeEstimator = SizeEstimatorFactory.create(dataType);
if (sizeEstimator instanceof ConstSizeEstimator) {
return new MaximumAggState(
dataType.streamer(), ramAccountingContext, ((ConstSizeEstimator) sizeEstimator).size());
} else {
return new VariableSizeMaximumAggState(
dataType.streamer(), ramAccountingContext, sizeEstimator);
}
}
}
);
Expand All @@ -96,12 +86,37 @@ public boolean iterate(MaximumAggState state, Input... args) throws CircuitBreak
return true;
}

public static abstract class MaximumAggState extends VariableSizeAggregationState<MaximumAggState> {
public static class VariableSizeMaximumAggState extends MaximumAggState {

private final SizeEstimator<Object> sizeEstimator;

VariableSizeMaximumAggState(Streamer streamer,
RamAccountingContext ramAccountingContext,
SizeEstimator<Object> sizeEstimator) {
super(streamer, ramAccountingContext);
this.sizeEstimator = sizeEstimator;
}

@Override
public void setValue(Comparable newValue) throws CircuitBreakingException {
ramAccountingContext.addBytes(sizeEstimator.estimateSizeDelta(value(), newValue));
super.setValue(newValue);
}
}

public static class MaximumAggState extends AggregationState<MaximumAggState> {

private final Streamer streamer;
private Comparable value = null;

public MaximumAggState(RamAccountingContext ramAccountingContext, SizeEstimator sizeEstimator) {
super(ramAccountingContext, sizeEstimator);
private MaximumAggState(Streamer streamer, RamAccountingContext ramAccountingContext) {
super(ramAccountingContext);
this.streamer = streamer;
}

MaximumAggState(Streamer streamer, RamAccountingContext ramAccountingContext, long constStateSize) {
this(streamer, ramAccountingContext);
ramAccountingContext.addBytes(constStateSize);
}

@Override
Expand All @@ -115,8 +130,7 @@ public void reduce(MaximumAggState other) throws CircuitBreakingException {
return;
}
if (value == null || compareTo(other) < 0) {
addEstimatedSize(sizeEstimator.estimateSize(value, other.value));
value = other.value;
setValue(other.value);
}
}

Expand All @@ -125,14 +139,12 @@ void add(Comparable otherValue) throws CircuitBreakingException {
return;
}
if (value == null || compareValue(otherValue) < 0) {
addEstimatedSize(sizeEstimator.estimateSize(value, otherValue));
value = otherValue;
setValue(otherValue);
}
}

public void setValue(Comparable value) throws CircuitBreakingException {
addEstimatedSize(sizeEstimator.estimateSize(this.value, value));
this.value = value;
public void setValue(Comparable newValue) throws CircuitBreakingException {
this.value = newValue;
}

@Override
Expand All @@ -141,18 +153,33 @@ public int compareTo(MaximumAggState o) {
return compareValue(o.value);
}

public int compareValue(Object otherValue) {
private int compareValue(Comparable otherValue) {
if (value == null) return (otherValue == null ? 0 : -1);
if (otherValue == null) return 1;

//noinspection unchecked
return value.compareTo(otherValue);
}

@Override
public String toString() {
return "<MaximumAggState \"" + value + "\"";
}
}

@Override
public void readFrom(StreamInput in) throws IOException {
if (!in.readBoolean()) {
setValue((Comparable) streamer.readValueFrom(in));
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
Object value = value();
out.writeBoolean(value == null);
if (value != null) {
streamer.writeValueTo(out, value);
}
}
}
}
Loading

0 comments on commit 0911404

Please sign in to comment.