Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: extended query anonymizer tests with functional tests queries #7480

Merged
merged 2 commits into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,25 @@ public void shouldAnonymizeDefineUndefineProperty() {
anon.anonymize("UNDEFINE format;"));
}

@Test
public void shouldAnonymizeSelectStatementCorrectly() {
Assert.assertEquals("SELECT * FROM source1;",
anon.anonymize("SELECT * FROM S1;"));
}

@Test
public void shouldAnonymizeExplainStatementCorrectly() {
Assert.assertEquals("EXPLAIN query", anon.anonymize("EXPLAIN my_query;"));
Assert.assertEquals("EXPLAIN query;", anon.anonymize("EXPLAIN my_query;"));
Assert.assertEquals("EXPLAIN SELECT * FROM source1;",
anon.anonymize("EXPLAIN SELECT * from S1;"));
}

@Test
public void shouldAnonymizeJoinStatementsCorrectly() {
final String output = anon.anonymize("INSERT INTO OUTPUT SELECT col1, col2, col3"
+ " FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON col1.k=col2.k;");

Approvals.verify(output);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
CREATE STREAM stream1 AS SELECT column1, column2, column3 FROM stream2 WHERE column1='0' AND column2='[string]' AND column3='[string]';
CREATE STREAM stream1 AS SELECT column1, column2, column3 FROM source1 WHERE column1='0' AND column2='[string]' AND column3='[string]';
Original file line number Diff line number Diff line change
@@ -1 +1 @@
CREATE TABLE table1 AS SELECT column1, column2, column3 FROM table2 WHERE column1='0' AND column2='[string]' AND column3='[string]';
CREATE TABLE table1 AS SELECT column1, column2, column3 FROM source1 WHERE column1='0' AND column2='[string]' AND column3='[string]';
Original file line number Diff line number Diff line change
@@ -1 +1 @@
INSERT INTO stream1 SELECT column1, column2, column3 FROM stream2 WHERE column1='0' AND column2='[string]' AND column3='[string]';
INSERT INTO stream1 SELECT column1, column2, column3 FROM source1 WHERE column1='0' AND column2='[string]' AND column3='[string]';
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
INSERT INTO stream1 SELECT column1, column2, column3 FROM source1 INNER JOIN source2 WITHIN '0' SECOND ON anonKey1=anonKey2;
Original file line number Diff line number Diff line change
@@ -1 +1 @@
CREATE STREAM stream1 AS SELECT column1, udf1 FROM stream2;
CREATE STREAM stream1 AS SELECT column1, udf1 FROM source1;
518 changes: 518 additions & 0 deletions ksqldb-functional-tests/hs_err_pid11216.log

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
CREATE TABLE L (ID INT PRIMARY KEY, V0 INT, V1 INT) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE TABLE T1 (ID INT PRIMARY KEY, V0 bigint) WITH (kafka_topic='left', key_format='KAFKA', value_format='JSON');
CREATE STREAM S1 (ID INT KEY, V0 INT) WITH (kafka_topic='left', key_format='AVRO', value_format='JSON');
CREATE STREAM S (ID BIGINT KEY, NAME STRING) WITH (kafka_topic='S', value_format='JSON');
CREATE STREAM TEST (ID BIGINT KEY, NAME varchar, VALUE bigint) WITH (kafka_topic='test_topic', value_format='JSON');
CREATE STREAM TEST1 (ID INT KEY, IGNORED STRING) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM SOURCE (K STRING KEY, A bigint, B varchar) WITH (kafka_topic='source', value_format='AVRO');
CREATE STREAM input2 (K STRING KEY, data VARCHAR) WITH (kafka_topic='input', value_format='DELIMITED');
CREATE TABLE T1 (ID BIGINT PRIMARY KEY, NAME varchar) WITH (kafka_topic='t1', key_format='AVRO', value_format='JSON');
CREATE STREAM L (ID STRING KEY, l0 DOUBLE, l1 INT) WITH (kafka_topic='left_topic', value_format='PROTOBUF');
CREATE STREAM S1 (ID INT KEY, k int, V0 bigint) WITH (kafka_topic='left', value_format='JSON');
CREATE TABLE TEST (ID BIGINT PRIMARY KEY, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='AVRO');
CREATE STREAM L (ID STRING KEY, l0 STRING, l1 INT) WITH (kafka_topic='left_topic', value_format='AVRO');
CREATE STREAM S1 (ID INT KEY, V0 bigint) WITH (kafka_topic='left', key_format='JSON_SR', value_format='JSON');
CREATE TABLE T (ID INT PRIMARY KEY, VAL STRING) WITH (kafka_topic='t', key_format='AVRO', value_format='JSON');
CREATE STREAM SOURCE1 (K STRING, data VARCHAR) WITH (kafka_topic='stream-source', value_format='JSON');
CREATE STREAM L (ID STRING KEY, l0 ARRAY<INT>, l1 INT) WITH (kafka_topic='left_topic', format='AVRO');
CREATE STREAM TEST (ID BIGINT KEY, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='PROTOBUF');
CREATE STREAM INPUT (ID STRING KEY, v DECIMAL(33, 16)) WITH (kafka_topic='test_topic', value_format='DELIMITED');
CREATE STREAM TEST (ID BIGINT KEY, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM TEST1 (K STRING KEY, ID varchar) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE TABLE TEST (ID BIGINT PRIMARY KEY, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM S1 (ID BIGINT KEY, NAME varchar, TS bigint) WITH (timestamp='TS', kafka_topic='s1', value_format='JSON');
CREATE STREAM L (ID STRING KEY, l0 BIGINT, l1 INT) WITH (kafka_topic='left_topic', value_format='PROTOBUF');
CREATE STREAM TEST (ID BIGINT KEY, NAME varchar, VALUE bigint) WITH (kafka_topic='test_topic', value_format='PROTOBUF');
CREATE STREAM S1 (ID BIGINT KEY, NAME STRING) WITH (WRAP_SINGLE_VALUE=false, kafka_topic='S1', value_format='JSON');
CREATE STREAM S1 WITH (kafka_topic='s1', format='JSON_SR');
CREATE TABLE T1 (ID BIGINT PRIMARY KEY, NAME STRING) WITH (WRAP_SINGLE_VALUE=false, kafka_topic='T1', value_format='JSON');
CREATE STREAM L (A INT KEY, B INT, C INT) WITH (kafka_topic='LEFT', value_format='JSON');
CREATE STREAM L (ID STRING KEY, l0 STRUCT<F1 INT, F2 STRING>, l1 INT) WITH (kafka_topic='left_topic', format='JSON');
CREATE TABLE A (id varchar primary key, regionid varchar) WITH (kafka_topic='a', value_format='json');
CREATE STREAM S1 (ID INT KEY, V0 bigint) WITH (kafka_topic='left', key_format='JSON', value_format='JSON');
CREATE STREAM SOURCE1 (K STRING KEY, data VARCHAR) WITH (kafka_topic='stream-source', value_format='DELIMITED');
CREATE STREAM L (ID INT KEY, x bigint) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM TEST (LEFT_ID BIGINT KEY, NAME varchar) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM S1 (ID INT KEY, V0 INT) WITH (kafka_topic='left', key_format='JSON_SR', value_format='JSON');
CREATE STREAM INPUT (K STRING KEY, WLAN_SA ARRAY<VARCHAR>) WITH (kafka_topic='input', value_format='JSON');
CREATE STREAM INPUT_STREAM (K STRING KEY, SF BIGINT) WITH (kafka_topic='stream_topic', value_format='JSON');
CREATE STREAM INPUT_1 (FOO INT KEY, bar INT) WITH (kafka_topic='t1', value_format='JSON');
CREATE TABLE S1 (ID BIGINT PRIMARY KEY, NAME varchar, TS bigint) WITH (timestamp='TS', kafka_topic='s1', value_format='JSON');
CREATE STREAM S1 (ID INT KEY, V bigint) WITH (kafka_topic='left_topic', key_format='JSON_SR', value_format='JSON', WINDOW_TYPE='Hopping', WINDOW_SIZE='5 SECONDS');
CREATE TABLE T1 (ID BIGINT PRIMARY KEY, NAME STRING) WITH (kafka_topic='T1', value_format='JSON');
CREATE STREAM L (ID STRING KEY, l0 INT, l1 INT) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM TEST (K STRING KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM S1 (ID INT KEY, V0 bigint) WITH (kafka_topic='left', key_format='AVRO', value_format='JSON');
CREATE STREAM L (ID STRING KEY, l0 BIGINT, l1 INT) WITH (kafka_topic='left_topic', value_format='AVRO');
CREATE STREAM S1 (ID INT KEY, K INT, V0 bigint) WITH (kafka_topic='left', value_format='JSON');
CREATE STREAM S1 (A INT KEY, B INT) WITH (kafka_topic='S1', value_format='JSON');
CREATE TABLE T1 (ID INT PRIMARY KEY, V0 bigint) WITH (kafka_topic='left', key_format='AVRO', value_format='JSON');
CREATE STREAM S1 (ID INT KEY, A bigint, B STRUCT<C INT>) WITH (kafka_topic='left_topic', value_format='JSON');
EXPLAIN SELECT * from S1;
DROP STREAM IF EXISTS input2;
INSERT INTO OUTPUT SELECT S1.K AS K, S1.ID + S2.ID as DATA, 1 as I FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON S1.k = S2.k;
DROP STREAM input2;
INSERT INTO TARGET SELECT 1 as c1, 2.0 as c2 FROM SOURCE;
INSERT INTO SINK SELECT * FROM SOURCE;
INSERT INTO OUTPUT SELECT * FROM SOURCE2;
INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON S1.K = S2.K;
DROP STREAM IF EXISTS input2;
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2021 Confluent Inc.
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
* http://www.confluent.io/confluent-community-license
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.test;

import static java.nio.charset.StandardCharsets.UTF_8;
import io.confluent.ksql.engine.rewrite.QueryAnonymizer;
import io.confluent.ksql.test.QueryTranslationTest.QttTestFile;
import io.confluent.ksql.test.loader.JsonTestLoader;
import io.confluent.ksql.test.tools.TestCase;
import io.confluent.ksql.util.GrammarTokenExporter;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.InputStreamReader;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Scanner;
import java.util.stream.Collectors;
import org.approvaltests.Approvals;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

public class QueryAnonymizerTest {
private static final Path QUERIES_TO_ANONYMIZE_PATH =
Paths.get("src/test/java/io/confluent/ksql/test/QueriesToAnonymizeTest.txt");
private static final Path QUERY_VALIDATION_TEST_DIR = Paths.get("query-validation-tests");
private final QueryAnonymizer anon = new QueryAnonymizer();

@Test
public void queriesAreAnonymizedCorrectly() throws Exception {
// Given:
StringBuilder statements = new StringBuilder();

String line;
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(new FileInputStream(QUERIES_TO_ANONYMIZE_PATH.toString()), UTF_8))) {
while ((line = reader.readLine()) != null) {
statements.append(line);
}
}

// When:
final String anonStatementSelection = anon.anonymize(statements.toString());

// Assert:
Approvals.verify(anonStatementSelection);
}

private static JsonTestLoader<TestCase> testFileLoader() {
return JsonTestLoader.of(QUERY_VALIDATION_TEST_DIR, QttTestFile.class);
}

@RunWith(Parameterized.class)
public static class AnonQuerySetIntersectionTestClass {
private static final Stream<TestCase> testCases = testFileLoader().load();
private List<String> sqlTokens;
private final QueryAnonymizer anon = new QueryAnonymizer();
private final String statement;

public AnonQuerySetIntersectionTestClass(final String statement) {
this.statement = statement;
}

@Before
public void setUp() {
sqlTokens = GrammarTokenExporter.getTokens();
sqlTokens.addAll(Arrays.asList("INT", "DOUBLE", "VARCHAR", "BOOLEAN", "BIGINT", "*"));
}

@Parameterized.Parameters
public static Collection<String> input() {
return testCases
.filter(statement -> !statement.expectedException().isPresent())
.map(TestCase::statements)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}

@Test
public void anonQuerySplitByWordsHasOnlyTokensInSetIntersectionWithQuery() {
final String anonStatement = anon.anonymize(statement);
final Set<String> statementWord = new HashSet<>(Arrays.asList(statement.split("[\\s\\(\\)<>,;]")));
final Set<String> anonStatementWord = new HashSet<>(Arrays.asList(anonStatement.split("[\\s\\(\\)<>,;]")));
final Set<String> intersection = new HashSet<>(statementWord);
intersection.retainAll(anonStatementWord);

// Assert:
intersection.removeAll(sqlTokens);
intersection.remove("");
Assert.assertEquals(0, intersection.size());
}
tolgadur marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
CREATE TABLE table1 (column1 INT PRIMARY KEY, column2 INT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table2 (column1 INT PRIMARY KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 INT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream2 (column1 BIGINT KEY, column9 STRING) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream4 (column1 INT KEY, column14 STRING) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream5 (column15 STRING KEY, column16 BIGINT, column17 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream6 (column15 STRING KEY, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 VARCHAR) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 DOUBLE, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column15 INT, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table4 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 STRING, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table5 (column1 INT PRIMARY KEY, column37 STRING) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream11 (column15 STRING, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 ARRAY<INT>, column24 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]');
CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream14 (column1 STRING KEY, column47 DECIMAL) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream4 (column15 STRING KEY, column1 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table4 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 BIGINT KEY, column9 VARCHAR, column58 BIGINT) WITH (TIMESTAMP='[string]', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 BIGINT, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 BIGINT KEY, column9 STRING) WITH (WRAP_SINGLE_VALUE='false', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 WITH (KAFKA_TOPIC='[string]', FORMAT='[string]');
CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 STRING) WITH (WRAP_SINGLE_VALUE='false', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column16 INT KEY, column17 INT, column71 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 STRUCT<INT, STRING>, column24 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]');
CREATE TABLE table8 (column1 VARCHAR PRIMARY KEY, column76 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream11 (column15 STRING KEY, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 INT KEY, column82 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column83 BIGINT KEY, column9 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 INT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream14 (column15 STRING KEY, column88 ARRAY<VARCHAR>) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream30 (column15 STRING KEY, column90 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream31 (column91 INT KEY, column92 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table9 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column58 BIGINT) WITH (TIMESTAMP='[string]', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column47 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]', WINDOW_TYPE='[string]', WINDOW_SIZE='[string]');
CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 STRING) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 INT, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column15 STRING KEY, column1 BIGINT, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 BIGINT, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column15 INT, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column16 INT KEY, column17 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table2 (column1 INT PRIMARY KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column16 BIGINT, column17 STRUCT<INT>) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
EXPLAIN SELECT * FROM source1;
DROP STREAM IF EXISTS stream6;
INSERT INTO stream41 SELECT column122, column123, column124 FROM source2 INNER JOIN source3 WITHIN '0' SECOND ON anonKey1=anonKey2;
DROP STREAM stream6;
INSERT INTO stream43 SELECT column124, column126 FROM source4;
INSERT INTO stream44 SELECT * FROM source4;
INSERT INTO stream41 SELECT * FROM source3;
INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN '0' SECOND ON anonKey1=anonKey2;
DROP STREAM IF EXISTS stream6;