Skip to content

Commit

Permalink
Add HashJoinSegment, a virtual segment for joins.
Browse files Browse the repository at this point in the history
An initial step towards apache#8728. This patch adds enough functionality to implement a joining
cursor on top of a normal datasource. It does not include enough to actually do a query. For
that, future patches will need to wire this low-level functionality into the query language.
  • Loading branch information
gianm committed Dec 30, 2019
1 parent dec619e commit 27a1a7b
Show file tree
Hide file tree
Showing 69 changed files with 6,120 additions and 125 deletions.
Expand Up @@ -118,6 +118,27 @@ public static Double defaultDoubleValue()
return replaceWithDefault() ? ZERO_DOUBLE : null;
}

/**
* Returns the default value for an object of the provided class. Will be null in SQL-compatible null handling mode.
* May be null or some non-null default value when not in SQL-compatible null handling mode.
*/
@Nullable
@SuppressWarnings("unchecked")
public static <T> T defaultValueForClass(final Class<T> clazz)
{
if (clazz == Float.class) {
return (T) defaultFloatValue();
} else if (clazz == Double.class) {
return (T) defaultDoubleValue();
} else if (clazz == Long.class) {
return (T) defaultLongValue();
} else if (clazz == String.class) {
return (T) defaultStringValue();
} else {
return null;
}
}

public static boolean isNullOrEquivalent(@Nullable String value)
{
return replaceWithDefault() ? Strings.isNullOrEmpty(value) : value == null;
Expand Down
71 changes: 71 additions & 0 deletions core/src/main/java/org/apache/druid/math/expr/Exprs.java
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.math.expr;

import org.apache.druid.java.util.common.Pair;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Stack;

public class Exprs
{
/**
* Decomposes any expr into a list of exprs that, if ANDed together, are equivalent to the input expr.
*
* @param expr any expr
*
* @return list of exprs that, if ANDed together, are equivalent to the input expr
*/
public static List<Expr> decomposeAnd(final Expr expr)
{
final List<Expr> retVal = new ArrayList<>();
final Stack<Expr> stack = new Stack<>();
stack.push(expr);

while (!stack.empty()) {
final Expr current = stack.pop();

if (current instanceof BinAndExpr) {
stack.push(((BinAndExpr) current).right);
stack.push(((BinAndExpr) current).left);
} else {
retVal.add(current);
}
}

return retVal;
}

/**
* Decomposes an equality expr into the left- and right-hand side.
*
* @return decomposed equality, or empty if the input expr was not an equality expr
*/
public static Optional<Pair<Expr, Expr>> decomposeEquals(final Expr expr)
{
if (expr instanceof BinEqExpr) {
return Optional.of(Pair.of(((BinEqExpr) expr).left, ((BinEqExpr) expr).right));
} else {
return Optional.empty();
}
}
}
Expand Up @@ -32,6 +32,7 @@
import java.util.Objects;

/**
*
*/
public class DefaultDimensionSpec implements DimensionSpec
{
Expand Down Expand Up @@ -136,6 +137,12 @@ public boolean preservesOrdering()
return true;
}

@Override
public DimensionSpec withDimension(String newDimension)
{
return new DefaultDimensionSpec(newDimension, this.outputName, this.outputType);
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -83,4 +83,10 @@ default boolean canVectorize()
}

boolean preservesOrdering();

/**
* Returns a copy of this DimensionSpec with the underlying dimension (the value of {@link #getDimension()})
* replaced by "newDimension".
*/
DimensionSpec withDimension(String newDimension);
}
Expand Up @@ -125,6 +125,12 @@ public boolean preservesOrdering()
return extractionFn.preservesOrdering();
}

@Override
public DimensionSpec withDimension(String newDimension)
{
return new ExtractionDimensionSpec(newDimension, outputName, outputType, extractionFn);
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -35,6 +35,7 @@
import java.util.Set;

/**
*
*/
public class ListFilteredDimensionSpec extends BaseFilteredDimensionSpec
{
Expand Down Expand Up @@ -171,6 +172,12 @@ public byte[] getCacheKey()
return filterCacheKey.array();
}

@Override
public DimensionSpec withDimension(String newDimension)
{
return new ListFilteredDimensionSpec(delegate.withDimension(newDimension), values, isWhitelist);
}

@Override
public boolean equals(Object o)
{
Expand Down
Expand Up @@ -200,6 +200,21 @@ public boolean preservesOrdering()
return getExtractionFn().preservesOrdering();
}

@Override
public DimensionSpec withDimension(String newDimension)
{
return new LookupDimensionSpec(
newDimension,
outputName,
lookup,
retainMissingValue,
replaceMissingValueWith,
name,
optimize,
lookupExtractorFactoryContainerProvider
);
}

@Override
public boolean equals(Object o)
{
Expand Down
Expand Up @@ -33,6 +33,7 @@
import java.nio.ByteBuffer;

/**
*
*/
public class PrefixFilteredDimensionSpec extends BaseFilteredDimensionSpec
{
Expand Down Expand Up @@ -109,6 +110,12 @@ public byte[] getCacheKey()
.array();
}

@Override
public DimensionSpec withDimension(String newDimension)
{
return new PrefixFilteredDimensionSpec(delegate.withDimension(newDimension), prefix);
}

@Override
public boolean equals(Object o)
{
Expand Down
Expand Up @@ -34,6 +34,7 @@
import java.util.regex.Pattern;

/**
*
*/
public class RegexFilteredDimensionSpec extends BaseFilteredDimensionSpec
{
Expand Down Expand Up @@ -113,6 +114,12 @@ public byte[] getCacheKey()
.array();
}

@Override
public DimensionSpec withDimension(String newDimension)
{
return new RegexFilteredDimensionSpec(delegate.withDimension(newDimension), pattern);
}

@Override
public boolean equals(Object o)
{
Expand Down
Expand Up @@ -19,23 +19,31 @@

package org.apache.druid.query.dimension;

import org.apache.druid.segment.ColumnProcessorFactory;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorValueSelector;

/**
* Class that encapsulates knowledge about how to create vector column processors. Used by
* {@link org.apache.druid.segment.DimensionHandlerUtils#makeVectorProcessor}.
*
* Unlike {@link ColumnProcessorFactory}, this interface does not have a "defaultType" method. The default type is
* always implicitly STRING. It also does not have a "makeComplexProcessor" method; instead, complex-typed columns
* are fed into "makeSingleValueDimensionProcessor". This behavior may change in the future to better align
* with {@link ColumnProcessorFactory}.
*
* @see ColumnProcessorFactory the non-vectorized version
*/
public interface VectorColumnStrategizer<T>
public interface VectorColumnProcessorFactory<T>
{
T makeSingleValueDimensionStrategy(SingleValueDimensionVectorSelector selector);
T makeSingleValueDimensionProcessor(SingleValueDimensionVectorSelector selector);

T makeMultiValueDimensionStrategy(MultiValueDimensionVectorSelector selector);
T makeMultiValueDimensionProcessor(MultiValueDimensionVectorSelector selector);

T makeFloatStrategy(VectorValueSelector selector);
T makeFloatProcessor(VectorValueSelector selector);

T makeDoubleStrategy(VectorValueSelector selector);
T makeDoubleProcessor(VectorValueSelector selector);

T makeLongStrategy(VectorValueSelector selector);
T makeLongProcessor(VectorValueSelector selector);
}
Expand Up @@ -122,6 +122,18 @@ public byte[] getCacheKey()
}
}

@Override
public boolean canIterate()
{
return true;
}

@Override
public Iterable<Map.Entry<String, String>> iterable()
{
return map.entrySet();
}

@Override
public boolean equals(Object o)
{
Expand Down
Expand Up @@ -19,55 +19,55 @@

package org.apache.druid.query.filter.vector;

import org.apache.druid.query.dimension.VectorColumnStrategizer;
import org.apache.druid.query.dimension.VectorColumnProcessorFactory;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorValueSelector;

public class VectorValueMatcherColumnStrategizer implements VectorColumnStrategizer<VectorValueMatcherFactory>
public class VectorValueMatcherColumnProcessorFactory implements VectorColumnProcessorFactory<VectorValueMatcherFactory>
{
private static final VectorValueMatcherColumnStrategizer INSTANCE = new VectorValueMatcherColumnStrategizer();
private static final VectorValueMatcherColumnProcessorFactory INSTANCE = new VectorValueMatcherColumnProcessorFactory();

private VectorValueMatcherColumnStrategizer()
private VectorValueMatcherColumnProcessorFactory()
{
// Singleton.
}

public static VectorValueMatcherColumnStrategizer instance()
public static VectorValueMatcherColumnProcessorFactory instance()
{
return INSTANCE;
}

@Override
public VectorValueMatcherFactory makeSingleValueDimensionStrategy(
public VectorValueMatcherFactory makeSingleValueDimensionProcessor(
final SingleValueDimensionVectorSelector selector
)
{
return new SingleValueStringVectorValueMatcher(selector);
}

@Override
public VectorValueMatcherFactory makeMultiValueDimensionStrategy(
public VectorValueMatcherFactory makeMultiValueDimensionProcessor(
final MultiValueDimensionVectorSelector selector
)
{
return new MultiValueStringVectorValueMatcher(selector);
}

@Override
public VectorValueMatcherFactory makeFloatStrategy(final VectorValueSelector selector)
public VectorValueMatcherFactory makeFloatProcessor(final VectorValueSelector selector)
{
return new FloatVectorValueMatcher(selector);
}

@Override
public VectorValueMatcherFactory makeDoubleStrategy(final VectorValueSelector selector)
public VectorValueMatcherFactory makeDoubleProcessor(final VectorValueSelector selector)
{
return new DoubleVectorValueMatcher(selector);
}

@Override
public VectorValueMatcherFactory makeLongStrategy(final VectorValueSelector selector)
public VectorValueMatcherFactory makeLongProcessor(final VectorValueSelector selector)
{
return new LongVectorValueMatcher(selector);
}
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.RowBasedColumnSelectorFactory;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
Expand Down
Expand Up @@ -53,7 +53,6 @@
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryHelper;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.RowBasedColumnSelectorFactory;
import org.apache.druid.query.groupby.epinephelinae.Grouper.BufferComparator;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
Expand All @@ -66,6 +65,8 @@
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.RowBasedColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
Expand Down Expand Up @@ -347,8 +348,8 @@ public static ColumnSelectorFactory createResultRowBasedColumnSelectorFactory(
final Supplier<ResultRow> supplier
)
{
final RowBasedColumnSelectorFactory.RowAdapter<ResultRow> adapter =
new RowBasedColumnSelectorFactory.RowAdapter<ResultRow>()
final RowAdapter<ResultRow> adapter =
new RowAdapter<ResultRow>()
{
@Override
public ToLongFunction<ResultRow> timestampFunction()
Expand All @@ -362,7 +363,7 @@ public ToLongFunction<ResultRow> timestampFunction()
}

@Override
public Function<ResultRow, Object> rawFunction(final String columnName)
public Function<ResultRow, Object> columnFunction(final String columnName)
{
final int columnIndex = query.getResultRowPositionLookup().getInt(columnName);
if (columnIndex < 0) {
Expand Down

0 comments on commit 27a1a7b

Please sign in to comment.