Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable rewriting certain inner joins as filters. #11068

Merged
merged 9 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public void setup()
enableFilterPushdown,
enableFilterRewrite,
enableFilterRewriteValueFilters,
QueryContexts.DEFAULT_ENABLE_REWRITE_JOIN_TO_FILTER,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
),
clauses,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public void setup() throws IOException
false,
false,
false,
false,
0
),
joinableClausesLookupStringKey,
Expand Down Expand Up @@ -185,6 +186,7 @@ public void setup() throws IOException
false,
false,
false,
false,
0
),
joinableClausesLookupLongKey,
Expand Down Expand Up @@ -220,6 +222,7 @@ public void setup() throws IOException
false,
false,
false,
false,
0
),
joinableClausesLookupLongKey,
Expand Down Expand Up @@ -255,6 +258,7 @@ public void setup() throws IOException
false,
false,
false,
false,
0
),
joinableClausesIndexedTableLongKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -111,12 +112,24 @@ public boolean canIterate()
return false;
}

@Override
public boolean canGetKeySet()
{
return false;
}

@Override
public Iterable<Map.Entry<String, String>> iterable()
{
throw new UnsupportedOperationException("Cannot iterate");
}

@Override
public Set<String> keySet()
{
throw new UnsupportedOperationException("Cannot get key set");
}

@Override
public byte[] getCacheKey()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -173,12 +174,24 @@ public boolean canIterate()
return false;
}

@Override
public boolean canGetKeySet()
{
return false;
}

@Override
public Iterable<Map.Entry<String, String>> iterable()
{
throw new UnsupportedOperationException("Cannot iterate");
}

@Override
public Set<String> keySet()
{
throw new UnsupportedOperationException("Cannot get key set");
}

@Override
public byte[] getCacheKey()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -40,6 +42,9 @@ public class LoadingLookupTest extends InitializedNullHandlingTest
LoadingCache reverseLookupCache = EasyMock.createStrictMock(LoadingCache.class);
LoadingLookup loadingLookup = new LoadingLookup(dataFetcher, lookupCache, reverseLookupCache);

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void testApplyEmptyOrNull() throws ExecutionException
{
Expand Down Expand Up @@ -123,4 +128,17 @@ public void testGetCacheKey()
{
Assert.assertFalse(Arrays.equals(loadingLookup.getCacheKey(), loadingLookup.getCacheKey()));
}

@Test
public void testCanGetKeySet()
{
Assert.assertFalse(loadingLookup.canGetKeySet());
}

@Test
public void testKeySet()
{
expectedException.expect(UnsupportedOperationException.class);
loadingLookup.keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

Expand Down Expand Up @@ -62,6 +64,9 @@ public class PollingLookupTest extends InitializedNullHandlingTest

private static final long POLL_PERIOD = 1000L;

@Rule
public ExpectedException expectedException = ExpectedException.none();

@JsonTypeName("mock")
private static class MockDataFetcher implements DataFetcher
{
Expand Down Expand Up @@ -204,6 +209,19 @@ public void testGetCacheKey()
Assert.assertFalse(Arrays.equals(pollingLookup2.getCacheKey(), pollingLookup.getCacheKey()));
}

@Test
public void testCanGetKeySet()
{
Assert.assertFalse(pollingLookup.canGetKeySet());
}

@Test
public void testKeySet()
{
expectedException.expect(UnsupportedOperationException.class);
pollingLookup.keySet();
}

private void assertMapLookup(Map<String, String> map, LookupExtractor lookup)
{
for (Map.Entry<String, String> entry : map.entrySet()) {
Expand Down
74 changes: 74 additions & 0 deletions processing/src/main/java/org/apache/druid/query/Queries.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -219,4 +224,73 @@ public static <T> Query<T> withBaseDataSource(final Query<T> query, final DataSo

return retVal;
}

/**
* Helper for implementations of {@link Query#getRequiredColumns()}. Returns the list of columns that will be read
* out of a datasource by a query that uses the provided objects in the usual way.
*
* The returned set always contains {@code __time}, no matter what.
*
* If the virtual columns, filter, dimensions, aggregators, or additional columns refer to a virtual column, then the
* inputs of the virtual column will be returned instead of the name of the virtual column itself. Therefore, the
* returned list will never contain the names of any virtual columns.
*
* @param virtualColumns virtual columns whose inputs should be included.
* @param filter optional filter whose inputs should be included.
* @param dimensions dimension specs whose inputs should be included.
* @param aggregators aggregators whose inputs should be included.
* @param additionalColumns additional columns to include. Each of these will be added to the returned set, unless it
* refers to a virtual column, in which case the virtual column inputs will be added instead.
*/
public static Set<String> computeRequiredColumns(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

final VirtualColumns virtualColumns,
@Nullable final DimFilter filter,
final List<DimensionSpec> dimensions,
final List<AggregatorFactory> aggregators,
final List<String> additionalColumns
)
{
final Set<String> requiredColumns = new HashSet<>();

// Everyone needs __time (it's used by intervals filters).
requiredColumns.add(ColumnHolder.TIME_COLUMN_NAME);

for (VirtualColumn virtualColumn : virtualColumns.getVirtualColumns()) {
for (String column : virtualColumn.requiredColumns()) {
if (!virtualColumns.exists(column)) {
requiredColumns.addAll(virtualColumn.requiredColumns());
}
}
}

if (filter != null) {
for (String column : filter.getRequiredColumns()) {
if (!virtualColumns.exists(column)) {
requiredColumns.add(column);
}
}
}

for (DimensionSpec dimensionSpec : dimensions) {
if (!virtualColumns.exists(dimensionSpec.getDimension())) {
requiredColumns.add(dimensionSpec.getDimension());
}
}

for (AggregatorFactory aggregator : aggregators) {
for (String column : aggregator.requiredFields()) {
if (!virtualColumns.exists(column)) {
requiredColumns.add(column);
}
}
}

for (String column : additionalColumns) {
if (!virtualColumns.exists(column)) {
requiredColumns.add(column);
}
}

return requiredColumns;
}
}
17 changes: 17 additions & 0 deletions processing/src/main/java/org/apache/druid/query/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -193,4 +194,20 @@ default VirtualColumns getVirtualColumns()
{
return VirtualColumns.EMPTY;
}

/**
* Returns the set of columns that this query will need to access out of its datasource.
*
* This method does not "look into" what the datasource itself is doing. For example, if a query is built on a
* {@link QueryDataSource}, this method will not return the columns used by that subquery. As another example, if a
* query is built on a {@link JoinDataSource}, this method will not return the columns from the underlying datasources
* that are used by the join condition, unless those columns are also used by this query in other ways.
*
* Returns null if the set of required columns cannot be known ahead of time.
*/
@Nullable
default Set<String> getRequiredColumns()
{
return null;
}
}
12 changes: 12 additions & 0 deletions processing/src/main/java/org/apache/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class QueryContexts
public static final String JOIN_FILTER_PUSH_DOWN_KEY = "enableJoinFilterPushDown";
public static final String JOIN_FILTER_REWRITE_ENABLE_KEY = "enableJoinFilterRewrite";
public static final String JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY = "enableJoinFilterRewriteValueColumnFilters";
public static final String REWRITE_JOIN_TO_FILTER_ENABLE_KEY = "enableRewriteJoinToFilter";
public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY = "joinFilterRewriteMaxSize";
// This flag control whether a sql join query with left scan should be attempted to be run as direct table access
// instead of being wrapped inside a query. With direct table access enabled, druid can push down the join operation to
Expand All @@ -80,6 +81,7 @@ public class QueryContexts
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false;
public static final boolean DEFAULT_ENABLE_REWRITE_JOIN_TO_FILTER = false;
public static final long DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE = 10000;
public static final boolean DEFAULT_ENABLE_SQL_JOIN_LEFT_SCAN_DIRECT = false;
public static final boolean DEFAULT_USE_FILTER_CNF = false;
Expand Down Expand Up @@ -274,6 +276,7 @@ public static <T> int getParallelMergeParallelism(Query<T> query, int defaultVal
{
return parseInt(query, BROKER_PARALLELISM, defaultValue);
}

public static <T> boolean getEnableJoinFilterRewriteValueColumnFilters(Query<T> query)
{
return parseBoolean(
Expand All @@ -283,6 +286,15 @@ public static <T> boolean getEnableJoinFilterRewriteValueColumnFilters(Query<T>
);
}

public static <T> boolean getEnableRewriteJoinToFilter(Query<T> query)
{
return parseBoolean(
query,
REWRITE_JOIN_TO_FILTER_ENABLE_KEY,
DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS
DEFAULT_ENABLE_REWRITE_JOIN_TO_FILTER

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😱

);
}

public static <T> long getJoinFilterRewriteMaxSize(Query<T> query)
{
return parseLong(query, JOIN_FILTER_REWRITE_MAX_SIZE_KEY, DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

@JsonTypeName("map")
Expand Down Expand Up @@ -128,12 +129,24 @@ public boolean canIterate()
return true;
}

@Override
public boolean canGetKeySet()
{
return true;
}

@Override
public Iterable<Map.Entry<String, String>> iterable()
{
return map.entrySet();
}

@Override
public Set<String> keySet()
{
return Collections.unmodifiableSet(map.keySet());
}

@Override
public boolean equals(Object o)
{
Expand Down
Loading