Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HashJoinSegment, a virtual segment for joins. #9111

Merged
merged 10 commits into from
Jan 16, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,27 @@ public static Double defaultDoubleValue()
return replaceWithDefault() ? ZERO_DOUBLE : null;
}

/**
* Returns the default value for an object of the provided class. Will be null in SQL-compatible null handling mode.
* May be null or some non-null default value when not in SQL-compatible null handling mode.
*/
@Nullable
@SuppressWarnings("unchecked")
public static <T> T defaultValueForClass(final Class<T> clazz)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing a unit test to ensure maintainability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, a few. One per class.

{
if (clazz == Float.class) {
return (T) defaultFloatValue();
} else if (clazz == Double.class) {
return (T) defaultDoubleValue();
} else if (clazz == Long.class) {
return (T) defaultLongValue();
} else if (clazz == String.class) {
return (T) defaultStringValue();
} else {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This else is problematic. What other classes do you expect? I prefer we're explicit in this method, returning null for the supported classes, and otherwise throwing an exception in the else case. If in the future we add a new supported type, and forget to add the default value if case, we would hit the exception instead of silently returning a null and causing issues. A unit test should additionally be created based on the set of supported types. Ideally, we should have our own typing system with an interface that has methods like getDefault() so we avoid these issues. I friggin hate type checks in Java.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately all types are supported. This method is used by the PossiblyNullColumnValueSelector when it generates nulls that don't exist in the base selector. The base selector could be returning any type, even weird ones that don't exist in Druid's type system (there's a COMPLEX catch-all for those, & it's used for stuff like sketches).

}
}

public static boolean isNullOrEquivalent(@Nullable String value)
{
return replaceWithDefault() ? Strings.isNullOrEmpty(value) : value == null;
Expand Down
71 changes: 71 additions & 0 deletions core/src/main/java/org/apache/druid/math/expr/Exprs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.math.expr;

import org.apache.druid.java.util.common.Pair;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Stack;

public class Exprs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing unit tests for this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, these should have unit tests. I'll add them.

{
/**
* Decomposes any expr into a list of exprs that, if ANDed together, are equivalent to the input expr.
*
* @param expr any expr
*
* @return list of exprs that, if ANDed together, are equivalent to the input expr
*/
public static List<Expr> decomposeAnd(final Expr expr)
{
final List<Expr> retVal = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might wanna specify initial size since these are likely smallish in the usual case? Or perhaps a LL is good enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you suggest for an initial size?

final Stack<Expr> stack = new Stack<>();
stack.push(expr);

while (!stack.empty()) {
final Expr current = stack.pop();

if (current instanceof BinAndExpr) {
stack.push(((BinAndExpr) current).right);
stack.push(((BinAndExpr) current).left);
} else {
retVal.add(current);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This potentially seems like a method that belongs on Expr rather than here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the method look like?

}

return retVal;
}

/**
* Decomposes an equality expr into the left- and right-hand side.
*
* @return decomposed equality, or empty if the input expr was not an equality expr
*/
public static Optional<Pair<Expr, Expr>> decomposeEquals(final Expr expr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also looks like a method on Expr rather than here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the method look like?

{
if (expr instanceof BinEqExpr) {
return Optional.of(Pair.of(((BinEqExpr) expr).left, ((BinEqExpr) expr).right));
} else {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Objects;

/**
*
*/
public class DefaultDimensionSpec implements DimensionSpec
{
Expand Down Expand Up @@ -136,6 +137,12 @@ public boolean preservesOrdering()
return true;
}

@Override
public DimensionSpec withDimension(String newDimension)
{
return new DefaultDimensionSpec(newDimension, this.outputName, this.outputType);
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,10 @@ default boolean canVectorize()
}

boolean preservesOrdering();

/**
* Returns a copy of this DimensionSpec with the underlying dimension (the value of {@link #getDimension()})
* replaced by "newDimension".
*/
DimensionSpec withDimension(String newDimension);
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ public boolean preservesOrdering()
return extractionFn.preservesOrdering();
}

@Override
public DimensionSpec withDimension(String newDimension)
{
return new ExtractionDimensionSpec(newDimension, outputName, outputType, extractionFn);
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Set;

/**
*
*/
public class ListFilteredDimensionSpec extends BaseFilteredDimensionSpec
{
Expand Down Expand Up @@ -171,6 +172,12 @@ public byte[] getCacheKey()
return filterCacheKey.array();
}

@Override
public DimensionSpec withDimension(String newDimension)
{
return new ListFilteredDimensionSpec(delegate.withDimension(newDimension), values, isWhitelist);
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,21 @@ public boolean preservesOrdering()
return getExtractionFn().preservesOrdering();
}

@Override
public DimensionSpec withDimension(String newDimension)
{
return new LookupDimensionSpec(
newDimension,
outputName,
lookup,
retainMissingValue,
replaceMissingValueWith,
name,
optimize,
lookupExtractorFactoryContainerProvider
);
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.ByteBuffer;

/**
*
*/
public class PrefixFilteredDimensionSpec extends BaseFilteredDimensionSpec
{
Expand Down Expand Up @@ -109,6 +110,12 @@ public byte[] getCacheKey()
.array();
}

@Override
public DimensionSpec withDimension(String newDimension)
{
return new PrefixFilteredDimensionSpec(delegate.withDimension(newDimension), prefix);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these withDimension changes belong in a separate PR because they're simple to review, but create a ton of noise, which distracts from the ability to focus on the real meat of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here I thought I did a good job minimizing the scope of this PR, by making it not even wired up to the query layer :)

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.regex.Pattern;

/**
*
*/
public class RegexFilteredDimensionSpec extends BaseFilteredDimensionSpec
{
Expand Down Expand Up @@ -113,6 +114,12 @@ public byte[] getCacheKey()
.array();
}

@Override
public DimensionSpec withDimension(String newDimension)
{
return new RegexFilteredDimensionSpec(delegate.withDimension(newDimension), pattern);
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,31 @@

package org.apache.druid.query.dimension;

import org.apache.druid.segment.ColumnProcessorFactory;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorValueSelector;

/**
* Class that encapsulates knowledge about how to create vector column processors. Used by
* {@link org.apache.druid.segment.DimensionHandlerUtils#makeVectorProcessor}.
*
* Unlike {@link ColumnProcessorFactory}, this interface does not have a "defaultType" method. The default type is
* always implicitly STRING. It also does not have a "makeComplexProcessor" method; instead, complex-typed columns
* are fed into "makeSingleValueDimensionProcessor". This behavior may change in the future to better align
* with {@link ColumnProcessorFactory}.
*
* @see ColumnProcessorFactory the non-vectorized version
*/
public interface VectorColumnStrategizer<T>
public interface VectorColumnProcessorFactory<T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refactoring belongs in another PR. See earlier comment on noise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

{
T makeSingleValueDimensionStrategy(SingleValueDimensionVectorSelector selector);
T makeSingleValueDimensionProcessor(SingleValueDimensionVectorSelector selector);

T makeMultiValueDimensionStrategy(MultiValueDimensionVectorSelector selector);
T makeMultiValueDimensionProcessor(MultiValueDimensionVectorSelector selector);

T makeFloatStrategy(VectorValueSelector selector);
T makeFloatProcessor(VectorValueSelector selector);

T makeDoubleStrategy(VectorValueSelector selector);
T makeDoubleProcessor(VectorValueSelector selector);

T makeLongStrategy(VectorValueSelector selector);
T makeLongProcessor(VectorValueSelector selector);
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ public byte[] getCacheKey()
}
}

@Override
public boolean canIterate()
{
return true;
}

@Override
public Iterable<Map.Entry<String, String>> iterable()
{
return map.entrySet();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, these methods need unit tests to ensure maintainability and that people don't inadvertently change the logic to be incorrect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then more importantly perhaps could add a unit test for equals below since you're here :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add tests for the two new functions. It looks like equals and hashCode already had them.

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,55 @@

package org.apache.druid.query.filter.vector;

import org.apache.druid.query.dimension.VectorColumnStrategizer;
import org.apache.druid.query.dimension.VectorColumnProcessorFactory;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorValueSelector;

public class VectorValueMatcherColumnStrategizer implements VectorColumnStrategizer<VectorValueMatcherFactory>
public class VectorValueMatcherColumnProcessorFactory implements VectorColumnProcessorFactory<VectorValueMatcherFactory>
{
private static final VectorValueMatcherColumnStrategizer INSTANCE = new VectorValueMatcherColumnStrategizer();
private static final VectorValueMatcherColumnProcessorFactory INSTANCE = new VectorValueMatcherColumnProcessorFactory();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish this were a singleton created by Guice... The other thing I really hate other than type-checking in Java is static objects, even if final. They make unit testing super hard because they force state to be reused (the state within the static object, so we would have to be diligent about making the object stateless), and they're harder to mock. Anyway, not the point of this PR...

That said, this refactoring probably belongs in a separate PR. I think you get the point, so I won't make these comments anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO objects that are both static and stateful/configurable are an abomination against all that is good and natural in the world. I would only want to go with the static final pattern for stateless objects that don't accept any runtime configuration.

I think for those kinds of objects, static final is reasonable, and preferred vs. Guice since it's simpler.


private VectorValueMatcherColumnStrategizer()
private VectorValueMatcherColumnProcessorFactory()
{
// Singleton.
}

public static VectorValueMatcherColumnStrategizer instance()
public static VectorValueMatcherColumnProcessorFactory instance()
{
return INSTANCE;
}

@Override
public VectorValueMatcherFactory makeSingleValueDimensionStrategy(
public VectorValueMatcherFactory makeSingleValueDimensionProcessor(
final SingleValueDimensionVectorSelector selector
)
{
return new SingleValueStringVectorValueMatcher(selector);
}

@Override
public VectorValueMatcherFactory makeMultiValueDimensionStrategy(
public VectorValueMatcherFactory makeMultiValueDimensionProcessor(
final MultiValueDimensionVectorSelector selector
)
{
return new MultiValueStringVectorValueMatcher(selector);
}

@Override
public VectorValueMatcherFactory makeFloatStrategy(final VectorValueSelector selector)
public VectorValueMatcherFactory makeFloatProcessor(final VectorValueSelector selector)
{
return new FloatVectorValueMatcher(selector);
}

@Override
public VectorValueMatcherFactory makeDoubleStrategy(final VectorValueSelector selector)
public VectorValueMatcherFactory makeDoubleProcessor(final VectorValueSelector selector)
{
return new DoubleVectorValueMatcher(selector);
}

@Override
public VectorValueMatcherFactory makeLongStrategy(final VectorValueSelector selector)
public VectorValueMatcherFactory makeLongProcessor(final VectorValueSelector selector)
{
return new LongVectorValueMatcher(selector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.RowBasedColumnSelectorFactory;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unneeded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed now because it moved out of the org.apache.druid.query.groupby package. Anyway, we have a Travis step that verifies there aren't any unneeded imports, so there's no need to scan for them visually.

import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryHelper;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.RowBasedColumnSelectorFactory;
import org.apache.druid.query.groupby.epinephelinae.Grouper.BufferComparator;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
Expand All @@ -66,6 +65,8 @@
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.RowBasedColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
Expand Down Expand Up @@ -347,8 +348,8 @@ public static ColumnSelectorFactory createResultRowBasedColumnSelectorFactory(
final Supplier<ResultRow> supplier
)
{
final RowBasedColumnSelectorFactory.RowAdapter<ResultRow> adapter =
new RowBasedColumnSelectorFactory.RowAdapter<ResultRow>()
final RowAdapter<ResultRow> adapter =
new RowAdapter<ResultRow>()
{
@Override
public ToLongFunction<ResultRow> timestampFunction()
Expand All @@ -362,7 +363,7 @@ public ToLongFunction<ResultRow> timestampFunction()
}

@Override
public Function<ResultRow, Object> rawFunction(final String columnName)
public Function<ResultRow, Object> columnFunction(final String columnName)
{
final int columnIndex = query.getResultRowPositionLookup().getInt(columnName);
if (columnIndex < 0) {
Expand Down
Loading