Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sorting on complex columns in MSQ #16322

Merged
merged 29 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1c46cb4
init
LakshSingla Apr 23, 2024
486d29a
working, almost
LakshSingla Apr 26, 2024
260206b
stuff working
LakshSingla May 3, 2024
b93b53e
Merge branch 'master' into msq-complex-sorting
LakshSingla May 3, 2024
ac8ba6b
tests, checkstyle
LakshSingla May 6, 2024
abbf49b
tests
LakshSingla May 6, 2024
b7ea7b8
more changes
LakshSingla May 6, 2024
f3caaf6
test comments
LakshSingla May 8, 2024
c852c35
changes
LakshSingla May 8, 2024
d2dd402
tests
LakshSingla May 8, 2024
a0d29d4
tests
LakshSingla May 8, 2024
ae87dab
tests
LakshSingla May 8, 2024
4b86b31
better comment
LakshSingla May 8, 2024
1ec5796
tests fix
LakshSingla May 8, 2024
b68948e
tests fix
LakshSingla May 8, 2024
08633f0
tests fix, test framework fix, comments
LakshSingla May 9, 2024
8d49678
Trigger Build
LakshSingla May 9, 2024
76fd60e
Merge branch 'master' into msq-complex-sorting
LakshSingla May 9, 2024
cd29bec
merge fix
LakshSingla May 9, 2024
690c5cf
convert list to array
LakshSingla May 9, 2024
d1d28e1
add back old tests
LakshSingla May 9, 2024
7f27724
preserve old tests, add new tests for complexcol + byte comparable col
LakshSingla May 9, 2024
ae77984
tests
LakshSingla May 10, 2024
2af2977
add benchmarks for nested data
LakshSingla May 10, 2024
b1c61cb
final set, have separate methods
LakshSingla May 10, 2024
eaa0593
some more final changes
LakshSingla May 10, 2024
2e23831
Merge branch 'master' into msq-complex-sorting
LakshSingla May 10, 2024
7356aa2
review comments
LakshSingla May 13, 2024
806b1ec
Update FrameWriterUtils.java
LakshSingla May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.StringUtils;
Expand All @@ -47,6 +49,7 @@
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.nested.StructuredData;
import org.apache.druid.timeline.SegmentId;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -82,6 +85,7 @@ public class FrameChannelMergerBenchmark
{
static {
NullHandling.initializeForTests();
NestedDataModule.registerHandlersAndSerde();
}

private static final String KEY = "key";
Expand All @@ -99,6 +103,9 @@ public class FrameChannelMergerBenchmark
@Param({"100"})
private int rowLength;

@Param({"string", "nested"})
private String columnType;

/**
* Linked to {@link KeyGenerator}.
*/
Expand All @@ -121,13 +128,20 @@ enum KeyGenerator
*/
RANDOM {
@Override
public String generateKey(int rowNumber, int keyLength)
public Comparable generateKey(int rowNumber, int keyLength, String columnType)
{
final StringBuilder builder = new StringBuilder(keyLength);
for (int i = 0; i < keyLength; i++) {
builder.append((char) ('a' + ThreadLocalRandom.current().nextInt(26)));
}
return builder.toString();
String str = builder.toString();
if ("string".equals(columnType)) {
return str;
} else if ("nested".equals(columnType)) {
return StructuredData.wrap(str);
} else {
throw new IAE("unsupported column type");
}
}
},

Expand All @@ -136,13 +150,20 @@ public String generateKey(int rowNumber, int keyLength)
*/
SEQUENTIAL {
@Override
public String generateKey(int rowNumber, int keyLength)
public Comparable generateKey(int rowNumber, int keyLength, String columnType)
{
return StringUtils.format("%0" + keyLength + "d", rowNumber);
String str = StringUtils.format("%0" + keyLength + "d", rowNumber);
if ("string".equals(columnType)) {
return str;
} else if ("nested".equals(columnType)) {
return StructuredData.wrap(str);
} else {
throw new IAE("unsupported column type");
}
}
};

public abstract String generateKey(int rowNumber, int keyLength);
public abstract Comparable generateKey(int rowNumber, int keyLength, String columnType);
}

/**
Expand Down Expand Up @@ -176,13 +197,9 @@ public int getChannelNumber(int rowNumber, int numRows, int numChannels)
public abstract int getChannelNumber(int rowNumber, int numRows, int numChannels);
}

private final RowSignature signature =
RowSignature.builder()
.add(KEY, ColumnType.STRING)
.add(VALUE, ColumnType.STRING)
.build();
private RowSignature signature;
private FrameReader frameReader;

private final FrameReader frameReader = FrameReader.create(signature);
private final List<KeyColumn> sortKey = ImmutableList.of(new KeyColumn(KEY, KeyOrder.ASCENDING));

private List<List<Frame>> channelFrames;
Expand All @@ -200,6 +217,14 @@ public int getChannelNumber(int rowNumber, int numRows, int numChannels)
@Setup(Level.Trial)
public void setupTrial()
{
signature =
RowSignature.builder()
.add(KEY, createKeyColumnTypeFromTypeString(columnType))
.add(VALUE, ColumnType.STRING)
.build();

frameReader = FrameReader.create(signature);

exec = new FrameProcessorExecutor(
MoreExecutors.listeningDecorator(
Execs.singleThreaded(StringUtils.encodeForFormat(getClass().getSimpleName()))
Expand All @@ -211,14 +236,15 @@ public void setupTrial()
ChannelDistribution.valueOf(StringUtils.toUpperCase(channelDistributionString));

// Create channelRows which holds rows for each channel.
final List<List<NonnullPair<String, String>>> channelRows = new ArrayList<>();
final List<List<NonnullPair<Comparable, String>>> channelRows = new ArrayList<>();
channelFrames = new ArrayList<>();
for (int channelNumber = 0; channelNumber < numChannels; channelNumber++) {
channelRows.add(new ArrayList<>());
channelFrames.add(new ArrayList<>());
}

// Create "valueString", a string full of spaces to pad out the row.
// Create "valueString", a string full of spaces to pad out the row. Nested columns wrap up strings with the
// corresponding keyLength, therefore the padding works out for the nested types as well.
final StringBuilder valueStringBuilder = new StringBuilder();
for (int i = 0; i < rowLength - keyLength; i++) {
valueStringBuilder.append(' ');
Expand All @@ -227,20 +253,20 @@ public void setupTrial()

// Populate "channelRows".
for (int rowNumber = 0; rowNumber < numRows; rowNumber++) {
final String keyString = keyGenerator.generateKey(rowNumber, keyLength);
final NonnullPair<String, String> row = new NonnullPair<>(keyString, valueString);
final Comparable keyObject = keyGenerator.generateKey(rowNumber, keyLength, columnType);
final NonnullPair<Comparable, String> row = new NonnullPair<>(keyObject, valueString);
channelRows.get(channelDistribution.getChannelNumber(rowNumber, numRows, numChannels)).add(row);
}

// Sort each "channelRows".
for (List<NonnullPair<String, String>> rows : channelRows) {
for (List<NonnullPair<Comparable, String>> rows : channelRows) {
rows.sort(Comparator.comparing(row -> row.lhs));
}

// Populate each "channelFrames".
for (int channelNumber = 0; channelNumber < numChannels; channelNumber++) {
final List<NonnullPair<String, String>> rows = channelRows.get(channelNumber);
final RowBasedSegment<NonnullPair<String, String>> segment =
final List<NonnullPair<Comparable, String>> rows = channelRows.get(channelNumber);
final RowBasedSegment<NonnullPair<Comparable, String>> segment =
new RowBasedSegment<>(
SegmentId.dummy("__dummy"),
Sequences.simple(rows),
Expand Down Expand Up @@ -350,4 +376,14 @@ public void mergeChannels(Blackhole blackhole)
throw new ISE("Incorrect numRows[%s], expected[%s]", FutureUtils.getUncheckedImmediately(retVal), numRows);
}
}

private ColumnType createKeyColumnTypeFromTypeString(final String columnTypeString)
{
if ("string".equals(columnTypeString)) {
return ColumnType.STRING;
} else if ("nested".equals(columnTypeString)) {
return ColumnType.NESTED_DATA;
}
throw new IAE("Unsupported type [%s]", columnTypeString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ private static DataSourcePlan forSortMergeJoin(
final List<KeyColumn> leftPartitionKey = partitionKeys.get(0);
leftBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(leftPartitionKey, 0), maxWorkerCount));
leftBuilder.signature(QueryKitUtils.sortableSignature(leftBuilder.getSignature(), leftPartitionKey));

// Build up the right stage.
final StageDefinitionBuilder rightBuilder = subQueryDefBuilder.getStageBuilder(
((StageInputSpec) Iterables.getOnlyElement(rightPlan.getInputSpecs())).getStageNumber()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testInsertWithoutRollupOnNestedData(String contextName, Map<String,
"v", Collections.emptyList()
)
),
2L
1L
},
new Object[]{
0L,
Expand Down Expand Up @@ -293,7 +293,7 @@ public void testInsertWithRollupOnNestedData(String contextName, Map<String, Obj
"v", Collections.emptyList()
)
),
2L
1L
},
new Object[]{
0L,
Expand Down Expand Up @@ -408,7 +408,6 @@ public void testSortingOnNestedData(String contextName, Map<String, Object> cont
.setExpectedResultRows(ImmutableList.of(
new Object[]{"{\"a\":500,\"b\":{\"x\":\"e\",\"z\":[1,2,3,4]},\"v\":\"a\"}"},
new Object[]{"{\"a\":100,\"b\":{\"x\":\"a\",\"y\":1.1,\"z\":[1,2,3,4]},\"v\":[]}"},
new Object[]{"{\"a\":100,\"b\":{\"x\":\"a\",\"y\":1.1,\"z\":[1,2,3,4]},\"v\":[]}"},
new Object[]{"{\"a\":700,\"b\":{\"x\":\"g\",\"y\":1.1,\"z\":[9,null,9,9]},\"v\":[]}"},
new Object[]{"{\"a\":200,\"b\":{\"x\":\"b\",\"y\":1.1,\"z\":[2,4,6]},\"v\":[]}"},
new Object[]{"{\"a\":600,\"b\":{\"x\":\"f\",\"y\":1.1,\"z\":[6,7,8,9]},\"v\":\"b\"}"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.sql.calcite.CalciteNestedDataQueryTest;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.TempDirProducer;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/**
* Runs {@link CalciteNestedDataQueryTest} but with MSQ engine
*/
@SqlTestFramework.SqlTestFrameWorkModule(CalciteNestedDataQueryMSQTest.NestedDataQueryMSQComponentSupplier.class)
@SqlTestFrameworkConfig.ComponentSupplier(CalciteNestedDataQueryMSQTest.NestedDataQueryMSQComponentSupplier.class)
public class CalciteNestedDataQueryMSQTest extends CalciteNestedDataQueryTest
{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.druid.msq.test;

import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.QueryTestRunner;
import org.junit.Assert;
Expand Down Expand Up @@ -55,6 +57,7 @@ public QueryTestRunner.QueryRunStep make(QueryTestBuilder builder, QueryTestRunn
return new QueryTestRunner.BaseExecuteQuery(builder)
{
final List<QueryTestRunner.QueryResults> extractedResults = new ArrayList<>();
final RowSignature resultsSignature = null;

final MSQTestOverlordServiceClient overlordClient = overlordClientSupplier.get();

Expand Down Expand Up @@ -99,7 +102,10 @@ public void run()
if (resultRows == null) {
throw new ISE("Results report not present in the task's report payload");
}
extractedResults.add(results.withResults(resultRows));
extractedResults.add(
results.withSignatureAndResults(
convertColumnAndTypeToRowSignature(payload.getResults().getSignature()), resultRows)
);
}
}

Expand All @@ -108,6 +114,15 @@ public List<QueryTestRunner.QueryResults> results()
{
return extractedResults;
}

private RowSignature convertColumnAndTypeToRowSignature(final List<MSQResultsReport.ColumnAndType> columnAndTypes)
{
final RowSignature.Builder builder = RowSignature.builder();
for (MSQResultsReport.ColumnAndType columnAndType : columnAndTypes) {
builder.add(columnAndType.getName(), columnAndType.getType());
}
return builder.build();
}
};
}
}
Loading
Loading