From c781ee818dfb051b95c37289a96057ecd03ff87a Mon Sep 17 00:00:00 2001 From: Tolga Dur Date: Mon, 10 May 2021 18:26:19 +0100 Subject: [PATCH] fix: changed anonymizer to work on edge cases identified --- .../ksql/engine/rewrite/QueryAnonymizer.java | 244 +++++++++++++----- .../engine/rewrite/QueryAnonymizerTest.java | 18 +- ...eCreateStreamAsQueryCorrectly.approved.txt | 2 +- ...zeCreateTableAsQueryCorrectly.approved.txt | 2 +- ...dAnonymizeInsertIntoCorrectly.approved.txt | 2 +- ...nymizeJoinStatementsCorrectly.approved.txt | 1 + ...dAnonymizeUDFQueriesCorrectly.approved.txt | 2 +- .../ksql/test/QueryAnonymizerTest.java | 17 +- ...queriesAreAnonymizedCorrectly.approved.txt | 38 +-- ...queriesAreAnonymizedCorrectly.received.txt | 59 ----- 10 files changed, 234 insertions(+), 151 deletions(-) create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeJoinStatementsCorrectly.approved.txt delete mode 100644 ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.queriesAreAnonymizedCorrectly.received.txt diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/QueryAnonymizer.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/QueryAnonymizer.java index b6db341abe86..2ac8bbef309e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/QueryAnonymizer.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/QueryAnonymizer.java @@ -12,12 +12,15 @@ package io.confluent.ksql.engine.rewrite; +import com.google.common.collect.ImmutableList; import io.confluent.ksql.execution.expression.tree.FunctionCall; import io.confluent.ksql.metastore.TypeRegistry; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.parser.AstBuilder; import io.confluent.ksql.parser.DefaultKsqlParser; import io.confluent.ksql.parser.SqlBaseBaseVisitor; +import io.confluent.ksql.parser.SqlBaseParser; +import io.confluent.ksql.parser.SqlBaseParser.AliasedRelationContext; import io.confluent.ksql.parser.SqlBaseParser.AlterOptionContext; import io.confluent.ksql.parser.SqlBaseParser.AlterSourceContext; import io.confluent.ksql.parser.SqlBaseParser.BooleanDefaultContext; @@ -41,6 +44,11 @@ import io.confluent.ksql.parser.SqlBaseParser.InsertIntoContext; import io.confluent.ksql.parser.SqlBaseParser.InsertValuesContext; import io.confluent.ksql.parser.SqlBaseParser.IntegerLiteralContext; +import io.confluent.ksql.parser.SqlBaseParser.JoinRelationContext; +import io.confluent.ksql.parser.SqlBaseParser.JoinWindowSizeContext; +import io.confluent.ksql.parser.SqlBaseParser.JoinWindowWithBeforeAndAfterContext; +import io.confluent.ksql.parser.SqlBaseParser.JoinedSourceContext; +import io.confluent.ksql.parser.SqlBaseParser.LeftJoinContext; import io.confluent.ksql.parser.SqlBaseParser.ListConnectorsContext; import io.confluent.ksql.parser.SqlBaseParser.ListFunctionsContext; import io.confluent.ksql.parser.SqlBaseParser.ListPropertiesContext; @@ -51,20 +59,24 @@ import io.confluent.ksql.parser.SqlBaseParser.ListVariablesContext; import io.confluent.ksql.parser.SqlBaseParser.LogicalBinaryContext; import io.confluent.ksql.parser.SqlBaseParser.NumericLiteralContext; +import io.confluent.ksql.parser.SqlBaseParser.OuterJoinContext; import io.confluent.ksql.parser.SqlBaseParser.PartitionByContext; import io.confluent.ksql.parser.SqlBaseParser.PrintTopicContext; import io.confluent.ksql.parser.SqlBaseParser.QueryContext; import io.confluent.ksql.parser.SqlBaseParser.RegisterTypeContext; +import io.confluent.ksql.parser.SqlBaseParser.RelationDefaultContext; import io.confluent.ksql.parser.SqlBaseParser.SelectItemContext; import io.confluent.ksql.parser.SqlBaseParser.SelectSingleContext; import io.confluent.ksql.parser.SqlBaseParser.SetPropertyContext; import io.confluent.ksql.parser.SqlBaseParser.ShowColumnsContext; import io.confluent.ksql.parser.SqlBaseParser.SingleExpressionContext; +import io.confluent.ksql.parser.SqlBaseParser.SingleJoinWindowContext; import io.confluent.ksql.parser.SqlBaseParser.SingleStatementContext; import io.confluent.ksql.parser.SqlBaseParser.StatementsContext; import io.confluent.ksql.parser.SqlBaseParser.StringLiteralContext; import io.confluent.ksql.parser.SqlBaseParser.TableElementContext; import io.confluent.ksql.parser.SqlBaseParser.TableElementsContext; +import io.confluent.ksql.parser.SqlBaseParser.TableNameContext; import io.confluent.ksql.parser.SqlBaseParser.TablePropertiesContext; import io.confluent.ksql.parser.SqlBaseParser.TablePropertyContext; import io.confluent.ksql.parser.SqlBaseParser.TerminateQueryContext; @@ -73,6 +85,7 @@ import io.confluent.ksql.parser.SqlBaseParser.UnquotedIdentifierContext; import io.confluent.ksql.parser.SqlBaseParser.UnsetPropertyContext; import io.confluent.ksql.parser.SqlBaseParser.ValueExpressionContext; +import io.confluent.ksql.parser.SqlBaseParser.WithinExpressionContext; import io.confluent.ksql.util.ParserUtil; import java.util.ArrayList; import java.util.Hashtable; @@ -102,6 +115,7 @@ private static final class Visitor extends SqlBaseBaseVisitor { private int columnCount = 1; private int tableCount = 1; private int udfCount = 1; + private int sourceCount = 1; private final Hashtable anonTable = new Hashtable<>(); @Override @@ -116,7 +130,7 @@ public String visitStatements(final StatementsContext context) { @Override public String visitSingleStatement(final SingleStatementContext context) { - return visit(context.statement()); + return String.format("%s;", visit(context.statement())); } @Override @@ -161,7 +175,7 @@ public String visitAlterSource(final AlterSourceContext context) { } stringBuilder.append(String.format(" (%s)", StringUtils.join(alterOptions, ", "))); - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -184,7 +198,7 @@ public String visitRegisterType(final RegisterTypeContext context) { // anonymize type stringBuilder.append(String.format(" type AS %s", visit(context.type()))); - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -211,7 +225,7 @@ public String visitCreateConnector(final CreateConnectorContext context) { stringBuilder.append(visit(context.tableProperties())); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -229,10 +243,10 @@ public String visitInsertInto(final InsertIntoContext context) { // anonymize with query if (context.query() != null) { - stringBuilder.append(String.format(" SELECT %s", getQuery(context.query(), true))); + stringBuilder.append(String.format(" %s", visit(context.query()))); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -262,7 +276,7 @@ public String visitInsertValues(final InsertValuesContext context) { } stringBuilder.append(String.format(" VALUES (%s)", StringUtils.join(values, " ,"))); - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -278,7 +292,7 @@ public String visitListConnectors(final ListConnectorsContext context) { stringBuilder.append(" CONNECTORS"); - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -290,31 +304,31 @@ public String visitListStreams(final ListStreamsContext context) { stringBuilder.append(" EXTENDED"); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override public String visitListFunctions(final ListFunctionsContext context) { final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW(); - return String.format("%s FUNCTIONS;", listOrVisit.toString()); + return String.format("%s FUNCTIONS", listOrVisit.toString()); } @Override public String visitListProperties(final ListPropertiesContext context) { final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW(); - return String.format("%s PROPERTIES;", listOrVisit.toString()); + return String.format("%s PROPERTIES", listOrVisit.toString()); } @Override public String visitListTypes(final ListTypesContext context) { final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW(); - return String.format("%s TYPES;", listOrVisit.toString()); + return String.format("%s TYPES", listOrVisit.toString()); } @Override public String visitListVariables(final ListVariablesContext context) { final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW(); - return String.format("%s VARIABLES;", listOrVisit.toString()); + return String.format("%s VARIABLES", listOrVisit.toString()); } @Override @@ -326,7 +340,7 @@ public String visitListQueries(final ListQueriesContext context) { stringBuilder.append(" EXTENDED"); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -344,17 +358,17 @@ public String visitListTopics(final ListTopicsContext context) { stringBuilder.append(" EXTENDED"); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override public String visitDescribeFunction(final DescribeFunctionContext context) { - return "DESCRIBE FUNCTION function;"; + return "DESCRIBE FUNCTION function"; } @Override public String visitDescribeConnector(final DescribeConnectorContext context) { - return "DESCRIBE CONNECTOR connector;"; + return "DESCRIBE CONNECTOR connector"; } @Override @@ -373,15 +387,15 @@ public String visitPrintTopic(final PrintTopicContext context) { stringBuilder.append(" LIMIT '0'"); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override public String visitTerminateQuery(final TerminateQueryContext context) { if (context.ALL() != null) { - return "TERMINATE ALL;"; + return "TERMINATE ALL"; } - return "TERMINATE query;"; + return "TERMINATE query"; } @@ -393,7 +407,7 @@ public String visitDescribeStreams(final DescribeStreamsContext context) { stringBuilder.append("EXTENDED"); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -412,34 +426,39 @@ public String visitShowColumns(final ShowColumnsContext context) { stringBuilder.append(" EXTENDED"); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override public String visitSetProperty(final SetPropertyContext context) { final String propertyName = context.STRING(0).getText(); - return String.format("SET %s='[string]';", propertyName); + return String.format("SET %s='[string]'", propertyName); } @Override public String visitUnsetProperty(final UnsetPropertyContext context) { final String propertyName = context.STRING().getText(); - return String.format("UNSET %s;", propertyName); + return String.format("UNSET %s", propertyName); } @Override public String visitDefineVariable(final DefineVariableContext context) { - return "DEFINE variable='[string]';"; + return "DEFINE variable='[string]'"; } @Override public String visitUndefineVariable(final UndefineVariableContext context) { - return "UNDEFINE variable;"; + return "UNDEFINE variable"; } @Override public String visitExplain(final ExplainContext context) { - return "EXPLAIN query"; + if (context.identifier() != null) { + return "EXPLAIN query"; + } + + final String subQuery = visit(context.statement()); + return String.format("EXPLAIN %s", subQuery); } @Override @@ -542,10 +561,10 @@ public String visitCreateStreamAs(final CreateStreamAsContext context) { // rest of query if (context.query() != null) { - stringBuilder.append(String.format(" AS SELECT %s", getQuery(context.query(), true))); + stringBuilder.append(String.format(" AS %s", visit(context.query()))); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -578,7 +597,7 @@ public String visitCreateStream(final CreateStreamContext context) { stringBuilder.append(visit(context.tableProperties())); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -608,10 +627,10 @@ public String visitCreateTableAs(final CreateTableAsContext context) { // rest of query if (context.query() != null) { - stringBuilder.append(String.format(" AS SELECT %s", getQuery(context.query(), false))); + stringBuilder.append(String.format(" AS %s", visit(context.query()))); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -644,7 +663,7 @@ public String visitCreateTable(final CreateTableContext context) { stringBuilder.append(visit(context.tableProperties())); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -706,7 +725,7 @@ public String visitDropTable(final DropTableContext context) { stringBuilder.append(" DELETE TOPIC"); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -725,7 +744,7 @@ public String visitDropStream(final DropStreamContext context) { stringBuilder.append(" DELETE TOPIC"); } - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -738,7 +757,7 @@ public String visitDropConnector(final DropConnectorContext context) { stringBuilder.append("connector"); - return String.format("%s;", stringBuilder); + return stringBuilder.toString(); } @Override @@ -751,42 +770,26 @@ public String visitDropType(final DropTypeContext context) { stringBuilder.append("type"); - return String.format("%s;", stringBuilder); - } - - private String getTypeName(final TypeContext context) { - if (context.DECIMAL() != null) { - return "DECIMAL"; - } - switch (context.getText().toUpperCase()) { - case ("BOOLEAN"): - case ("INTEGER"): - case ("INT"): - case ("BIGINT"): - case ("DOUBLE"): - case ("STRING"): - case ("VARCHAR"): - return context.getText().toUpperCase(); - default: - return "CUSTOM_TYPE"; - } + return stringBuilder.toString(); } - private String getQuery(final QueryContext context, final boolean isStream) { - final StringBuilder stringBuilder = new StringBuilder(); + @Override + public String visitQuery(final QueryContext context) { + final StringBuilder stringBuilder = new StringBuilder("SELECT "); // visit as select items final List selectItemList = new ArrayList<>(); for (SelectItemContext selectItem : context.selectItem()) { - selectItemList.add(visit(selectItem)); + if (selectItem.getText().equals("*")) { + selectItemList.add("*"); + } else { + selectItemList.add(visit(selectItem)); + } } stringBuilder.append(StringUtils.join(selectItemList, ", ")); // visit from statement - final String streamTableName = context.from.getText(); - final String anonStreamTableName = - !isStream ? getAnonTableName(streamTableName) : getAnonStreamName(streamTableName); - stringBuilder.append(String.format(" FROM %s", anonStreamTableName)); + stringBuilder.append(String.format(" FROM %s", visit(context.from))); // visit where statement if (context.where != null) { @@ -811,6 +814,117 @@ private String getQuery(final QueryContext context, final boolean isStream) { return stringBuilder.toString(); } + @Override + public String visitAliasedRelation(final AliasedRelationContext context) { + return visit(context.relationPrimary()); + } + + @Override + public String visitRelationDefault(final RelationDefaultContext context) { + return getAnonSourceName(context.getText()); + } + + @Override + public String visitTableName(final TableNameContext context) { + return getAnonSourceName(context.getText()); + } + + @Override + public String visitJoinRelation(final JoinRelationContext context) { + final String left = visit(context.left); + final ImmutableList rights = context + .joinedSource() + .stream() + .map(this::visitJoinedSource) + .collect(ImmutableList.toImmutableList()); + + return String.format("%s %s", left, String.join(" ", rights)); + } + + @Override + public String visitJoinedSource(final JoinedSourceContext context) { + final StringBuilder stringBuilder = new StringBuilder(); + + // get join type + final SqlBaseParser.JoinTypeContext joinTypeContext = context.joinType(); + if (joinTypeContext instanceof LeftJoinContext) { + stringBuilder.append("LEFT OUTER "); + } else if (joinTypeContext instanceof OuterJoinContext) { + stringBuilder.append("FULL OUTER "); + } else { + stringBuilder.append("INNER "); + } + + // right side of join + final String right = visit(context.aliasedRelation()); + stringBuilder.append(String.format("JOIN %s", right)); + + // visit within expression + if (context.joinWindow() != null) { + stringBuilder.append(visitWithinExpression(context.joinWindow().withinExpression())); + } + + // visit join on + stringBuilder.append("ON anonKey1=anonKey2"); + + return stringBuilder.toString(); + } + + private static String visitWithinExpression(final WithinExpressionContext context) { + final StringBuilder stringBuilder = new StringBuilder(" WITHIN "); + + if (context instanceof SingleJoinWindowContext) { + final SqlBaseParser.SingleJoinWindowContext singleWithin + = (SqlBaseParser.SingleJoinWindowContext) context; + + stringBuilder.append(getSizeAndUnitFromJoinWindowSize(singleWithin.joinWindowSize())); + } else if (context instanceof JoinWindowWithBeforeAndAfterContext) { + final SqlBaseParser.JoinWindowWithBeforeAndAfterContext beforeAndAfterJoinWindow + = (SqlBaseParser.JoinWindowWithBeforeAndAfterContext) context; + + final String before = + getSizeAndUnitFromJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(0)); + final String after = + getSizeAndUnitFromJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(1)); + stringBuilder.append(String.format("(%s, %s)", before, after)); + } else { + throw new RuntimeException("Expecting either a single join window, ie \"WITHIN 10 " + + "seconds\", or a join window with before and after specified, " + + "ie. \"WITHIN (10 seconds, 20 seconds)"); + } + + return stringBuilder.toString(); + } + + private static String getSizeAndUnitFromJoinWindowSize( + final JoinWindowSizeContext joinWindowSize + ) { + return String.format("%s %s ", + "'0'", joinWindowSize.windowUnit().getText().toUpperCase()); + } + + private String getTypeName(final TypeContext context) { + if (context.DECIMAL() != null) { + return "DECIMAL"; + } + switch (context.getText().toUpperCase()) { + case ("BOOLEAN"): + case ("INTEGER"): + case ("INT"): + case ("BIGINT"): + case ("DOUBLE"): + case ("STRING"): + case ("VARCHAR"): + return context.getText().toUpperCase(); + default: + return "CUSTOM_TYPE"; + } + } + + private String getAnonSourceName(final String originName) { + return getAnonName(originName, "source", sourceCount++); + } + private String getAnonUdfName(final String originName) { return getAnonName(originName, "udf", udfCount++); } @@ -828,12 +942,12 @@ private String getAnonTableName(final String originName) { } private String getAnonName(final String originName, final String genericName, final int count) { - if (anonTable.containsKey(originName)) { - return anonTable.get(originName); + if (anonTable.containsKey(originName + genericName)) { + return anonTable.get(originName + genericName); } final String newName = String.format("%s%d", genericName, count); - anonTable.put(originName, newName); + anonTable.put(originName + genericName, newName); return newName; } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.java index 3cdc505bd69a..eb9529d5582f 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.java @@ -112,9 +112,25 @@ public void shouldAnonymizeDefineUndefineProperty() { anon.anonymize("UNDEFINE format;")); } + @Test + public void shouldAnonymizeSelectStatementCorrectly() { + Assert.assertEquals("SELECT * FROM source1;", + anon.anonymize("SELECT * FROM S1;")); + } + @Test public void shouldAnonymizeExplainStatementCorrectly() { - Assert.assertEquals("EXPLAIN query", anon.anonymize("EXPLAIN my_query;")); + Assert.assertEquals("EXPLAIN query;", anon.anonymize("EXPLAIN my_query;")); + Assert.assertEquals("EXPLAIN SELECT * FROM source1;", + anon.anonymize("EXPLAIN SELECT * from S1;")); + } + + @Test + public void shouldAnonymizeJoinStatementsCorrectly() { + final String output = anon.anonymize("INSERT INTO OUTPUT SELECT col1, col2, col3" + + " FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON col1.k=col2.k;"); + + Approvals.verify(output); } @Test diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateStreamAsQueryCorrectly.approved.txt b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateStreamAsQueryCorrectly.approved.txt index 42009bb2f915..07e8ad103915 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateStreamAsQueryCorrectly.approved.txt +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateStreamAsQueryCorrectly.approved.txt @@ -1 +1 @@ -CREATE STREAM stream1 AS SELECT column1, column2, column3 FROM stream2 WHERE column1='0' AND column2='[string]' AND column3='[string]'; \ No newline at end of file +CREATE STREAM stream1 AS SELECT column1, column2, column3 FROM source1 WHERE column1='0' AND column2='[string]' AND column3='[string]'; \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateTableAsQueryCorrectly.approved.txt b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateTableAsQueryCorrectly.approved.txt index 47876fdb3443..d00423858ffe 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateTableAsQueryCorrectly.approved.txt +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateTableAsQueryCorrectly.approved.txt @@ -1 +1 @@ -CREATE TABLE table1 AS SELECT column1, column2, column3 FROM table2 WHERE column1='0' AND column2='[string]' AND column3='[string]'; \ No newline at end of file +CREATE TABLE table1 AS SELECT column1, column2, column3 FROM source1 WHERE column1='0' AND column2='[string]' AND column3='[string]'; \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeInsertIntoCorrectly.approved.txt b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeInsertIntoCorrectly.approved.txt index 36256533307e..93afc2600db1 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeInsertIntoCorrectly.approved.txt +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeInsertIntoCorrectly.approved.txt @@ -1 +1 @@ -INSERT INTO stream1 SELECT column1, column2, column3 FROM stream2 WHERE column1='0' AND column2='[string]' AND column3='[string]'; \ No newline at end of file +INSERT INTO stream1 SELECT column1, column2, column3 FROM source1 WHERE column1='0' AND column2='[string]' AND column3='[string]'; \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeJoinStatementsCorrectly.approved.txt b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeJoinStatementsCorrectly.approved.txt new file mode 100644 index 000000000000..82f407437c0c --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeJoinStatementsCorrectly.approved.txt @@ -0,0 +1 @@ +INSERT INTO stream1 SELECT column1, column2, column3 FROM source1 INNER JOIN source2 WITHIN '0' SECOND ON anonKey1=anonKey2; \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeUDFQueriesCorrectly.approved.txt b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeUDFQueriesCorrectly.approved.txt index cab14be4ceeb..6cae4d2b35c9 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeUDFQueriesCorrectly.approved.txt +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeUDFQueriesCorrectly.approved.txt @@ -1 +1 @@ -CREATE STREAM stream1 AS SELECT column1, udf1 FROM stream2; \ No newline at end of file +CREATE STREAM stream1 AS SELECT column1, udf1 FROM source1; \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.java index 9a3522528125..17c8a4a3bc93 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.java +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.java @@ -1,5 +1,18 @@ +/* + * Copyright 2021 Confluent Inc. + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * http://www.confluent.io/confluent-community-license + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package io.confluent.ksql.test; +import io.confluent.ksql.engine.rewrite.QueryAnonymizer; import io.confluent.ksql.test.QueryTranslationTest.QttTestFile; import io.confluent.ksql.test.loader.JsonTestLoader; import io.confluent.ksql.test.tools.TestCase; @@ -16,14 +29,12 @@ import java.util.List; import java.util.Set; import java.util.stream.Stream; -import io.confluent.ksql.util.QueryAnonymizer; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -@RunWith(Parameterized.class) public class QueryAnonymizerTest { private static final Path QUERIES_TO_ANONYMIZE_PATH = Paths.get("src/test/java/io/confluent/ksql/test/QueriesToAnonymizeTest.txt"); @@ -65,7 +76,7 @@ public AnonQuerySetIntersectionTestClass(final String statement) { @Before public void setUp() { sqlTokens = GrammarTokenExporter.getTokens(); - sqlTokens.addAll(Arrays.asList("INT", "DOUBLE", "VARCHAR", "BOOLEAN", "BIGINT")); + sqlTokens.addAll(Arrays.asList("INT", "DOUBLE", "VARCHAR", "BOOLEAN", "BIGINT", "*")); } @Parameterized.Parameters diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.queriesAreAnonymizedCorrectly.approved.txt b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.queriesAreAnonymizedCorrectly.approved.txt index 7dfae1fe3d8a..3d0a1ba23b02 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.queriesAreAnonymizedCorrectly.approved.txt +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.queriesAreAnonymizedCorrectly.approved.txt @@ -7,53 +7,53 @@ CREATE STREAM stream4 (column1 INT KEY, column14 STRING) WITH (KAFKA_TOPIC='[str CREATE STREAM stream5 (column15 STRING KEY, column16 BIGINT, column17 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream6 (column15 STRING KEY, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 VARCHAR) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM table1 (column1 STRING KEY, column23 DOUBLE, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); +CREATE STREAM stream7 (column1 STRING KEY, column23 DOUBLE, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream1 (column1 INT KEY, column15 INT, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE TABLE stream3 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM table1 (column1 STRING KEY, column23 STRING, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); +CREATE TABLE table4 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); +CREATE STREAM stream7 (column1 STRING KEY, column23 STRING, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); CREATE TABLE table5 (column1 INT PRIMARY KEY, column37 STRING) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream11 (column15 STRING, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM table1 (column1 STRING KEY, column23 ARRAY, column24 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]'); +CREATE STREAM stream7 (column1 STRING KEY, column23 ARRAY, column24 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]'); CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream14 (column1 STRING KEY, column47 DECIMAL) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream4 (column15 STRING KEY, column1 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE TABLE table1 (column1 BIGINT PRIMARY KEY, column2 VARCHAR, column3 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); +CREATE TABLE table4 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream1 (column1 BIGINT KEY, column9 VARCHAR, column58 BIGINT) WITH (TIMESTAMP='[string]', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 STRING KEY, column2 BIGINT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); +CREATE STREAM stream7 (column1 STRING KEY, column23 BIGINT, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream1 (column1 BIGINT KEY, column9 STRING) WITH (WRAP_SINGLE_VALUE='false', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream1 WITH (KAFKA_TOPIC='[string]', FORMAT='[string]'); CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 STRING) WITH (WRAP_SINGLE_VALUE='false', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 INT KEY, column2 INT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 STRING KEY, column2 STRUCT, column3 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]'); -CREATE TABLE table1 (column1 VARCHAR PRIMARY KEY, column2 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); +CREATE STREAM stream7 (column16 INT KEY, column17 INT, column71 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); +CREATE STREAM stream7 (column1 STRING KEY, column23 STRUCT, column24 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]'); +CREATE TABLE table8 (column1 VARCHAR PRIMARY KEY, column76 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream11 (column15 STRING KEY, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); +CREATE STREAM stream7 (column1 INT KEY, column82 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream3 (column83 BIGINT KEY, column9 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream1 (column1 INT KEY, column2 INT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream14 (column15 STRING KEY, column88 ARRAY) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream30 (column15 STRING KEY, column90 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream31 (column91 INT KEY, column92 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE TABLE table1 (column1 BIGINT PRIMARY KEY, column2 VARCHAR, column3 BIGINT) WITH (TIMESTAMP='[string]', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); +CREATE TABLE table9 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column58 BIGINT) WITH (TIMESTAMP='[string]', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream1 (column1 INT KEY, column47 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]', WINDOW_TYPE='[string]', WINDOW_SIZE='[string]'); CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 STRING) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 STRING KEY, column2 INT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); +CREATE STREAM stream7 (column1 STRING KEY, column23 INT, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream3 (column15 STRING KEY, column1 BIGINT, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 STRING KEY, column2 BIGINT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); +CREATE STREAM stream7 (column1 STRING KEY, column23 BIGINT, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream1 (column1 INT KEY, column15 INT, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream1 (column16 INT KEY, column17 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); CREATE TABLE table2 (column1 INT PRIMARY KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); CREATE STREAM stream1 (column1 INT KEY, column16 BIGINT, column17 STRUCT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -EXPLAIN SELECT * from stream1; +EXPLAIN SELECT * FROM source1; DROP STREAM IF EXISTS stream6; -INSERT INTO OUTPUT SELECT column122, column123, column124 FROM stream11 JOIN stream2 WITHIN '0' SECOND ON stream11.k = stream2.k; +INSERT INTO stream41 SELECT column122, column123, column124 FROM source2 INNER JOIN source3 WITHIN '0' SECOND ON anonKey1=anonKey2; DROP STREAM stream6; -INSERT INTO TARGET SELECT column124, column126 FROM stream5; -INSERT INTO SINK SELECT * FROM stream5; -INSERT INTO OUTPUT SELECT * FROM stream49; -INSERT INTO OUTPUT SELECT stream11, stream11, stream2 FROM stream11 JOIN stream2 WITHIN 1 SECOND ON stream2.K = stream2.K; +INSERT INTO stream43 SELECT column124, column126 FROM source4; +INSERT INTO stream44 SELECT * FROM source4; +INSERT INTO stream41 SELECT * FROM source3; +INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN '0' SECOND ON anonKey1=anonKey2; DROP STREAM IF EXISTS stream6; \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.queriesAreAnonymizedCorrectly.received.txt b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.queriesAreAnonymizedCorrectly.received.txt deleted file mode 100644 index 8d071cea12ef..000000000000 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.queriesAreAnonymizedCorrectly.received.txt +++ /dev/null @@ -1,59 +0,0 @@ -CREATE TABLE table1 (column1 INT PRIMARY KEY, column2 INT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE TABLE table2 (column1 INT PRIMARY KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 INT KEY, column2 INT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream2 (column1 BIGINT KEY, column9 STRING) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream4 (column1 INT KEY, column14 STRING) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream5 (column15 STRING KEY, column16 BIGINT, column17 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream6 (column15 STRING KEY, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 VARCHAR) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM table1 (column1 STRING KEY, column23 DOUBLE, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 INT KEY, column15 INT, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE TABLE stream3 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM table1 (column1 STRING KEY, column23 STRING, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); -CREATE TABLE table5 (column1 INT PRIMARY KEY, column37 STRING) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream11 (column15 STRING, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM table1 (column1 STRING KEY, column23 ARRAY, column24 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]'); -CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream14 (column1 STRING KEY, column47 DECIMAL) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream4 (column15 STRING KEY, column1 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE TABLE stream3 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 BIGINT KEY, column9 VARCHAR, column58 BIGINT) WITH (TIMESTAMP='[string]', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM table1 (column1 STRING KEY, column23 BIGINT, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 BIGINT KEY, column9 STRING) WITH (WRAP_SINGLE_VALUE='false', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 WITH (KAFKA_TOPIC='[string]', FORMAT='[string]'); -CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 STRING) WITH (WRAP_SINGLE_VALUE='false', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM table1 (column16 INT KEY, column17 INT, column71 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM table1 (column1 STRING KEY, column23 STRUCT, column24 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]'); -CREATE TABLE column16 (column1 VARCHAR PRIMARY KEY, column76 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream11 (column15 STRING KEY, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM table1 (column1 INT KEY, column82 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream3 (column83 BIGINT KEY, column9 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 INT KEY, column2 INT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream14 (column15 STRING KEY, column88 ARRAY) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream30 (column15 STRING KEY, column90 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream31 (column91 INT KEY, column92 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE TABLE stream1 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column58 BIGINT) WITH (TIMESTAMP='[string]', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 INT KEY, column47 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]', WINDOW_TYPE='[string]', WINDOW_SIZE='[string]'); -CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 STRING) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM table1 (column1 STRING KEY, column23 INT, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream3 (column15 STRING KEY, column1 BIGINT, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM table1 (column1 STRING KEY, column23 BIGINT, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 INT KEY, column15 INT, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column16 INT KEY, column17 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -CREATE TABLE table2 (column1 INT PRIMARY KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]'); -CREATE STREAM stream1 (column1 INT KEY, column16 BIGINT, column17 STRUCT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); -EXPLAIN query -DROP STREAM IF EXISTS stream6; -INSERT INTO stream41 SELECT column122, column123, column124 FROM stream42; -DROP STREAM stream6; -INSERT INTO stream44 SELECT column124, column126 FROM stream5; -INSERT INTO stream46 SELECT FROM stream5; -INSERT INTO stream41 SELECT FROM stream49; -INSERT INTO stream41 SELECT column122, column128, column129 FROM stream51; -DROP STREAM IF EXISTS stream6; \ No newline at end of file