Skip to content

Commit

Permalink
fix: better error message when users enter old style syntax for query (
Browse files Browse the repository at this point in the history
…#3397)

* fix: better error message when users enter old style syntax for query

Say a user executes `SELECT * FROM X JOIN Y ON X.ID = Y.ID;` or similar. In older versions of KSQL this would be a streaming join and would work. With the introduction of static queries this example is now a static query, which don't support joins. Hence the user will get the error: `Static queries do not support joins`... which isn't very helpful if you don't know the query syntax has changed. With this PR the output is now:

```
Static queries do not support joins.
Did you mean to execute a continuous query? If so, add an 'EMIT CHANGES' clause.
Query syntax in KSQL has changed. There are now two broad categories of queries:
- Static queries: query the current state of the system, return a result and terminate.
- Streaming queries: query the state of the system in motion and will continue to
output results until they meet any LIMIT clause criteria or are terminated by the user.

'EMIT CHANGES' is used to indicate a query is continuous and outputs all changes.
To turn a static query into a streaming query, as was the default in older versions
of KSQL, add `EMIT CHANGES` to the end of the statement before any limit clause.

For example, the following are static queries:
    'SELECT * FROM X WHERE ROWKEY=Y;' (non-windowed table)
    'SELECT * FROM X WHERE ROWKEY=Y AND WINDOWSTART>=Z;' (windowed table)
and, the following is a streaming query:
    'SELECT * FROM X EMIT CHANGES;'

Note: Persistent queries, e.g. `CREATE TABLE AS ...`, currently have an implicit
`EMIT CHANGES`. However, it is recommended to add `EMIT CHANGES` to such statements
going forward, as a this will be required in a future release.
```
  • Loading branch information
big-andy-coates committed Sep 24, 2019
1 parent 078e114 commit f948ec0
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 154 deletions.
25 changes: 17 additions & 8 deletions ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties;
import io.confluent.ksql.parser.tree.ResultMaterialization;
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.planner.plan.JoinNode;
Expand All @@ -40,14 +41,14 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;

public class Analysis {

private final ResultMaterialization resultMaterialization;
private Optional<Into> into = Optional.empty();
private final List<AliasedDataSource> fromDataSources = new ArrayList<>();
private Optional<JoinInfo> joinInfo = Optional.empty();
Expand All @@ -61,6 +62,14 @@ public class Analysis {
private OptionalInt limitClause = OptionalInt.empty();
private CreateSourceAsProperties withProperties = CreateSourceAsProperties.none();

public Analysis(final ResultMaterialization resultMaterialization) {
this.resultMaterialization = requireNonNull(resultMaterialization, "resultMaterialization");
}

ResultMaterialization getResultMaterialization() {
return resultMaterialization;
}

void addSelectItem(final Expression expression, final String alias) {
selectExpressions.add(SelectExpression.of(alias, expression));
}
Expand Down Expand Up @@ -177,7 +186,7 @@ public Set<SerdeOption> getSerdeOptions() {
}

void setProperties(final CreateSourceAsProperties properties) {
withProperties = Objects.requireNonNull(properties, "properties");
withProperties = requireNonNull(properties, "properties");
}

public CreateSourceAsProperties getProperties() {
Expand Down Expand Up @@ -232,8 +241,8 @@ public static final class AliasedDataSource {
final String alias,
final DataSource<?> dataSource
) {
this.alias = Objects.requireNonNull(alias, "alias");
this.dataSource = Objects.requireNonNull(dataSource, "dataSource");
this.alias = requireNonNull(alias, "alias");
this.dataSource = requireNonNull(dataSource, "dataSource");

if (alias.trim().isEmpty()) {
throw new IllegalArgumentException("Alias or name can not be empty: '" + alias + "'");
Expand Down Expand Up @@ -264,10 +273,10 @@ public static final class JoinInfo {
final Optional<WithinExpression> withinExpression

) {
this.leftJoinField = Objects.requireNonNull(leftJoinField, "leftJoinField");
this.rightJoinField = Objects.requireNonNull(rightJoinField, "rightJoinField");
this.type = Objects.requireNonNull(type, "type");
this.withinExpression = Objects.requireNonNull(withinExpression, "withinExpression");
this.leftJoinField = requireNonNull(leftJoinField, "leftJoinField");
this.rightJoinField = requireNonNull(rightJoinField, "rightJoinField");
this.type = requireNonNull(type, "type");
this.withinExpression = requireNonNull(withinExpression, "withinExpression");

if (leftJoinField.trim().isEmpty()) {
throw new IllegalArgumentException("left join field name can not be empty");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Analysis analyze(
final Query query,
final Optional<Sink> sink
) {
final Visitor visitor = new Visitor(query.isStatic());
final Visitor visitor = new Visitor(query);
visitor.process(query, null);

sink.ifPresent(visitor::analyzeNonStdOutSink);
Expand All @@ -145,13 +145,14 @@ Analysis analyze(
private final class Visitor extends DefaultTraversalVisitor<AstNode, Void> {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private final Analysis analysis = new Analysis();
private final Analysis analysis;
private final boolean staticQuery;
private boolean isJoin = false;
private boolean isGroupBy = false;

Visitor(final boolean staticQuery) {
this.staticQuery = staticQuery;
Visitor(final Query query) {
this.staticQuery = query.isStatic();
this.analysis = new Analysis(query.getResultMaterialization());
}

private void analyzeNonStdOutSink(final Sink sink) {
Expand Down Expand Up @@ -286,7 +287,6 @@ protected AstNode visitQuery(
final Query node,
final Void context
) {

process(node.getFrom(), context);

process(node.getSelect(), context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,14 @@

package io.confluent.ksql.analyzer;

import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.ResultMaterialization;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;

public class ContinuousQueryValidator implements QueryValidator {

@Override
public void preValidate(
final Query query,
final Optional<Sink> sink
) {
if (query.isStatic()) {
throw new IllegalArgumentException("static");
public void validate(final Analysis analysis) {
if (analysis.getResultMaterialization() != ResultMaterialization.CHANGES) {
throw new IllegalArgumentException("Continuous queries don't support `EMIT FINAL`.");
}

if (query.getResultMaterialization() != ResultMaterialization.CHANGES) {
throw new KsqlException("Continuous queries do not yet support `EMIT FINAL`. "
+ "Consider changing to `EMIT CHANGES`."
+ QueryAnalyzer.NEW_QUERY_SYNTAX_HELP
);
}
}

@Override
public void postValidate(final Analysis analysis) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,6 @@

public class QueryAnalyzer {

static final String NEW_QUERY_SYNTAX_HELP = System.lineSeparator()
+ "'EMIT CHANGES' is used to indicate a query is continuous and outputs all changes."
+ System.lineSeparator()
+ "'Bare queries, e.g. those in the format 'SELECT * FROM X ...' are now, by default, "
+ "static queries, i.e. they query the current state of the system and return a final "
+ "result."
+ System.lineSeparator()
+ "To turn a static query into a streaming query, as was the default in older versions "
+ "of KSQL, add `EMIT CHANGES` to the end of the statement, before any limit clause."
+ System.lineSeparator()
+ "Persistent queries, e.g. `CREATE STREAM AS ...`, currently have an implicit "
+ "`EMIT CHANGES`. However, it is recommended to add `EMIT CHANGES` to such statements "
+ "as a this will be required in a future release.";

private final Analyzer analyzer;
private final MetaStore metaStore;
private final QueryValidator continuousValidator;
Expand Down Expand Up @@ -90,18 +76,12 @@ public Analysis analyze(
final Query query,
final Optional<Sink> sink
) {
if (query.isStatic()) {
staticValidator.preValidate(query, sink);
} else {
continuousValidator.preValidate(query, sink);
}

final Analysis analysis = analyzer.analyze(query, sink);

if (query.isStatic()) {
staticValidator.postValidate(analysis);
staticValidator.validate(analysis);
} else {
continuousValidator.postValidate(analysis);
continuousValidator.validate(analysis);
}

return analysis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,10 @@

package io.confluent.ksql.analyzer;

import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Sink;
import java.util.Optional;

/**
* Validator used by {@link QueryAnalyzer}.
*/
interface QueryValidator {

void preValidate(Query query, Optional<Sink> sink);

void postValidate(Analysis analysis);
void validate(Analysis analysis);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,110 @@

package io.confluent.ksql.analyzer;

import io.confluent.ksql.parser.tree.Query;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.parser.tree.ResultMaterialization;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;

public class StaticQueryValidator implements QueryValidator {

@Override
public void preValidate(
final Query query,
final Optional<Sink> sink
) {
if (!query.isStatic()) {
throw new IllegalArgumentException("not static");
}

if (query.getResultMaterialization() != ResultMaterialization.FINAL) {
throw new KsqlException("Static queries do not yet support `EMIT CHANGES`. "
+ "Consider removing 'EMIT CHANGES' to any bare query."
+ QueryAnalyzer.NEW_QUERY_SYNTAX_HELP
);
}
private static final String NEW_QUERY_SYNTAX_HELP = " "
+ "Did you mean to execute a continuous query? Add an 'EMIT CHANGES' clause to do so."
+ System.lineSeparator()
+ System.lineSeparator()
+ "Query syntax in KSQL has changed. There are now two broad categories of queries:"
+ System.lineSeparator()
+ "- Static queries: query the current state of the system, return a result, and terminate "
+ "the query."
+ System.lineSeparator()
+ "- Streaming queries: query the state of the system in motion and continue to output "
+ "results until they meet a LIMIT clause condition or the user terminates the query."
+ System.lineSeparator()
+ System.lineSeparator()
+ "Use 'EMIT CHANGES' to indicate that a query is continuous and outputs all changes. "
+ "To convert a static query into a streaming query, which was the default behavior in older "
+ "versions of KSQL, add `EMIT CHANGES` to the end of the statement before any LIMIT clause."
+ System.lineSeparator()
+ System.lineSeparator()
+ "For example, the following are static queries:"
+ System.lineSeparator()
+ "\t'SELECT * FROM X WHERE ROWKEY=Y;' (non-windowed table)"
+ System.lineSeparator()
+ "\t'SELECT * FROM X WHERE ROWKEY=Y AND WINDOWSTART>=Z;' (windowed table)"
+ System.lineSeparator()
+ System.lineSeparator()
+ "The following is a streaming query:"
+ System.lineSeparator()
+ "\t'SELECT * FROM X EMIT CHANGES;'"
+ System.lineSeparator()
+ System.lineSeparator()
+ "Note: Persistent queries, like `CREATE TABLE AS ...`, have an implicit "
+ "`EMIT CHANGES`, but we recommend adding `EMIT CHANGES` to these statements.";

if (sink.isPresent()) {
throw new IllegalArgumentException("static queries should not have a sink");
}
}
private static final List<Rule> RULES = ImmutableList.of(
Rule.of(
analysis -> analysis.getResultMaterialization() == ResultMaterialization.FINAL,
"Static queries don't support `EMIT CHANGES`."
),
Rule.of(
analysis -> !analysis.getInto().isPresent(),
"Static queries don't support output to sinks."
),
Rule.of(
analysis -> !analysis.isJoin(),
"Static queries don't support JOIN clauses."
),
Rule.of(
analysis -> !analysis.getWindowExpression().isPresent(),
"Static queries don't support WINDOW clauses."
),
Rule.of(
analysis -> analysis.getGroupByExpressions().isEmpty(),
"Static queries don't support GROUP BY clauses."
),
Rule.of(
analysis -> !analysis.getPartitionBy().isPresent(),
"Static queries don't support PARTITION BY clauses."
),
Rule.of(
analysis -> !analysis.getHavingExpression().isPresent(),
"Static queries don't support HAVING clauses."
),
Rule.of(
analysis -> !analysis.getLimitClause().isPresent(),
"Static queries don't support LIMIT clauses."
)
);

@Override
public void postValidate(final Analysis analysis) {
if (analysis.getInto().isPresent()) {
throw new KsqlException("Static queries do not support outputting to sinks.");
public void validate(final Analysis analysis) {
try {
RULES.forEach(rule -> rule.check(analysis));
} catch (final KsqlException e) {
throw new KsqlException(e.getMessage() + NEW_QUERY_SYNTAX_HELP, e);
}
}

if (analysis.isJoin()) {
throw new KsqlException("Static queries do not support joins.");
}
private static final class Rule {

if (analysis.getWindowExpression().isPresent()) {
throw new KsqlException("Static queries do not support WINDOW clauses.");
}

if (!analysis.getGroupByExpressions().isEmpty()) {
throw new KsqlException("Static queries do not support GROUP BY clauses.");
}
private final Predicate<Analysis> condition;
private final String failureMsg;

if (analysis.getPartitionBy().isPresent()) {
throw new KsqlException("Static queries do not support PARTITION BY clauses.");
private static Rule of(final Predicate<Analysis> condition, final String failureMsg) {
return new Rule(condition, failureMsg);
}

if (analysis.getHavingExpression().isPresent()) {
throw new KsqlException("Static queries do not support HAVING clauses.");
private Rule(final Predicate<Analysis> condition, final String failureMsg) {
this.condition = Objects.requireNonNull(condition, "condition");
this.failureMsg = Objects.requireNonNull(failureMsg, "failureMsg");
}

if (analysis.getLimitClause().isPresent()) {
throw new KsqlException("Static queries do not support LIMIT clauses.");
public void check(final Analysis analysis) {
if (!condition.test(analysis)) {
throw new KsqlException(failureMsg);
}
}
}
}
Loading

0 comments on commit f948ec0

Please sign in to comment.