Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sorting on complex columns in MSQ #16322

Merged
merged 29 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1c46cb4
init
LakshSingla Apr 23, 2024
486d29a
working, almost
LakshSingla Apr 26, 2024
260206b
stuff working
LakshSingla May 3, 2024
b93b53e
Merge branch 'master' into msq-complex-sorting
LakshSingla May 3, 2024
ac8ba6b
tests, checkstyle
LakshSingla May 6, 2024
abbf49b
tests
LakshSingla May 6, 2024
b7ea7b8
more changes
LakshSingla May 6, 2024
f3caaf6
test comments
LakshSingla May 8, 2024
c852c35
changes
LakshSingla May 8, 2024
d2dd402
tests
LakshSingla May 8, 2024
a0d29d4
tests
LakshSingla May 8, 2024
ae87dab
tests
LakshSingla May 8, 2024
4b86b31
better comment
LakshSingla May 8, 2024
1ec5796
tests fix
LakshSingla May 8, 2024
b68948e
tests fix
LakshSingla May 8, 2024
08633f0
tests fix, test framework fix, comments
LakshSingla May 9, 2024
8d49678
Trigger Build
LakshSingla May 9, 2024
76fd60e
Merge branch 'master' into msq-complex-sorting
LakshSingla May 9, 2024
cd29bec
merge fix
LakshSingla May 9, 2024
690c5cf
convert list to array
LakshSingla May 9, 2024
d1d28e1
add back old tests
LakshSingla May 9, 2024
7f27724
preserve old tests, add new tests for complexcol + byte comparable col
LakshSingla May 9, 2024
ae77984
tests
LakshSingla May 10, 2024
2af2977
add benchmarks for nested data
LakshSingla May 10, 2024
b1c61cb
final set, have separate methods
LakshSingla May 10, 2024
eaa0593
some more final changes
LakshSingla May 10, 2024
2e23831
Merge branch 'master' into msq-complex-sorting
LakshSingla May 10, 2024
7356aa2
review comments
LakshSingla May 13, 2024
806b1ec
Update FrameWriterUtils.java
LakshSingla May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codestyle/druid-forbidden-apis.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ org.apache.calcite.sql.type.OperandTypes#NULLABLE_LITERAL @ Create an instance o
org.apache.commons.io.FileUtils#getTempDirectory() @ Use org.junit.rules.TemporaryFolder for tests instead
org.apache.commons.io.FileUtils#deleteDirectory(java.io.File) @ Use org.apache.druid.java.util.common.FileUtils#deleteDirectory()
org.apache.commons.io.FileUtils#forceMkdir(java.io.File) @ Use org.apache.druid.java.util.common.FileUtils.mkdirp instead
org.apache.datasketches.memory.Memory#wrap(byte[], int, int, java.nio.ByteOrder) @ The implementation isn't correct in datasketches-memory-2.2.0. Wrap the array in a ByteBuffer before wrapping it up in Memory
java.lang.Class#getCanonicalName() @ Class.getCanonicalName can return null for anonymous types, use Class.getName instead.
java.util.concurrent.Executors#newFixedThreadPool(int) @ Executor is non-daemon and can prevent JVM shutdown, use org.apache.druid.java.util.common.concurrent.Execs#multiThreaded(int, java.lang.String) instead.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,6 @@ private static DataSourcePlan forSortMergeJoin(
final List<KeyColumn> leftPartitionKey = partitionKeys.get(0);
leftBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(leftPartitionKey, 0), maxWorkerCount));
leftBuilder.signature(QueryKitUtils.sortableSignature(leftBuilder.getSignature(), leftPartitionKey));

LakshSingla marked this conversation as resolved.
Show resolved Hide resolved
// Build up the right stage.
final StageDefinitionBuilder rightBuilder = subQueryDefBuilder.getStageBuilder(
((StageInputSpec) Iterables.getOnlyElement(rightPlan.getInputSpecs())).getStageNumber()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@
static ClusterBy computeClusterByForResults(final GroupByQuery query)
{
if (query.getLimitSpec() instanceof DefaultLimitSpec) {
final RowSignature resultSignature = computeResultSignature(query);
Fixed Show fixed Hide fixed
final DefaultLimitSpec defaultLimitSpec = (DefaultLimitSpec) query.getLimitSpec();

if (!defaultLimitSpec.getColumns().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class ClusterByStatisticsCollectorImpl implements ClusterByStatisticsColl

private ClusterByStatisticsCollectorImpl(
final ClusterBy clusterBy,
final RowSignature rowSignature,
final RowKeyReader keyReader,
final KeyCollectorFactory<?, ?> keyCollectorFactory,
final long maxRetainedBytes,
Expand All @@ -78,7 +79,7 @@ private ClusterByStatisticsCollectorImpl(
this.keyReader = keyReader;
this.keyCollectorFactory = keyCollectorFactory;
this.maxRetainedBytes = maxRetainedBytes;
this.buckets = new TreeMap<>(clusterBy.bucketComparator());
this.buckets = new TreeMap<>(clusterBy.bucketComparator(rowSignature));
this.maxBuckets = maxBuckets;
this.checkHasMultipleValues = checkHasMultipleValues;
this.hasMultipleValues = checkHasMultipleValues ? new boolean[clusterBy.getColumns().size()] : null;
Expand All @@ -98,10 +99,12 @@ public static ClusterByStatisticsCollector create(
)
{
final RowKeyReader keyReader = clusterBy.keyReader(signature);
final KeyCollectorFactory<?, ?> keyCollectorFactory = KeyCollectors.makeStandardFactory(clusterBy, aggregate);
final KeyCollectorFactory<?, ?> keyCollectorFactory =
KeyCollectors.makeStandardFactory(clusterBy, aggregate, signature);

return new ClusterByStatisticsCollectorImpl(
clusterBy,
signature,
keyReader,
keyCollectorFactory,
maxRetainedBytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.RowKey;
import org.apache.druid.segment.column.RowSignature;

import java.util.Comparator;
import java.util.stream.Collectors;
Expand All @@ -36,9 +37,9 @@ private DistinctKeyCollectorFactory(Comparator<RowKey> comparator)
this.comparator = comparator;
}

static DistinctKeyCollectorFactory create(final ClusterBy clusterBy)
static DistinctKeyCollectorFactory create(final ClusterBy clusterBy, RowSignature rowSignature)
{
return new DistinctKeyCollectorFactory(clusterBy.keyComparator());
return new DistinctKeyCollectorFactory(clusterBy.keyComparator(rowSignature));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.msq.statistics;

import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.segment.column.RowSignature;

public class KeyCollectors
{
Expand All @@ -33,19 +34,20 @@ private KeyCollectors()
*/
public static KeyCollectorFactory<?, ?> makeStandardFactory(
final ClusterBy clusterBy,
final boolean aggregate
final boolean aggregate,
final RowSignature rowSignature
)
{
final KeyCollectorFactory<?, ?> baseFactory;

if (aggregate) {
baseFactory = DistinctKeyCollectorFactory.create(clusterBy);
baseFactory = DistinctKeyCollectorFactory.create(clusterBy, rowSignature);
} else {
baseFactory = QuantilesSketchKeyCollectorFactory.create(clusterBy);
baseFactory = QuantilesSketchKeyCollectorFactory.create(clusterBy, rowSignature);
}

// Wrap in DelegateOrMinKeyCollectorFactory, so we are guaranteed to be able to downsample to a single key. This
// is important because it allows us to better handle large numbers of small buckets.
return new DelegateOrMinKeyCollectorFactory<>(clusterBy.keyComparator(), baseFactory);
return new DelegateOrMinKeyCollectorFactory<>(clusterBy.keyComparator(rowSignature), baseFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.datasketches.quantiles.ItemsSketch;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.column.RowSignature;

import java.nio.ByteOrder;
import java.util.Arrays;
Expand All @@ -46,9 +47,9 @@ private QuantilesSketchKeyCollectorFactory(final Comparator<byte[]> comparator)
this.comparator = comparator;
}

static QuantilesSketchKeyCollectorFactory create(final ClusterBy clusterBy)
static QuantilesSketchKeyCollectorFactory create(final ClusterBy clusterBy, final RowSignature rowSignature)
{
return new QuantilesSketchKeyCollectorFactory(clusterBy.byteKeyComparator());
return new QuantilesSketchKeyCollectorFactory(clusterBy.byteKeyComparator(rowSignature));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.exec;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.NestedDataTestUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CompressionUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Collection;

public class MSQComplexGroupByTest extends MSQTestBase
{
static {
NestedDataModule.registerHandlersAndSerde();
}

private String dataFileNameJsonString;
private String dataFileSignatureJsonString;
private DataSource dataFileExternalDataSource;


public static Collection<Object[]> data()
{
Object[][] data = new Object[][]{
{DEFAULT, DEFAULT_MSQ_CONTEXT},
{DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT},
{FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
{PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}
};
return Arrays.asList(data);
}

@BeforeEach
public void setup() throws IOException
{
File dataFile = newTempFile("dataFile");
final InputStream resourceStream = this.getClass().getClassLoader()
.getResourceAsStream(NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE);
final InputStream decompressing = CompressionUtils.decompress(
resourceStream,
"nested-all-types-test-data.json"
);
Files.copy(decompressing, dataFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
decompressing.close();

dataFileNameJsonString = queryFramework().queryJsonMapper().writeValueAsString(dataFile);

RowSignature dataFileSignature = RowSignature.builder()
.add("timestamp", ColumnType.STRING)
.add("obj", ColumnType.NESTED_DATA)
.add("arrayObject", ColumnType.STRING_ARRAY)
.add("arrayNestedLong", ColumnType.LONG_ARRAY)
.build();
dataFileSignatureJsonString = queryFramework().queryJsonMapper().writeValueAsString(dataFileSignature);

dataFileExternalDataSource = new ExternalDataSource(
new LocalInputSource(null, null, ImmutableList.of(dataFile), SystemFields.none()),
new JsonInputFormat(null, null, null, null, null),
dataFileSignature
);

objectMapper.registerModules(NestedDataModule.getJacksonModulesList());
}

@Test
public void testInsertWithRollupOnNestedData()
{
testIngestQuery().setSql("INSERT INTO foo1 SELECT\n"
+ " obj,\n"
+ " COUNT(*) as cnt\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + dataFileNameJsonString + "],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"STRING\"}, {\"name\": \"obj\", \"type\": \"COMPLEX<json>\"}]'\n"
+ " )\n"
+ " )\n"
+ " GROUP BY 1\n"
+ " PARTITIONED BY ALL")
.setQueryContext(ImmutableMap.of())
.setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0)))
.setExpectedDataSource("foo1")
.setExpectedRowSignature(RowSignature.builder()
.add("obj", ColumnType.NESTED_DATA)
.add("cnt", ColumnType.LONG)
.build())
.setExpectedResultRows(ImmutableList.of())
.verifyResults();

}

@Test
public void testSortingOnNestedData()
{
testSelectQuery().setSql("SELECT\n"
+ " obj\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + dataFileNameJsonString + "],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"STRING\"}, {\"name\": \"obj\", \"type\": \"COMPLEX<json>\"}]'\n"

+ " )\n"
+ " )\n"
+ " ORDER BY 1")
.setQueryContext(ImmutableMap.of())
.setExpectedRowSignature(RowSignature.builder()
.add("obj", ColumnType.NESTED_DATA)
.build())
.setExpectedResultRows(ImmutableList.of())
.verifyResults();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void test_clusterByX_unique()
.iterator();

final NavigableMap<RowKey, List<Integer>> sortedKeyWeights =
computeSortedKeyWeightsFromUnweightedKeys(keys, clusterBy.keyComparator());
computeSortedKeyWeightsFromUnweightedKeys(keys, clusterBy.keyComparator(SIGNATURE));

doTest(
clusterBy,
Expand Down Expand Up @@ -157,7 +157,7 @@ public void test_clusterByX_everyKeyAppearsTwice()
}

final NavigableMap<RowKey, List<Integer>> sortedKeyWeights =
computeSortedKeyWeightsFromUnweightedKeys(keys, clusterBy.keyComparator());
computeSortedKeyWeightsFromUnweightedKeys(keys, clusterBy.keyComparator(SIGNATURE));

doTest(
clusterBy,
Expand Down Expand Up @@ -208,7 +208,7 @@ public void test_clusterByX_everyKeyAppearsTwice_withAggregation()
}

final NavigableMap<RowKey, List<Integer>> sortedKeyWeights =
computeSortedKeyWeightsFromUnweightedKeys(keys, clusterBy.keyComparator());
computeSortedKeyWeightsFromUnweightedKeys(keys, clusterBy.keyComparator(SIGNATURE));

doTest(
clusterBy,
Expand Down Expand Up @@ -267,7 +267,7 @@ public void test_clusterByXYbucketByX_threeX_uniqueY()
}

final NavigableMap<RowKey, List<Integer>> sortedKeyWeights =
computeSortedKeyWeightsFromUnweightedKeys(keys, clusterBy.keyComparator());
computeSortedKeyWeightsFromUnweightedKeys(keys, clusterBy.keyComparator(SIGNATURE));

doTest(
clusterBy,
Expand Down Expand Up @@ -331,7 +331,7 @@ public void test_clusterByXYbucketByX_maxX_uniqueY()
}

final NavigableMap<RowKey, List<Integer>> sortedKeyWeights =
computeSortedKeyWeightsFromUnweightedKeys(keys, clusterBy.keyComparator());
computeSortedKeyWeightsFromUnweightedKeys(keys, clusterBy.keyComparator(SIGNATURE));

doTest(
clusterBy,
Expand Down Expand Up @@ -402,7 +402,7 @@ public void test_clusterByXYbucketByX_maxX_lowCardinalityY_withAggregation()
}

final NavigableMap<RowKey, List<Integer>> sortedKeyWeights =
computeSortedKeyWeightsFromUnweightedKeys(keys, clusterBy.keyComparator());
computeSortedKeyWeightsFromUnweightedKeys(keys, clusterBy.keyComparator(SIGNATURE));

doTest(
clusterBy,
Expand Down Expand Up @@ -551,7 +551,7 @@ private void doTest(
final BiConsumer<String, ClusterByStatisticsCollectorImpl> testFn
)
{
final Comparator<RowKey> comparator = clusterBy.keyComparator();
final Comparator<RowKey> comparator = clusterBy.keyComparator(SIGNATURE);

// Load into single collector, sorted order.
final ClusterByStatisticsCollectorImpl sortedCollector = makeCollector(clusterBy, aggregate);
Expand Down Expand Up @@ -649,7 +649,7 @@ private static void verifyPartitions(
testName,
partitions,
sortedKeyWeights.firstKey(),
clusterBy.keyComparator()
clusterBy.keyComparator(SIGNATURE)
);
verifyPartitionWeights(testName, clusterBy, partitions, sortedKeyWeights, aggregate, expectedPartitionSize);
}
Expand Down
Loading
Loading