Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.calcite.sql.SqlPostfixOperator;
import org.apache.calcite.sql.SqlSplittableAggFunction;
import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.fun.SqlLeadLagAggFunction;
import org.apache.calcite.sql.fun.SqlMonotonicBinaryOperator;
import org.apache.calcite.sql.fun.SqlNtileAggFunction;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
Expand Down Expand Up @@ -214,10 +213,8 @@ public static PinotOperatorTable instance(boolean nullHandlingEnabled) {
// WINDOW Functions (non-aggregate)
SqlStdOperatorTable.LAST_VALUE,
SqlStdOperatorTable.FIRST_VALUE,
// TODO: Replace these with SqlStdOperatorTable.LEAD and SqlStdOperatorTable.LAG when the function implementations
// are updated to support the IGNORE NULLS option.
PinotLeadWindowFunction.INSTANCE,
PinotLagWindowFunction.INSTANCE,
SqlStdOperatorTable.LEAD,
SqlStdOperatorTable.LAG,

// SPECIAL OPERATORS
SqlStdOperatorTable.IGNORE_NULLS,
Expand Down Expand Up @@ -448,32 +445,6 @@ public List<SqlOperator> getOperatorList() {
return _operatorList;
}

private static class PinotLeadWindowFunction extends SqlLeadLagAggFunction {
static final SqlOperator INSTANCE = new PinotLeadWindowFunction();

public PinotLeadWindowFunction() {
super(SqlKind.LEAD);
}

@Override
public boolean allowsNullTreatment() {
return false;
}
}

private static class PinotLagWindowFunction extends SqlLeadLagAggFunction {
static final SqlOperator INSTANCE = new PinotLagWindowFunction();

public PinotLagWindowFunction() {
super(SqlKind.LAG);
}

@Override
public boolean allowsNullTreatment() {
return false;
}
}

private static final class PinotNtileWindowFunction extends SqlNtileAggFunction {
static final SqlOperator INSTANCE = new PinotNtileWindowFunction();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.query.runtime.operator.window;

import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
Expand Down Expand Up @@ -63,6 +64,7 @@ public WindowFunction(RexExpression.FunctionCall aggCall, DataSchema inputSchema
*/
public abstract List<Object> processRows(List<Object[]> rows);

@Nullable
protected Object extractValueFromRow(Object[] row) {
return _inputRef == -1 ? _literal : (row == null ? null : row[_inputRef]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.query.runtime.operator.window.value;

import com.google.common.base.Preconditions;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.List;
import org.apache.calcite.rel.RelFieldCollation;
Expand All @@ -28,9 +29,10 @@
import org.apache.pinot.query.runtime.operator.window.WindowFrame;


/**
* The LAG window function doesn't allow custom window frames (and this is enforced by Calcite).
*/
/// Window function that returns the value of a column from a preceding row within the partition.
/// Supports an optional offset (default 1), an optional default value for when no row exists at
/// that offset, and IGNORE NULLS mode which skips null values when scanning backward.
/// Custom window frames are not allowed (enforced by Calcite).
public class LagValueWindowFunction extends ValueWindowFunction {
private final int _offset;
private final Object _defaultValue;
Expand Down Expand Up @@ -75,6 +77,9 @@ public LagValueWindowFunction(RexExpression.FunctionCall aggCall, DataSchema inp

@Override
public List<Object> processRows(List<Object[]> rows) {
if (_ignoreNulls) {
return processRowsIgnoreNulls(rows);
}
int numRows = rows.size();
Object[] result = new Object[numRows];
if (_defaultValue != null) {
Expand All @@ -88,4 +93,28 @@ public List<Object> processRows(List<Object[]> rows) {
}
return Arrays.asList(result);
}

/**
* LAG with IGNORE NULLS: for each row, find the offset-th non-null value scanning backward.
* Uses a bounded deque of size {@code _offset} for O(N) time and O(offset) memory.
* Scans left-to-right, maintaining a sliding window of preceding non-null values. The oldest
* element in the deque (peekFirst) is always the offset-th non-null value behind the current
* row.
*/
private List<Object> processRowsIgnoreNulls(List<Object[]> rows) {
int numRows = rows.size();
Object[] result = new Object[numRows];
ArrayDeque<Object> window = new ArrayDeque<>(_offset);
for (int i = 0; i < numRows; i++) {
result[i] = (window.size() == _offset) ? window.peekFirst() : _defaultValue;
Object val = extractValueFromRow(rows.get(i));
if (val != null) {
window.addLast(val);
if (window.size() > _offset) {
window.pollFirst();
}
}
}
return Arrays.asList(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.query.runtime.operator.window.value;

import com.google.common.base.Preconditions;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.List;
import org.apache.calcite.rel.RelFieldCollation;
Expand All @@ -28,9 +29,10 @@
import org.apache.pinot.query.runtime.operator.window.WindowFrame;


/**
* The LAG window function doesn't allow custom window frames (and this is enforced by Calcite).
*/
/// Window function that returns the value of a column from a subsequent row within the partition.
/// Supports an optional offset (default 1), an optional default value for when no row exists at
/// that offset, and IGNORE NULLS mode which skips null values when scanning forward.
/// Custom window frames are not allowed (enforced by Calcite).
public class LeadValueWindowFunction extends ValueWindowFunction {

private final int _offset;
Expand Down Expand Up @@ -75,6 +77,9 @@ public LeadValueWindowFunction(RexExpression.FunctionCall aggCall, DataSchema in

@Override
public List<Object> processRows(List<Object[]> rows) {
if (_ignoreNulls) {
return processRowsIgnoreNulls(rows);
}
int numRows = rows.size();
Object[] result = new Object[numRows];
for (int i = 0; i < numRows - _offset; i++) {
Expand All @@ -88,4 +93,28 @@ public List<Object> processRows(List<Object[]> rows) {
}
return Arrays.asList(result);
}

/**
* LEAD with IGNORE NULLS: for each row, find the offset-th non-null value scanning forward.
* Uses a bounded deque of size {@code _offset} for O(N) time and O(offset) memory.
* Scans right-to-left, maintaining a sliding window of upcoming non-null values. The oldest
* element in the deque (peekFirst) is always the offset-th non-null value ahead of the current
* row.
*/
private List<Object> processRowsIgnoreNulls(List<Object[]> rows) {
int numRows = rows.size();
Object[] result = new Object[numRows];
ArrayDeque<Object> window = new ArrayDeque<>(_offset);
for (int i = numRows - 1; i >= 0; i--) {
result[i] = (window.size() == _offset) ? window.peekFirst() : _defaultValue;
Object val = extractValueFromRow(rows.get(i));
if (val != null) {
window.addLast(val);
if (window.size() > _offset) {
window.pollFirst();
}
}
}
return Arrays.asList(result);
}
}
Loading
Loading