Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Starting on Window Functions #13458

Merged
merged 31 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a53bdb8
Processors for Window Processing
imply-cheddar Nov 7, 2022
54e8c65
Wire up windowed processors with a query type that
imply-cheddar Nov 11, 2022
a5eead8
wip
gianm Nov 12, 2022
f3b61ef
Merge remote-tracking branch 'imply-cheddar/window-operations' into s…
gianm Nov 14, 2022
f5ccb09
WindowOperatorQuery SQL bindings
gianm Nov 14, 2022
31b4005
Processors for Window Processing
imply-cheddar Nov 7, 2022
cbcb264
Wire up windowed processors with a query type that
imply-cheddar Nov 11, 2022
32971ac
Wire up windowed processors with a query type that
imply-cheddar Nov 14, 2022
76e8645
Initial SQL parsing/planning for window functions
imply-cheddar Nov 16, 2022
2148a78
Some SQL tests for window functions
imply-cheddar Nov 22, 2022
ffde5a9
Add Javadoc to interfaces
imply-cheddar Nov 24, 2022
b72a1b2
Reformat Code and optimize imports
imply-cheddar Nov 24, 2022
b5bfad4
Add license header
imply-cheddar Nov 24, 2022
5131674
2 whitespaces in the copyright clob
imply-cheddar Nov 24, 2022
5f43d28
Test and style fixes
imply-cheddar Nov 25, 2022
a979d83
Fix dependencies
imply-cheddar Nov 28, 2022
d012b86
Forbidden APIs
imply-cheddar Nov 28, 2022
598c373
Checkstyle
imply-cheddar Nov 28, 2022
f989459
Tests for coverage
imply-cheddar Nov 28, 2022
1300de7
Support SQL-compatible mode in tests
imply-cheddar Nov 28, 2022
2a2023f
Some initial tests for RowsAndColumns directly
imply-cheddar Nov 29, 2022
70488b7
Tests and style
imply-cheddar Nov 29, 2022
f3d3270
Expand branch coverage for tests
imply-cheddar Nov 30, 2022
851acb8
Tests and style
imply-cheddar Nov 30, 2022
299d735
Make windowing off by default
imply-cheddar Dec 2, 2022
ea566d9
Merge remote-tracking branch 'apache/master' into windowinging
imply-cheddar Dec 5, 2022
935864c
Make it compile after merge.
imply-cheddar Dec 5, 2022
f38945c
Changes from code review
imply-cheddar Dec 5, 2022
8cc8235
Checkstyle and tests
imply-cheddar Dec 5, 2022
01cea90
Merge remote-tracking branch 'apache/master' into windowinging
imply-cheddar Dec 6, 2022
edb06d8
Fix comment
imply-cheddar Dec 6, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@ public static void checkMaxSize(int available, int maxSizeBytes, TypeSignature<?
*/
public static final class LongTypeStrategy implements TypeStrategy<Long>
{
private static final Comparator<Long> COMPARATOR = Longs::compare;

@Override
public int estimateSizeBytes(Long value)
{
Expand Down Expand Up @@ -276,9 +274,9 @@ public int write(ByteBuffer buffer, Long value, int maxSizeBytes)
}

@Override
public int compare(Long o1, Long o2)
public int compare(Object o1, Object o2)
{
return COMPARATOR.compare(o1, o2);
return Longs.compare(((Number) o1).longValue(), ((Number) o2).longValue());
}
}

Expand All @@ -289,8 +287,6 @@ public int compare(Long o1, Long o2)
*/
public static final class FloatTypeStrategy implements TypeStrategy<Float>
{
private static final Comparator<Float> COMPARATOR = Floats::compare;

@Override
public int estimateSizeBytes(Float value)
{
Expand Down Expand Up @@ -329,9 +325,9 @@ public int write(ByteBuffer buffer, Float value, int maxSizeBytes)
}

@Override
public int compare(Float o1, Float o2)
public int compare(Object o1, Object o2)
{
return COMPARATOR.compare(o1, o2);
return Floats.compare(((Number) o1).floatValue(), ((Number) o2).floatValue());
}
}

Expand All @@ -342,7 +338,6 @@ public int compare(Float o1, Float o2)
*/
public static final class DoubleTypeStrategy implements TypeStrategy<Double>
{
private static final Comparator<Double> COMPARATOR = Double::compare;

@Override
public int estimateSizeBytes(Double value)
Expand Down Expand Up @@ -382,9 +377,9 @@ public int write(ByteBuffer buffer, Double value, int maxSizeBytes)
}

@Override
public int compare(Double o1, Double o2)
public int compare(Object o1, Object o2)
{
return COMPARATOR.compare(o1, o2);
return Double.compare(((Number) o1).doubleValue(), ((Number) o2).doubleValue());
}
}

Expand Down Expand Up @@ -437,7 +432,7 @@ public int write(ByteBuffer buffer, String value, int maxSizeBytes)
}

@Override
public int compare(String s, String s2)
public int compare(Object s, Object s2)
{
// copy of lexicographical string comparator in druid processing
// Avoid comparisons for equal references
Expand All @@ -447,7 +442,7 @@ public int compare(String s, String s2)
return 0;
}

return ORDERING.compare(s, s2);
return ORDERING.compare((String) s, (String) s2);
}
}

Expand Down Expand Up @@ -521,8 +516,11 @@ public int write(ByteBuffer buffer, Object[] value, int maxSizeBytes)
}

@Override
public int compare(@Nullable Object[] o1, @Nullable Object[] o2)
public int compare(@Nullable Object o1Obj, @Nullable Object o2Obj)
{
Object[] o1 = (Object[]) o1Obj;
Object[] o2 = (Object[]) o2Obj;

//noinspection ArrayEquality
if (o1 == o2) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,15 @@
* Implementations of this interface should be thread safe, but may not use {@link ByteBuffer} in a thread safe manner,
* potentially modifying positions and limits, either temporarily or permanently depending on which set of methods is
* called.
*
* This interface extends {@code Comparator<Object>} instead of {@code Comparator<T>} because trying to specialize the
* type of the comparison method can run into issues for comparators of objects that can sometimes be of a different
* java class type. For example, {@code Comparator<Long>} cannot accept Integer objects in its comparison method
* and there is no easy way for this interface definition to allow {@code TypeStrategy<Long>} to actually be a
* {@code Comparator<Number>}. So, we fall back to effectively erasing the generic type and having them all be
* {@code Comparator<Object>}.
*/
public interface TypeStrategy<T> extends Comparator<T>
public interface TypeStrategy<T> extends Comparator<Object>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get why you did it this way, but it's a bummer, since nothing really stops the inexorable Objectification of the Ts in this interface, except moving away from Jackson serde of plain Object and Object[] and List<Object>. That's the real issue. I think we'll be able to move away from that as we go to frame and frame channel RPC, so, I am okay with this for that reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

{
/**
* Estimate the size in bytes that writing this value to memory would require. This method is not required to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.segment.column;

import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
Expand Down Expand Up @@ -103,7 +104,7 @@ public int write(ByteBuffer buffer, String value, int maxSizeBytes)
}

@Override
public int compare(String o1, String o2)
public int compare(Object o1, Object o2)
{
return 0;
}
Expand Down Expand Up @@ -639,10 +640,13 @@ public int compareTo(NullableLongPair o)

public static class NullableLongPairTypeStrategy implements TypeStrategy<NullableLongPair>
{

private Ordering<NullableLongPair> ordering = Comparators.naturalNullsFirst();

@Override
public int compare(NullableLongPair o1, NullableLongPair o2)
public int compare(Object o1, Object o2)
{
return Comparators.<NullableLongPair>naturalNullsFirst().compare(o1, o2);
return ordering.compare((NullableLongPair) o1, (NullableLongPair) o2);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.google.common.primitives.Ints;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.columnar.ComplexFrameColumnWriter;
import org.apache.druid.frame.write.columnar.ComplexFrameMaker;
import org.apache.druid.frame.write.columnar.FrameColumnWriters;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -53,10 +53,10 @@ public ColumnPlus readColumn(final Frame frame)
final Memory memory = frame.region(columnNumber);
validate(memory, frame.numRows());

final int typeNameLength = memory.getInt(ComplexFrameColumnWriter.TYPE_NAME_LENGTH_POSITION);
final int typeNameLength = memory.getInt(ComplexFrameMaker.TYPE_NAME_LENGTH_POSITION);
final byte[] typeNameBytes = new byte[typeNameLength];

memory.getByteArray(ComplexFrameColumnWriter.TYPE_NAME_POSITION, typeNameBytes, 0, typeNameLength);
memory.getByteArray(ComplexFrameMaker.TYPE_NAME_POSITION, typeNameBytes, 0, typeNameLength);

final String typeName = StringUtils.fromUtf8(typeNameBytes);
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
Expand Down Expand Up @@ -84,7 +84,7 @@ public ColumnPlus readColumn(final Frame frame)

private void validate(final Memory region, final int numRows)
{
if (region.getCapacity() < ComplexFrameColumnWriter.TYPE_NAME_POSITION) {
if (region.getCapacity() < ComplexFrameMaker.TYPE_NAME_POSITION) {
throw new ISE("Column is not big enough for a header");
}

Expand All @@ -93,9 +93,9 @@ private void validate(final Memory region, final int numRows)
throw new ISE("Column does not have the correct type code");
}

final int typeNameLength = region.getInt(ComplexFrameColumnWriter.TYPE_NAME_LENGTH_POSITION);
final int typeNameLength = region.getInt(ComplexFrameMaker.TYPE_NAME_LENGTH_POSITION);
if (region.getCapacity() <
ComplexFrameColumnWriter.TYPE_NAME_POSITION + typeNameLength + (long) numRows * Integer.BYTES) {
ComplexFrameMaker.TYPE_NAME_POSITION + typeNameLength + (long) numRows * Integer.BYTES) {
throw new ISE("Column is missing offset section");
}
}
Expand Down Expand Up @@ -198,7 +198,7 @@ private Object getObjectForPhysicalRow(final int physicalRow)
startOfDataSection + memory.getInt(startOfOffsetSection + (long) Integer.BYTES * (physicalRow - 1));
}

if (memory.getByte(startOffset) == ComplexFrameColumnWriter.NULL_MARKER) {
if (memory.getByte(startOffset) == ComplexFrameMaker.NULL_MARKER) {
return null;
} else {
final int payloadLength = Ints.checkedCast(endOffset - startOffset - Byte.BYTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.columnar.DoubleFrameColumnWriter;
import org.apache.druid.frame.write.columnar.DoubleFrameMaker;
import org.apache.druid.frame.write.columnar.FrameColumnWriters;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
Expand Down Expand Up @@ -69,7 +69,7 @@ private void validate(final Memory region, final int numRows)
final long memorySize = region.getCapacity();

// Check if column is big enough for a header
if (memorySize < DoubleFrameColumnWriter.DATA_OFFSET) {
if (memorySize < DoubleFrameMaker.DATA_OFFSET) {
throw new ISE("Column is not big enough for a header");
}

Expand All @@ -79,10 +79,10 @@ private void validate(final Memory region, final int numRows)
}

final boolean hasNulls = getHasNulls(region);
final int sz = DoubleFrameColumnWriter.valueSize(hasNulls);
final int sz = DoubleFrameMaker.valueSize(hasNulls);

// Check column length again, now that we know exactly how long it should be.
if (memorySize != DoubleFrameColumnWriter.DATA_OFFSET + (long) sz * numRows) {
if (memorySize != DoubleFrameMaker.DATA_OFFSET + (long) sz * numRows) {
throw new ISE("Column does not have the correct length");
}
}
Expand All @@ -108,9 +108,9 @@ private DoubleFrameColumn(
{
this.frame = frame;
this.hasNulls = hasNulls;
this.sz = DoubleFrameColumnWriter.valueSize(hasNulls);
this.sz = DoubleFrameMaker.valueSize(hasNulls);
this.memory = memory;
this.memoryPosition = DoubleFrameColumnWriter.DATA_OFFSET;
this.memoryPosition = DoubleFrameMaker.DATA_OFFSET;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.columnar.FloatFrameColumnWriter;
import org.apache.druid.frame.write.columnar.FloatFrameMaker;
import org.apache.druid.frame.write.columnar.FrameColumnWriters;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
Expand Down Expand Up @@ -69,7 +69,7 @@ private void validate(final Memory region, final int numRows)
final long memorySize = region.getCapacity();

// Check if column is big enough for a header
if (memorySize < FloatFrameColumnWriter.DATA_OFFSET) {
if (memorySize < FloatFrameMaker.DATA_OFFSET) {
throw new ISE("Column is not big enough for a header");
}

Expand All @@ -79,10 +79,10 @@ private void validate(final Memory region, final int numRows)
}

final boolean hasNulls = getHasNulls(region);
final int sz = FloatFrameColumnWriter.valueSize(hasNulls);
final int sz = FloatFrameMaker.valueSize(hasNulls);

// Check column length again, now that we know exactly how long it should be.
if (memorySize != FloatFrameColumnWriter.DATA_OFFSET + (long) sz * numRows) {
if (memorySize != FloatFrameMaker.DATA_OFFSET + (long) sz * numRows) {
throw new ISE("Column does not have the correct length");
}
}
Expand All @@ -108,9 +108,9 @@ private FloatFrameColumn(
{
this.frame = frame;
this.hasNulls = hasNulls;
this.sz = FloatFrameColumnWriter.valueSize(hasNulls);
this.sz = FloatFrameMaker.valueSize(hasNulls);
this.memory = memory;
this.memoryPosition = FloatFrameColumnWriter.DATA_OFFSET;
this.memoryPosition = FloatFrameMaker.DATA_OFFSET;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.columnar.FrameColumnWriters;
import org.apache.druid.frame.write.columnar.LongFrameColumnWriter;
import org.apache.druid.frame.write.columnar.LongFrameMaker;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
Expand Down Expand Up @@ -67,7 +67,7 @@ public ColumnPlus readColumn(final Frame frame)
private void validate(final Memory region, final int numRows)
{
// Check if column is big enough for a header
if (region.getCapacity() < LongFrameColumnWriter.DATA_OFFSET) {
if (region.getCapacity() < LongFrameMaker.DATA_OFFSET) {
throw new ISE("Column is not big enough for a header");
}

Expand All @@ -77,10 +77,10 @@ private void validate(final Memory region, final int numRows)
}

final boolean hasNulls = getHasNulls(region);
final int sz = LongFrameColumnWriter.valueSize(hasNulls);
final int sz = LongFrameMaker.valueSize(hasNulls);

// Check column length again, now that we know exactly how long it should be.
if (region.getCapacity() != LongFrameColumnWriter.DATA_OFFSET + (long) sz * numRows) {
if (region.getCapacity() != LongFrameMaker.DATA_OFFSET + (long) sz * numRows) {
throw new ISE("Column does not have the correct length");
}
}
Expand All @@ -106,9 +106,9 @@ private LongFrameColumn(
{
this.frame = frame;
this.hasNulls = hasNulls;
this.sz = LongFrameColumnWriter.valueSize(hasNulls);
this.sz = LongFrameMaker.valueSize(hasNulls);
this.memory = memory;
this.memoryPosition = LongFrameColumnWriter.DATA_OFFSET;
this.memoryPosition = LongFrameMaker.DATA_OFFSET;
}

@Override
Expand Down