diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml index 342044827b4..e410ad227ab 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml @@ -52,6 +52,24 @@ limitations under the License. flink-cdc-composer ${project.version} + + org.testcontainers + jdbc + 1.18.3 + test + + + org.apache.flink + flink-test-utils-junit + ${flink.version} + test + + + org.apache.flink + flink-test-utils + ${flink.version} + test + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java new file mode 100644 index 00000000000..d029943e362 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.shade.org.apache.commons.compress.utils; + +import java.util.ArrayList; + +/** + * Dummy class of shaded apache-commons since connector 1.2.9 depends on this, but not package it. + * This package should be removed after upgrading to 1.2.10 which will not use commons-compress + * anymore. + */ +public class Lists { + public static ArrayList newArrayList() { + return new ArrayList<>(); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java new file mode 100644 index 00000000000..c294dd42338 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.starrocks.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator; +import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator; +import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; +import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer; +import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME; + +/** IT tests for {@link StarRocksMetadataApplier}. */ +public class StarRocksMetadataApplierITCase extends StarRocksSinkTestBase { + private static final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + @BeforeClass + public static void before() { + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(3000); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @Before + public void initializeDatabase() { + executeSql( + String.format( + "CREATE DATABASE IF NOT EXISTS `%s`;", + StarRocksContainer.STARROCKS_DATABASE_NAME)); + LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME); + } + + @After + public void destroyDatabase() { + executeSql(String.format("DROP DATABASE %s;", StarRocksContainer.STARROCKS_DATABASE_NAME)); + LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME); + } + + private List generateAddColumnEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("extra_date", DataTypes.DATE(), null)))), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "extra_bool", DataTypes.BOOLEAN(), null)))), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "extra_decimal", + DataTypes.DECIMAL(17, 0), + null))))); + } + + private List generateDropColumnEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new DropColumnEvent(tableId, Collections.singletonList("number"))); + } + + private List generateRenameColumnEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new RenameColumnEvent(tableId, Collections.singletonMap("number", "kazu")), + new RenameColumnEvent(tableId, Collections.singletonMap("name", "namae"))); + } + + private List generateAlterColumnTypeEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19)))); + } + + private List generateNarrowingAlterColumnTypeEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + // Double -> Float is a narrowing cast, should fail + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("number", DataTypes.FLOAT()))); + } + + @Test + public void testStarRocksDataType() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), "ID")) + // StarRocks sink doesn't support BINARY and BYTES type yet. + // .column(new PhysicalColumn("binary", DataTypes.BINARY(17), "Binary")) + // .column(new PhysicalColumn("varbinary", DataTypes.VARBINARY(17), "Var + // Binary")) + // .column(new PhysicalColumn("bytes", DataTypes.BYTES(), "Bytes")) + .column(new PhysicalColumn("boolean", DataTypes.BOOLEAN(), "Boolean")) + .column(new PhysicalColumn("int", DataTypes.INT(), "Int")) + .column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), "Tiny Int")) + .column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), "Small Int")) + .column(new PhysicalColumn("float", DataTypes.FLOAT(), "Float")) + .column(new PhysicalColumn("double", DataTypes.DOUBLE(), "Double")) + .column(new PhysicalColumn("char", DataTypes.CHAR(17), "Char")) + .column(new PhysicalColumn("varchar", DataTypes.VARCHAR(17), "Var Char")) + .column(new PhysicalColumn("string", DataTypes.STRING(), "String")) + .column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal")) + .column(new PhysicalColumn("date", DataTypes.DATE(), "Date")) + // StarRocks sink doesn't support TIME type yet. + // .column(new PhysicalColumn("time", DataTypes.TIME(), "Time")) + // .column(new PhysicalColumn("time_3", DataTypes.TIME(3), "Time With + // Precision")) + .column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp")) + .column( + new PhysicalColumn( + "timestamp_3", + DataTypes.TIMESTAMP(3), + "Timestamp With Precision")) + // StarRocks sink doesn't support TIMESTAMP with non-local TZ yet. + // .column(new PhysicalColumn("timestamptz", DataTypes.TIMESTAMP_TZ(), + // "TimestampTZ")) + // .column(new PhysicalColumn("timestamptz_3", DataTypes.TIMESTAMP_TZ(3), + // "TimestampTZ With Precision")) + .column( + new PhysicalColumn( + "timestampltz", DataTypes.TIMESTAMP_LTZ(), "TimestampLTZ")) + .column( + new PhysicalColumn( + "timestampltz_3", + DataTypes.TIMESTAMP_LTZ(3), + "TimestampLTZ With Precision")) + .primaryKey("id") + .build(); + + runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, schema))); + + List actual = inspectTableSchema(tableId); + List expected = + Arrays.asList( + "id | int | NO | true | null", + "boolean | boolean | YES | false | null", + "int | int | YES | false | null", + "tinyint | tinyint | YES | false | null", + "smallint | smallint | YES | false | null", + "float | float | YES | false | null", + "double | double | YES | false | null", + "char | char(51) | YES | false | null", + "varchar | varchar(51) | YES | false | null", + "string | varchar(1048576) | YES | false | null", + "decimal | decimal(17,7) | YES | false | null", + "date | date | YES | false | null", + "timestamp | datetime | YES | false | null", + "timestamp_3 | datetime | YES | false | null", + "timestampltz | datetime | YES | false | null", + "timestampltz_3 | datetime | YES | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + public void testStarRocksAddColumn() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + runJobWithEvents(generateAddColumnEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | int | NO | true | null", + "number | double | YES | false | null", + "name | varchar(51) | YES | false | null", + "extra_date | date | YES | false | null", + "extra_bool | boolean | YES | false | null", + "extra_decimal | decimal(17,0) | YES | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + public void testStarRocksDropColumn() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + runJobWithEvents(generateDropColumnEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | int | NO | true | null", "name | varchar(51) | YES | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + @Ignore("Rename column is not supported currently.") + public void testStarRocksRenameColumn() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + runJobWithEvents(generateRenameColumnEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | int | NO | true | null", + "kazu | double | YES | false | null", + "namae | varchar(51) | YES | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + @Ignore("Alter column type is not supported currently.") + public void testStarRocksAlterColumnType() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + runJobWithEvents(generateAlterColumnTypeEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | int | NO | true | null", + "number | double | YES | false | null", + "name | varchar(57) | YES | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test(expected = JobExecutionException.class) + @Ignore("Alter column type is not supported currently.") + public void testStarRocksNarrowingAlterColumnType() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId)); + } + + private void runJobWithEvents(List events) throws Exception { + DataStream stream = env.fromCollection(events, TypeInformation.of(Event.class)); + + Configuration config = + new Configuration() + .set(LOAD_URL, STARROCKS_CONTAINER.getLoadUrl()) + .set(JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl()) + .set(USERNAME, StarRocksContainer.STARROCKS_USERNAME) + .set(PASSWORD, StarRocksContainer.STARROCKS_PASSWORD); + + DataSink starRocksSink = createStarRocksDataSink(config); + + SchemaOperatorTranslator schemaOperatorTranslator = + new SchemaOperatorTranslator( + SchemaChangeBehavior.EVOLVE, + "$$_schema_operator_$$", + DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT); + + OperatorIDGenerator schemaOperatorIDGenerator = + new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); + + stream = + schemaOperatorTranslator.translate( + stream, + DEFAULT_PARALLELISM, + starRocksSink.getMetadataApplier(), + new ArrayList<>()); + + DataSinkTranslator sinkTranslator = new DataSinkTranslator(); + sinkTranslator.translate( + new SinkDef("starrocks", "Dummy StarRocks Sink", config), + stream, + starRocksSink, + schemaOperatorIDGenerator.generate()); + + env.execute("StarRocks Schema Evolution Test"); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java new file mode 100644 index 00000000000..43c1faaacdd --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.starrocks.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.FlinkSinkProvider; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer; +import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME; + +/** IT tests for {@link StarRocksDataSink}. */ +public class StarRocksPipelineITCase extends StarRocksSinkTestBase { + private static final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + @BeforeClass + public static void before() { + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(3000); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @Before + public void initializeDatabaseAndTable() { + executeSql( + String.format( + "CREATE DATABASE IF NOT EXISTS `%s`;", + StarRocksContainer.STARROCKS_DATABASE_NAME)); + + LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME); + + List schema = Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)"); + + executeSql( + String.format( + "CREATE TABLE `%s`.`%s` (%s) PRIMARY KEY (`%s`) DISTRIBUTED BY HASH(`%s`) BUCKETS 1 PROPERTIES (\"replication_num\" = \"1\");", + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME, + String.join(", ", schema), + "id", + "id")); + + LOG.info("Table {} created.", StarRocksContainer.STARROCKS_TABLE_NAME); + } + + @After + public void destroyDatabaseAndTable() { + + executeSql( + String.format( + "DROP TABLE %s.%s;", + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME)); + + LOG.info("Table {} destroyed.", StarRocksContainer.STARROCKS_TABLE_NAME); + + executeSql(String.format("DROP DATABASE %s;", StarRocksContainer.STARROCKS_DATABASE_NAME)); + + LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME); + } + + private List generateEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17))); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")})), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 19, 2.718, BinaryStringData.fromString("Que Sera Sera") + })), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 21, 1.732, BinaryStringData.fromString("Disenchanted") + })), + DataChangeEvent.deleteEvent( + tableId, + generator.generate( + new Object[] { + 19, 2.718, BinaryStringData.fromString("Que Sera Sera") + })), + DataChangeEvent.updateEvent( + tableId, + generator.generate( + new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")}), + generator.generate( + new Object[] { + 17, 6.28, BinaryStringData.fromString("StarRocks") + }))); + } + + @Test + public void testValuesToStarRocks() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + DataStream stream = + env.fromCollection(generateEvents(tableId), TypeInformation.of(Event.class)); + + Configuration config = + new Configuration() + .set(LOAD_URL, STARROCKS_CONTAINER.getLoadUrl()) + .set(JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl()) + .set(USERNAME, StarRocksContainer.STARROCKS_USERNAME) + .set(PASSWORD, StarRocksContainer.STARROCKS_PASSWORD); + + Sink starRocksSink = + ((FlinkSinkProvider) createStarRocksDataSink(config).getEventSinkProvider()) + .getSink(); + stream.sinkTo(starRocksSink); + + env.execute("Values to StarRocks Sink"); + + List actual = fetchTableContent(tableId, 3); + List expected = Arrays.asList("17 | 6.28 | StarRocks", "21 | 1.732 | Disenchanted"); + + assertEqualsInAnyOrder(expected, actual); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java new file mode 100644 index 00000000000..8bba7053e2f --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.starrocks.sink.utils; + +import org.junit.ClassRule; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** Docker container for StarRocks. */ +public class StarRocksContainer extends JdbcDatabaseContainer { + + private static final String DOCKER_IMAGE_NAME = "starrocks/allin1-ubuntu:3.2.6"; + + // exposed ports + public static final int FE_HTTP_SERVICE_PORT = 8080; + public static final int FE_QUERY_PORT = 9030; + + public static final String STARROCKS_DATABASE_NAME = "starrocks_database"; + public static final String STARROCKS_TABLE_NAME = "fallen_angel"; + public static final String STARROCKS_USERNAME = "root"; + public static final String STARROCKS_PASSWORD = ""; + + @ClassRule public static final Network NETWORK = Network.newNetwork(); + + public StarRocksContainer() { + super(DockerImageName.parse(DOCKER_IMAGE_NAME)); + setExposedPorts(Arrays.asList(FE_HTTP_SERVICE_PORT, FE_QUERY_PORT)); + setNetwork(NETWORK); + } + + public StarRocksContainer(Network network) { + super(DockerImageName.parse(DOCKER_IMAGE_NAME)); + setExposedPorts(Arrays.asList(FE_HTTP_SERVICE_PORT, FE_QUERY_PORT)); + setNetwork(network); + } + + public List getLoadUrl() { + return Collections.singletonList( + String.format("%s:%d", getHost(), getMappedPort(FE_HTTP_SERVICE_PORT))); + } + + public void waitForLog(String regex, int count, int timeoutSeconds) { + new LogMessageWaitStrategy() + .withRegEx(regex) + .withTimes(count) + .withStartupTimeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) + .waitUntilReady(this); + } + + @Override + public String getDriverClassName() { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + return "com.mysql.cj.jdbc.Driver"; + } catch (ClassNotFoundException e) { + return "com.mysql.jdbc.Driver"; + } + } + + @Override + public String getJdbcUrl() { + return getJdbcUrl(""); + } + + public String getJdbcUrl(String databaseName) { + String additionalUrlParams = constructUrlParameters("?", "&"); + return "jdbc:mysql://" + + getHost() + + ":" + + getMappedPort(FE_QUERY_PORT) + + "/" + + databaseName + + additionalUrlParams; + } + + @Override + public String getUsername() { + return STARROCKS_USERNAME; + } + + @Override + public String getPassword() { + return STARROCKS_PASSWORD; + } + + @Override + protected String getTestQueryString() { + return "SELECT 1"; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java new file mode 100644 index 00000000000..4980c603b7c --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.starrocks.sink.utils; + +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.factories.Factory; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSink; +import org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkFactory; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.lifecycle.Startables; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Basic class for testing {@link StarRocksDataSink}. */ +public class StarRocksSinkTestBase extends TestLogger { + protected static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkTestBase.class); + + protected static final int DEFAULT_PARALLELISM = 1; + + protected static final StarRocksContainer STARROCKS_CONTAINER = createStarRocksContainer(); + + public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; + + private static StarRocksContainer createStarRocksContainer() { + return new StarRocksContainer(); + } + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); + + @BeforeClass + public static void startContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(STARROCKS_CONTAINER)).join(); + LOG.info("Waiting for StarRocks to launch"); + + long startWaitingTimestamp = System.currentTimeMillis(); + + new LogMessageWaitStrategy() + .withRegEx(".*Enjoy the journal to StarRocks blazing-fast lake-house engine!.*\\s") + .withTimes(1) + .withStartupTimeout( + Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS)) + .waitUntilReady(STARROCKS_CONTAINER); + + while (!checkBackendAvailability()) { + try { + if (System.currentTimeMillis() - startWaitingTimestamp + > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) { + throw new RuntimeException("StarRocks backend startup timed out."); + } + LOG.info("Waiting for backends to be available"); + Thread.sleep(1000); + } catch (InterruptedException ignored) { + // ignore and check next round + } + } + LOG.info("Containers are started."); + } + + @AfterClass + public static void stopContainers() { + LOG.info("Stopping containers..."); + STARROCKS_CONTAINER.stop(); + LOG.info("Containers are stopped."); + } + + static class MockContext implements Factory.Context { + + Configuration factoryConfiguration; + + public MockContext(Configuration factoryConfiguration) { + this.factoryConfiguration = factoryConfiguration; + } + + @Override + public Configuration getFactoryConfiguration() { + return factoryConfiguration; + } + + @Override + public Configuration getPipelineConfiguration() { + return Configuration.fromMap(Collections.singletonMap("local-time-zone", "UTC")); + } + + @Override + public ClassLoader getClassLoader() { + return null; + } + } + + public static DataSink createStarRocksDataSink(Configuration factoryConfiguration) { + StarRocksDataSinkFactory factory = new StarRocksDataSinkFactory(); + return factory.createDataSink(new MockContext(factoryConfiguration)); + } + + public static void executeSql(String sql) { + try { + Container.ExecResult rs = + STARROCKS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + "-e " + sql); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to execute SQL." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to execute SQL.", e); + } + } + + public static boolean checkBackendAvailability() { + try { + Container.ExecResult rs = + STARROCKS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + "-e SHOW BACKENDS\\G"); + + if (rs.getExitCode() != 0) { + return false; + } + return rs.getStdout() + .contains("*************************** 1. row ***************************"); + } catch (Exception e) { + LOG.info("Failed to check backend status.", e); + return false; + } + } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + + public List inspectTableSchema(TableId tableId) throws SQLException { + List results = new ArrayList<>(); + ResultSet rs = + STARROCKS_CONTAINER + .createConnection("") + .createStatement() + .executeQuery( + String.format( + "DESCRIBE `%s`.`%s`", + tableId.getSchemaName(), tableId.getTableName())); + + while (rs.next()) { + List columns = new ArrayList<>(); + for (int i = 1; i <= 5; i++) { + columns.add(rs.getString(i)); + } + results.add(String.join(" | ", columns)); + } + return results; + } + + public List fetchTableContent(TableId tableId, int columnCount) throws SQLException { + List results = new ArrayList<>(); + ResultSet rs = + STARROCKS_CONTAINER + .createConnection("") + .createStatement() + .executeQuery( + String.format( + "SELECT * FROM %s.%s", + tableId.getSchemaName(), tableId.getTableName())); + + while (rs.next()) { + List columns = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + columns.add(rs.getString(i)); + } + results.add(String.join(" | ", columns)); + } + return results; + } + + public static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + } + + public static void assertMapEquals(Map expected, Map actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + for (String key : expected.keySet()) { + assertEquals(expected.get(key), actual.get(key)); + } + } +}