Skip to content

Commit

Permalink
fix: changed anonymizer to work on edge cases identified
Browse files Browse the repository at this point in the history
  • Loading branch information
tolgadur committed May 11, 2021
1 parent 753ccb9 commit c781ee8
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 151 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,25 @@ public void shouldAnonymizeDefineUndefineProperty() {
anon.anonymize("UNDEFINE format;"));
}

@Test
public void shouldAnonymizeSelectStatementCorrectly() {
Assert.assertEquals("SELECT * FROM source1;",
anon.anonymize("SELECT * FROM S1;"));
}

@Test
public void shouldAnonymizeExplainStatementCorrectly() {
Assert.assertEquals("EXPLAIN query", anon.anonymize("EXPLAIN my_query;"));
Assert.assertEquals("EXPLAIN query;", anon.anonymize("EXPLAIN my_query;"));
Assert.assertEquals("EXPLAIN SELECT * FROM source1;",
anon.anonymize("EXPLAIN SELECT * from S1;"));
}

@Test
public void shouldAnonymizeJoinStatementsCorrectly() {
final String output = anon.anonymize("INSERT INTO OUTPUT SELECT col1, col2, col3"
+ " FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON col1.k=col2.k;");

Approvals.verify(output);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
CREATE STREAM stream1 AS SELECT column1, column2, column3 FROM stream2 WHERE column1='0' AND column2='[string]' AND column3='[string]';
CREATE STREAM stream1 AS SELECT column1, column2, column3 FROM source1 WHERE column1='0' AND column2='[string]' AND column3='[string]';
Original file line number Diff line number Diff line change
@@ -1 +1 @@
CREATE TABLE table1 AS SELECT column1, column2, column3 FROM table2 WHERE column1='0' AND column2='[string]' AND column3='[string]';
CREATE TABLE table1 AS SELECT column1, column2, column3 FROM source1 WHERE column1='0' AND column2='[string]' AND column3='[string]';
Original file line number Diff line number Diff line change
@@ -1 +1 @@
INSERT INTO stream1 SELECT column1, column2, column3 FROM stream2 WHERE column1='0' AND column2='[string]' AND column3='[string]';
INSERT INTO stream1 SELECT column1, column2, column3 FROM source1 WHERE column1='0' AND column2='[string]' AND column3='[string]';
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
INSERT INTO stream1 SELECT column1, column2, column3 FROM source1 INNER JOIN source2 WITHIN '0' SECOND ON anonKey1=anonKey2;
Original file line number Diff line number Diff line change
@@ -1 +1 @@
CREATE STREAM stream1 AS SELECT column1, udf1 FROM stream2;
CREATE STREAM stream1 AS SELECT column1, udf1 FROM source1;
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
/*
* Copyright 2021 Confluent Inc.
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
* http://www.confluent.io/confluent-community-license
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.test;

import io.confluent.ksql.engine.rewrite.QueryAnonymizer;
import io.confluent.ksql.test.QueryTranslationTest.QttTestFile;
import io.confluent.ksql.test.loader.JsonTestLoader;
import io.confluent.ksql.test.tools.TestCase;
Expand All @@ -16,14 +29,12 @@
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import io.confluent.ksql.util.QueryAnonymizer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class QueryAnonymizerTest {
private static final Path QUERIES_TO_ANONYMIZE_PATH =
Paths.get("src/test/java/io/confluent/ksql/test/QueriesToAnonymizeTest.txt");
Expand Down Expand Up @@ -65,7 +76,7 @@ public AnonQuerySetIntersectionTestClass(final String statement) {
@Before
public void setUp() {
sqlTokens = GrammarTokenExporter.getTokens();
sqlTokens.addAll(Arrays.asList("INT", "DOUBLE", "VARCHAR", "BOOLEAN", "BIGINT"));
sqlTokens.addAll(Arrays.asList("INT", "DOUBLE", "VARCHAR", "BOOLEAN", "BIGINT", "*"));
}

@Parameterized.Parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,53 +7,53 @@ CREATE STREAM stream4 (column1 INT KEY, column14 STRING) WITH (KAFKA_TOPIC='[str
CREATE STREAM stream5 (column15 STRING KEY, column16 BIGINT, column17 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream6 (column15 STRING KEY, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 VARCHAR) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM table1 (column1 STRING KEY, column23 DOUBLE, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 DOUBLE, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column15 INT, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE stream3 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM table1 (column1 STRING KEY, column23 STRING, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table4 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 STRING, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table5 (column1 INT PRIMARY KEY, column37 STRING) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream11 (column15 STRING, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM table1 (column1 STRING KEY, column23 ARRAY<INT>, column24 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 ARRAY<INT>, column24 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]');
CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream14 (column1 STRING KEY, column47 DECIMAL) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream4 (column15 STRING KEY, column1 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table1 (column1 BIGINT PRIMARY KEY, column2 VARCHAR, column3 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table4 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 BIGINT KEY, column9 VARCHAR, column58 BIGINT) WITH (TIMESTAMP='[string]', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 STRING KEY, column2 BIGINT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 BIGINT, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column1 BIGINT KEY, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 BIGINT KEY, column9 STRING) WITH (WRAP_SINGLE_VALUE='false', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 WITH (KAFKA_TOPIC='[string]', FORMAT='[string]');
CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 STRING) WITH (WRAP_SINGLE_VALUE='false', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 INT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 STRING KEY, column2 STRUCT<INT, STRING>, column3 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]');
CREATE TABLE table1 (column1 VARCHAR PRIMARY KEY, column2 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column16 INT KEY, column17 INT, column71 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 STRUCT<INT, STRING>, column24 INT) WITH (KAFKA_TOPIC='[string]', FORMAT='[string]');
CREATE TABLE table8 (column1 VARCHAR PRIMARY KEY, column76 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream11 (column15 STRING KEY, column19 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 INT KEY, column82 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column83 BIGINT KEY, column9 VARCHAR) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 INT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream14 (column15 STRING KEY, column88 ARRAY<VARCHAR>) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream30 (column15 STRING KEY, column90 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream31 (column91 INT KEY, column92 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table1 (column1 BIGINT PRIMARY KEY, column2 VARCHAR, column3 BIGINT) WITH (TIMESTAMP='[string]', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table9 (column1 BIGINT PRIMARY KEY, column9 VARCHAR, column58 BIGINT) WITH (TIMESTAMP='[string]', KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column47 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]', WINDOW_TYPE='[string]', WINDOW_SIZE='[string]');
CREATE TABLE table2 (column1 BIGINT PRIMARY KEY, column9 STRING) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 STRING KEY, column2 INT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 INT, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream3 (column15 STRING KEY, column1 BIGINT, column9 VARCHAR, column12 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 STRING KEY, column2 BIGINT, column3 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream7 (column1 STRING KEY, column23 BIGINT, column24 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column15 INT, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column16 INT KEY, column17 INT) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
CREATE TABLE table2 (column1 INT PRIMARY KEY, column2 BIGINT) WITH (KAFKA_TOPIC='[string]', KEY_FORMAT='[string]', VALUE_FORMAT='[string]');
CREATE STREAM stream1 (column1 INT KEY, column16 BIGINT, column17 STRUCT<INT>) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
EXPLAIN SELECT * from stream1;
EXPLAIN SELECT * FROM source1;
DROP STREAM IF EXISTS stream6;
INSERT INTO OUTPUT SELECT column122, column123, column124 FROM stream11 JOIN stream2 WITHIN '0' SECOND ON stream11.k = stream2.k;
INSERT INTO stream41 SELECT column122, column123, column124 FROM source2 INNER JOIN source3 WITHIN '0' SECOND ON anonKey1=anonKey2;
DROP STREAM stream6;
INSERT INTO TARGET SELECT column124, column126 FROM stream5;
INSERT INTO SINK SELECT * FROM stream5;
INSERT INTO OUTPUT SELECT * FROM stream49;
INSERT INTO OUTPUT SELECT stream11, stream11, stream2 FROM stream11 JOIN stream2 WITHIN 1 SECOND ON stream2.K = stream2.K;
INSERT INTO stream43 SELECT column124, column126 FROM source4;
INSERT INTO stream44 SELECT * FROM source4;
INSERT INTO stream41 SELECT * FROM source3;
INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN '0' SECOND ON anonKey1=anonKey2;
DROP STREAM IF EXISTS stream6;

This file was deleted.

0 comments on commit c781ee8

Please sign in to comment.