Skip to content

Commit

Permalink
Math expressions support for missing columns. (apache#3630)
Browse files Browse the repository at this point in the history
Also add SchemaEvolutionTest to help test this kind of thing.

Fixes apache#3627 and includes test for apache#3625.
  • Loading branch information
gianm authored and fundead committed Dec 7, 2016
1 parent 50799ee commit cd0072d
Show file tree
Hide file tree
Showing 11 changed files with 481 additions and 41 deletions.
59 changes: 45 additions & 14 deletions common/src/main/java/io/druid/math/expr/Expr.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public Number eval(ObjectBinding bindings)
{
Number val = bindings.get(value);
if (val == null) {
throw new RuntimeException("No binding found for " + value);
return null;
} else {
return val instanceof Long ? val : val.doubleValue();
}
Expand Down Expand Up @@ -238,6 +238,11 @@ public BinaryOpExprBase(String op, Expr left, Expr right)
this.right = right;
}

protected boolean isNull(Number left, Number right)
{
return left == null || right == null;
}

protected boolean isLong(Number left, Number right)
{
return left instanceof Long && right instanceof Long;
Expand Down Expand Up @@ -271,7 +276,9 @@ public Number eval(ObjectBinding bindings)
{
Number leftVal = left.eval(bindings);
Number rightVal = right.eval(bindings);
if (isLong(leftVal, rightVal)) {
if (isNull(leftVal, rightVal)) {
return null;
} else if (isLong(leftVal, rightVal)) {
return leftVal.longValue() - rightVal.longValue();
} else {
return leftVal.doubleValue() - rightVal.doubleValue();
Expand All @@ -292,7 +299,9 @@ public Number eval(ObjectBinding bindings)
{
Number leftVal = left.eval(bindings);
Number rightVal = right.eval(bindings);
if (isLong(leftVal, rightVal)) {
if (isNull(leftVal, rightVal)) {
return null;
} else if (isLong(leftVal, rightVal)) {
return LongMath.pow(leftVal.longValue(), rightVal.intValue());
} else {
return Math.pow(leftVal.doubleValue(), rightVal.doubleValue());
Expand All @@ -313,7 +322,9 @@ public Number eval(ObjectBinding bindings)
{
Number leftVal = left.eval(bindings);
Number rightVal = right.eval(bindings);
if (isLong(leftVal, rightVal)) {
if (isNull(leftVal, rightVal)) {
return null;
} else if (isLong(leftVal, rightVal)) {
return leftVal.longValue() * rightVal.longValue();
} else {
return leftVal.doubleValue() * rightVal.doubleValue();
Expand Down Expand Up @@ -355,7 +366,9 @@ public Number eval(ObjectBinding bindings)
{
Number leftVal = left.eval(bindings);
Number rightVal = right.eval(bindings);
if (isLong(leftVal, rightVal)) {
if (isNull(leftVal, rightVal)) {
return null;
} else if (isLong(leftVal, rightVal)) {
return leftVal.longValue() % rightVal.longValue();
} else {
return leftVal.doubleValue() % rightVal.doubleValue();
Expand All @@ -376,7 +389,9 @@ public Number eval(ObjectBinding bindings)
{
Number leftVal = left.eval(bindings);
Number rightVal = right.eval(bindings);
if (isLong(leftVal, rightVal)) {
if (isNull(leftVal, rightVal)) {
return null;
} else if (isLong(leftVal, rightVal)) {
return leftVal.longValue() + rightVal.longValue();
} else {
return leftVal.doubleValue() + rightVal.doubleValue();
Expand All @@ -397,7 +412,9 @@ public Number eval(ObjectBinding bindings)
{
Number leftVal = left.eval(bindings);
Number rightVal = right.eval(bindings);
if (isLong(leftVal, rightVal)) {
if (isNull(leftVal, rightVal)) {
return null;
} else if (isLong(leftVal, rightVal)) {
return leftVal.longValue() < rightVal.longValue() ? 1 : 0;
} else {
return leftVal.doubleValue() < rightVal.doubleValue() ? 1.0d : 0.0d;
Expand All @@ -418,7 +435,9 @@ public Number eval(ObjectBinding bindings)
{
Number leftVal = left.eval(bindings);
Number rightVal = right.eval(bindings);
if (isLong(leftVal, rightVal)) {
if (isNull(leftVal, rightVal)) {
return null;
} else if (isLong(leftVal, rightVal)) {
return leftVal.longValue() <= rightVal.longValue() ? 1 : 0;
} else {
return leftVal.doubleValue() <= rightVal.doubleValue() ? 1.0d : 0.0d;
Expand All @@ -439,7 +458,9 @@ public Number eval(ObjectBinding bindings)
{
Number leftVal = left.eval(bindings);
Number rightVal = right.eval(bindings);
if (isLong(leftVal, rightVal)) {
if (isNull(leftVal, rightVal)) {
return null;
} else if (isLong(leftVal, rightVal)) {
return leftVal.longValue() > rightVal.longValue() ? 1 : 0;
} else {
return leftVal.doubleValue() > rightVal.doubleValue() ? 1.0d : 0.0d;
Expand All @@ -460,7 +481,9 @@ public Number eval(ObjectBinding bindings)
{
Number leftVal = left.eval(bindings);
Number rightVal = right.eval(bindings);
if (isLong(leftVal, rightVal)) {
if (isNull(leftVal, rightVal)) {
return null;
} else if (isLong(leftVal, rightVal)) {
return leftVal.longValue() >= rightVal.longValue() ? 1 : 0;
} else {
return leftVal.doubleValue() >= rightVal.doubleValue() ? 1.0d : 0.0d;
Expand All @@ -481,7 +504,9 @@ public Number eval(ObjectBinding bindings)
{
Number leftVal = left.eval(bindings);
Number rightVal = right.eval(bindings);
if (isLong(leftVal, rightVal)) {
if (isNull(leftVal, rightVal)) {
return null;
} else if (isLong(leftVal, rightVal)) {
return leftVal.longValue() == rightVal.longValue() ? 1 : 0;
} else {
return leftVal.doubleValue() == rightVal.doubleValue() ? 1.0d : 0.0d;
Expand All @@ -502,7 +527,9 @@ public Number eval(ObjectBinding bindings)
{
Number leftVal = left.eval(bindings);
Number rightVal = right.eval(bindings);
if (isLong(leftVal, rightVal)) {
if (isNull(leftVal, rightVal)) {
return null;
} else if (isLong(leftVal, rightVal)) {
return leftVal.longValue() != rightVal.longValue() ? 1 : 0;
} else {
return leftVal.doubleValue() != rightVal.doubleValue() ? 1.0d : 0.0d;
Expand All @@ -523,7 +550,9 @@ public Number eval(ObjectBinding bindings)
{
Number leftVal = left.eval(bindings);
Number rightVal = right.eval(bindings);
if (isLong(leftVal, rightVal)) {
if (isNull(leftVal, rightVal)) {
return null;
} else if (isLong(leftVal, rightVal)) {
long lval = leftVal.longValue();
if (lval > 0) {
long rval = rightVal.longValue();
Expand Down Expand Up @@ -556,7 +585,9 @@ public Number eval(ObjectBinding bindings)
{
Number leftVal = left.eval(bindings);
Number rightVal = right.eval(bindings);
if (isLong(leftVal, rightVal)) {
if (isNull(leftVal, rightVal)) {
return null;
} else if (isLong(leftVal, rightVal)) {
long lval = leftVal.longValue();
if (lval > 0) {
return 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ public static Pair<List<AggregatorFactory>, List<PostAggregator>> condensedAggre
}

public static FloatColumnSelector getFloatColumnSelector(
ColumnSelectorFactory metricFactory,
String fieldName,
String fieldExpression
final ColumnSelectorFactory metricFactory,
final String fieldName,
final String fieldExpression,
final float nullValue
)
{
if (fieldName != null && fieldExpression == null) {
Expand All @@ -105,17 +106,19 @@ public static FloatColumnSelector getFloatColumnSelector(
@Override
public float get()
{
return numeric.get().floatValue();
final Number number = numeric.get();
return number == null ? nullValue : number.floatValue();
}
};
}
throw new IllegalArgumentException("Must have a valid, non-null fieldName or expression");
}

public static LongColumnSelector getLongColumnSelector(
ColumnSelectorFactory metricFactory,
String fieldName,
String fieldExpression
final ColumnSelectorFactory metricFactory,
final String fieldName,
final String fieldExpression,
final long nullValue
)
{
if (fieldName != null && fieldExpression == null) {
Expand All @@ -128,7 +131,8 @@ public static LongColumnSelector getLongColumnSelector(
@Override
public long get()
{
return numeric.get().longValue();
final Number number = numeric.get();
return number == null ? nullValue : number.longValue();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)

private FloatColumnSelector getFloatColumnSelector(ColumnSelectorFactory metricFactory)
{
return AggregatorUtil.getFloatColumnSelector(metricFactory, fieldName, expression);
return AggregatorUtil.getFloatColumnSelector(metricFactory, fieldName, expression, Float.MIN_VALUE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)

private FloatColumnSelector getFloatColumnSelector(ColumnSelectorFactory metricFactory)
{
return AggregatorUtil.getFloatColumnSelector(metricFactory, fieldName, expression);
return AggregatorUtil.getFloatColumnSelector(metricFactory, fieldName, expression, Float.MAX_VALUE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)

private FloatColumnSelector getFloatColumnSelector(ColumnSelectorFactory metricFactory)
{
return AggregatorUtil.getFloatColumnSelector(metricFactory, fieldName, expression);
return AggregatorUtil.getFloatColumnSelector(metricFactory, fieldName, expression, 0f);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)

private LongColumnSelector getLongColumnSelector(ColumnSelectorFactory metricFactory)
{
return AggregatorUtil.getLongColumnSelector(metricFactory, fieldName, expression);
return AggregatorUtil.getLongColumnSelector(metricFactory, fieldName, expression, Long.MIN_VALUE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)

private LongColumnSelector getLongColumnSelector(ColumnSelectorFactory metricFactory)
{
return AggregatorUtil.getLongColumnSelector(metricFactory, fieldName, expression);
return AggregatorUtil.getLongColumnSelector(metricFactory, fieldName, expression, Long.MAX_VALUE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)

private LongColumnSelector getLongColumnSelector(ColumnSelectorFactory metricFactory)
{
return AggregatorUtil.getLongColumnSelector(metricFactory, fieldName, expression);
return AggregatorUtil.getLongColumnSelector(metricFactory, fieldName, expression, 0L);
}

@Override
Expand Down
53 changes: 46 additions & 7 deletions processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
Expand Down Expand Up @@ -185,7 +189,10 @@ public TableDataSource apply(@Nullable String input)
public static ArithmeticPostAggregator hyperUniqueFinalizingPostAgg = new ArithmeticPostAggregator(
hyperUniqueFinalizingPostAggMetric,
"+",
Lists.newArrayList(new HyperUniqueFinalizingPostAggregator(uniqueMetric, uniqueMetric), new ConstantPostAggregator(null, 1))
Lists.newArrayList(
new HyperUniqueFinalizingPostAggregator(uniqueMetric, uniqueMetric),
new ConstantPostAggregator(null, 1)
)
);

public static final List<AggregatorFactory> commonAggregators = Arrays.asList(
Expand Down Expand Up @@ -335,7 +342,11 @@ public static <T, QueryType extends Query<T>> List<QueryRunner<T>> makeQueryRunn
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId), "rtIndex"),
makeQueryRunner(factory, new IncrementalIndexSegment(noRollupRtIndex, segmentId), "noRollupRtIndex"),
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex), "mMappedTestIndex"),
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, noRollupMMappedTestIndex), "noRollupMMappedTestIndex"),
makeQueryRunner(
factory,
new QueryableIndexSegment(segmentId, noRollupMMappedTestIndex),
"noRollupMMappedTestIndex"
),
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex), "mergedRealtimeIndex")
);
}
Expand All @@ -361,9 +372,12 @@ public static Collection<?> makeUnionQueryRunners(
)
);
}

/**
* Iterate through the iterables in a synchronous manner and return each step as an Object[]
*
* @param in The iterables to step through. (effectively columns)
*
* @return An iterable of Object[] containing the "rows" of the input (effectively rows)
*/
public static Iterable<Object[]> transformToConstructionFeeder(Iterable<?>... in)
Expand Down Expand Up @@ -498,7 +512,9 @@ public String toString()

public static <T> QueryRunner<T> makeFilteringQueryRunner(
final VersionedIntervalTimeline<String, Segment> timeline,
final QueryRunnerFactory<T, Query<T>> factory) {
final QueryRunnerFactory<T, Query<T>> factory
)
{

final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
return new FluentQueryRunnerBuilder<T>(toolChest)
Expand Down Expand Up @@ -537,11 +553,16 @@ public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)

public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(null, null, null) {
return new IntervalChunkingQueryRunnerDecorator(null, null, null)
{
@Override
public <T> QueryRunner<T> decorate(final QueryRunner<T> delegate,
QueryToolChest<T, ? extends Query<T>> toolChest) {
return new QueryRunner<T>() {
public <T> QueryRunner<T> decorate(
final QueryRunner<T> delegate,
QueryToolChest<T, ? extends Query<T>> toolChest
)
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
Expand All @@ -560,4 +581,22 @@ public static Map<String, Object> of(Object... keyvalues)
}
return builder.build();
}

public static QueryRunnerFactoryConglomerate newConglomerate()
{
return new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
.put(TimeseriesQuery.class, newTimeseriesQueryRunnerFactory())
.build()
);
}

public static TimeseriesQueryRunnerFactory newTimeseriesQueryRunnerFactory()
{
return new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
}
}

0 comments on commit cd0072d

Please sign in to comment.