diff --git a/processing/src/main/java/org/apache/druid/frame/write/columnar/FrameColumnWriters.java b/processing/src/main/java/org/apache/druid/frame/write/columnar/FrameColumnWriters.java index 596fa77646f3..8c5dbe758532 100644 --- a/processing/src/main/java/org/apache/druid/frame/write/columnar/FrameColumnWriters.java +++ b/processing/src/main/java/org/apache/druid/frame/write/columnar/FrameColumnWriters.java @@ -130,7 +130,7 @@ private static StringFrameColumnWriter makeStringWriter( return new StringFrameColumnWriterImpl( selector, allocator, - capabilities == null || capabilities.hasMultipleValues().isMaybeTrue() + capabilities == null ? ColumnCapabilities.Capable.UNKNOWN : capabilities.hasMultipleValues() ); } diff --git a/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java b/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java index 536918ec59ca..ec812d9654ca 100644 --- a/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java +++ b/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -45,7 +46,8 @@ public abstract class StringFrameColumnWriter imp private final T selector; private final byte typeCode; - protected final boolean multiValue; + protected final ColumnCapabilities.Capable multiValue; + protected boolean encounteredMultiValueRow; /** * Row lengths: one int per row with the number of values contained by that row and all previous rows. @@ -73,14 +75,14 @@ public abstract class StringFrameColumnWriter imp final T selector, final MemoryAllocator allocator, final byte typeCode, - final boolean multiValue + final ColumnCapabilities.Capable multiValue ) { this.selector = selector; this.typeCode = typeCode; this.multiValue = multiValue; - if (multiValue) { + if (multiValue.isMaybeTrue()) { this.cumulativeRowLengths = AppendableMemory.create(allocator, INITIAL_ALLOCATION_SIZE); } else { this.cumulativeRowLengths = null; @@ -107,7 +109,7 @@ public boolean addSelection() return false; } - if (multiValue && !cumulativeRowLengths.reserveAdditional(Integer.BYTES)) { + if (multiValue.isMaybeTrue() && !cumulativeRowLengths.reserveAdditional(Integer.BYTES)) { return false; } @@ -119,8 +121,16 @@ public boolean addSelection() return false; } + if (utf8Count != 1) { + encounteredMultiValueRow = true; + + if (multiValue.isFalse()) { + throw new ISE("Encountered unexpected multi-value row, size[%d]", utf8Count); + } + } + // Enough space has been reserved to write what we need to write; let's start. - if (multiValue) { + if (multiValue.isMaybeTrue()) { final MemoryRange rowLengthsCursor = cumulativeRowLengths.cursor(); if (utf8Data == null && typeCode == FrameColumnWriters.TYPE_STRING_ARRAY) { @@ -189,7 +199,7 @@ public void undo() throw new ISE("Cannot undo"); } - if (multiValue) { + if (multiValue.isMaybeTrue()) { cumulativeRowLengths.rewindCursor(Integer.BYTES); cumulativeStringLengths.rewindCursor(Integer.BYTES * lastRowLength); lastCumulativeRowLength -= lastRowLength; @@ -207,7 +217,7 @@ public void undo() public long size() { return DATA_OFFSET - + (multiValue ? cumulativeRowLengths.size() : 0) + + (isWriteMultiValue() ? cumulativeRowLengths.size() : 0) + cumulativeStringLengths.size() + stringData.size(); } @@ -215,13 +225,14 @@ public long size() @Override public long writeTo(final WritableMemory memory, final long startPosition) { + final boolean writeMultiValue = isWriteMultiValue(); long currentPosition = startPosition; memory.putByte(currentPosition, typeCode); - memory.putByte(currentPosition + 1, multiValue ? (byte) 1 : (byte) 0); + memory.putByte(currentPosition + 1, writeMultiValue ? (byte) 1 : (byte) 0); currentPosition += 2; - if (multiValue) { + if (writeMultiValue) { currentPosition += cumulativeRowLengths.writeTo(memory, currentPosition); } @@ -234,7 +245,7 @@ public long writeTo(final WritableMemory memory, final long startPosition) @Override public void close() { - if (multiValue) { + if (multiValue.isMaybeTrue()) { cumulativeRowLengths.close(); } @@ -249,6 +260,15 @@ public void close() @Nullable public abstract List getUtf8ByteBuffersFromSelector(T selector); + /** + * Whether, given the current state of the writer, a call to {@link #writeTo(WritableMemory, long)} at this point + * would generate a multi-value column. + */ + private boolean isWriteMultiValue() + { + return multiValue.isTrue() || encounteredMultiValueRow; + } + /** * Returns the sum of remaining bytes in the provided list of byte buffers. */ @@ -277,7 +297,7 @@ class StringFrameColumnWriterImpl extends StringFrameColumnWriter getUtf8ByteBuffersFromSelector(final DimensionSelector selector) { - return FrameWriterUtils.getUtf8ByteBuffersFromStringSelector(selector, multiValue); + return FrameWriterUtils.getUtf8ByteBuffersFromStringSelector(selector, multiValue.isMaybeTrue()); } } @@ -300,7 +320,7 @@ class StringArrayFrameColumnWriterImpl extends StringFrameColumnWriter capabilitiesAdjustFn; + public FrameWriterTest( @Nullable final FrameType inputFrameType, final FrameType outputFrameType, @@ -130,14 +143,89 @@ public static void setUpClass() } @Test - public void test_string() + public void test_string_multiValueTrue() { + capabilitiesAdjustFn = capabilities -> capabilities.setHasMultipleValues(ColumnCapabilities.Capable.TRUE); testWithDataset(FrameWriterTestData.TEST_STRINGS_SINGLE_VALUE); } @Test - public void test_multiValueString() + public void test_string_multiValueFalse() + { + capabilitiesAdjustFn = capabilities -> capabilities.setHasMultipleValues(ColumnCapabilities.Capable.FALSE); + testWithDataset(FrameWriterTestData.TEST_STRINGS_SINGLE_VALUE); + } + + @Test + public void test_string_multiValueUnknown() + { + capabilitiesAdjustFn = capabilities -> capabilities.setHasMultipleValues(ColumnCapabilities.Capable.UNKNOWN); + testWithDataset(FrameWriterTestData.TEST_STRINGS_SINGLE_VALUE); + } + + @Test + public void test_singleValueWithEmpty_multiValueTrue() + { + capabilitiesAdjustFn = capabilities -> capabilities.setHasMultipleValues(ColumnCapabilities.Capable.TRUE); + testWithDataset(FrameWriterTestData.TEST_STRINGS_MULTI_VALUE); + } + + @Test + public void test_singleValueWithEmpty_multiValueFalse() + { + capabilitiesAdjustFn = capabilities -> capabilities.setHasMultipleValues(ColumnCapabilities.Capable.FALSE); + + // When columnar frames are in multiValue = false mode, and when they see a dataset that is all single strings and + // empty arrays, they write a single-valued column, replacing the empty arrays with nulls. + final FrameWriterTestData.Dataset expectedReadDataset = + outputFrameType == FrameType.COLUMNAR + ? FrameWriterTestData.TEST_STRINGS_SINGLE_VALUE + : FrameWriterTestData.TEST_STRINGS_SINGLE_VALUE_WITH_EMPTY; + + testWithDataset( + FrameWriterTestData.TEST_STRINGS_SINGLE_VALUE_WITH_EMPTY, + expectedReadDataset + ); + } + + @Test + public void test_singleValueWithEmpty_multiValueUnknown() + { + capabilitiesAdjustFn = capabilities -> capabilities.setHasMultipleValues(ColumnCapabilities.Capable.UNKNOWN); + testWithDataset(FrameWriterTestData.TEST_STRINGS_SINGLE_VALUE_WITH_EMPTY); + } + + @Test + public void test_multiValueString_multiValueTrue() + { + capabilitiesAdjustFn = capabilities -> capabilities.setHasMultipleValues(ColumnCapabilities.Capable.TRUE); + testWithDataset(FrameWriterTestData.TEST_STRINGS_MULTI_VALUE); + } + + @Test + public void test_multiValueString_multiValueFalse() + { + capabilitiesAdjustFn = capabilities -> capabilities.setHasMultipleValues(ColumnCapabilities.Capable.FALSE); + + if (outputFrameType == FrameType.COLUMNAR) { + final IllegalStateException e = Assert.assertThrows( + IllegalStateException.class, + () -> testWithDataset(FrameWriterTestData.TEST_STRINGS_MULTI_VALUE) + ); + + MatcherAssert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Encountered unexpected multi-value row")) + ); + } else { + testWithDataset(FrameWriterTestData.TEST_STRINGS_MULTI_VALUE); + } + } + + @Test + public void test_multiValueString_multiValueUnknown() { + capabilitiesAdjustFn = capabilities -> capabilities.setHasMultipleValues(ColumnCapabilities.Capable.UNKNOWN); testWithDataset(FrameWriterTestData.TEST_STRINGS_MULTI_VALUE); } @@ -418,6 +506,7 @@ private Pair writeFrame( inputFrameType, outputFrameType, allocator, + capabilitiesAdjustFn, rows, signature, computeSortColumns(sortColumns) @@ -453,6 +542,20 @@ private void testWithDataset(final FrameWriterTestData.Dataset dataset) verifyFrame(rows(dataset.getData(sortedness)), writeResult.lhs, signature); } + private void testWithDataset( + final FrameWriterTestData.Dataset writeDataset, + final FrameWriterTestData.Dataset readDataset + ) + { + final List data = writeDataset.getData(KeyOrder.NONE); + final RowSignature signature = RowSignature.builder().add("x", writeDataset.getType()).build(); + final Sequence> rowSequence = rows(data); + final Pair writeResult = writeFrame(rowSequence, signature, signature.getColumnNames()); + + Assert.assertEquals(data.size(), (int) writeResult.rhs); + verifyFrame(rows(readDataset.getData(sortedness)), writeResult.lhs, signature); + } + /** * Writes as many rows to a single frame as possible. Returns the number of rows written. */ @@ -460,6 +563,7 @@ private static Pair writeFrame( @Nullable final FrameType inputFrameType, final FrameType outputFrameType, final MemoryAllocator allocator, + @Nullable final Consumer capabilitiesAdjustFn, final Sequence> rows, final RowSignature signature, final List keyColumns @@ -483,6 +587,7 @@ private static Pair writeFrame( null, inputFrameType, HeapMemoryAllocator.unlimited(), + null, rows, signature, Collections.emptyList() @@ -504,8 +609,17 @@ private static Pair writeFrame( keyColumns ); + ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); + + if (capabilitiesAdjustFn != null) { + columnSelectorFactory = new OverrideCapabilitiesColumnSelectorFactory( + columnSelectorFactory, + capabilitiesAdjustFn + ); + } + try (final FrameWriter frameWriter = - frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) { + frameWriterFactory.newFrameWriter(columnSelectorFactory)) { while (!cursor.isDone() && frameWriter.addSelection()) { numRows++; cursor.advance(); @@ -593,4 +707,52 @@ private static Sequence> rows(final List vals) return Sequences.simple(retVal); } + + private static class OverrideCapabilitiesColumnSelectorFactory implements ColumnSelectorFactory + { + private final ColumnSelectorFactory delegate; + private final Consumer fn; + + public OverrideCapabilitiesColumnSelectorFactory( + final ColumnSelectorFactory delegate, + final Consumer fn + ) + { + this.delegate = delegate; + this.fn = fn; + } + + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return delegate.makeDimensionSelector(dimensionSpec); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + return delegate.makeColumnValueSelector(columnName); + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + final ColumnCapabilities capabilities = delegate.getColumnCapabilities(column); + if (capabilities == null) { + return null; + } else { + final ColumnCapabilitiesImpl retVal = ColumnCapabilitiesImpl.copyOf(capabilities); + fn.accept(retVal); + return retVal; + } + } + + @Nullable + @Override + public RowIdSupplier getRowIdSupplier() + { + return delegate.getRowIdSupplier(); + } + } } diff --git a/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTestData.java b/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTestData.java index a52c4d5efdd8..72990e02086e 100644 --- a/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTestData.java +++ b/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTestData.java @@ -48,23 +48,47 @@ public class FrameWriterTestData { public static final Dataset TEST_STRINGS_SINGLE_VALUE = new Dataset<>( ColumnType.STRING, - Stream.of( + Arrays.asList( null, NullHandling.emptyToNullIfNeeded(""), // Empty string in SQL-compatible mode, null otherwise + "brown", + "dog", + "fox", + "jumps", + "lazy", + "over", + "quick", + "the", // Repeated string + "the", + "thee", // To ensure "the" is before "thee" "\uD83D\uDE42", - "\uD83E\uDEE5", "\uD83E\uDD20", - "thee", // To ensure "the" is before "thee" - "the", - "quick", + "\uD83E\uDEE5" + ) + ); + + /** + * Single-value strings, mostly, but with an empty list thrown in. + */ + public static final Dataset TEST_STRINGS_SINGLE_VALUE_WITH_EMPTY = new Dataset<>( + ColumnType.STRING, + Arrays.asList( + Collections.emptyList(), + NullHandling.emptyToNullIfNeeded(""), // Empty string in SQL-compatible mode, null otherwise "brown", + "dog", "fox", "jumps", + "lazy", "over", + "quick", "the", // Repeated string - "lazy", - "dog" - ).sorted(Comparators.naturalNullsFirst()).collect(Collectors.toList()) + "the", + "thee", // To ensure "the" is before "thee" + "\uD83D\uDE42", + "\uD83E\uDD20", + "\uD83E\uDEE5" + ) ); /**