From 8b67daf018a14f5e1a83bdd636177f26953ab975 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 5 Jun 2024 12:04:48 +0800 Subject: [PATCH 1/5] [FLINK-35415][base] Fix compatibility with Flink 1.19 --- .../flink/translator/DataSinkTranslator.java | 45 ++++++++++++++++--- 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java index a6188d2df7..ec0bcb593c 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java @@ -30,6 +30,7 @@ import org.apache.flink.cdc.composer.definition.SinkDef; import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator; import org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory; +import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; @@ -39,9 +40,11 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; import org.apache.flink.streaming.api.transformations.PhysicalTransformation; -import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory; + +import java.lang.reflect.InvocationTargetException; /** Translator used to build {@link DataSink} for given {@link DataStream}. */ @Internal @@ -117,10 +120,8 @@ private void addCommittingTopology( DataStream inputStream, String sinkName, OperatorID schemaOperatorID) { - TwoPhaseCommittingSink committingSink = - (TwoPhaseCommittingSink) sink; TypeInformation> typeInformation = - CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer); + CommittableMessageTypeInfo.of(() -> getCommittableSerializer(sink)); DataStream> written = inputStream.transform( SINK_WRITER_PREFIX + sinkName, @@ -140,8 +141,7 @@ private void addCommittingTopology( preCommitted.transform( SINK_COMMITTER_PREFIX + sinkName, typeInformation, - new CommitterOperatorFactory<>( - committingSink, isBatchMode, isCheckpointingEnabled)); + getCommitterOperatorFactory(sink, isBatchMode, isCheckpointingEnabled)); if (sink instanceof WithPostCommitTopology) { ((WithPostCommitTopology) sink).addPostCommitTopology(committed); @@ -152,4 +152,37 @@ private String generateSinkName(SinkDef sinkDef) { return sinkDef.getName() .orElse(String.format("Flink CDC Event Sink: %s", sinkDef.getType())); } + + private static SimpleVersionedSerializer getCommittableSerializer(Object sink) { + // FIX ME: TwoPhaseCommittingSink has been deprecated, and its signature has changed + // during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported. + try { + return (SimpleVersionedSerializer) + sink.getClass().getDeclaredMethod("getCommittableSerializer").invoke(sink); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException("Failed to get CommittableSerializer", e); + } + } + + private static + OneInputStreamOperatorFactory, CommittableMessage> + getCommitterOperatorFactory( + Sink sink, boolean isBatchMode, boolean isCheckpointingEnabled) { + // FIX ME: OneInputStreamOperatorFactory is an @Internal class, and its signature has + // changed during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported. + try { + return (OneInputStreamOperatorFactory< + CommittableMessage, CommittableMessage>) + Class.forName( + "org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory") + .getDeclaredConstructors()[0] + .newInstance(sink, isBatchMode, isCheckpointingEnabled); + + } catch (ClassNotFoundException + | InstantiationException + | IllegalAccessException + | InvocationTargetException e) { + throw new RuntimeException("Failed to create CommitterOperatorFactory", e); + } + } } From 367d18e3d8a13efef957bd71c9ba3d7da9075ae6 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 5 Jun 2024 12:08:07 +0800 Subject: [PATCH 2/5] [FLINK-35415][base] Bump Flink patch version to 1.18.1 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 0e322627c9..3ac889259d 100644 --- a/pom.xml +++ b/pom.xml @@ -73,7 +73,7 @@ limitations under the License. true - 1.18.0 + 1.18.1 1.18 17.0 1.9.8.Final From 6a21d202a8fab477e50ef868bcf8c62086c9c95b Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 5 Jun 2024 12:09:08 +0800 Subject: [PATCH 3/5] [FLINK-35316][base] Run E2e test cases with Flink 1.19 and latest patch versions --- .../flink-cdc-pipeline-e2e-tests/pom.xml | 7 ++- .../cdc/pipeline/tests/MysqlE2eITCase.java | 24 ++++---- .../pipeline/tests/TransformE2eITCase.java | 10 ++-- .../tests/utils/PipelineTestEnvironment.java | 55 +++++++++++++----- .../flink-cdc-source-e2e-tests/pom.xml | 38 +++++-------- .../utils/FlinkContainerTestEnvironment.java | 56 ++++++++++++++----- 6 files changed, 120 insertions(+), 70 deletions(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml index e785ba0a5a..2326240b6d 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml @@ -28,8 +28,9 @@ limitations under the License. flink-cdc-pipeline-e2e-tests - 1.17.1 - 1.18.0 + 1.17.2 + 1.18.1 + 1.19.0 8.0.27 1.2.9_flink-${flink.major.version} @@ -88,12 +89,14 @@ limitations under the License. org.apache.flink flink-cdc-pipeline-connector-doris ${project.version} + test-jar test org.apache.flink flink-cdc-pipeline-connector-starrocks ${project.version} + test-jar test diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java index 3307175864..28db063ef8 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -114,12 +114,12 @@ public void testSyncWholeDatabase() throws Exception { submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); waitUntilJobRunning(Duration.ofSeconds(30)); LOG.info("Pipeline job is running"); - waitUtilSpecificEvent( + waitUntilSpecificEvent( String.format( "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}", mysqlInventoryDatabase.getDatabaseName()), 60000L); - waitUtilSpecificEvent( + waitUntilSpecificEvent( String.format( "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", mysqlInventoryDatabase.getDatabaseName()), @@ -186,6 +186,14 @@ public void testSyncWholeDatabase() throws Exception { Statement stat = conn.createStatement()) { stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + + // Perform DDL changes after the binlog is generated + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + 20000L); + // modify table schema stat.execute("ALTER TABLE products ADD COLUMN new_col INT;"); stat.execute( @@ -201,7 +209,7 @@ public void testSyncWholeDatabase() throws Exception { throw e; } - waitUtilSpecificEvent( + waitUntilSpecificEvent( String.format( "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}", mysqlInventoryDatabase.getDatabaseName()), @@ -236,17 +244,13 @@ public void testSyncWholeDatabase() throws Exception { validateResult(expectedEvents); } - private void validateResult(List expectedEvents) { - String stdout = taskManagerConsumer.toUtf8String(); + private void validateResult(List expectedEvents) throws Exception { for (String event : expectedEvents) { - if (!stdout.contains(event)) { - throw new RuntimeException( - "failed to get specific event: " + event + " from stdout: " + stdout); - } + waitUntilSpecificEvent(event, 6000L); } } - private void waitUtilSpecificEvent(String event, long timeout) throws Exception { + private void waitUntilSpecificEvent(String event, long timeout) throws Exception { boolean result = false; long endTimeout = System.currentTimeMillis() + timeout; while (System.currentTimeMillis() < endTimeout) { diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index cd8fc0b45f..2f896d3344 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -132,13 +132,13 @@ public void testHeteroSchemaTransform() throws Exception { String.format( "DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}", transformRenameDatabase.getDatabaseName()), - 6000L); + 60000L); waitUntilSpecificEvent( String.format( "DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}", transformRenameDatabase.getDatabaseName()), - 6000L); + 60000L); List expectedEvents = Arrays.asList( @@ -188,19 +188,19 @@ public void testHeteroSchemaTransform() throws Exception { String.format( "DataChangeEvent{tableId=%s.terminus, before=[], after=[3007, 7], op=INSERT, meta=()}", transformRenameDatabase.getDatabaseName()), - 6000L); + 20000L); waitUntilSpecificEvent( String.format( "DataChangeEvent{tableId=%s.terminus, before=[1009, 8.1], after=[1009, 100], op=UPDATE, meta=()}", transformRenameDatabase.getDatabaseName()), - 6000L); + 20000L); waitUntilSpecificEvent( String.format( "DataChangeEvent{tableId=%s.terminus, before=[2011, 11], after=[], op=DELETE, meta=()}", transformRenameDatabase.getDatabaseName()), - 6000L); + 20000L); String stdout = taskManagerConsumer.toUtf8String(); System.out.println(stdout); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java index a448bf554c..65c0a202e5 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java @@ -28,6 +28,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.util.TestLogger; +import com.fasterxml.jackson.core.Version; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -54,6 +55,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkState; @@ -71,17 +73,6 @@ public abstract class PipelineTestEnvironment extends TestLogger { public static final int JOB_MANAGER_REST_PORT = 8081; public static final String INTER_CONTAINER_JM_ALIAS = "jobmanager"; public static final String INTER_CONTAINER_TM_ALIAS = "taskmanager"; - public static final String FLINK_PROPERTIES = - String.join( - "\n", - Arrays.asList( - "jobmanager.rpc.address: jobmanager", - "taskmanager.numberOfTaskSlots: 10", - "parallelism.default: 4", - "execution.checkpointing.interval: 300", - // this is needed for oracle-cdc tests. - // see https://stackoverflow.com/a/47062742/4915129 - "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false")); @ClassRule public static final Network NETWORK = Network.newNetwork(); @@ -97,13 +88,16 @@ public abstract class PipelineTestEnvironment extends TestLogger { @Parameterized.Parameters(name = "flinkVersion: {0}") public static List getFlinkVersion() { - return Arrays.asList("1.17.1", "1.18.0"); + return Arrays.asList("1.17.2", "1.18.1", "1.19.0"); } @Before public void before() throws Exception { LOG.info("Starting containers..."); jobManagerConsumer = new ToStringConsumer(); + + String flinkProperties = getFlinkProperties(flinkVersion); + jobManager = new GenericContainer<>(getFlinkDockerImageTag()) .withCommand("jobmanager") @@ -111,7 +105,7 @@ public void before() throws Exception { .withExtraHost("host.docker.internal", "host-gateway") .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) .withExposedPorts(JOB_MANAGER_REST_PORT) - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withEnv("FLINK_PROPERTIES", flinkProperties) .withLogConsumer(jobManagerConsumer); taskManagerConsumer = new ToStringConsumer(); taskManager = @@ -120,7 +114,7 @@ public void before() throws Exception { .withExtraHost("host.docker.internal", "host-gateway") .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withEnv("FLINK_PROPERTIES", flinkProperties) .dependsOn(jobManager) .withLogConsumer(taskManagerConsumer); @@ -246,4 +240,37 @@ public void waitUntilJobRunning(Duration timeout) { protected String getFlinkDockerImageTag() { return String.format("flink:%s-scala_2.12", flinkVersion); } + + private static Version parseVersion(String version) { + List versionParts = + Arrays.stream(version.split("\\.")) + .map(Integer::valueOf) + .limit(3) + .collect(Collectors.toList()); + return new Version( + versionParts.get(0), versionParts.get(1), versionParts.get(2), null, null, null); + } + + private static String getFlinkProperties(String flinkVersion) { + // this is needed for oracle-cdc tests. + // see https://stackoverflow.com/a/47062742/4915129 + String javaOptsConfig; + Version version = parseVersion(flinkVersion); + if (version.compareTo(parseVersion("1.17.0")) >= 0) { + // Flink 1.17 renames `env.java.opts` to `env.java.opts.all` + javaOptsConfig = "env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false"; + } else { + // Legacy Flink version, might drop their support in near future + javaOptsConfig = "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"; + } + + return String.join( + "\n", + Arrays.asList( + "jobmanager.rpc.address: jobmanager", + "taskmanager.numberOfTaskSlots: 10", + "parallelism.default: 4", + "execution.checkpointing.interval: 300", + javaOptsConfig)); + } } diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml index adb3e0f823..6f604f3461 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml @@ -28,13 +28,13 @@ limitations under the License. flink-cdc-source-e2e-tests - 1.14.6 - 1.15.4 - 1.16.2 - 1.17.1 - 1.18.0 + 1.16.3 + 1.17.2 + 1.18.1 + 1.19.0 3.1.1-1.17 - 3.1.1-1.17 + 3.1.2-1.18 + 3.1.2-1.18 8.0.27 42.5.1 @@ -237,21 +237,11 @@ limitations under the License. - - org.apache.flink - flink-connector-jdbc_2.11 - ${flink-1.14} - jdbc-connector_${flink-1.14}.jar - jar - ${project.build.directory}/dependencies - - - org.apache.flink flink-connector-jdbc - ${flink-1.15} - jdbc-connector_${flink-1.15}.jar + ${flink-1.16} + jdbc-connector_${flink-1.16}.jar jar ${project.build.directory}/dependencies @@ -260,8 +250,8 @@ limitations under the License. org.apache.flink flink-connector-jdbc - ${flink-1.16} - jdbc-connector_${flink-1.16}.jar + ${jdbc.version-1.17} + jdbc-connector_${flink-1.17}.jar jar ${project.build.directory}/dependencies @@ -270,8 +260,8 @@ limitations under the License. org.apache.flink flink-connector-jdbc - ${jdbc.version-1.17} - jdbc-connector_${flink-1.17}.jar + ${jdbc.version-1.18} + jdbc-connector_${flink-1.18}.jar jar ${project.build.directory}/dependencies @@ -280,8 +270,8 @@ limitations under the License. org.apache.flink flink-connector-jdbc - ${jdbc.version-1.18} - jdbc-connector_${flink-1.18}.jar + ${jdbc.version-1.19} + jdbc-connector_${flink-1.19}.jar jar ${project.build.directory}/dependencies diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java index 650e205b8b..6175aec2c2 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java @@ -31,6 +31,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.util.TestLogger; +import com.fasterxml.jackson.core.Version; import com.github.dockerjava.api.DockerClient; import org.junit.After; import org.junit.AfterClass; @@ -62,6 +63,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkState; @@ -80,17 +82,6 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger { private static final String FLINK_BIN = "bin"; private static final String INTER_CONTAINER_JM_ALIAS = "jobmanager"; private static final String INTER_CONTAINER_TM_ALIAS = "taskmanager"; - private static final String FLINK_PROPERTIES = - String.join( - "\n", - Arrays.asList( - "jobmanager.rpc.address: jobmanager", - "taskmanager.numberOfTaskSlots: 10", - "parallelism.default: 4", - "execution.checkpointing.interval: 10000", - // this is needed for oracle-cdc tests. - // see https://stackoverflow.com/a/47062742/4915129 - "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false")); // ------------------------------------------------------------------------------------------ // MySQL Variables (we always use MySQL as the sink for easier verifying) @@ -129,17 +120,19 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger { @Parameterized.Parameters(name = "flinkVersion: {0}") public static List getFlinkVersion() { - return Arrays.asList("1.14.6", "1.15.4", "1.16.2", "1.17.1", "1.18.0"); + return Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.0"); } private static final List FLINK_VERSION_WITH_SCALA_212 = - Arrays.asList("1.15.4", "1.16.2", "1.17.1", "1.18.0"); + Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.0"); @Before public void before() { mysqlInventoryDatabase.createAndInitialize(); jdbcJar = TestUtils.getResource(getJdbcConnectorResourceName()); + String flinkProperties = getFlinkProperties(flinkVersion); + LOG.info("Starting containers..."); jobManager = new GenericContainer<>(getFlinkDockerImageTag()) @@ -148,7 +141,7 @@ public void before() { .withExtraHost("host.docker.internal", "host-gateway") .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) .withExposedPorts(JOB_MANAGER_REST_PORT) - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withEnv("FLINK_PROPERTIES", flinkProperties) .withLogConsumer(new Slf4jLogConsumer(LOG)); taskManager = new GenericContainer<>(getFlinkDockerImageTag()) @@ -156,7 +149,7 @@ public void before() { .withExtraHost("host.docker.internal", "host-gateway") .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withEnv("FLINK_PROPERTIES", flinkProperties) .dependsOn(jobManager) .withLogConsumer(new Slf4jLogConsumer(LOG)); @@ -325,4 +318,37 @@ private String getFlinkDockerImageTag() { protected String getJdbcConnectorResourceName() { return String.format("jdbc-connector_%s.jar", flinkVersion); } + + private static Version parseVersion(String version) { + List versionParts = + Arrays.stream(version.split("\\.")) + .map(Integer::valueOf) + .limit(3) + .collect(Collectors.toList()); + return new Version( + versionParts.get(0), versionParts.get(1), versionParts.get(2), null, null, null); + } + + private static String getFlinkProperties(String flinkVersion) { + // this is needed for oracle-cdc tests. + // see https://stackoverflow.com/a/47062742/4915129 + String javaOptsConfig; + Version version = parseVersion(flinkVersion); + if (version.compareTo(parseVersion("1.17.0")) >= 0) { + // Flink 1.17 renames `env.java.opts` to `env.java.opts.all` + javaOptsConfig = "env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false"; + } else { + // Legacy Flink version, might drop their support in near future + javaOptsConfig = "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"; + } + + return String.join( + "\n", + Arrays.asList( + "jobmanager.rpc.address: jobmanager", + "taskmanager.numberOfTaskSlots: 10", + "parallelism.default: 4", + "execution.checkpointing.interval: 300", + javaOptsConfig)); + } } From 0913de89f707fd3567427699c122a867d956cf41 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 5 Jun 2024 12:09:40 +0800 Subject: [PATCH 4/5] [FLINK-35120][doris] Add Doris integration test cases --- .../pom.xml | 43 ++ .../sink/DorisMetadataApplierITCase.java | 441 ++++++++++++++++++ .../doris/sink/DorisPipelineITCase.java | 210 +++++++++ .../doris/sink/utils/DorisContainer.java | 146 ++++++ .../doris/sink/utils/DorisSinkTestBase.java | 319 +++++++++++++ .../src/test/resources/log4j2-test.properties | 25 + .../pipeline/tests/MySqlToDorisE2eITCase.java | 384 +++++++++++++++ 7 files changed, 1568 insertions(+) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisContainer.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisSinkTestBase.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/resources/log4j2-test.properties create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml index 54ae076fc3..ed166767cd 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml @@ -46,6 +46,49 @@ limitations under the License. flink-doris-connector-${flink.major.version} 1.6.0 + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + junit + junit + + + + + org.apache.flink + flink-test-utils-junit + ${flink.version} + test + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + org.slf4j + slf4j-api + ${slf4j.version} + test + + + org.testcontainers + jdbc + 1.18.3 + test + + + mysql + mysql-connector-java + 8.0.26 + test + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java new file mode 100644 index 0000000000..0d3c0d99ce --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator; +import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator; +import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME; + +/** IT tests for {@link DorisMetadataApplier}. */ +@RunWith(Parameterized.class) +public class DorisMetadataApplierITCase extends DorisSinkTestBase { + private static final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5; + + private final boolean batchMode; + + public DorisMetadataApplierITCase(boolean batchMode) { + this.batchMode = batchMode; + } + + @Parameters(name = "batchMode: {0}") + public static Iterable data() { + return Arrays.asList(true, false); + } + + @BeforeClass + public static void before() { + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(3000); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @Before + public void initializeDatabase() { + createDatabase(DorisContainer.DORIS_DATABASE_NAME); + + // waiting for table to be created + DORIS_CONTAINER.waitForLog( + String.format(".*createDb dbName = %s,.*\\s", DorisContainer.DORIS_DATABASE_NAME), + 1, + DATABASE_OPERATION_TIMEOUT_SECONDS); + + LOG.info("Database {} created.", DorisContainer.DORIS_DATABASE_NAME); + } + + @After + public void destroyDatabase() { + dropDatabase(DorisContainer.DORIS_DATABASE_NAME); + // waiting for database to be created + DORIS_CONTAINER.waitForLog( + String.format( + ".*finish drop database\\[%s\\].*\\s", DorisContainer.DORIS_DATABASE_NAME), + 1, + DATABASE_OPERATION_TIMEOUT_SECONDS); + + LOG.info("Database {} destroyed.", DorisContainer.DORIS_DATABASE_NAME); + } + + private List generateAddColumnEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("extra_date", DataTypes.DATE(), null)))), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "extra_bool", DataTypes.BOOLEAN(), null)))), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "extra_decimal", + DataTypes.DECIMAL(17, 0), + null))))); + } + + private List generateDropColumnEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new DropColumnEvent(tableId, Collections.singletonList("number"))); + } + + private List generateRenameColumnEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new RenameColumnEvent(tableId, Collections.singletonMap("number", "kazu")), + new RenameColumnEvent(tableId, Collections.singletonMap("name", "namae"))); + } + + private List generateAlterColumnTypeEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19)))); + } + + private List generateNarrowingAlterColumnTypeEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + // Double -> Float is a narrowing cast, should fail + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("number", DataTypes.FLOAT()))); + } + + @Test + public void testDorisDataTypes() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), "ID")) + // Doris sink doesn't support BINARY type yet. + // .column(new PhysicalColumn("binary", DataTypes.BINARY(17), "Binary")) + // .column(new PhysicalColumn("varbinary", DataTypes.VARBINARY(17), "Var + // Binary")) + .column(new PhysicalColumn("bytes", DataTypes.BYTES(), "Bytes")) + .column(new PhysicalColumn("boolean", DataTypes.BOOLEAN(), "Boolean")) + .column(new PhysicalColumn("int", DataTypes.INT(), "Int")) + .column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), "Tiny Int")) + .column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), "Small Int")) + .column(new PhysicalColumn("float", DataTypes.FLOAT(), "Float")) + .column(new PhysicalColumn("double", DataTypes.DOUBLE(), "Double")) + .column(new PhysicalColumn("char", DataTypes.CHAR(17), "Char")) + .column(new PhysicalColumn("varchar", DataTypes.VARCHAR(17), "Var Char")) + .column(new PhysicalColumn("string", DataTypes.STRING(), "String")) + .column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal")) + .column(new PhysicalColumn("date", DataTypes.DATE(), "Date")) + // Doris sink doesn't support TIME type yet. + // .column(new PhysicalColumn("time", DataTypes.TIME(), "Time")) + // .column(new PhysicalColumn("time_3", DataTypes.TIME(3), "Time With + // Precision")) + .column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp")) + .column( + new PhysicalColumn( + "timestamp_3", + DataTypes.TIMESTAMP(3), + "Timestamp With Precision")) + .column( + new PhysicalColumn( + "timestamptz", DataTypes.TIMESTAMP_TZ(), "TimestampTZ")) + .column( + new PhysicalColumn( + "timestamptz_3", + DataTypes.TIMESTAMP_TZ(3), + "TimestampTZ With Precision")) + .column( + new PhysicalColumn( + "timestampltz", DataTypes.TIMESTAMP_LTZ(), "TimestampLTZ")) + .column( + new PhysicalColumn( + "timestampltz_3", + DataTypes.TIMESTAMP_LTZ(3), + "TimestampLTZ With Precision")) + .column( + new PhysicalColumn( + "arrayofint", + DataTypes.ARRAY(DataTypes.INT()), + "Array of Int")) + .column( + new PhysicalColumn( + "arrayofstr", + DataTypes.ARRAY(DataTypes.STRING()), + "Array of String")) + .column( + new PhysicalColumn( + "mapint2str", + DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), + "Map Int to String")) + .primaryKey("id") + .build(); + + runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, schema))); + + List actual = inspectTableSchema(tableId); + List expected = + Arrays.asList( + "id | INT | Yes | true | null", + "bytes | TEXT | Yes | false | null", + "boolean | BOOLEAN | Yes | false | null", + "int | INT | Yes | false | null", + "tinyint | TINYINT | Yes | false | null", + "smallint | SMALLINT | Yes | false | null", + "float | FLOAT | Yes | false | null", + "double | DOUBLE | Yes | false | null", + "char | CHAR(51) | Yes | false | null", + "varchar | VARCHAR(51) | Yes | false | null", + "string | TEXT | Yes | false | null", + "decimal | DECIMAL(17, 7) | Yes | false | null", + "date | DATE | Yes | false | null", + "timestamp | DATETIME(6) | Yes | false | null", + "timestamp_3 | DATETIME(3) | Yes | false | null", + "timestamptz | DATETIME(6) | Yes | false | null", + "timestamptz_3 | DATETIME(3) | Yes | false | null", + "timestampltz | DATETIME(6) | Yes | false | null", + "timestampltz_3 | DATETIME(3) | Yes | false | null", + "arrayofint | TEXT | Yes | false | null", + "arrayofstr | TEXT | Yes | false | null", + "mapint2str | TEXT | Yes | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + public void testDorisAddColumn() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + runJobWithEvents(generateAddColumnEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | INT | Yes | true | null", + "number | DOUBLE | Yes | false | null", + "name | VARCHAR(51) | Yes | false | null", + "extra_date | DATE | Yes | false | null", + "extra_bool | BOOLEAN | Yes | false | null", + "extra_decimal | DECIMAL(17, 0) | Yes | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + public void testDorisDropColumn() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + runJobWithEvents(generateDropColumnEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | INT | Yes | true | null", "name | VARCHAR(51) | Yes | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + public void testDorisRenameColumn() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + runJobWithEvents(generateRenameColumnEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | INT | Yes | true | null", + "kazu | DOUBLE | Yes | false | null", + "namae | VARCHAR(51) | Yes | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + @Ignore("AlterColumnType is yet to be supported until we close FLINK-35072.") + public void testDorisAlterColumnType() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + runJobWithEvents(generateAlterColumnTypeEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | INT | Yes | true | null", + "number | DOUBLE | Yes | false | null", + "name | VARCHAR(57) | Yes | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test(expected = JobExecutionException.class) + public void testDorisNarrowingAlterColumnType() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId)); + } + + private void runJobWithEvents(List events) throws Exception { + DataStream stream = env.fromCollection(events, TypeInformation.of(Event.class)); + + Configuration config = + new Configuration() + .set(FENODES, DORIS_CONTAINER.getFeNodes()) + .set(BENODES, DORIS_CONTAINER.getBeNodes()) + .set(USERNAME, DorisContainer.DORIS_USERNAME) + .set(PASSWORD, DorisContainer.DORIS_PASSWORD) + .set(SINK_ENABLE_BATCH_MODE, batchMode) + .set(SINK_ENABLE_DELETE, true); + + config.addAll( + Configuration.fromMap( + Collections.singletonMap("table.create.properties.replication_num", "1"))); + + DataSink dorisSink = createDorisDataSink(config); + + SchemaOperatorTranslator schemaOperatorTranslator = + new SchemaOperatorTranslator( + SchemaChangeBehavior.EVOLVE, + "$$_schema_operator_$$", + DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT); + + OperatorIDGenerator schemaOperatorIDGenerator = + new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); + + stream = + schemaOperatorTranslator.translate( + stream, + DEFAULT_PARALLELISM, + dorisSink.getMetadataApplier(), + new ArrayList<>()); + + DataSinkTranslator sinkTranslator = new DataSinkTranslator(); + sinkTranslator.translate( + new SinkDef("doris", "Dummy Doris Sink", config), + stream, + dorisSink, + schemaOperatorIDGenerator.generate()); + + env.execute("Doris Schema Evolution Test"); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java new file mode 100644 index 0000000000..bb209a7935 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.FlinkSinkProvider; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME; + +/** IT tests for {@link DorisDataSink}. */ +public class DorisPipelineITCase extends DorisSinkTestBase { + + private static final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5; + + @BeforeClass + public static void before() { + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(3000); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @Before + public void initializeDatabaseAndTable() { + createDatabase(DorisContainer.DORIS_DATABASE_NAME); + + // waiting for table to be created + DORIS_CONTAINER.waitForLog( + String.format(".*createDb dbName = %s,.*\\s", DorisContainer.DORIS_DATABASE_NAME), + 1, + DATABASE_OPERATION_TIMEOUT_SECONDS); + + LOG.info("Database {} created.", DorisContainer.DORIS_DATABASE_NAME); + + createTable( + DorisContainer.DORIS_DATABASE_NAME, + DorisContainer.DORIS_TABLE_NAME, + "id", + Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)")); + + // waiting for table to be created + DORIS_CONTAINER.waitForLog( + String.format( + ".*successfully create table\\[%s;.*\\s", DorisContainer.DORIS_TABLE_NAME), + 1, + DATABASE_OPERATION_TIMEOUT_SECONDS); + + LOG.info("Table {} created.", DorisContainer.DORIS_TABLE_NAME); + } + + @After + public void destroyDatabaseAndTable() { + dropTable(DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + // waiting for table to be dropped + DORIS_CONTAINER.waitForLog( + String.format( + ".*finished dropping table: %s.*\\s", DorisContainer.DORIS_TABLE_NAME), + 1, + DATABASE_OPERATION_TIMEOUT_SECONDS); + + LOG.info("Table {} destroyed.", DorisContainer.DORIS_TABLE_NAME); + + dropDatabase(DorisContainer.DORIS_DATABASE_NAME); + // waiting for database to be created + DORIS_CONTAINER.waitForLog( + String.format( + ".*finish drop database\\[%s\\].*\\s", DorisContainer.DORIS_DATABASE_NAME), + 1, + DATABASE_OPERATION_TIMEOUT_SECONDS); + + LOG.info("Database {} destroyed.", DorisContainer.DORIS_DATABASE_NAME); + } + + @Test + public void testDorisSinkStreamJob() throws Exception { + runValuesToDorisJob(false); + } + + @Test + public void testDorisSinkBatchJob() throws Exception { + runValuesToDorisJob(true); + } + + private List generateEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17))); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")})), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 19, 2.718, BinaryStringData.fromString("Que Sera Sera") + })), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 21, 1.732, BinaryStringData.fromString("Disenchanted") + })), + DataChangeEvent.updateEvent( + tableId, + generator.generate( + new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")}), + generator.generate( + new Object[] {17, 6.28, BinaryStringData.fromString("Doris Day")})), + DataChangeEvent.deleteEvent( + tableId, + generator.generate( + new Object[] { + 19, 2.718, BinaryStringData.fromString("Que Sera Sera") + }))); + } + + private void runValuesToDorisJob(boolean batchMode) throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + DataStream stream = + env.fromCollection(generateEvents(tableId), TypeInformation.of(Event.class)); + + Configuration config = + new Configuration() + .set(FENODES, DORIS_CONTAINER.getFeNodes()) + .set(BENODES, DORIS_CONTAINER.getBeNodes()) + .set(USERNAME, DorisContainer.DORIS_USERNAME) + .set(PASSWORD, DorisContainer.DORIS_PASSWORD) + .set(SINK_ENABLE_BATCH_MODE, batchMode) + .set(SINK_ENABLE_DELETE, true); + + config.addAll( + Configuration.fromMap( + Collections.singletonMap("table.create.properties.replication_num", "1"))); + + Sink dorisSink = + ((FlinkSinkProvider) createDorisDataSink(config).getEventSinkProvider()).getSink(); + + stream.sinkTo(dorisSink); + + env.execute("Values to Doris Sink"); + + List actual = fetchTableContent(tableId, 3); + + List expected = Arrays.asList("17 | 6.28 | Doris Day", "21 | 1.732 | Disenchanted"); + + assertEqualsInAnyOrder(expected, actual); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisContainer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisContainer.java new file mode 100644 index 0000000000..aa63c336b5 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisContainer.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink.utils; + +import org.junit.ClassRule; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; + +/** Docker container for Doris. */ +public class DorisContainer extends JdbcDatabaseContainer { + + private static final String DOCKER_IMAGE_NAME = "apache/doris:doris-all-in-one-2.1.0"; + + public static final int FE_INNER_PORT = 8030; + public static final int BE_INNER_PORT = 8040; + public static final int DB_INNER_PORT = 9030; + + @ClassRule public static final Network NETWORK = Network.newNetwork(); + + public String getFeNodes() { + return String.format("%s:%d", getHost(), getMappedPort(FE_INNER_PORT)); + } + + public String getBeNodes() { + return String.format("%s:%d", getHost(), getMappedPort(BE_INNER_PORT)); + } + + public String getTableIdentifier() { + return String.format("%s.%s", DORIS_DATABASE_NAME, DORIS_TABLE_NAME); + } + + public static final String DORIS_DATABASE_NAME = "doris_database"; + public static final String DORIS_TABLE_NAME = "fallen_angel"; + public static final String DORIS_USERNAME = "root"; + public static final String DORIS_PASSWORD = ""; + + public DorisContainer() { + super(DockerImageName.parse(DOCKER_IMAGE_NAME)); + setExposedPorts(Arrays.asList(FE_INNER_PORT, BE_INNER_PORT, DB_INNER_PORT)); + setNetwork(NETWORK); + } + + public DorisContainer(Network network) { + super(DockerImageName.parse(DOCKER_IMAGE_NAME)); + setExposedPorts(Arrays.asList(FE_INNER_PORT, BE_INNER_PORT, DB_INNER_PORT)); + setNetwork(network); + } + + public void waitForLog(String regex, int count, int timeoutSeconds) { + new LogMessageWaitStrategy() + .withRegEx(regex) + .withTimes(count) + .withStartupTimeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) + .waitUntilReady(this); + } + + @Override + public String getDriverClassName() { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + return "com.mysql.cj.jdbc.Driver"; + } catch (ClassNotFoundException e) { + return "com.mysql.jdbc.Driver"; + } + } + + @Override + public String getJdbcUrl() { + return getJdbcUrl(""); + } + + public String getJdbcUrl(String databaseName) { + String additionalUrlParams = constructUrlParameters("?", "&"); + return "jdbc:mysql://" + + getHost() + + ":" + + getMappedPort(DB_INNER_PORT) + + "/" + + databaseName + + additionalUrlParams; + } + + public String getJdbcUrl(String databaseName, String username) { + String additionalUrlParams = constructUrlParameters("?", "&"); + return "jdbc:mysql://" + + username + + "@" + + getHost() + + ":" + + getMappedPort(DB_INNER_PORT) + + "/" + + databaseName + + additionalUrlParams; + } + + public String getJdbcUrl(String databaseName, String username, String password) { + String additionalUrlParams = constructUrlParameters("?", "&"); + return "jdbc:mysql://" + + username + + ":" + + password + + "@" + + getHost() + + ":" + + getMappedPort(DB_INNER_PORT) + + "/" + + databaseName + + additionalUrlParams; + } + + @Override + public String getUsername() { + return DORIS_USERNAME; + } + + @Override + public String getPassword() { + return DORIS_PASSWORD; + } + + @Override + protected String getTestQueryString() { + return "SELECT 1"; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisSinkTestBase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisSinkTestBase.java new file mode 100644 index 0000000000..bad46a8df5 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisSinkTestBase.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink.utils; + +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.factories.Factory; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.connectors.doris.factory.DorisDataSinkFactory; +import org.apache.flink.cdc.connectors.doris.sink.DorisDataSink; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.lifecycle.Startables; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Basic class for testing {@link DorisDataSink}. */ +public class DorisSinkTestBase extends TestLogger { + + protected static final Logger LOG = LoggerFactory.getLogger(DorisSinkTestBase.class); + + protected static final int DEFAULT_PARALLELISM = 1; + protected static final DorisContainer DORIS_CONTAINER = createDorisContainer(); + + public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; + + private static DorisContainer createDorisContainer() { + return new DorisContainer(); + } + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); + + @BeforeClass + public static void startContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(DORIS_CONTAINER)).join(); + LOG.info("Waiting for backends to be available"); + long startWaitingTimestamp = System.currentTimeMillis(); + + new LogMessageWaitStrategy() + .withRegEx(".*get heartbeat from FE.*\\s") + .withTimes(1) + .withStartupTimeout( + Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS)) + .waitUntilReady(DORIS_CONTAINER); + + while (!checkBackendAvailability()) { + try { + if (System.currentTimeMillis() - startWaitingTimestamp + > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) { + throw new RuntimeException("Doris backend startup timed out."); + } + LOG.info("Waiting for backends to be available"); + Thread.sleep(1000); + } catch (InterruptedException ignored) { + // ignore and check next round + } + } + + LOG.info("Containers are started."); + } + + @AfterClass + public static void stopContainers() { + LOG.info("Stopping containers..."); + DORIS_CONTAINER.stop(); + LOG.info("Containers are stopped."); + } + + static class MockContext implements Factory.Context { + + Configuration factoryConfiguration; + + public MockContext(Configuration factoryConfiguration) { + this.factoryConfiguration = factoryConfiguration; + } + + @Override + public Configuration getFactoryConfiguration() { + return factoryConfiguration; + } + + @Override + public Configuration getPipelineConfiguration() { + return Configuration.fromMap(Collections.singletonMap("local-time-zone", "UTC")); + } + + @Override + public ClassLoader getClassLoader() { + return this.getClassLoader(); + } + } + + public static DataSink createDorisDataSink(Configuration factoryConfiguration) { + DorisDataSinkFactory factory = new DorisDataSinkFactory(); + return factory.createDataSink(new MockContext(factoryConfiguration)); + } + + public static boolean checkBackendAvailability() { + try { + Container.ExecResult rs = + DORIS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + "-e SHOW BACKENDS\\G"); + + if (rs.getExitCode() != 0) { + return false; + } + String output = rs.getStdout(); + LOG.info("Doris backend status:\n{}", output); + return output.contains("*************************** 1. row ***************************") + && !output.contains("AvailCapacity: 1.000 B"); + } catch (Exception e) { + LOG.info("Failed to check backend status.", e); + return false; + } + } + + public static void createDatabase(String databaseName) { + try { + Container.ExecResult rs = + DORIS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + String.format("-e CREATE DATABASE IF NOT EXISTS `%s`;", databaseName)); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to create database." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to create database.", e); + } + } + + public static void createTable( + String databaseName, String tableName, String primaryKey, List schema) { + try { + Container.ExecResult rs = + DORIS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + String.format( + "-e CREATE TABLE `%s`.`%s` (%s) UNIQUE KEY (`%s`) DISTRIBUTED BY HASH(`%s`) BUCKETS 1 PROPERTIES (\"replication_num\" = \"1\");", + databaseName, + tableName, + String.join(", ", schema), + primaryKey, + primaryKey)); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to create table." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to create table.", e); + } + } + + public static void dropDatabase(String databaseName) { + try { + Container.ExecResult rs = + DORIS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + String.format("-e DROP DATABASE IF EXISTS %s;", databaseName)); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to drop database." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to drop database.", e); + } + } + + public static void dropTable(String databaseName, String tableName) { + try { + Container.ExecResult rs = + DORIS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + String.format( + "-e DROP TABLE IF EXISTS %s.%s;", databaseName, tableName)); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to drop table." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to drop table.", e); + } + } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + + public List inspectTableSchema(TableId tableId) throws SQLException { + List results = new ArrayList<>(); + ResultSet rs = + DORIS_CONTAINER + .createConnection("") + .createStatement() + .executeQuery( + String.format( + "DESCRIBE `%s`.`%s`", + tableId.getSchemaName(), tableId.getTableName())); + + while (rs.next()) { + List columns = new ArrayList<>(); + for (int i = 1; i <= 5; i++) { + columns.add(rs.getString(i)); + } + results.add(String.join(" | ", columns)); + } + return results; + } + + public List fetchTableContent(TableId tableId, int columnCount) throws SQLException { + List results = new ArrayList<>(); + ResultSet rs = + DORIS_CONTAINER + .createConnection("") + .createStatement() + .executeQuery( + String.format( + "SELECT * FROM %s.%s", + tableId.getSchemaName(), tableId.getTableName())); + + while (rs.next()) { + List columns = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + columns.add(rs.getString(i)); + } + results.add(String.join(" | ", columns)); + } + return results; + } + + public static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + } + + public static void assertMapEquals(Map expected, Map actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + for (String key : expected.keySet()) { + assertEquals(expected.get(key), actual.get(key)); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/resources/log4j2-test.properties b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000000..f0d32fb590 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/resources/log4j2-test.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java new file mode 100644 index 0000000000..39997cdb69 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.lifecycle.Startables; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** End-to-end tests for mysql cdc to Doris pipeline job. */ +@RunWith(Parameterized.class) +public class MySqlToDorisE2eITCase extends PipelineTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(MySqlToDorisE2eITCase.class); + + // ------------------------------------------------------------------------------------------ + // MySQL Variables (we always use MySQL as the data source for easier verifying) + // ------------------------------------------------------------------------------------------ + protected static final String MYSQL_TEST_USER = "mysqluser"; + protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; + public static final int TESTCASE_TIMEOUT_SECONDS = 60; + + @ClassRule + public static final MySqlContainer MYSQL = + (MySqlContainer) + new MySqlContainer( + MySqlVersion.V8_0) // v8 support both ARM and AMD architectures + .withConfigurationOverride("docker/mysql/my.cnf") + .withSetupSQL("docker/mysql/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withNetwork(NETWORK) + .withNetworkAliases("mysql") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @ClassRule + public static final DorisContainer DORIS = + new DorisContainer(NETWORK) + .withNetworkAliases("doris") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + protected final UniqueDatabase mysqlInventoryDatabase = + new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + @BeforeClass + public static void initializeContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL)).join(); + Startables.deepStart(Stream.of(DORIS)).join(); + LOG.info("Waiting for backends to be available"); + long startWaitingTimestamp = System.currentTimeMillis(); + + new LogMessageWaitStrategy() + .withRegEx(".*get heartbeat from FE.*") + .withTimes(1) + .withStartupTimeout( + Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS)) + .waitUntilReady(DORIS); + + while (!checkBackendAvailability()) { + try { + if (System.currentTimeMillis() - startWaitingTimestamp + > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) { + throw new RuntimeException("Doris backend startup timed out."); + } + LOG.info("Waiting for backends to be available"); + Thread.sleep(1000); + } catch (InterruptedException ignored) { + // ignore and check next round + } + } + LOG.info("Containers are started."); + } + + @Before + public void before() throws Exception { + super.before(); + mysqlInventoryDatabase.createAndInitialize(); + createDorisDatabase(mysqlInventoryDatabase.getDatabaseName()); + } + + private static boolean checkBackendAvailability() { + try { + Container.ExecResult rs = + DORIS.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + "-e SHOW BACKENDS\\G"); + + if (rs.getExitCode() != 0) { + return false; + } + String output = rs.getStdout(); + LOG.info("Doris backend status:\n{}", output); + return output.contains("*************************** 1. row ***************************") + && !output.contains("AvailCapacity: 1.000 B"); + } catch (Exception e) { + LOG.info("Failed to check backend status.", e); + return false; + } + } + + @After + public void after() { + super.after(); + mysqlInventoryDatabase.dropDatabase(); + dropDorisDatabase(mysqlInventoryDatabase.getDatabaseName()); + } + + @Test + public void testSyncWholeDatabase() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: mysql\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: doris\n" + + " fenodes: doris:8030\n" + + " benodes: doris:8040\n" + + " username: %s\n" + + " password: \"%s\"\n" + + " table.create.properties.replication_num: 1\n" + + "\n" + + "pipeline:\n" + + " parallelism: 1", + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + DORIS.getUsername(), + DORIS.getPassword()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, dorisCdcConnector, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "products", + 7, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null")); + + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "customers", + 4, + Arrays.asList( + "101 | user_1 | Shanghai | 123567891234", + "102 | user_2 | Shanghai | 123567891234", + "103 | user_3 | Shanghai | 123567891234", + "104 | user_4 | Shanghai | 123567891234")); + + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + + stat.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110 + + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "products", + 7, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null")); + + stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + + // modify table schema + stat.execute("ALTER TABLE products DROP COLUMN point_c;"); + stat.execute("DELETE FROM products WHERE id=101;"); + + stat.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null);"); // 111 + stat.execute( + "INSERT INTO products VALUES (default,'finally', null, 2.14, null, null);"); // 112 + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "products", + 7, + Arrays.asList( + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | null", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | null", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | null", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | null", + "106 | hammer | 18oz carpenter hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.1 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null", + "111 | scooter | Big 2-wheel scooter | 5.18 | null | null | null", + "112 | finally | null | 2.14 | null | null | null")); + } + + public static void createDorisDatabase(String databaseName) { + try { + Container.ExecResult rs = + DORIS.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + String.format("-e CREATE DATABASE IF NOT EXISTS `%s`;", databaseName)); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to create database." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to create database.", e); + } + } + + public static void dropDorisDatabase(String databaseName) { + try { + Container.ExecResult rs = + DORIS.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + String.format("-e DROP DATABASE IF EXISTS %s;", databaseName)); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to drop database." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to drop database.", e); + } + } + + private void validateSinkResult( + String databaseName, String tableName, int columnCount, List expected) + throws Exception { + long startWaitingTimestamp = System.currentTimeMillis(); + while (true) { + if (System.currentTimeMillis() - startWaitingTimestamp + > TESTCASE_TIMEOUT_SECONDS * 1000) { + throw new RuntimeException("Doris backend startup timed out."); + } + List results = new ArrayList<>(); + try (Connection conn = + DriverManager.getConnection( + DORIS.getJdbcUrl(databaseName, DORIS.getUsername())); + Statement stat = conn.createStatement()) { + ResultSet rs = + stat.executeQuery( + String.format("SELECT * FROM `%s`.`%s`;", databaseName, tableName)); + + while (rs.next()) { + List columns = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + try { + columns.add(rs.getString(i)); + } catch (SQLException ignored) { + // Column count could change after schema evolution + columns.add(null); + } + } + results.add(String.join(" | ", columns)); + } + + if (expected.size() == results.size()) { + assertEqualsInAnyOrder(expected, results); + break; + } else { + Thread.sleep(1000); + } + } catch (SQLException e) { + LOG.info("Validate sink result failure, waiting for next turn...", e); + Thread.sleep(1000); + } + } + } + + public static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + } +} From ce68041436ded972af413394e1536fc1b8b7077d Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 5 Jun 2024 12:09:46 +0800 Subject: [PATCH 5/5] [FLINK-35092][cdc][starrocks] Add starrocks integration test cases --- .../pom.xml | 18 + .../apache/commons/compress/utils/Lists.java | 31 ++ .../sink/StarRocksMetadataApplierITCase.java | 388 ++++++++++++++++++ .../sink/StarRocksPipelineITCase.java | 178 ++++++++ .../sink/utils/StarRocksContainer.java | 113 +++++ .../sink/utils/StarRocksSinkTestBase.java | 252 ++++++++++++ 6 files changed, 980 insertions(+) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml index 342044827b..e410ad227a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml @@ -52,6 +52,24 @@ limitations under the License. flink-cdc-composer ${project.version} + + org.testcontainers + jdbc + 1.18.3 + test + + + org.apache.flink + flink-test-utils-junit + ${flink.version} + test + + + org.apache.flink + flink-test-utils + ${flink.version} + test + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java new file mode 100644 index 0000000000..d029943e36 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.shade.org.apache.commons.compress.utils; + +import java.util.ArrayList; + +/** + * Dummy class of shaded apache-commons since connector 1.2.9 depends on this, but not package it. + * This package should be removed after upgrading to 1.2.10 which will not use commons-compress + * anymore. + */ +public class Lists { + public static ArrayList newArrayList() { + return new ArrayList<>(); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java new file mode 100644 index 0000000000..c294dd4233 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.starrocks.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator; +import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator; +import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; +import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer; +import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME; + +/** IT tests for {@link StarRocksMetadataApplier}. */ +public class StarRocksMetadataApplierITCase extends StarRocksSinkTestBase { + private static final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + @BeforeClass + public static void before() { + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(3000); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @Before + public void initializeDatabase() { + executeSql( + String.format( + "CREATE DATABASE IF NOT EXISTS `%s`;", + StarRocksContainer.STARROCKS_DATABASE_NAME)); + LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME); + } + + @After + public void destroyDatabase() { + executeSql(String.format("DROP DATABASE %s;", StarRocksContainer.STARROCKS_DATABASE_NAME)); + LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME); + } + + private List generateAddColumnEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("extra_date", DataTypes.DATE(), null)))), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "extra_bool", DataTypes.BOOLEAN(), null)))), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "extra_decimal", + DataTypes.DECIMAL(17, 0), + null))))); + } + + private List generateDropColumnEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new DropColumnEvent(tableId, Collections.singletonList("number"))); + } + + private List generateRenameColumnEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new RenameColumnEvent(tableId, Collections.singletonMap("number", "kazu")), + new RenameColumnEvent(tableId, Collections.singletonMap("name", "namae"))); + } + + private List generateAlterColumnTypeEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19)))); + } + + private List generateNarrowingAlterColumnTypeEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + // Double -> Float is a narrowing cast, should fail + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("number", DataTypes.FLOAT()))); + } + + @Test + public void testStarRocksDataType() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), "ID")) + // StarRocks sink doesn't support BINARY and BYTES type yet. + // .column(new PhysicalColumn("binary", DataTypes.BINARY(17), "Binary")) + // .column(new PhysicalColumn("varbinary", DataTypes.VARBINARY(17), "Var + // Binary")) + // .column(new PhysicalColumn("bytes", DataTypes.BYTES(), "Bytes")) + .column(new PhysicalColumn("boolean", DataTypes.BOOLEAN(), "Boolean")) + .column(new PhysicalColumn("int", DataTypes.INT(), "Int")) + .column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), "Tiny Int")) + .column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), "Small Int")) + .column(new PhysicalColumn("float", DataTypes.FLOAT(), "Float")) + .column(new PhysicalColumn("double", DataTypes.DOUBLE(), "Double")) + .column(new PhysicalColumn("char", DataTypes.CHAR(17), "Char")) + .column(new PhysicalColumn("varchar", DataTypes.VARCHAR(17), "Var Char")) + .column(new PhysicalColumn("string", DataTypes.STRING(), "String")) + .column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal")) + .column(new PhysicalColumn("date", DataTypes.DATE(), "Date")) + // StarRocks sink doesn't support TIME type yet. + // .column(new PhysicalColumn("time", DataTypes.TIME(), "Time")) + // .column(new PhysicalColumn("time_3", DataTypes.TIME(3), "Time With + // Precision")) + .column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp")) + .column( + new PhysicalColumn( + "timestamp_3", + DataTypes.TIMESTAMP(3), + "Timestamp With Precision")) + // StarRocks sink doesn't support TIMESTAMP with non-local TZ yet. + // .column(new PhysicalColumn("timestamptz", DataTypes.TIMESTAMP_TZ(), + // "TimestampTZ")) + // .column(new PhysicalColumn("timestamptz_3", DataTypes.TIMESTAMP_TZ(3), + // "TimestampTZ With Precision")) + .column( + new PhysicalColumn( + "timestampltz", DataTypes.TIMESTAMP_LTZ(), "TimestampLTZ")) + .column( + new PhysicalColumn( + "timestampltz_3", + DataTypes.TIMESTAMP_LTZ(3), + "TimestampLTZ With Precision")) + .primaryKey("id") + .build(); + + runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, schema))); + + List actual = inspectTableSchema(tableId); + List expected = + Arrays.asList( + "id | int | NO | true | null", + "boolean | boolean | YES | false | null", + "int | int | YES | false | null", + "tinyint | tinyint | YES | false | null", + "smallint | smallint | YES | false | null", + "float | float | YES | false | null", + "double | double | YES | false | null", + "char | char(51) | YES | false | null", + "varchar | varchar(51) | YES | false | null", + "string | varchar(1048576) | YES | false | null", + "decimal | decimal(17,7) | YES | false | null", + "date | date | YES | false | null", + "timestamp | datetime | YES | false | null", + "timestamp_3 | datetime | YES | false | null", + "timestampltz | datetime | YES | false | null", + "timestampltz_3 | datetime | YES | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + public void testStarRocksAddColumn() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + runJobWithEvents(generateAddColumnEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | int | NO | true | null", + "number | double | YES | false | null", + "name | varchar(51) | YES | false | null", + "extra_date | date | YES | false | null", + "extra_bool | boolean | YES | false | null", + "extra_decimal | decimal(17,0) | YES | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + public void testStarRocksDropColumn() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + runJobWithEvents(generateDropColumnEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | int | NO | true | null", "name | varchar(51) | YES | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + @Ignore("Rename column is not supported currently.") + public void testStarRocksRenameColumn() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + runJobWithEvents(generateRenameColumnEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | int | NO | true | null", + "kazu | double | YES | false | null", + "namae | varchar(51) | YES | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + @Ignore("Alter column type is not supported currently.") + public void testStarRocksAlterColumnType() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + runJobWithEvents(generateAlterColumnTypeEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | int | NO | true | null", + "number | double | YES | false | null", + "name | varchar(57) | YES | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test(expected = JobExecutionException.class) + @Ignore("Alter column type is not supported currently.") + public void testStarRocksNarrowingAlterColumnType() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId)); + } + + private void runJobWithEvents(List events) throws Exception { + DataStream stream = env.fromCollection(events, TypeInformation.of(Event.class)); + + Configuration config = + new Configuration() + .set(LOAD_URL, STARROCKS_CONTAINER.getLoadUrl()) + .set(JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl()) + .set(USERNAME, StarRocksContainer.STARROCKS_USERNAME) + .set(PASSWORD, StarRocksContainer.STARROCKS_PASSWORD); + + DataSink starRocksSink = createStarRocksDataSink(config); + + SchemaOperatorTranslator schemaOperatorTranslator = + new SchemaOperatorTranslator( + SchemaChangeBehavior.EVOLVE, + "$$_schema_operator_$$", + DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT); + + OperatorIDGenerator schemaOperatorIDGenerator = + new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); + + stream = + schemaOperatorTranslator.translate( + stream, + DEFAULT_PARALLELISM, + starRocksSink.getMetadataApplier(), + new ArrayList<>()); + + DataSinkTranslator sinkTranslator = new DataSinkTranslator(); + sinkTranslator.translate( + new SinkDef("starrocks", "Dummy StarRocks Sink", config), + stream, + starRocksSink, + schemaOperatorIDGenerator.generate()); + + env.execute("StarRocks Schema Evolution Test"); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java new file mode 100644 index 0000000000..43c1faaacd --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.starrocks.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.FlinkSinkProvider; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer; +import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME; + +/** IT tests for {@link StarRocksDataSink}. */ +public class StarRocksPipelineITCase extends StarRocksSinkTestBase { + private static final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + @BeforeClass + public static void before() { + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(3000); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @Before + public void initializeDatabaseAndTable() { + executeSql( + String.format( + "CREATE DATABASE IF NOT EXISTS `%s`;", + StarRocksContainer.STARROCKS_DATABASE_NAME)); + + LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME); + + List schema = Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)"); + + executeSql( + String.format( + "CREATE TABLE `%s`.`%s` (%s) PRIMARY KEY (`%s`) DISTRIBUTED BY HASH(`%s`) BUCKETS 1 PROPERTIES (\"replication_num\" = \"1\");", + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME, + String.join(", ", schema), + "id", + "id")); + + LOG.info("Table {} created.", StarRocksContainer.STARROCKS_TABLE_NAME); + } + + @After + public void destroyDatabaseAndTable() { + + executeSql( + String.format( + "DROP TABLE %s.%s;", + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME)); + + LOG.info("Table {} destroyed.", StarRocksContainer.STARROCKS_TABLE_NAME); + + executeSql(String.format("DROP DATABASE %s;", StarRocksContainer.STARROCKS_DATABASE_NAME)); + + LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME); + } + + private List generateEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17))); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")})), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 19, 2.718, BinaryStringData.fromString("Que Sera Sera") + })), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 21, 1.732, BinaryStringData.fromString("Disenchanted") + })), + DataChangeEvent.deleteEvent( + tableId, + generator.generate( + new Object[] { + 19, 2.718, BinaryStringData.fromString("Que Sera Sera") + })), + DataChangeEvent.updateEvent( + tableId, + generator.generate( + new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")}), + generator.generate( + new Object[] { + 17, 6.28, BinaryStringData.fromString("StarRocks") + }))); + } + + @Test + public void testValuesToStarRocks() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + DataStream stream = + env.fromCollection(generateEvents(tableId), TypeInformation.of(Event.class)); + + Configuration config = + new Configuration() + .set(LOAD_URL, STARROCKS_CONTAINER.getLoadUrl()) + .set(JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl()) + .set(USERNAME, StarRocksContainer.STARROCKS_USERNAME) + .set(PASSWORD, StarRocksContainer.STARROCKS_PASSWORD); + + Sink starRocksSink = + ((FlinkSinkProvider) createStarRocksDataSink(config).getEventSinkProvider()) + .getSink(); + stream.sinkTo(starRocksSink); + + env.execute("Values to StarRocks Sink"); + + List actual = fetchTableContent(tableId, 3); + List expected = Arrays.asList("17 | 6.28 | StarRocks", "21 | 1.732 | Disenchanted"); + + assertEqualsInAnyOrder(expected, actual); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java new file mode 100644 index 0000000000..8bba7053e2 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.starrocks.sink.utils; + +import org.junit.ClassRule; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** Docker container for StarRocks. */ +public class StarRocksContainer extends JdbcDatabaseContainer { + + private static final String DOCKER_IMAGE_NAME = "starrocks/allin1-ubuntu:3.2.6"; + + // exposed ports + public static final int FE_HTTP_SERVICE_PORT = 8080; + public static final int FE_QUERY_PORT = 9030; + + public static final String STARROCKS_DATABASE_NAME = "starrocks_database"; + public static final String STARROCKS_TABLE_NAME = "fallen_angel"; + public static final String STARROCKS_USERNAME = "root"; + public static final String STARROCKS_PASSWORD = ""; + + @ClassRule public static final Network NETWORK = Network.newNetwork(); + + public StarRocksContainer() { + super(DockerImageName.parse(DOCKER_IMAGE_NAME)); + setExposedPorts(Arrays.asList(FE_HTTP_SERVICE_PORT, FE_QUERY_PORT)); + setNetwork(NETWORK); + } + + public StarRocksContainer(Network network) { + super(DockerImageName.parse(DOCKER_IMAGE_NAME)); + setExposedPorts(Arrays.asList(FE_HTTP_SERVICE_PORT, FE_QUERY_PORT)); + setNetwork(network); + } + + public List getLoadUrl() { + return Collections.singletonList( + String.format("%s:%d", getHost(), getMappedPort(FE_HTTP_SERVICE_PORT))); + } + + public void waitForLog(String regex, int count, int timeoutSeconds) { + new LogMessageWaitStrategy() + .withRegEx(regex) + .withTimes(count) + .withStartupTimeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) + .waitUntilReady(this); + } + + @Override + public String getDriverClassName() { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + return "com.mysql.cj.jdbc.Driver"; + } catch (ClassNotFoundException e) { + return "com.mysql.jdbc.Driver"; + } + } + + @Override + public String getJdbcUrl() { + return getJdbcUrl(""); + } + + public String getJdbcUrl(String databaseName) { + String additionalUrlParams = constructUrlParameters("?", "&"); + return "jdbc:mysql://" + + getHost() + + ":" + + getMappedPort(FE_QUERY_PORT) + + "/" + + databaseName + + additionalUrlParams; + } + + @Override + public String getUsername() { + return STARROCKS_USERNAME; + } + + @Override + public String getPassword() { + return STARROCKS_PASSWORD; + } + + @Override + protected String getTestQueryString() { + return "SELECT 1"; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java new file mode 100644 index 0000000000..4980c603b7 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.starrocks.sink.utils; + +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.factories.Factory; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSink; +import org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkFactory; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.lifecycle.Startables; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Basic class for testing {@link StarRocksDataSink}. */ +public class StarRocksSinkTestBase extends TestLogger { + protected static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkTestBase.class); + + protected static final int DEFAULT_PARALLELISM = 1; + + protected static final StarRocksContainer STARROCKS_CONTAINER = createStarRocksContainer(); + + public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; + + private static StarRocksContainer createStarRocksContainer() { + return new StarRocksContainer(); + } + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); + + @BeforeClass + public static void startContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(STARROCKS_CONTAINER)).join(); + LOG.info("Waiting for StarRocks to launch"); + + long startWaitingTimestamp = System.currentTimeMillis(); + + new LogMessageWaitStrategy() + .withRegEx(".*Enjoy the journal to StarRocks blazing-fast lake-house engine!.*\\s") + .withTimes(1) + .withStartupTimeout( + Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS)) + .waitUntilReady(STARROCKS_CONTAINER); + + while (!checkBackendAvailability()) { + try { + if (System.currentTimeMillis() - startWaitingTimestamp + > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) { + throw new RuntimeException("StarRocks backend startup timed out."); + } + LOG.info("Waiting for backends to be available"); + Thread.sleep(1000); + } catch (InterruptedException ignored) { + // ignore and check next round + } + } + LOG.info("Containers are started."); + } + + @AfterClass + public static void stopContainers() { + LOG.info("Stopping containers..."); + STARROCKS_CONTAINER.stop(); + LOG.info("Containers are stopped."); + } + + static class MockContext implements Factory.Context { + + Configuration factoryConfiguration; + + public MockContext(Configuration factoryConfiguration) { + this.factoryConfiguration = factoryConfiguration; + } + + @Override + public Configuration getFactoryConfiguration() { + return factoryConfiguration; + } + + @Override + public Configuration getPipelineConfiguration() { + return Configuration.fromMap(Collections.singletonMap("local-time-zone", "UTC")); + } + + @Override + public ClassLoader getClassLoader() { + return null; + } + } + + public static DataSink createStarRocksDataSink(Configuration factoryConfiguration) { + StarRocksDataSinkFactory factory = new StarRocksDataSinkFactory(); + return factory.createDataSink(new MockContext(factoryConfiguration)); + } + + public static void executeSql(String sql) { + try { + Container.ExecResult rs = + STARROCKS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + "-e " + sql); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to execute SQL." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to execute SQL.", e); + } + } + + public static boolean checkBackendAvailability() { + try { + Container.ExecResult rs = + STARROCKS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + "-e SHOW BACKENDS\\G"); + + if (rs.getExitCode() != 0) { + return false; + } + return rs.getStdout() + .contains("*************************** 1. row ***************************"); + } catch (Exception e) { + LOG.info("Failed to check backend status.", e); + return false; + } + } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + + public List inspectTableSchema(TableId tableId) throws SQLException { + List results = new ArrayList<>(); + ResultSet rs = + STARROCKS_CONTAINER + .createConnection("") + .createStatement() + .executeQuery( + String.format( + "DESCRIBE `%s`.`%s`", + tableId.getSchemaName(), tableId.getTableName())); + + while (rs.next()) { + List columns = new ArrayList<>(); + for (int i = 1; i <= 5; i++) { + columns.add(rs.getString(i)); + } + results.add(String.join(" | ", columns)); + } + return results; + } + + public List fetchTableContent(TableId tableId, int columnCount) throws SQLException { + List results = new ArrayList<>(); + ResultSet rs = + STARROCKS_CONTAINER + .createConnection("") + .createStatement() + .executeQuery( + String.format( + "SELECT * FROM %s.%s", + tableId.getSchemaName(), tableId.getTableName())); + + while (rs.next()) { + List columns = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + columns.add(rs.getString(i)); + } + results.add(String.join(" | ", columns)); + } + return results; + } + + public static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + } + + public static void assertMapEquals(Map expected, Map actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + for (String key : expected.keySet()) { + assertEquals(expected.get(key), actual.get(key)); + } + } +}