Skip to content

Commit

Permalink
fix: extended query anonymizer tests with functional tests queries
Browse files Browse the repository at this point in the history
  • Loading branch information
tolgadur committed May 6, 2021
1 parent d350924 commit e97a32e
Show file tree
Hide file tree
Showing 6 changed files with 828 additions and 8 deletions.
518 changes: 518 additions & 0 deletions ksqldb-functional-tests/hs_err_pid11216.log

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
CREATE TABLE L (ID INT PRIMARY KEY, V0 INT, V1 INT) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE TABLE T1 (ID INT PRIMARY KEY, V0 bigint) WITH (kafka_topic='left', key_format='KAFKA', value_format='JSON');
CREATE STREAM S1 (ID INT KEY, V0 INT) WITH (kafka_topic='left', key_format='AVRO', value_format='JSON');
CREATE STREAM S (ID BIGINT KEY, NAME STRING) WITH (kafka_topic='S', value_format='JSON');
CREATE STREAM TEST (ID BIGINT KEY, NAME varchar, VALUE bigint) WITH (kafka_topic='test_topic', value_format='JSON');
CREATE STREAM TEST1 (ID INT KEY, IGNORED STRING) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM SOURCE (K STRING KEY, A bigint, B varchar) WITH (kafka_topic='source', value_format='AVRO');
CREATE STREAM input2 (K STRING KEY, data VARCHAR) WITH (kafka_topic='input', value_format='DELIMITED');
CREATE TABLE T1 (ID BIGINT PRIMARY KEY, NAME varchar) WITH (kafka_topic='t1', key_format='AVRO', value_format='JSON');
CREATE STREAM L (ID STRING KEY, l0 DOUBLE, l1 INT) WITH (kafka_topic='left_topic', value_format='PROTOBUF');
CREATE STREAM S1 (ID INT KEY, k int, V0 bigint) WITH (kafka_topic='left', value_format='JSON');
CREATE TABLE TEST (ID BIGINT PRIMARY KEY, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='AVRO');
CREATE STREAM L (ID STRING KEY, l0 STRING, l1 INT) WITH (kafka_topic='left_topic', value_format='AVRO');
CREATE STREAM S1 (ID INT KEY, V0 bigint) WITH (kafka_topic='left', key_format='JSON_SR', value_format='JSON');
CREATE TABLE T (ID INT PRIMARY KEY, VAL STRING) WITH (kafka_topic='t', key_format='AVRO', value_format='JSON');
CREATE STREAM SOURCE1 (K STRING, data VARCHAR) WITH (kafka_topic='stream-source', value_format='JSON');
CREATE STREAM L (ID STRING KEY, l0 ARRAY<INT>, l1 INT) WITH (kafka_topic='left_topic', format='AVRO');
CREATE STREAM TEST (ID BIGINT KEY, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='PROTOBUF');
CREATE STREAM INPUT (ID STRING KEY, v DECIMAL(33, 16)) WITH (kafka_topic='test_topic', value_format='DELIMITED');
CREATE STREAM TEST (ID BIGINT KEY, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM TEST1 (K STRING KEY, ID varchar) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE TABLE TEST (ID BIGINT PRIMARY KEY, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM S1 (ID BIGINT KEY, NAME varchar, TS bigint) WITH (timestamp='TS', kafka_topic='s1', value_format='JSON');
CREATE STREAM L (ID STRING KEY, l0 BIGINT, l1 INT) WITH (kafka_topic='left_topic', value_format='PROTOBUF');
CREATE STREAM TEST (ID BIGINT KEY, NAME varchar, VALUE bigint) WITH (kafka_topic='test_topic', value_format='PROTOBUF');
CREATE STREAM S1 (ID BIGINT KEY, NAME STRING) WITH (WRAP_SINGLE_VALUE=false, kafka_topic='S1', value_format='JSON');
CREATE STREAM S1 WITH (kafka_topic='s1', format='JSON_SR');
CREATE TABLE T1 (ID BIGINT PRIMARY KEY, NAME STRING) WITH (WRAP_SINGLE_VALUE=false, kafka_topic='T1', value_format='JSON');
CREATE STREAM L (A INT KEY, B INT, C INT) WITH (kafka_topic='LEFT', value_format='JSON');
CREATE STREAM L (ID STRING KEY, l0 STRUCT<F1 INT, F2 STRING>, l1 INT) WITH (kafka_topic='left_topic', format='JSON');
CREATE TABLE A (id varchar primary key, regionid varchar) WITH (kafka_topic='a', value_format='json');
CREATE STREAM S1 (ID INT KEY, V0 bigint) WITH (kafka_topic='left', key_format='JSON', value_format='JSON');
CREATE STREAM SOURCE1 (K STRING KEY, data VARCHAR) WITH (kafka_topic='stream-source', value_format='DELIMITED');
CREATE STREAM L (ID INT KEY, x bigint) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM TEST (LEFT_ID BIGINT KEY, NAME varchar) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM S1 (ID INT KEY, V0 INT) WITH (kafka_topic='left', key_format='JSON_SR', value_format='JSON');
CREATE STREAM INPUT (K STRING KEY, WLAN_SA ARRAY<VARCHAR>) WITH (kafka_topic='input', value_format='JSON');
CREATE STREAM INPUT_STREAM (K STRING KEY, SF BIGINT) WITH (kafka_topic='stream_topic', value_format='JSON');
CREATE STREAM INPUT_1 (FOO INT KEY, bar INT) WITH (kafka_topic='t1', value_format='JSON');
CREATE TABLE S1 (ID BIGINT PRIMARY KEY, NAME varchar, TS bigint) WITH (timestamp='TS', kafka_topic='s1', value_format='JSON');
CREATE STREAM S1 (ID INT KEY, V bigint) WITH (kafka_topic='left_topic', key_format='JSON_SR', value_format='JSON', WINDOW_TYPE='Hopping', WINDOW_SIZE='5 SECONDS');
CREATE TABLE T1 (ID BIGINT PRIMARY KEY, NAME STRING) WITH (kafka_topic='T1', value_format='JSON');
CREATE STREAM L (ID STRING KEY, l0 INT, l1 INT) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM TEST (K STRING KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='JSON');
CREATE STREAM S1 (ID INT KEY, V0 bigint) WITH (kafka_topic='left', key_format='AVRO', value_format='JSON');
CREATE STREAM L (ID STRING KEY, l0 BIGINT, l1 INT) WITH (kafka_topic='left_topic', value_format='AVRO');
CREATE STREAM S1 (ID INT KEY, K INT, V0 bigint) WITH (kafka_topic='left', value_format='JSON');
CREATE STREAM S1 (A INT KEY, B INT) WITH (kafka_topic='S1', value_format='JSON');
CREATE TABLE T1 (ID INT PRIMARY KEY, V0 bigint) WITH (kafka_topic='left', key_format='AVRO', value_format='JSON');
CREATE STREAM S1 (ID INT KEY, A bigint, B STRUCT<C INT>) WITH (kafka_topic='left_topic', value_format='JSON');
EXPLAIN SELECT * from S1;
DROP STREAM IF EXISTS input2;
INSERT INTO OUTPUT SELECT S1.K AS K, S1.ID + S2.ID as DATA, 1 as I FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON S1.k = S2.k;
DROP STREAM input2;
INSERT INTO TARGET SELECT 1 as c1, 2.0 as c2 FROM SOURCE;
INSERT INTO SINK SELECT * FROM SOURCE;
INSERT INTO OUTPUT SELECT * FROM SOURCE2;
INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON S1.K = S2.K;
DROP STREAM IF EXISTS input2;
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.confluent.ksql.test;

import io.confluent.ksql.test.QueryTranslationTest.QttTestFile;
import io.confluent.ksql.test.loader.JsonTestLoader;
import io.confluent.ksql.test.tools.TestCase;
import io.confluent.ksql.util.GrammarTokenExporter;
import java.io.FileReader;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Scanner;
import java.util.stream.Collectors;
import org.approvaltests.Approvals;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import io.confluent.ksql.util.QueryAnonymizer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

public class QueryAnonymizerTest {
private static final Path QUERIES_TO_ANONYMIZE_PATH =
Paths.get("src/test/java/io/confluent/ksql/test/QueriesToAnonymizeTest.txt");
private static final Path QUERY_VALIDATION_TEST_DIR = Paths.get("query-validation-tests");
private final QueryAnonymizer anon = new QueryAnonymizer();
private final Stream<TestCase> testCases = testFileLoader().load();
private List<String> sqlTokens;

@Before
public void setUp() {
sqlTokens = GrammarTokenExporter.getTokens();
sqlTokens.addAll(Arrays.asList("INT", "DOUBLE", "VARCHAR", "BOOLEAN", "BIGINT"));
}

@Test
public void queriesAreAnonymizedCorrectly() throws Exception {
// Given:
StringBuilder statements = new StringBuilder();
try (Scanner s = new Scanner(new FileReader(QUERIES_TO_ANONYMIZE_PATH.toString()))) {
while (s.hasNext()) {
statements.append(s.nextLine());
}
}

// When:
final String anonStatementSelection = anon.anonymize(statements.toString());

// Assert:
Approvals.verify(anonStatementSelection);
}


@Test
public void anonQuerySplitByWordsHasOnlyTokensInSetIntersectionWithQuery() {
// Given:
final Set<String> statements = testCases
.filter(statement -> !statement.expectedException().isPresent())
.map(TestCase::statements)
.flatMap(Collection::stream)
.collect(Collectors.toSet());

// When:
for (String statement : statements) {
final String anonStatement = anon.anonymize(statement);
final Set<String> statementWord = new HashSet<>(Arrays.asList(statement.split("[\\s\\(\\)<>,;]")));
final Set<String> anonStatementWord = new HashSet<>(Arrays.asList(anonStatement.split("[\\s\\(\\)<>,;]")));
final Set<String> intersection = new HashSet<>(statementWord);
intersection.retainAll(anonStatementWord);

// Assert:
intersection.removeAll(sqlTokens);
intersection.remove("");
Assert.assertEquals(0, intersection.size());
}
}

private static JsonTestLoader<TestCase> testFileLoader() {
return JsonTestLoader.of(QUERY_VALIDATION_TEST_DIR, QttTestFile.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
CREATE TABLE table1 (column1 INT PRIMARY KEY, column2 INT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table2 (column1 INT PRIMARY KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 INT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream2 (column1 BIGINT KEY, column9 STRING) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream4 (column1 INT KEY, column14 STRING) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream5 (column15 STRING KEY, column16 BIGINT, column17 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream6 (column15 STRING KEY, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 VARCHAR) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM table1 (column1 STRING KEY, column23 DOUBLE, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column15 INT, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE stream3 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM table1 (column1 STRING KEY, column23 STRING, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table5 (column1 INT PRIMARY KEY, column37 STRING) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream11 (column15 STRING, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM table1 (column1 STRING KEY, column23 ARRAY<INT>, column24 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]');
CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream14 (column1 STRING KEY, column47 DECIMAL) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream4 (column15 STRING KEY, column1 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table1 (column1 BIGINT PRIMARY KEY, column2 VARCHAR, column3 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 BIGINT KEY, column9 VARCHAR, column58 BIGINT) WITH (TIMESTAMP='[string]', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 STRING KEY, column2 BIGINT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 BIGINT KEY, column9 STRING) WITH (WRAP_SINGLE_VALUE='false', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 WITH (KAFKA_TOPIC='[string]', FORMAT='[string]');
CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 STRING) WITH (WRAP_SINGLE_VALUE='false', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 INT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 STRING KEY, column2 STRUCT<INT, STRING>, column3 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]');
CREATE TABLE table1 (column1 VARCHAR PRIMARY KEY, column2 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream11 (column15 STRING KEY, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column83 BIGINT KEY, column9 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 INT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream14 (column15 STRING KEY, column88 ARRAY<VARCHAR>) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream30 (column15 STRING KEY, column90 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream31 (column91 INT KEY, column92 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table1 (column1 BIGINT PRIMARY KEY, column2 VARCHAR, column3 BIGINT) WITH (TIMESTAMP='[string]', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column47 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]', WINDOW_TYPE='[string]', WINDOW_SIZE='[string]');
CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 STRING) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 STRING KEY, column2 INT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column15 STRING KEY, column1 BIGINT, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 STRING KEY, column2 BIGINT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column15 INT, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column16 INT KEY, column17 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table2 (column1 INT PRIMARY KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column16 BIGINT, column17 STRUCT<INT>) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
EXPLAIN SELECT * from stream1;
DROP STREAM IF EXISTS stream6;
INSERT INTO OUTPUT SELECT column122, column123, column124 FROM stream11 JOIN stream2 WITHIN '0' SECOND ON stream11.k = stream2.k;
DROP STREAM stream6;
INSERT INTO TARGET SELECT column124, column126 FROM stream5;
INSERT INTO SINK SELECT * FROM stream5;
INSERT INTO OUTPUT SELECT * FROM stream49;
INSERT INTO OUTPUT SELECT stream11, stream11, stream2 FROM stream11 JOIN stream2 WITHIN 1 SECOND ON stream2.K = stream2.K;
DROP STREAM IF EXISTS stream6;
Loading

0 comments on commit e97a32e

Please sign in to comment.