diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/QueryFilterNode.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/QueryFilterNode.java index ed19e7d23ccd..80de4962b47f 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/QueryFilterNode.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/QueryFilterNode.java @@ -322,36 +322,43 @@ public Void visitComparisonExpression( isWindowed)); return null; } + return visitColumnComparisonExpression(column, node); + } + //Check the referred column of the expression + private Void visitColumnComparisonExpression( + final UnqualifiedColumnReferenceExp column, + final ComparisonExpression node + ) { final ColumnName columnName = column.getColumnName(); if (columnName.equals(SystemColumns.WINDOWSTART_NAME) - || columnName.equals(SystemColumns.WINDOWEND_NAME)) { + || columnName.equals(SystemColumns.WINDOWEND_NAME)) { final Type type = node.getType(); if (!VALID_WINDOW_BOUND_COMPARISONS.contains(type)) { throw invalidWhereClauseException( - "Unsupported " + columnName + " bounds: " + type, true); + "Unsupported " + columnName + " bounds: " + type, true); } if (!isWindowed) { throw invalidWhereClauseException( - "Cannot use WINDOWSTART/WINDOWEND on non-windowed source", - false); + "Cannot use WINDOWSTART/WINDOWEND on non-windowed source", + false); } return null; } else { final Column col = schema.findColumn(columnName) - .orElseThrow(() -> invalidWhereClauseException( - "Bound on non-existent column " + columnName, isWindowed)); + .orElseThrow(() -> invalidWhereClauseException( + "Bound on non-existent column " + columnName, isWindowed)); if (col.namespace() == Namespace.KEY) { if (!isKeyQuery(node)) { setTableScanOrElseThrow(() -> invalidWhereClauseException("Bound on key columns '" - + getSource().getSchema().key() + "' must currently be '='", isWindowed)); + + getSource().getSchema().key() + "' must currently be '='", isWindowed)); } - if (seenKeys.get(col.index())) { - setTableScanOrElseThrow(() -> invalidWhereClauseException( - "A comparison condition on the key column cannot be combined with other" - + " comparisons such as an IN predicate", isWindowed)); + if (seenKeys.get(col.index()) && !queryPlannerOptions.getTableScansEnabled()) { + throw invalidWhereClauseException( + "A comparison condition on the key column cannot be combined with other" + + " comparisons such as an IN predicate", isWindowed); } seenKeys.set(col.index()); isKeyedQuery = true; @@ -484,9 +491,7 @@ public Void visitComparisonExpression( final Optional col = schema.findColumn(columnName); if (col.isPresent() && col.get().namespace() == Namespace.KEY) { final Object key = resolveKey(other, col.get(), metaStore, ksqlConfig, node); - keyContents[col.get().index()] = key; - seenKeys.set(col.get().index()); - operators.put(col.get().index(), new ImmutablePair<>(node.getType(), col.get().type())); + setMostSelectiveConstraint(col.get(), node, key); } return null; } @@ -577,6 +582,35 @@ private Object resolveKey( FormatOptions.noEscape()))) .orElse(null); } + + //Pick the most selective constraint from the predicates + private void setMostSelectiveConstraint( + final Column col, + final ComparisonExpression node, + final Object key) { + final int index = col.index(); + if (operators.containsKey(index) && operators.get(index).getLeft() == Type.EQUAL) { + return; + } + if (node.getType() == Type.EQUAL) { + setConstraint(index, col, node, key); + return; + } + if (!operators.containsKey(index)) { + setConstraint(index, col, node, key); + } + } + + private void setConstraint( + final int index, + final Column col, + final ComparisonExpression node, + final Object key + ) { + keyContents[index] = key; + seenKeys.set(index); + operators.put(index, new ImmutablePair<>(node.getType(), col.type())); + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/QueryFilterNodeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/QueryFilterNodeTest.java index 24ff6197d962..e8e05750ea5a 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/QueryFilterNodeTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/QueryFilterNodeTest.java @@ -1198,8 +1198,20 @@ public void shouldExtractConstraintWithMultipleKeyExpressions_tableScan() { expression2 ); + QueryFilterNode filterNode = new QueryFilterNode( + NODE_ID, + source, + expression, + metaStore, + ksqlConfig, + false, + plannerOptions + ); + // Then: - expectTableScan(expression, false); + final List keys = filterNode.getLookupConstraints(); + assertThat(keys.size(), is(1)); + assertThat(keys.get(0), instanceOf(KeyConstraint.class)); } @Test @@ -1676,4 +1688,283 @@ public void shouldThrowNotKeyColumnForBetween() { // Then: assertThat(e.getMessage(), containsString("A comparison must directly reference a key column")); } + + @Test + public void shouldReturnNonKeyConstraintIntGreater() { + // Given: + final Expression expression = new ComparisonExpression( + Type.GREATER_THAN, + new UnqualifiedColumnReferenceExp(ColumnName.of("K")), + new IntegerLiteral(1) + ); + QueryFilterNode filterNode = new QueryFilterNode( + NODE_ID, + source, + expression, + metaStore, + ksqlConfig, + false, + plannerOptions + ); + + // When: + final List keys = filterNode.getLookupConstraints(); + + // Then: + assertThat(keys.size(), is(1)); + assertThat(keys.get(0), instanceOf(NonKeyConstraint.class)); + } + + @Test + public void shouldReturnKeyConstraintInt() { + // Given: + when(plannerOptions.getTableScansEnabled()).thenReturn(true); + final Expression keyExp1 = new ComparisonExpression( + Type.GREATER_THAN, + new UnqualifiedColumnReferenceExp(ColumnName.of("K")), + new IntegerLiteral(1) + ); + final Expression keyExp2 = new ComparisonExpression( + Type.EQUAL, + new UnqualifiedColumnReferenceExp(ColumnName.of("K")), + new IntegerLiteral(3) + ); + final Expression expression = new LogicalBinaryExpression( + LogicalBinaryExpression.Type.AND, + keyExp1, + keyExp2 + ); + QueryFilterNode filterNode = new QueryFilterNode( + NODE_ID, + source, + expression, + metaStore, + ksqlConfig, + false, + plannerOptions); + + // When: + final List keys = filterNode.getLookupConstraints(); + + // Then: + assertThat(keys.size(), is(1)); + assertThat(keys.get(0), instanceOf(KeyConstraint.class)); + final KeyConstraint keyConstraint = (KeyConstraint) keys.get(0); + assertThat(keyConstraint.getKey(), is(GenericKey.genericKey(3))); + assertThat(keyConstraint.getOperator(), is(KeyConstraint.ConstraintOperator.EQUAL)); + } + + @Test + public void shouldExtractMultiColKeySchema() { + // Given: + final LogicalSchema multiSchema = LogicalSchema.builder() + .keyColumn(ColumnName.of("K1"), SqlTypes.INTEGER) + .keyColumn(ColumnName.of("K2"), SqlTypes.INTEGER) + .valueColumn(ColumnName.of("C1"), SqlTypes.INTEGER) + .build(); + final Expression keyExp1 = new ComparisonExpression( + Type.EQUAL, + new UnqualifiedColumnReferenceExp(ColumnName.of("K1")), + new IntegerLiteral(1) + ); + final Expression keyExp2 = new ComparisonExpression( + Type.EQUAL, + new UnqualifiedColumnReferenceExp(ColumnName.of("K2")), + new IntegerLiteral(3) + ); + final Expression expression = new LogicalBinaryExpression( + LogicalBinaryExpression.Type.AND, + keyExp1, + keyExp2 + ); + + when(source.getSchema()).thenReturn(multiSchema); + + QueryFilterNode filterNode = new QueryFilterNode( + NODE_ID, + source, + expression, + metaStore, + ksqlConfig, + false, + plannerOptions); + + // When: + final List keys = filterNode.getLookupConstraints(); + + // Then: + assertThat(keys.size(), is(1)); + assertThat(keys.get(0), instanceOf(KeyConstraint.class)); + final KeyConstraint keyConstraint = (KeyConstraint) keys.get(0); + assertThat(keyConstraint.getKey(), is(GenericKey.genericKey(1, 3))); + assertThat(keyConstraint.getOperator(), is(KeyConstraint.ConstraintOperator.EQUAL)); + } + + @Test + public void shouldNotExtractMultiColKeySchema() { + // Given: + final LogicalSchema multiSchema = LogicalSchema.builder() + .keyColumn(ColumnName.of("K1"), SqlTypes.INTEGER) + .keyColumn(ColumnName.of("K2"), SqlTypes.INTEGER) + .valueColumn(ColumnName.of("C1"), SqlTypes.INTEGER) + .build(); + final Expression keyExp1 = new ComparisonExpression( + Type.GREATER_THAN, + new UnqualifiedColumnReferenceExp(ColumnName.of("K1")), + new IntegerLiteral(1) + ); + final Expression keyExp2 = new ComparisonExpression( + Type.EQUAL, + new UnqualifiedColumnReferenceExp(ColumnName.of("K2")), + new IntegerLiteral(3) + ); + final Expression expression = new LogicalBinaryExpression( + LogicalBinaryExpression.Type.AND, + keyExp1, + keyExp2 + ); + + when(source.getSchema()).thenReturn(multiSchema); + + QueryFilterNode filterNode = new QueryFilterNode( + NODE_ID, + source, + expression, + metaStore, + ksqlConfig, + false, + plannerOptions); + + // When: + final List keys = filterNode.getLookupConstraints(); + + // Then: + assertThat(keys.size(), is(1)); + assertThat(keys.get(0), instanceOf(NonKeyConstraint.class)); + } + + @Test + public void shouldReturnKeyConstraintStringGreater() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .keyColumn(ColumnName.of("K1"), SqlTypes.STRING) + .valueColumn(ColumnName.of("C1"), SqlTypes.INTEGER) + .build(); + when(source.getSchema()).thenReturn(schema); + final Expression expression = new ComparisonExpression( + Type.GREATER_THAN, + new UnqualifiedColumnReferenceExp(ColumnName.of("K1")), + new StringLiteral("v1") + ); + QueryFilterNode filterNode = new QueryFilterNode( + NODE_ID, + source, + expression, + metaStore, + ksqlConfig, + false, + plannerOptions + ); + + // When: + final List keys = filterNode.getLookupConstraints(); + + // Then: + assertThat(keys.size(), is(1)); + assertThat(keys.get(0), instanceOf(KeyConstraint.class)); + final KeyConstraint keyConstraint = (KeyConstraint) keys.get(0); + assertThat(keyConstraint.getKey(), is(GenericKey.genericKey("v1"))); + assertThat(keyConstraint.getOperator(), is(KeyConstraint.ConstraintOperator.GREATER_THAN)); + } + + @Test + public void shouldReturnKeyConstraintString() { + // Given: + when(plannerOptions.getTableScansEnabled()).thenReturn(true); + final LogicalSchema schema = LogicalSchema.builder() + .keyColumn(ColumnName.of("K1"), SqlTypes.STRING) + .valueColumn(ColumnName.of("C1"), SqlTypes.INTEGER) + .build(); + when(source.getSchema()).thenReturn(schema); + final Expression keyExp1 = new ComparisonExpression( + Type.GREATER_THAN, + new UnqualifiedColumnReferenceExp(ColumnName.of("K1")), + new StringLiteral("v1") + ); + final Expression keyExp2 = new ComparisonExpression( + Type.EQUAL, + new UnqualifiedColumnReferenceExp(ColumnName.of("K1")), + new StringLiteral("v2") + ); + final Expression expression = new LogicalBinaryExpression( + LogicalBinaryExpression.Type.AND, + keyExp1, + keyExp2 + ); + QueryFilterNode filterNode = new QueryFilterNode( + NODE_ID, + source, + expression, + metaStore, + ksqlConfig, + false, + plannerOptions); + + // When: + final List keys = filterNode.getLookupConstraints(); + + // Then: + assertThat(keys.size(), is(1)); + assertThat(keys.get(0), instanceOf(KeyConstraint.class)); + final KeyConstraint keyConstraint = (KeyConstraint) keys.get(0); + assertThat(keyConstraint.getKey(), is(GenericKey.genericKey("v2"))); + assertThat(keyConstraint.getOperator(), is(KeyConstraint.ConstraintOperator.EQUAL)); + } + + @Test + public void shouldReturnKeyConstraintStringForOr() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .keyColumn(ColumnName.of("K1"), SqlTypes.STRING) + .valueColumn(ColumnName.of("C1"), SqlTypes.INTEGER) + .build(); + when(source.getSchema()).thenReturn(schema); + final Expression keyExp1 = new ComparisonExpression( + Type.GREATER_THAN, + new UnqualifiedColumnReferenceExp(ColumnName.of("K1")), + new StringLiteral("v1") + ); + final Expression keyExp2 = new ComparisonExpression( + Type.EQUAL, + new UnqualifiedColumnReferenceExp(ColumnName.of("K1")), + new StringLiteral("v2") + ); + final Expression expression = new LogicalBinaryExpression( + LogicalBinaryExpression.Type.OR, + keyExp1, + keyExp2 + ); + QueryFilterNode filterNode = new QueryFilterNode( + NODE_ID, + source, + expression, + metaStore, + ksqlConfig, + false, + plannerOptions); + + // When: + final List keys = filterNode.getLookupConstraints(); + + // Then: + assertThat(keys.size(), is(2)); + assertThat(keys.get(0), instanceOf(KeyConstraint.class)); + final KeyConstraint keyConstraint1 = (KeyConstraint) keys.get(0); + assertThat(keyConstraint1.getKey(), is(GenericKey.genericKey("v1"))); + assertThat(keyConstraint1.getOperator(), is(KeyConstraint.ConstraintOperator.GREATER_THAN)); + assertThat(keys.get(1), instanceOf(KeyConstraint.class)); + final KeyConstraint keyConstraint2 = (KeyConstraint) keys.get(1); + assertThat(keyConstraint2.getKey(), is(GenericKey.genericKey("v2"))); + assertThat(keyConstraint2.getOperator(), is(KeyConstraint.ConstraintOperator.EQUAL)); + } } \ No newline at end of file