Skip to content

Commit

Permalink
fix: the QTTs now run through SqlFormatter & other formatting fixes (#…
Browse files Browse the repository at this point in the history
…3222)

Co-authored-by: Sergio Peña <sergio@confluent.io>
  • Loading branch information
agavra and spena committed Aug 15, 2019
1 parent b9138e6 commit 79da68c
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;

public class SqlFormatInjector implements Injector {
Expand All @@ -45,7 +46,8 @@ public <T extends Statement> ConfiguredStatement<T> inject(

return statement.withStatement(sql, (T) prepare.getStatement());
} catch (final Exception e) {
return statement;
throw new KsqlException("Unable to format statement! This is bad because "
+ "it means we cannot persist it onto the command topic: " + statement, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public String visitNullLiteral(final NullLiteral node, final Context context) {
@Override
public String visitQualifiedNameReference(final QualifiedNameReference node,
final Context context) {
return formatQualifiedName(node.getName());
return formatQualifiedName(node.getName(), context);
}

@Override
Expand All @@ -153,15 +153,16 @@ public String visitDereferenceExpression(
final Context context) {
final String baseString = process(node.getBase(), context);
if (node.getBase() instanceof QualifiedNameReference) {
return baseString + KsqlConstants.DOT + formatIdentifier(node.getFieldName());
return baseString + KsqlConstants.DOT + formatIdentifier(node.getFieldName(), context);
}
return baseString + KsqlConstants.STRUCT_FIELD_REF + formatIdentifier(node.getFieldName());
return baseString + KsqlConstants.STRUCT_FIELD_REF
+ formatIdentifier(node.getFieldName(), context);
}

private static String formatQualifiedName(final QualifiedName name) {
private static String formatQualifiedName(final QualifiedName name, final Context context) {
final List<String> parts = new ArrayList<>();
for (final String part : name.getParts()) {
parts.add(formatIdentifier(part));
parts.add(formatIdentifier(part, context));
}
return Joiner.on(KsqlConstants.DOT).join(parts);
}
Expand All @@ -175,7 +176,7 @@ public String visitFunctionCall(final FunctionCall node, final Context context)
arguments = "*";
}

builder.append(formatQualifiedName(node.getName()))
builder.append(formatQualifiedName(node.getName(), context))
.append('(').append(arguments).append(')');

return builder.toString();
Expand Down Expand Up @@ -337,9 +338,8 @@ private String joinExpressions(
.iterator());
}

private static String formatIdentifier(final String s) {
// TODO: handle escaping properly
return s;
private static String formatIdentifier(final String s, final Context context) {
return context.isReserved.test(s) ? "`" + s + "`" : s;
}

private static String formatStringLiteral(final String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.KsqlExecutionContext.ExecuteResult;
import io.confluent.ksql.engine.FakeInsertValuesExecutor;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.SqlFormatInjector;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
Expand All @@ -45,6 +46,7 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -57,7 +59,9 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
final class TestExecutorUtil {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private TestExecutorUtil() {
}
Expand Down Expand Up @@ -257,7 +261,20 @@ private static ExecuteResultAndSortedSources execute(
schemaInjector
.map(injector -> injector.inject(configured))
.orElse((ConfiguredStatement) configured);
final ExecuteResult executeResult = executionContext.execute(withSchema);
final ConfiguredStatement<?> reformatted =
new SqlFormatInjector(executionContext).inject(withSchema);

final ExecuteResult executeResult;
try {
executeResult = executionContext.execute(reformatted);
} catch (final KsqlStatementException statementException) {
// use the original statement text in the exception so that tests
// can easily check that the failed statement is the input statement
throw new KsqlStatementException(
statementException.getMessage(),
withSchema.getStatementText(),
statementException.getCause());
}
if (prepared.getStatement() instanceof CreateAsSelect) {
return new ExecuteResultAndSortedSources(
executeResult,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"comments": [
"Query Translation tests should make sure that the command that is being executed can be ",
"persisted properly onto the command topic (i.e. serialized using SqlFormatter). Otherwise, ",
"it is possible that we may not execute the same statement after enqueuing as we did before. ",
"This test will fail if we refactor QueryTranslationTest in a way that makes it no longer go ",
"through the SqlFormatInjector, which serializes and then deserializes the statements to make ",
"sure that these inconsistencies are caught.",
"",
"NOTE: This test is a little hacky at the moment because it relies on us not formatting SET ",
"expressions. There is nothing preventing us in the future from doing this, but I don't have ",
"a better way as of now to ensure failure."
],
"tests": [
{
"name": "ensure QTTs go through SqlFormatter",
"statements": [
"CREATE STREAM IGNORED (ID bigint) WITH (kafka_topic='topic', value_format='DELIMITED');",
"SET 'auto.offset.reset'='earliest';"
],
"inputs": [ ],
"outputs": [ ],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlException",
"message": "Unable to format statement!"
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.google.common.collect.Iterables.getOnlyElement;

import com.google.common.base.Strings;
import io.confluent.ksql.execution.expression.formatter.ExpressionFormatter;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.parser.tree.AliasedRelation;
import io.confluent.ksql.parser.tree.AllColumns;
Expand All @@ -39,6 +40,8 @@
import io.confluent.ksql.parser.tree.JoinCriteria;
import io.confluent.ksql.parser.tree.JoinOn;
import io.confluent.ksql.parser.tree.ListFunctions;
import io.confluent.ksql.parser.tree.ListStreams;
import io.confluent.ksql.parser.tree.ListTables;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Relation;
import io.confluent.ksql.parser.tree.Select;
Expand All @@ -48,6 +51,7 @@
import io.confluent.ksql.parser.tree.Table;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElement.Namespace;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.util.ParserUtil;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -322,6 +326,35 @@ protected Void visitDropTable(final DropTable node, final Integer context) {
return null;
}

@Override
protected Void visitTerminateQuery(final TerminateQuery node, final Integer context) {
builder.append("TERMINATE ");
builder.append(node.getQueryId().getId());
return null;
}

@Override
protected Void visitListStreams(final ListStreams node, final Integer context) {
builder.append("SHOW STREAMS");
if (node.getShowExtended()) {
visitExtended();
}
return null;
}

@Override
protected Void visitListTables(final ListTables node, final Integer context) {
builder.append("SHOW TABLES");
if (node.getShowExtended()) {
visitExtended();
}
return null;
}

private void visitExtended() {
builder.append(" EXTENDED");
}

private void visitDrop(final DropStatement node, final String sourceType) {
builder.append("DROP ");
builder.append(sourceType);
Expand Down Expand Up @@ -416,7 +449,8 @@ private void formatCreateAs(final CreateAsSelect node, final Integer indent) {
private static String formatTableElement(final TableElement e) {
return ParserUtil.escapeIfReservedIdentifier(e.getName())
+ " "
+ e.getType()
+ ExpressionFormatter.formatExpression(
e.getType(), true, ParserUtil::isReservedIdentifier)
+ (e.getNamespace() == Namespace.KEY ? " KEY" : "");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,15 @@ protected R visitSimpleGroupBy(final SimpleGroupBy node, final C context) {
return visitGroupingElement(node, context);
}

protected R visitTerminateQuery(final TerminateQuery node, final C context) {
return visitStatement(node, context);
}

protected R visitListStreams(final ListStreams node, final C context) {
return visitStatement(node, context);
}

protected R visitListTables(final ListTables node, final C context) {
return visitStatement(node, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public boolean getShowExtended() {
return showExtended;
}

@Override
public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
return visitor.visitListStreams(this, context);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public boolean getShowExtended() {
return showExtended;
}

@Override
public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
return visitor.visitListTables(this, context);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public QueryId getQueryId() {
return queryId;
}

@Override
public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
return visitor.visitTerminateQuery(this, context);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Loading

0 comments on commit 79da68c

Please sign in to comment.