Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
Expand Down Expand Up @@ -53,14 +54,18 @@ public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunctio
public static final String JOB_ID_KEY = "job.id";
public static final String TASK_ID_KEY = "task.id";
public static final String META_KEY = "meta";
public static final String INCLUDE_DELETE_SIGN = "include_delete_sign";
private final boolean includeDeleteSign;

public CdcStreamTableValuedFunction(Map<String, String> properties) throws AnalysisException {
validate(properties);
includeDeleteSign = Boolean.parseBoolean(properties.getOrDefault(INCLUDE_DELETE_SIGN, "false"));
processProps(properties);
}

private void processProps(Map<String, String> properties) throws AnalysisException {
Map<String, String> copyProps = new HashMap<>(properties);
copyProps.remove(INCLUDE_DELETE_SIGN);
copyProps.put("format", "json");

// Standalone TVF: random jobId. TVF-in-job: job.id injected by rewriteTvfParams.
Expand Down Expand Up @@ -157,6 +162,7 @@ private void validate(Map<String, String> properties) throws AnalysisException {
validatePositiveIntIfPresent(properties, DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE);
validatePositiveIntIfPresent(properties, DataSourceConfigKeys.SNAPSHOT_PARALLELISM);
validateBooleanIfPresent(properties, DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL);
validateBooleanIfPresent(properties, INCLUDE_DELETE_SIGN);
// TVF entrypoint shares server_id checks with the from-to path's validateSource.
try {
DataSourceConfigValidator.validateServerIdConfig(properties);
Expand Down Expand Up @@ -214,7 +220,11 @@ public List<Column> getTableColumns() throws AnalysisException {
if (!jdbcClient.isTableExist(database, table)) {
throw new AnalysisException("Table does not exist: " + table);
}
return jdbcClient.getColumnsFromJdbc(database, table);
List<Column> columns = new ArrayList<>(jdbcClient.getColumnsFromJdbc(database, table));
if (includeDeleteSign) {
columns.add(new Column(Column.DELETE_SIGN, PrimitiveType.TINYINT, false));
}
return columns;
} finally {
jdbcClient.closeClient();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.tablefunction;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.util.StreamingJobUtils;

import org.junit.Assert;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class CdcStreamTableValuedFunctionTest {

@Test
public void testDeleteSignIsExcludedByDefault() throws Exception {
List<Column> columns = getTableColumns(baseProperties());

Assert.assertEquals(1, columns.size());
Assert.assertEquals("id", columns.get(0).getName());
}

@Test
public void testDeleteSignIsIncludedWhenEnabled() throws Exception {
Map<String, String> properties = baseProperties();
properties.put(CdcStreamTableValuedFunction.INCLUDE_DELETE_SIGN, "true");

List<Column> columns = getTableColumns(properties);

Assert.assertEquals(2, columns.size());
Column deleteSign = columns.get(1);
Assert.assertEquals(Column.DELETE_SIGN, deleteSign.getName());
Assert.assertEquals(PrimitiveType.TINYINT, deleteSign.getType().getPrimitiveType());
Assert.assertFalse(deleteSign.isAllowNull());
}

@Test
public void testInvalidIncludeDeleteSignIsRejected() {
Map<String, String> properties = baseProperties();
properties.put(CdcStreamTableValuedFunction.INCLUDE_DELETE_SIGN, "invalid");

AnalysisException exception = Assert.assertThrows(AnalysisException.class,
() -> new CdcStreamTableValuedFunction(properties));

Assert.assertTrue(exception.getMessage().contains("include_delete_sign"));
}

private List<Column> getTableColumns(Map<String, String> properties) throws Exception {
JdbcClient jdbcClient = Mockito.mock(JdbcClient.class);
List<Column> sourceColumns = new ArrayList<>();
sourceColumns.add(new Column("id", PrimitiveType.INT));
Mockito.when(jdbcClient.isTableExist("test_db", "test_table")).thenReturn(true);
Mockito.when(jdbcClient.getColumnsFromJdbc("test_db", "test_table")).thenReturn(sourceColumns);

try (MockedStatic<StreamingJobUtils> utils = Mockito.mockStatic(StreamingJobUtils.class)) {
utils.when(() -> StreamingJobUtils.getJdbcClient(
Mockito.eq(DataSourceType.MYSQL), Mockito.anyMap()))
.thenReturn(jdbcClient);
utils.when(() -> StreamingJobUtils.getRemoteDbName(
Mockito.eq(DataSourceType.MYSQL), Mockito.anyMap()))
.thenReturn("test_db");
CdcStreamTableValuedFunction function = new CdcStreamTableValuedFunction(properties);
return function.getTableColumns();
}
}

private Map<String, String> baseProperties() {
Map<String, String> properties = new HashMap<>();
properties.put(DataSourceConfigKeys.TYPE, "mysql");
properties.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://localhost:3306/test_db");
properties.put(DataSourceConfigKeys.DATABASE, "test_db");
properties.put(DataSourceConfigKeys.TABLE, "test_table");
properties.put(DataSourceConfigKeys.OFFSET, "initial");
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !snapshot_data --
A1 1
B1 2

-- !incremental_data --
A1 1 0
B1 2 0
C1 3 0
D1 4 0

-- !after_update --
A1 1 0
B1 2 0
C1 30 0
D1 4 0

-- !after_delete --
A1 1 0
B1 2 0
C1 30 0
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !snapshot_data --
A1 1
B1 2

-- !incremental_data --
A1 1 0
B1 2 0
C1 3 0
D1 4 0

-- !after_update --
A1 1 0
B1 2 0
C1 30 0
D1 4 0

-- !after_delete --
A1 1 0
B1 2 0
C1 30 0
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

/**
* Test CDC stream TVF with delete sign enabled for MySQL primary-key table.
*
* Scenario:
* 1. Snapshot phase (offset=initial): pre-existing rows (A1, B1) are synced.
* 2. Incremental INSERT (C1, D1).
* 3. Incremental UPDATE (C1 age -> 30).
* 4. Incremental DELETE (D1 removed).
*/
suite("test_streaming_job_cdc_stream_mysql_delete_sign", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_job_cdc_stream_mysql_ds_name"
def currentDb = (sql "select database()")[0][0]
def dorisTable = "test_streaming_job_cdc_stream_mysql_ds_tbl"
def mysqlDb = "test_cdc_db"
def mysqlTable = "test_streaming_job_cdc_stream_mysql_ds_src"

sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
sql """drop table if exists ${currentDb}.${dorisTable} force"""

sql """
CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
`name` varchar(200) NULL,
`age` int NULL
) ENGINE=OLAP
UNIQUE KEY(`name`)
DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true"
)
"""

String enabled = context.config.otherConfigs.get("enableJdbcTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String mysql_port = context.config.otherConfigs.get("mysql_57_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String s3_endpoint = getS3Endpoint()
String bucket = getS3BucketName()
String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"

// prepare source table with primary key and pre-existing snapshot data
connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlTable}"""
sql """CREATE TABLE ${mysqlDb}.${mysqlTable} (
`name` varchar(200) NOT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`name`)
) ENGINE=InnoDB"""
sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES ('A1', 1)"""
sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES ('B1', 2)"""
}

// create streaming job via cdc_stream TVF with delete sign enabled
sql """
CREATE JOB ${jobName}
ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age, __DORIS_DELETE_SIGN__)
SELECT name, age, __DORIS_DELETE_SIGN__ FROM cdc_stream(
"type" = "mysql",
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
"driver_url" = "${driver_url}",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"user" = "root",
"password" = "123456",
"database" = "${mysqlDb}",
"table" = "${mysqlTable}",
"offset" = "initial",
"snapshot_split_key" = "name",
"include_delete_sign" = "true"
)
"""

// wait for at least one snapshot task to succeed
try {
Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({
def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'"""
log.info("SucceedTaskCount: " + cnt)
cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 1
})
} catch (Exception ex) {
log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'"""))
log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'"""))
throw ex
}

// verify snapshot data
qt_snapshot_data """ SELECT name, age FROM ${currentDb}.${dorisTable} ORDER BY name """
Comment thread
JNSimba marked this conversation as resolved.

// insert incremental rows
connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES ('C1', 3)"""
sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES ('D1', 4)"""
}

// wait for binlog tasks to pick up the new rows
try {
Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({
def rows = sql """SELECT count(1) FROM ${currentDb}.${dorisTable} WHERE name IN ('C1', 'D1')"""
log.info("incremental rows: " + rows)
(rows.get(0).get(0) as int) == 2
})
} catch (Exception ex) {
log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'"""))
log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'"""))
throw ex
}

qt_incremental_data """ SELECT name, age, __DORIS_DELETE_SIGN__ FROM ${currentDb}.${dorisTable} ORDER BY name """

// verify incremental UPDATE
connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
sql """UPDATE ${mysqlDb}.${mysqlTable} SET age = 30 WHERE name = 'C1'"""
}
Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({
def rows = sql """SELECT age FROM ${currentDb}.${dorisTable} WHERE name = 'C1'"""
rows.size() == 1 && (rows.get(0).get(0) as int) == 30
})

qt_after_update """ SELECT name, age, __DORIS_DELETE_SIGN__ FROM ${currentDb}.${dorisTable} ORDER BY name """

// verify incremental DELETE
connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
sql """DELETE FROM ${mysqlDb}.${mysqlTable} WHERE name = 'D1'"""
}
Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({
def rows = sql """SELECT count(1) FROM ${currentDb}.${dorisTable} WHERE name = 'D1'"""
(rows.get(0).get(0) as int) == 0
})

qt_after_delete """ SELECT name, age, __DORIS_DELETE_SIGN__ FROM ${currentDb}.${dorisTable} ORDER BY name """

sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
sql """drop table if exists ${currentDb}.${dorisTable} force"""
}
}
Loading
Loading