Skip to content

Commit

Permalink
fix: Pull Query Key Extraction Optimizations (#8346)
Browse files Browse the repository at this point in the history
  • Loading branch information
hli21 authored Nov 17, 2021
1 parent 1a2f7b2 commit 6ad333e
Show file tree
Hide file tree
Showing 2 changed files with 340 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,36 +322,43 @@ public Void visitComparisonExpression(
isWindowed));
return null;
}
return visitColumnComparisonExpression(column, node);
}

//Check the referred column of the expression
private Void visitColumnComparisonExpression(
final UnqualifiedColumnReferenceExp column,
final ComparisonExpression node
) {
final ColumnName columnName = column.getColumnName();
if (columnName.equals(SystemColumns.WINDOWSTART_NAME)
|| columnName.equals(SystemColumns.WINDOWEND_NAME)) {
|| columnName.equals(SystemColumns.WINDOWEND_NAME)) {
final Type type = node.getType();
if (!VALID_WINDOW_BOUND_COMPARISONS.contains(type)) {
throw invalidWhereClauseException(
"Unsupported " + columnName + " bounds: " + type, true);
"Unsupported " + columnName + " bounds: " + type, true);
}
if (!isWindowed) {
throw invalidWhereClauseException(
"Cannot use WINDOWSTART/WINDOWEND on non-windowed source",
false);
"Cannot use WINDOWSTART/WINDOWEND on non-windowed source",
false);
}
return null;
} else {
final Column col = schema.findColumn(columnName)
.orElseThrow(() -> invalidWhereClauseException(
"Bound on non-existent column " + columnName, isWindowed));
.orElseThrow(() -> invalidWhereClauseException(
"Bound on non-existent column " + columnName, isWindowed));

if (col.namespace() == Namespace.KEY) {
if (!isKeyQuery(node)) {
setTableScanOrElseThrow(() ->
invalidWhereClauseException("Bound on key columns '"
+ getSource().getSchema().key() + "' must currently be '='", isWindowed));
+ getSource().getSchema().key() + "' must currently be '='", isWindowed));
}
if (seenKeys.get(col.index())) {
setTableScanOrElseThrow(() -> invalidWhereClauseException(
"A comparison condition on the key column cannot be combined with other"
+ " comparisons such as an IN predicate", isWindowed));
if (seenKeys.get(col.index()) && !queryPlannerOptions.getTableScansEnabled()) {
throw invalidWhereClauseException(
"A comparison condition on the key column cannot be combined with other"
+ " comparisons such as an IN predicate", isWindowed);
}
seenKeys.set(col.index());
isKeyedQuery = true;
Expand Down Expand Up @@ -484,9 +491,7 @@ public Void visitComparisonExpression(
final Optional<Column> col = schema.findColumn(columnName);
if (col.isPresent() && col.get().namespace() == Namespace.KEY) {
final Object key = resolveKey(other, col.get(), metaStore, ksqlConfig, node);
keyContents[col.get().index()] = key;
seenKeys.set(col.get().index());
operators.put(col.get().index(), new ImmutablePair<>(node.getType(), col.get().type()));
setMostSelectiveConstraint(col.get(), node, key);
}
return null;
}
Expand Down Expand Up @@ -577,6 +582,35 @@ private Object resolveKey(
FormatOptions.noEscape())))
.orElse(null);
}

//Pick the most selective constraint from the predicates
private void setMostSelectiveConstraint(
final Column col,
final ComparisonExpression node,
final Object key) {
final int index = col.index();
if (operators.containsKey(index) && operators.get(index).getLeft() == Type.EQUAL) {
return;
}
if (node.getType() == Type.EQUAL) {
setConstraint(index, col, node, key);
return;
}
if (!operators.containsKey(index)) {
setConstraint(index, col, node, key);
}
}

private void setConstraint(
final int index,
final Column col,
final ComparisonExpression node,
final Object key
) {
keyContents[index] = key;
seenKeys.set(index);
operators.put(index, new ImmutablePair<>(node.getType(), col.type()));
}
}


Expand Down
Loading

0 comments on commit 6ad333e

Please sign in to comment.