Skip to content

Commit

Permalink
Starting on Window Functions (#13458)
Browse files Browse the repository at this point in the history
* Processors for Window Processing

This is an initial take on how to use Processors
for Window Processing.  A Processor is an interface
that transforms RowsAndColumns objects.
RowsAndColumns objects are essentially combinations
of rows and columns.

The intention is that these Processors are the start
of a set of operators that more closely resemble what
DB engineers would be accustomed to seeing.

* Wire up windowed processors with a query type that
can run them end-to-end.  This code can be used to
actually run a query, so yay!

* Wire up windowed processors with a query type that
can run them end-to-end.  This code can be used to
actually run a query, so yay!

* Some SQL tests for window functions. Added wikipedia 
data to the indexes available to the
SQL queries and tests validating the windowing
functionality as it exists now.

Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
  • Loading branch information
imply-cheddar and gianm committed Dec 6, 2022
1 parent cf47216 commit 83261f9
Show file tree
Hide file tree
Showing 127 changed files with 10,654 additions and 682 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@ public static void checkMaxSize(int available, int maxSizeBytes, TypeSignature<?
*/
public static final class LongTypeStrategy implements TypeStrategy<Long>
{
private static final Comparator<Long> COMPARATOR = Longs::compare;

@Override
public int estimateSizeBytes(Long value)
{
Expand Down Expand Up @@ -276,9 +274,9 @@ public int write(ByteBuffer buffer, Long value, int maxSizeBytes)
}

@Override
public int compare(Long o1, Long o2)
public int compare(Object o1, Object o2)
{
return COMPARATOR.compare(o1, o2);
return Longs.compare(((Number) o1).longValue(), ((Number) o2).longValue());
}
}

Expand All @@ -289,8 +287,6 @@ public int compare(Long o1, Long o2)
*/
public static final class FloatTypeStrategy implements TypeStrategy<Float>
{
private static final Comparator<Float> COMPARATOR = Floats::compare;

@Override
public int estimateSizeBytes(Float value)
{
Expand Down Expand Up @@ -329,9 +325,9 @@ public int write(ByteBuffer buffer, Float value, int maxSizeBytes)
}

@Override
public int compare(Float o1, Float o2)
public int compare(Object o1, Object o2)
{
return COMPARATOR.compare(o1, o2);
return Floats.compare(((Number) o1).floatValue(), ((Number) o2).floatValue());
}
}

Expand All @@ -342,7 +338,6 @@ public int compare(Float o1, Float o2)
*/
public static final class DoubleTypeStrategy implements TypeStrategy<Double>
{
private static final Comparator<Double> COMPARATOR = Double::compare;

@Override
public int estimateSizeBytes(Double value)
Expand Down Expand Up @@ -382,9 +377,9 @@ public int write(ByteBuffer buffer, Double value, int maxSizeBytes)
}

@Override
public int compare(Double o1, Double o2)
public int compare(Object o1, Object o2)
{
return COMPARATOR.compare(o1, o2);
return Double.compare(((Number) o1).doubleValue(), ((Number) o2).doubleValue());
}
}

Expand Down Expand Up @@ -437,7 +432,7 @@ public int write(ByteBuffer buffer, String value, int maxSizeBytes)
}

@Override
public int compare(String s, String s2)
public int compare(Object s, Object s2)
{
// copy of lexicographical string comparator in druid processing
// Avoid comparisons for equal references
Expand All @@ -447,7 +442,7 @@ public int compare(String s, String s2)
return 0;
}

return ORDERING.compare(s, s2);
return ORDERING.compare((String) s, (String) s2);
}
}

Expand Down Expand Up @@ -521,8 +516,11 @@ public int write(ByteBuffer buffer, Object[] value, int maxSizeBytes)
}

@Override
public int compare(@Nullable Object[] o1, @Nullable Object[] o2)
public int compare(@Nullable Object o1Obj, @Nullable Object o2Obj)
{
Object[] o1 = (Object[]) o1Obj;
Object[] o2 = (Object[]) o2Obj;

//noinspection ArrayEquality
if (o1 == o2) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,15 @@
* Implementations of this interface should be thread safe, but may not use {@link ByteBuffer} in a thread safe manner,
* potentially modifying positions and limits, either temporarily or permanently depending on which set of methods is
* called.
*
* This interface extends {@code Comparator<Object>} instead of {@code Comparator<T>} because trying to specialize the
* type of the comparison method can run into issues for comparators of objects that can sometimes be of a different
* java class type. For example, {@code Comparator<Long>} cannot accept Integer objects in its comparison method
* and there is no easy way for this interface definition to allow {@code TypeStrategy<Long>} to actually be a
* {@code Comparator<Number>}. So, we fall back to effectively erasing the generic type and having them all be
* {@code Comparator<Object>}.
*/
public interface TypeStrategy<T> extends Comparator<T>
public interface TypeStrategy<T> extends Comparator<Object>
{
/**
* Estimate the size in bytes that writing this value to memory would require. This method is not required to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.segment.column;

import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
Expand Down Expand Up @@ -103,7 +104,7 @@ public int write(ByteBuffer buffer, String value, int maxSizeBytes)
}

@Override
public int compare(String o1, String o2)
public int compare(Object o1, Object o2)
{
return 0;
}
Expand Down Expand Up @@ -639,10 +640,13 @@ public int compareTo(NullableLongPair o)

public static class NullableLongPairTypeStrategy implements TypeStrategy<NullableLongPair>
{

private Ordering<NullableLongPair> ordering = Comparators.naturalNullsFirst();

@Override
public int compare(NullableLongPair o1, NullableLongPair o2)
public int compare(Object o1, Object o2)
{
return Comparators.<NullableLongPair>naturalNullsFirst().compare(o1, o2);
return ordering.compare((NullableLongPair) o1, (NullableLongPair) o2);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.google.common.primitives.Ints;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.columnar.ComplexFrameColumnWriter;
import org.apache.druid.frame.write.columnar.ComplexFrameMaker;
import org.apache.druid.frame.write.columnar.FrameColumnWriters;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -53,10 +53,10 @@ public ColumnPlus readColumn(final Frame frame)
final Memory memory = frame.region(columnNumber);
validate(memory, frame.numRows());

final int typeNameLength = memory.getInt(ComplexFrameColumnWriter.TYPE_NAME_LENGTH_POSITION);
final int typeNameLength = memory.getInt(ComplexFrameMaker.TYPE_NAME_LENGTH_POSITION);
final byte[] typeNameBytes = new byte[typeNameLength];

memory.getByteArray(ComplexFrameColumnWriter.TYPE_NAME_POSITION, typeNameBytes, 0, typeNameLength);
memory.getByteArray(ComplexFrameMaker.TYPE_NAME_POSITION, typeNameBytes, 0, typeNameLength);

final String typeName = StringUtils.fromUtf8(typeNameBytes);
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
Expand Down Expand Up @@ -84,7 +84,7 @@ public ColumnPlus readColumn(final Frame frame)

private void validate(final Memory region, final int numRows)
{
if (region.getCapacity() < ComplexFrameColumnWriter.TYPE_NAME_POSITION) {
if (region.getCapacity() < ComplexFrameMaker.TYPE_NAME_POSITION) {
throw new ISE("Column is not big enough for a header");
}

Expand All @@ -93,9 +93,9 @@ private void validate(final Memory region, final int numRows)
throw new ISE("Column does not have the correct type code");
}

final int typeNameLength = region.getInt(ComplexFrameColumnWriter.TYPE_NAME_LENGTH_POSITION);
final int typeNameLength = region.getInt(ComplexFrameMaker.TYPE_NAME_LENGTH_POSITION);
if (region.getCapacity() <
ComplexFrameColumnWriter.TYPE_NAME_POSITION + typeNameLength + (long) numRows * Integer.BYTES) {
ComplexFrameMaker.TYPE_NAME_POSITION + typeNameLength + (long) numRows * Integer.BYTES) {
throw new ISE("Column is missing offset section");
}
}
Expand Down Expand Up @@ -198,7 +198,7 @@ private Object getObjectForPhysicalRow(final int physicalRow)
startOfDataSection + memory.getInt(startOfOffsetSection + (long) Integer.BYTES * (physicalRow - 1));
}

if (memory.getByte(startOffset) == ComplexFrameColumnWriter.NULL_MARKER) {
if (memory.getByte(startOffset) == ComplexFrameMaker.NULL_MARKER) {
return null;
} else {
final int payloadLength = Ints.checkedCast(endOffset - startOffset - Byte.BYTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.columnar.DoubleFrameColumnWriter;
import org.apache.druid.frame.write.columnar.DoubleFrameMaker;
import org.apache.druid.frame.write.columnar.FrameColumnWriters;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
Expand Down Expand Up @@ -69,7 +69,7 @@ private void validate(final Memory region, final int numRows)
final long memorySize = region.getCapacity();

// Check if column is big enough for a header
if (memorySize < DoubleFrameColumnWriter.DATA_OFFSET) {
if (memorySize < DoubleFrameMaker.DATA_OFFSET) {
throw new ISE("Column is not big enough for a header");
}

Expand All @@ -79,10 +79,10 @@ private void validate(final Memory region, final int numRows)
}

final boolean hasNulls = getHasNulls(region);
final int sz = DoubleFrameColumnWriter.valueSize(hasNulls);
final int sz = DoubleFrameMaker.valueSize(hasNulls);

// Check column length again, now that we know exactly how long it should be.
if (memorySize != DoubleFrameColumnWriter.DATA_OFFSET + (long) sz * numRows) {
if (memorySize != DoubleFrameMaker.DATA_OFFSET + (long) sz * numRows) {
throw new ISE("Column does not have the correct length");
}
}
Expand All @@ -108,9 +108,9 @@ private DoubleFrameColumn(
{
this.frame = frame;
this.hasNulls = hasNulls;
this.sz = DoubleFrameColumnWriter.valueSize(hasNulls);
this.sz = DoubleFrameMaker.valueSize(hasNulls);
this.memory = memory;
this.memoryPosition = DoubleFrameColumnWriter.DATA_OFFSET;
this.memoryPosition = DoubleFrameMaker.DATA_OFFSET;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.columnar.FloatFrameColumnWriter;
import org.apache.druid.frame.write.columnar.FloatFrameMaker;
import org.apache.druid.frame.write.columnar.FrameColumnWriters;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
Expand Down Expand Up @@ -69,7 +69,7 @@ private void validate(final Memory region, final int numRows)
final long memorySize = region.getCapacity();

// Check if column is big enough for a header
if (memorySize < FloatFrameColumnWriter.DATA_OFFSET) {
if (memorySize < FloatFrameMaker.DATA_OFFSET) {
throw new ISE("Column is not big enough for a header");
}

Expand All @@ -79,10 +79,10 @@ private void validate(final Memory region, final int numRows)
}

final boolean hasNulls = getHasNulls(region);
final int sz = FloatFrameColumnWriter.valueSize(hasNulls);
final int sz = FloatFrameMaker.valueSize(hasNulls);

// Check column length again, now that we know exactly how long it should be.
if (memorySize != FloatFrameColumnWriter.DATA_OFFSET + (long) sz * numRows) {
if (memorySize != FloatFrameMaker.DATA_OFFSET + (long) sz * numRows) {
throw new ISE("Column does not have the correct length");
}
}
Expand All @@ -108,9 +108,9 @@ private FloatFrameColumn(
{
this.frame = frame;
this.hasNulls = hasNulls;
this.sz = FloatFrameColumnWriter.valueSize(hasNulls);
this.sz = FloatFrameMaker.valueSize(hasNulls);
this.memory = memory;
this.memoryPosition = FloatFrameColumnWriter.DATA_OFFSET;
this.memoryPosition = FloatFrameMaker.DATA_OFFSET;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.columnar.FrameColumnWriters;
import org.apache.druid.frame.write.columnar.LongFrameColumnWriter;
import org.apache.druid.frame.write.columnar.LongFrameMaker;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
Expand Down Expand Up @@ -67,7 +67,7 @@ public ColumnPlus readColumn(final Frame frame)
private void validate(final Memory region, final int numRows)
{
// Check if column is big enough for a header
if (region.getCapacity() < LongFrameColumnWriter.DATA_OFFSET) {
if (region.getCapacity() < LongFrameMaker.DATA_OFFSET) {
throw new ISE("Column is not big enough for a header");
}

Expand All @@ -77,10 +77,10 @@ private void validate(final Memory region, final int numRows)
}

final boolean hasNulls = getHasNulls(region);
final int sz = LongFrameColumnWriter.valueSize(hasNulls);
final int sz = LongFrameMaker.valueSize(hasNulls);

// Check column length again, now that we know exactly how long it should be.
if (region.getCapacity() != LongFrameColumnWriter.DATA_OFFSET + (long) sz * numRows) {
if (region.getCapacity() != LongFrameMaker.DATA_OFFSET + (long) sz * numRows) {
throw new ISE("Column does not have the correct length");
}
}
Expand All @@ -106,9 +106,9 @@ private LongFrameColumn(
{
this.frame = frame;
this.hasNulls = hasNulls;
this.sz = LongFrameColumnWriter.valueSize(hasNulls);
this.sz = LongFrameMaker.valueSize(hasNulls);
this.memory = memory;
this.memoryPosition = LongFrameColumnWriter.DATA_OFFSET;
this.memoryPosition = LongFrameMaker.DATA_OFFSET;
}

@Override
Expand Down

0 comments on commit 83261f9

Please sign in to comment.