Skip to content

Commit

Permalink
[CALCITE-4171] Support named parameters for table window functions
Browse files Browse the repository at this point in the history
[CALCITE-4171] Support named parameters for table window functions

* Changes SqlArgumentAssignmentOperator to allow non-scala query as
  operand
* In SqlCallBinding, matches the permuted operand by name with name
  matcher
* Refactor SqlWindowTableFunction and its sub-class to reuse same
  logic
* Do not patch up the SqlWindowTableFunction with DEFAULTs when sql
  validation
  • Loading branch information
danny0405 committed Aug 18, 2020
1 parent aadb605 commit 8ab35a7
Show file tree
Hide file tree
Showing 15 changed files with 646 additions and 140 deletions.
Expand Up @@ -804,7 +804,6 @@ static Expression tumblingWindowSelector(
expressions.add(expression);
}
final Expression wmColExprToLong = EnumUtils.convert(wmColExpr, long.class);
final Expression shiftExpr = Expressions.constant(1, long.class);

// Find the fixed window for a timestamp given a window size and an offset, and return the
// window start.
Expand Down
Expand Up @@ -220,6 +220,10 @@ ExInst<SqlValidatorException> columnNotFoundInTableDidYouMean(String a0,
@BaseMessage("Column ''{0}'' is ambiguous")
ExInst<SqlValidatorException> columnAmbiguous(String a0);

@BaseMessage("Param ''{0}'' not found in function ''{1}''; did you mean ''{2}''?")
ExInst<SqlValidatorException> paramNotFoundInFunctionDidYouMean(String a0,
String a1, String a2);

@BaseMessage("Operand {0} must be a query")
ExInst<SqlValidatorException> needQueryOp(String a0);

Expand Down
45 changes: 36 additions & 9 deletions core/src/main/java/org/apache/calcite/sql/SqlCallBinding.java
Expand Up @@ -27,13 +27,15 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SelectScope;
import org.apache.calcite.sql.validate.SqlMonotonicity;
import org.apache.calcite.sql.validate.SqlNameMatcher;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.calcite.sql.validate.SqlValidatorNamespace;
import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.util.ImmutableNullableList;
import org.apache.calcite.util.NlsString;
import org.apache.calcite.util.Pair;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -161,17 +163,42 @@ private boolean hasAssignment() {
* formal parameters of the function. */
private List<SqlNode> permutedOperands(final SqlCall call) {
final SqlFunction operator = (SqlFunction) call.getOperator();
return Lists.transform(operator.getParamNames(), paramName -> {
for (SqlNode operand2 : call.getOperandList()) {
final SqlCall call2 = (SqlCall) operand2;
assert operand2.getKind() == SqlKind.ARGUMENT_ASSIGNMENT;
final SqlIdentifier id = call2.operand(1);
if (id.getSimple().equals(paramName)) {
return call2.operand(0);
final List<String> paramNames = operator.getParamNames();
final List<SqlNode> permuted = new ArrayList<>();
final SqlNameMatcher nameMatcher = validator.getCatalogReader().nameMatcher();
for (final String paramName : paramNames) {
Pair<String, SqlIdentifier> args = null;
for (int j = 0; j < call.getOperandList().size(); j++) {
final SqlCall call2 = call.operand(j);
assert call2.getKind() == SqlKind.ARGUMENT_ASSIGNMENT;
final SqlIdentifier operandID = call2.operand(1);
final String operandName = operandID.getSimple();
if (nameMatcher.matches(operandName, paramName)) {
permuted.add(call2.operand(0));
break;
} else if (args == null
&& nameMatcher.isCaseSensitive()
&& operandName.equalsIgnoreCase(paramName)) {
args = Pair.of(paramName, operandID);
}
// the last operand, there is still no match.
if (j == call.getOperandList().size() - 1) {
if (args != null) {
throw SqlUtil.newContextException(args.right.getParserPosition(),
RESOURCE.paramNotFoundInFunctionDidYouMean(args.right.getSimple(),
call.getOperator().getName(), args.left));
}
if (!(operator instanceof SqlWindowTableFunction)) {
// Not like user defined functions, we do not patch up the operands
// with DEFAULT and then convert to nulls during sql-to-rel conversion.
// Thus, there is no need to show the optional operands in the plan and
// decide if the optional operand is null when code generation.
permuted.add(DEFAULT_CALL);
}
}
}
return DEFAULT_CALL;
});
}
return permuted;
}

/**
Expand Down
88 changes: 48 additions & 40 deletions core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
Expand Up @@ -16,60 +16,68 @@
*/
package org.apache.calcite.sql;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.type.SqlOperandTypeChecker;

import com.google.common.collect.ImmutableList;

import java.util.List;

/**
* SqlHopTableFunction implements an operator for hopping. It allows four parameters:
* 1. a table;
* 2. a descriptor to provide a watermarked column name from the input table;
* 3. an interval parameter to specify the length of window shifting;
* 4. an interval parameter to specify the length of window size.
* SqlHopTableFunction implements an operator for hopping.
*
* <p>It allows four parameters:
*
* <ol>
* <li>a table</li>
* <li>a descriptor to provide a watermarked column name from the input table</li>
* <li>an interval parameter to specify the length of window shifting</li>
* <li>an interval parameter to specify the length of window size</li>
* </ol>
*/
public class SqlHopTableFunction extends SqlWindowTableFunction {
public SqlHopTableFunction() {
super(SqlKind.HOP.name());
super(SqlKind.HOP.name(), OperandTypeCheckerImpl.INSTANCE);
}

@Override public SqlOperandCountRange getOperandCountRange() {
return SqlOperandCountRanges.between(4, 5);
@Override public List<String> getParamNames() {
return ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_SLIDE, PARAM_SIZE, PARAM_OFFSET);
}

@Override public boolean checkOperandTypes(SqlCallBinding callBinding,
boolean throwOnFailure) {
final SqlNode operand0 = callBinding.operand(0);
final SqlValidator validator = callBinding.getValidator();
final RelDataType type = validator.getValidatedNodeType(operand0);
if (type.getSqlTypeName() != SqlTypeName.ROW) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
}
final SqlNode operand1 = callBinding.operand(1);
if (operand1.getKind() != SqlKind.DESCRIPTOR) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------

/** Operand type checker for HOP. */
private static class OperandTypeCheckerImpl implements SqlOperandTypeChecker {
static final OperandTypeCheckerImpl INSTANCE = new OperandTypeCheckerImpl();

@Override public boolean checkOperandTypes(
SqlCallBinding callBinding, boolean throwOnFailure) {
if (!validateTableWithFollowingDescriptors(callBinding, 1)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
}
if (!validateTailingIntervals(callBinding, 2)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
}
return true;
}
validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2));
if (!SqlTypeUtil.isInterval(type2)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);

@Override public SqlOperandCountRange getOperandCountRange() {
return SqlOperandCountRanges.between(4, 5);
}
final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
if (!SqlTypeUtil.isInterval(type3)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);

@Override public String getAllowedSignatures(SqlOperator op, String opName) {
return opName + "(TABLE table_name, DESCRIPTOR(timecol), "
+ "datetime interval, datetime interval[, datetime interval])";
}
if (callBinding.getOperandCount() > 4) {
final RelDataType type4 = validator.getValidatedNodeType(callBinding.operand(4));
if (!SqlTypeUtil.isInterval(type4)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
}

@Override public Consistency getConsistency() {
return Consistency.NONE;
}
return true;
}

@Override public String getAllowedSignatures(String opNameToUse) {
return getName() + "(TABLE table_name, DESCRIPTOR(col), "
+ "datetime interval, datetime interval[, datetime interval])";
@Override public boolean isOptional(int i) {
return i == 4;
}
}
}
Expand Up @@ -18,54 +18,70 @@

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlOperandTypeChecker;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.SqlValidator;

import com.google.common.collect.ImmutableList;

import java.util.List;

/**
* SqlSessionTableFunction implements an operator for per-key sessionization. It allows
* four parameters:
* 1. a table.
* 2. a descriptor to provide a watermarked column name from the input table.
* 3. a descriptor to provide a column as key, on which sessionization will be applied.
* 4. an interval parameter to specify a inactive activity gap to break sessions.
*
* <ol>
* <li>table as data source</li>
* <li>a descriptor to provide a watermarked column name from the input table</li>
* <li>a descriptor to provide a column as key, on which sessionization will be applied</li>
* <li>an interval parameter to specify a inactive activity gap to break sessions</li>
* </ol>
*/
public class SqlSessionTableFunction extends SqlWindowTableFunction {
public SqlSessionTableFunction() {
super(SqlKind.SESSION.name());
super(SqlKind.SESSION.name(), OperandTypeCheckerImpl.INSTANCE);
}

@Override public SqlOperandCountRange getOperandCountRange() {
return SqlOperandCountRanges.of(4);
@Override public List<String> getParamNames() {
return ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_KEY, PARAM_SIZE);
}

@Override public boolean checkOperandTypes(SqlCallBinding callBinding,
boolean throwOnFailure) {
final SqlNode operand0 = callBinding.operand(0);
final SqlValidator validator = callBinding.getValidator();
final RelDataType type = validator.getValidatedNodeType(operand0);
if (type.getSqlTypeName() != SqlTypeName.ROW) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------

/** Operand type checker for SESSION. */
private static class OperandTypeCheckerImpl implements SqlOperandTypeChecker {
static final OperandTypeCheckerImpl INSTANCE = new OperandTypeCheckerImpl();

@Override public boolean checkOperandTypes(
SqlCallBinding callBinding, boolean throwOnFailure) {
final SqlValidator validator = callBinding.getValidator();
if (!validateTableWithFollowingDescriptors(callBinding, 2)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
}
final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
if (!SqlTypeUtil.isInterval(type3)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
}
return true;
}
final SqlNode operand1 = callBinding.operand(1);
if (operand1.getKind() != SqlKind.DESCRIPTOR) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);

@Override public SqlOperandCountRange getOperandCountRange() {
return SqlOperandCountRanges.of(4);
}
validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
final SqlNode operand2 = callBinding.operand(2);
if (operand2.getKind() != SqlKind.DESCRIPTOR) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);

@Override public String getAllowedSignatures(SqlOperator op, String opName) {
return opName + "(TABLE table_name, DESCRIPTOR(timecol), "
+ "DESCRIPTOR(key), datetime interval)";
}
validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand2).getOperandList());
final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
if (!SqlTypeUtil.isInterval(type3)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);

@Override public Consistency getConsistency() {
return Consistency.NONE;
}
return true;
}

@Override public String getAllowedSignatures(String opNameToUse) {
return getName() + "(TABLE table_name, DESCRIPTOR(col), "
+ "DESCRIPTOR(col), datetime interval)";
@Override public boolean isOptional(int i) {
return false;
}
}
}

0 comments on commit 8ab35a7

Please sign in to comment.