diff --git a/.gitleaks.toml b/.gitleaks.toml index 61743697eeee03..8498a6e5184af9 100644 --- a/.gitleaks.toml +++ b/.gitleaks.toml @@ -26,3 +26,10 @@ condition = "AND" regexTarget = "line" paths = ['''^fe/fe-filesystem/fe-filesystem-s3/src/test/java/org/apache/doris/filesystem/s3/S3ObjStorageTest\.java$'''] regexes = ['''(access_key|secret_key).*"(ak-|sk-|canonical|sess-tok)'''] + +[[rules]] +id = "private-key" + +[[rules.allowlists]] +description = "Ignore test-only TLS private key used by MySQL docker-compose fixtures" +paths = ['''^docker/thirdparties/docker-compose/mysql/certs/server\.key$'''] diff --git a/.licenserc.yaml b/.licenserc.yaml index 99e76f28a86bdf..90d2f778686701 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -97,6 +97,7 @@ header: - "docker/thirdparties/docker-compose/kerberos/sql/**" - "docker/thirdparties/docker-compose/iceberg/spark-defaults.conf.tpl" - "docker/thirdparties/docker-compose/postgresql/certs/**" + - "docker/thirdparties/docker-compose/mysql/certs/**" - "conf/mysql_ssl_default_certificate/*" - "conf/mysql_ssl_default_certificate/client_certificate/ca.pem" - "conf/mysql_ssl_default_certificate/client_certificate/client-cert.pem" diff --git a/docker/thirdparties/docker-compose/mysql/certs/root.crt b/docker/thirdparties/docker-compose/mysql/certs/root.crt new file mode 100644 index 00000000000000..94095b762c6d21 --- /dev/null +++ b/docker/thirdparties/docker-compose/mysql/certs/root.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDDzCCAfegAwIBAgIULswy9ovSHXeKSxoEen2Y3xEZqBgwDQYJKoZIhvcNAQEL +BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMxM1oXDTM2 +MDIyOTA4MjMxM1owFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMIIBIjANBgkqhkiG +9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsVFJhj3Y7zamNZiq9SefnnKAKaOXXUbXo/Fq +V6VNzMSkZuwDfRo/RKjvVaUru/JSd7QoV5zGyUYb+oHx/R233R1M0sd23+eR1mRQ +w771DmXthbdpIPBEwlmh0LMsiH9cJ7R2iRigCzfd2/SbJC3cvX6CtzyNqSkZboVO +fswkotF4ZaJgOiBile4A/zWWqeA07QVd8tusdxaoOJv0E/pjcLi5peGXtQA6SSj4 +tp20K/tlrRS1Zc0dKgxU7YohxNBwW4QF0uOVR/QBmfzEpMdxKlwcEnHubPAemgt1 +bp9g9Buwo7oWMvDJuS40xMPOlDhshrzNM8CoWIihgndMPG/LsQIDAQABo1MwUTAd +BgNVHQ4EFgQUHBKhmdKPD+b1xDjzzkQVaVETSfUwHwYDVR0jBBgwFoAUHBKhmdKP +D+b1xDjzzkQVaVETSfUwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC +AQEAnueVOIAk/XLQx3msDY58Reo+D1f/AUy/WTPzxeXCxXLScrjFCLXjrIDzgslN +WnP7E5xNJxdrWgskS36IJxVg0+cUfy5kQYYfmWo1vOYdW/AMNBdQwmK5ve3r3Z/3 +dE2cV4uvL6n0iZZMxnsL5KXwLeSQeTtJepvWi27Z0t8P23lJHJKfl/Ek49ILIDgB +zZIMKPgm6w7/U3jUWMUyQ+iI/XiEPrnn4url1FNViC8ucoIm8EU4ZE01j1mbZO8M +JSa6InQEIx/1P675qYtuKWF75Tq/qU7+uX7/07AiTyYSrHMT+024TfbRCi1PF/Ka +cx+pSJLima+3GHhK2Rj437yx1Q== +-----END CERTIFICATE----- diff --git a/docker/thirdparties/docker-compose/mysql/certs/server.crt b/docker/thirdparties/docker-compose/mysql/certs/server.crt new file mode 100644 index 00000000000000..e8aaecb71ec1a0 --- /dev/null +++ b/docker/thirdparties/docker-compose/mysql/certs/server.crt @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC+zCCAeOgAwIBAgIUG/9rYO8McYBH83YOe4nzMcb4YCAwDQYJKoZIhvcNAQEL +BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMyNloXDTM2 +MDIyOTA4MjMyNlowFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0B +AQEFAAOCAQ8AMIIBCgKCAQEAtHYYwevcMqMPbCAaQlrX7qJtRXf/j+WfGFbM4/PZ +Y6cjSsrqUgwHduMyE4yce9vWWygLJM/S9aBI3jsvhAdLVaFIXhOU4jMuyk+1RJvu +k+iUJ3wabo2Zv6605wUU7wS0FCfJJMxG/zz5FYtX8kMw7sKJWLhB4C+oQlO+mSj4 +CKjg7mNZjgKz024/BW7FKhAaYYGI9GNmjIgvjSDXGOXzd2nM9XLoVNIkR8mgD69l +yHHzhGUAdXDxaTr+026Z2uBrnip7ZjDIB65J/qrxSc8eK1ZhZzYdHBpLnP67zuWR +iyKDNETpRa1SoWCk9/9+AGwygRcXC7h1GpMb46wce4/TtwIDAQABo0IwQDAdBgNV +HQ4EFgQUEeFQVqK+A/H6R2iSiNW57cSilGcwHwYDVR0jBBgwFoAUHBKhmdKPD+b1 +xDjzzkQVaVETSfUwDQYJKoZIhvcNAQELBQADggEBAKpxfqPTPXL2+n/OW6F8cvwK +aod3BOquIjIKm17+Uob0rcOnxssYNQa0g9pW2zgIlAS+QUZ1K46ygJWrLNKdpIzt +mG2Hn6kUX9J7Xo+F5IldlX2bImi3b2/oI8IliLzawsofondCzL2BIfWLhE3LaISF +iN8pfzjoHCZXfLm3oUzxaeltFqEP+cApig/hAO17FkMHY6sl9QII94MV2d9gVwVl +pAi1ALOzOQKbsTCdRspoadPqmZ7AgbtS3RiVMmCZHwrtCvdIcaBuiPy5KiBFPCEX +Cdia+GWqETKBNpornHeMQ7d/J2ilbFRs+mRAUtyeWK0ilcdOxbmMOzCsjIV8kgI= +-----END CERTIFICATE----- diff --git a/docker/thirdparties/docker-compose/mysql/certs/server.key b/docker/thirdparties/docker-compose/mysql/certs/server.key new file mode 100644 index 00000000000000..d9eb2a8f4ad658 --- /dev/null +++ b/docker/thirdparties/docker-compose/mysql/certs/server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0dhjB69wyow9s +IBpCWtfuom1Fd/+P5Z8YVszj89ljpyNKyupSDAd24zITjJx729ZbKAskz9L1oEje +Oy+EB0tVoUheE5TiMy7KT7VEm+6T6JQnfBpujZm/rrTnBRTvBLQUJ8kkzEb/PPkV +i1fyQzDuwolYuEHgL6hCU76ZKPgIqODuY1mOArPTbj8FbsUqEBphgYj0Y2aMiC+N +INcY5fN3acz1cuhU0iRHyaAPr2XIcfOEZQB1cPFpOv7Tbpna4GueKntmMMgHrkn+ +qvFJzx4rVmFnNh0cGkuc/rvO5ZGLIoM0ROlFrVKhYKT3/34AbDKBFxcLuHUakxvj +rBx7j9O3AgMBAAECggEABZ+8uxdWnQYl+4xlV5E0gmTx3dh8Qd351UfFsW0demDr +lU1SI3I4I/Lelv8lyrLXZzjcwPfmezfec6RnF37p7ijSPgrIG2PLplCqJsy6BzK1 +ycH/yaYm6sIFSBqdF+ZO5QOaGOWZpA9lgsYHNVt/jdvJCq/50ZhJZO2fvfi9dr4I +vLjcCX57t+V9n68zHCdw8pTw3eSvO34wv8FXXQyofYi6+swoV/NhGFS1xMlc2USO +KQ0Do/Y8Dxr/5HawoiMTzO/o4M0Bdmb237fW4D0yVqaevjVWKe/wq2q3VZyBatB2 +XDMkL1ZaWiRsRZHoliiIh3K3gQ2jmtsMXjzv+IKdvQKBgQDgPsk7y5Ms5rjArL8g +qCP2o8a/IvxzCwxcvK59nfmWFuFeJsxE3uvp89UriqC6yGD5yxAmjDKvHOFtV+CE +KjCnMgt/jU6BpkaHzTRR8Gtt/RkILZTZiKoNdEgOTeBjHKCoOUoM7Dc78nW7Dp0F +QoLdAe0g0pSRy5iFcWBiX7UP5QKBgQDOBBRfnaU6fICVH0SmqBoKVSCDm+saYMAW +99mypm2xViP4VQOa1QjNRiEN9kllxD4I+S48kALSCpif+A/IE89bNgFNEOvTYbkW ++mvjoFLQtN79Tc8/G0CEi+WhRWWpY9WnMuzj1r/pAbC8uOEKvJ+tYfKmHZN5kvoC +k0e2yMCDawKBgFi6Hw9sxkgO5m0+LMW0Ib62IK6CHlc6uOJ8uaH0fsvXM8b4HPzn +I3tHQkJfMKeXH1/W7AYElQ1apQuJqMlClEujbo9CjxyXePLEy/3b3fYAHgZxWqMU +Aw0dxGD8iVtN+Xd2a4lfcZ9jmRexeYmaPoNJ/tRs3eIuJ6QtLxDdg5vNAoGBAIqU +C/BVZrN01Dl7Ev7XzMxufrSIyRixRAUvK20Urmy/eOqupQIdkxIhvlJZ/P1LiD8Y +/pUWeg83uXrBrjvzt2OvbCie3UMPVSWzxacUTSC+ydCx6lqUxk1inVBiEgRjd3BE +vTx1VBo0XOJVqmtCflZusH41HuKEj0/0KiU13OmJAoGAYkxy/U6uHHn6xB3KriID +bZgfYRlLv1bD4AYiOcjFke3/4MZJ2U4t/x6uzEjQZd/0waSeE3YY/MfEXufdHM99 +ZUlAHwLhjLcY58HgkyMkw4sRaHYxTQdOuxcnmzX1+sHKxKXlYoboLgh8Qf9A4DcR +HZde9n1uVLVtlBRTjjL5O84= +-----END PRIVATE KEY----- diff --git a/docker/thirdparties/docker-compose/mysql/mysql-5.7.yaml.tpl b/docker/thirdparties/docker-compose/mysql/mysql-5.7.yaml.tpl index 3ceeaa313e1f78..9fe7b1a38eff7e 100644 --- a/docker/thirdparties/docker-compose/mysql/mysql-5.7.yaml.tpl +++ b/docker/thirdparties/docker-compose/mysql/mysql-5.7.yaml.tpl @@ -20,7 +20,6 @@ version: "2.1" services: doris--mysql_57: image: mysql:5.7.36 - command: --default-authentication-plugin=mysql_native_password restart: always environment: MYSQL_ROOT_PASSWORD: 123456 @@ -29,15 +28,31 @@ services: LANG: C.UTF-8 ports: - ${DOCKER_MYSQL_57_EXTERNAL_PORT}:3306 + entrypoint: + - bash + - -c + - | + chown mysql:mysql /etc/mysql/certs/* + chmod 600 /etc/mysql/certs/server.key + chmod 644 /etc/mysql/certs/server.crt /etc/mysql/certs/root.crt + exec docker-entrypoint.sh "$@" + - -- + command: + - "mysqld" + - "--default-authentication-plugin=mysql_native_password" + - "--ssl-ca=/etc/mysql/certs/root.crt" + - "--ssl-cert=/etc/mysql/certs/server.crt" + - "--ssl-key=/etc/mysql/certs/server.key" healthcheck: test: mysqladmin ping -h 127.0.0.1 -u root --password=$$MYSQL_ROOT_PASSWORD && mysql -h 127.0.0.1 -u root --password=$$MYSQL_ROOT_PASSWORD -e "SELECT 1 FROM doris_test.deadline;" interval: 5s timeout: 60s retries: 120 volumes: - - ./data/:/var/lib/mysql + - ./data/:/var/lib/mysql - ./init:/docker-entrypoint-initdb.d - ./my.cnf:/etc/mysql/conf.d/my.cnf + - ./certs:/etc/mysql/certs networks: - doris--mysql_57 diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java index d31794766ad7f6..72322da26682da 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java @@ -40,6 +40,10 @@ public class DataSourceConfigKeys { public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1"; public static final String SSL_MODE = "ssl_mode"; public static final String SSL_ROOTCERT = "ssl_rootcert"; + // PG-style spelling; MySQL normalizes to underscore form. + public static final String SSL_MODE_DISABLE = "disable"; + public static final String SSL_MODE_REQUIRE = "require"; + public static final String SSL_MODE_VERIFY_CA = "verify-ca"; // PostgreSQL replication slot and publication config public static final String SLOT_NAME = "slot_name"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java index c4aad00d33a175..f37cbbff5dd95d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java @@ -56,6 +56,12 @@ public class DataSourceConfigValidator { DataSourceConfigKeys.PUBLICATION_NAME ); + private static final Set ALLOW_SSL_MODES = Sets.newHashSet( + DataSourceConfigKeys.SSL_MODE_DISABLE, + DataSourceConfigKeys.SSL_MODE_REQUIRE, + DataSourceConfigKeys.SSL_MODE_VERIFY_CA + ); + // Known suffixes for per-table config keys (format: "table..") private static final Set ALLOW_TABLE_LEVEL_SUFFIXES = Sets.newHashSet( DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX, @@ -102,6 +108,16 @@ public static void validateSource(Map input, throw new IllegalArgumentException("Invalid value for key '" + key + "': " + value); } } + + // Cross-field: verify-ca must be paired with a CA cert; otherwise the reader will + // silently fall back to the JVM default truststore and likely fail to connect. + if (DataSourceConfigKeys.SSL_MODE_VERIFY_CA.equals(input.get(DataSourceConfigKeys.SSL_MODE)) + && (input.get(DataSourceConfigKeys.SSL_ROOTCERT) == null + || input.get(DataSourceConfigKeys.SSL_ROOTCERT).trim().isEmpty())) { + throw new IllegalArgumentException( + "ssl_mode '" + DataSourceConfigKeys.SSL_MODE_VERIFY_CA + + "' requires ssl_rootcert to be set"); + } } public static void validateTarget(Map input) throws IllegalArgumentException { @@ -144,6 +160,9 @@ private static boolean isValidValue(String key, String value, String dataSourceT return value.length() <= PG_MAX_IDENTIFIER_LENGTH && PG_IDENTIFIER_PATTERN.matcher(value).matches(); } + if (key.equals(DataSourceConfigKeys.SSL_MODE) && !ALLOW_SSL_MODES.contains(value)) { + return false; + } return true; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java index 2f71b7664a832d..ce570f440a87e5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java @@ -30,6 +30,87 @@ public class DataSourceConfigValidatorTest { private static final int PG_MAX_IDENTIFIER_LENGTH = 63; + private static Map sslModeInput(String value) { + Map input = new HashMap<>(); + input.put(DataSourceConfigKeys.SSL_MODE, value); + return input; + } + + @Test + public void testSslModeLegalValues() { + DataSourceConfigValidator.validateSource( + sslModeInput(DataSourceConfigKeys.SSL_MODE_DISABLE), DataSourceType.MYSQL.name()); + DataSourceConfigValidator.validateSource( + sslModeInput(DataSourceConfigKeys.SSL_MODE_REQUIRE), DataSourceType.MYSQL.name()); + // verify-ca additionally requires ssl_rootcert; covered by testVerifyCaWithRootcertPasses. + } + + @Test + public void testSslModeRejectsMysqlUnderscoreSpelling() { + assertReject(sslModeInput("verify_ca")); + } + + @Test + public void testSslModeRejectsVerifyFull() { + assertReject(sslModeInput("verify-full")); + } + + @Test + public void testSslModeRejectsPreferredAndAllow() { + assertReject(sslModeInput("preferred")); + assertReject(sslModeInput("prefer")); + assertReject(sslModeInput("allow")); + } + + @Test + public void testSslModeRejectsUppercaseVariants() { + assertReject(sslModeInput("DISABLE")); + assertReject(sslModeInput("Verify-CA")); + } + + @Test + public void testSslModeRejectsEmpty() { + assertReject(sslModeInput("")); + } + + @Test + public void testSslModeOptional() { + // ssl_mode is not required; validateSource should pass when absent + Map input = new HashMap<>(); + input.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db"); + DataSourceConfigValidator.validateSource(input, DataSourceType.MYSQL.name()); + } + + @Test + public void testVerifyCaRequiresRootcert() { + Map input = sslModeInput(DataSourceConfigKeys.SSL_MODE_VERIFY_CA); + assertReject(input); + } + + @Test + public void testVerifyCaWithRootcertPasses() { + Map input = sslModeInput(DataSourceConfigKeys.SSL_MODE_VERIFY_CA); + input.put(DataSourceConfigKeys.SSL_ROOTCERT, "FILE:ca.pem"); + DataSourceConfigValidator.validateSource(input, DataSourceType.MYSQL.name()); + } + + @Test + public void testDisableWithoutRootcertPasses() { + DataSourceConfigValidator.validateSource( + sslModeInput(DataSourceConfigKeys.SSL_MODE_DISABLE), DataSourceType.MYSQL.name()); + DataSourceConfigValidator.validateSource( + sslModeInput(DataSourceConfigKeys.SSL_MODE_REQUIRE), DataSourceType.MYSQL.name()); + } + + private static void assertReject(Map input) { + try { + DataSourceConfigValidator.validateSource(input, DataSourceType.MYSQL.name()); + Assert.fail("expected IllegalArgumentException for input: " + input); + } catch (IllegalArgumentException ignored) { + // expected + } + } + @Test public void testSlotNameAndPublicationNameAllowed() { Map props = new HashMap<>(); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index 9aa268ef09bd81..bf8ac56312bbeb 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -25,6 +25,7 @@ import org.apache.doris.cdcclient.source.reader.SplitReadResult; import org.apache.doris.cdcclient.source.reader.SplitRecords; import org.apache.doris.cdcclient.utils.ConfigUtil; +import org.apache.doris.cdcclient.utils.SmallFileMgr; import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.doris.job.cdc.request.CompareOffsetRequest; import org.apache.doris.job.cdc.request.FetchTableSplitsRequest; @@ -878,6 +879,28 @@ private MySqlSourceConfig generateMySqlConfig(Map cdcConfig, Str dbzProps.setProperty( MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS.name(), DEBEZIUM_HEARTBEAT_INTERVAL_MS + ""); + + if (cdcConfig.containsKey(DataSourceConfigKeys.SSL_MODE)) { + String normalized = + normalizeSslModeForMysql(cdcConfig.get(DataSourceConfigKeys.SSL_MODE)); + dbzProps.put("database.ssl.mode", normalized); + // Flink CDC's forked MySqlConnection drops Debezium SSL props from the snapshot + // JDBC URL, so mirror to Connector/J native names. + jdbcProperteis.put("sslMode", normalized); + } + if (cdcConfig.containsKey(DataSourceConfigKeys.SSL_ROOTCERT)) { + String fileName = cdcConfig.get(DataSourceConfigKeys.SSL_ROOTCERT); + String truststorePath = SmallFileMgr.getPkcs12TruststorePath(fileName); + LOG.info("Using SSL truststore file path: {}", truststorePath); + dbzProps.put("database.ssl.truststore", truststorePath); + dbzProps.put("database.ssl.truststore.password", SmallFileMgr.TRUSTSTORE_PASSWORD); + jdbcProperteis.put("trustCertificateKeyStoreUrl", "file:" + truststorePath); + // Connector/J defaults keystore type to JKS; we generate PKCS12. + jdbcProperteis.put("trustCertificateKeyStoreType", "PKCS12"); + jdbcProperteis.put( + "trustCertificateKeyStorePassword", SmallFileMgr.TRUSTSTORE_PASSWORD); + } + configFactory.debeziumProperties(dbzProps); configFactory.heartbeatInterval(Duration.ofMillis(DEBEZIUM_HEARTBEAT_INTERVAL_MS)); @@ -1056,6 +1079,31 @@ public DeserializeResult deserialize(Map config, SourceRecord el return serializer.deserialize(config, element); } + /** Map Doris ssl_mode (PG-style) to Debezium MySQL's underscore spelling. */ + static String normalizeSslModeForMysql(String sslMode) { + if (sslMode == null) { + throw new IllegalArgumentException("ssl_mode must not be null"); + } + switch (sslMode) { + case DataSourceConfigKeys.SSL_MODE_DISABLE: + return MySqlConnectorConfig.SecureConnectionMode.DISABLED.getValue(); + case DataSourceConfigKeys.SSL_MODE_REQUIRE: + return MySqlConnectorConfig.SecureConnectionMode.REQUIRED.getValue(); + case DataSourceConfigKeys.SSL_MODE_VERIFY_CA: + return MySqlConnectorConfig.SecureConnectionMode.VERIFY_CA.getValue(); + default: + throw new IllegalArgumentException( + "Unsupported ssl_mode for MySQL: '" + + sslMode + + "'. Allowed: " + + String.join( + ", ", + DataSourceConfigKeys.SSL_MODE_DISABLE, + DataSourceConfigKeys.SSL_MODE_REQUIRE, + DataSourceConfigKeys.SSL_MODE_VERIFY_CA)); + } + } + /** * Filtered record iterator that only returns data change records, filtering out watermark, * heartbeat and other events. This is a private static inner class that encapsulates record diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java index 1e854097883fff..6f9616cadb767c 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java @@ -29,7 +29,15 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.security.KeyStore; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -47,9 +55,17 @@ public class SmallFileMgr { private static final String FILE_PREFIX = "FILE:"; + private static final String PKCS12_SUFFIX = ".p12"; + + /** JCA-required placeholder; a public-CA-only truststore has no secret to protect. */ + public static final String TRUSTSTORE_PASSWORD = "changeit"; + /** In-memory cache: "file_id:md5" -> absolute local file path */ private static final Map MEM_CACHE = new ConcurrentHashMap<>(); + /** In-memory cache for PKCS12 truststores derived from PEM CA certs. */ + private static final Map PKCS12_CACHE = new ConcurrentHashMap<>(); + /** * Per-key locks to serialize concurrent downloads of the same file, preventing tmp file * corruption when multiple threads race on the same file_id:md5 key. @@ -216,9 +232,86 @@ static String getFilePath( } } + /** + * Resolve a FILE: reference to a PKCS12 truststore path, converting the PEM on first access. + * For connectors (e.g. Debezium MySQL) that require JKS/PKCS12 rather than raw PEM. + * + * @param filePath FILE reference, format: FILE:{file_id}:{md5} + * @return absolute local path to the PKCS12 truststore + */ + public static String getPkcs12TruststorePath(String filePath) { + return pkcs12TruststorePath(getFilePath(filePath)); + } + + /** Package-private overload that accepts a custom local directory, used for testing. */ + static String getPkcs12TruststorePath( + String feMasterAddress, String filePath, String clusterToken, String localDir) { + return pkcs12TruststorePath(getFilePath(feMasterAddress, filePath, clusterToken, localDir)); + } + + private static String pkcs12TruststorePath(String pemPath) { + String cached = PKCS12_CACHE.get(pemPath); + if (cached != null && new File(cached).exists()) { + return cached; + } + Object lock = DOWNLOAD_LOCKS.computeIfAbsent(pemPath + PKCS12_SUFFIX, k -> new Object()); + synchronized (lock) { + String doubleChecked = PKCS12_CACHE.get(pemPath); + if (doubleChecked != null && new File(doubleChecked).exists()) { + return doubleChecked; + } + String p12Path = pemPath + PKCS12_SUFFIX; + if (!new File(p12Path).exists()) { + convertPemToPkcs12(pemPath, p12Path); + } + PKCS12_CACHE.put(pemPath, p12Path); + return p12Path; + } + } + + private static void convertPemToPkcs12(String pemPath, String p12Path) { + Path tmpFile; + try { + Path p12 = Paths.get(p12Path); + tmpFile = Files.createTempFile(p12.getParent(), "p12-", ".tmp"); + } catch (IOException e) { + throw new RuntimeException("Failed to create tmp file for PKCS12 truststore", e); + } + try { + KeyStore keyStore = KeyStore.getInstance("PKCS12"); + keyStore.load(null); + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + try (InputStream in = new FileInputStream(pemPath)) { + // A CA PEM may contain a chain (intermediate + root); import each with a + // distinct alias, otherwise later entries overwrite earlier ones. + int i = 0; + for (Certificate cert : cf.generateCertificates(in)) { + keyStore.setCertificateEntry("ca" + (i++), cert); + } + } + try (OutputStream os = Files.newOutputStream(tmpFile)) { + keyStore.store(os, TRUSTSTORE_PASSWORD.toCharArray()); + } + Files.move( + tmpFile, + Paths.get(p12Path), + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + LOG.info("Generated PKCS12 truststore: {}", p12Path); + } catch (Exception e) { + try { + Files.deleteIfExists(tmpFile); + } catch (IOException ignored) { + // best effort + } + throw new RuntimeException("Failed to convert PEM to PKCS12: " + pemPath, e); + } + } + /** Clears the in-memory cache. Exposed for testing. */ static void clearCache() { MEM_CACHE.clear(); + PKCS12_CACHE.clear(); DOWNLOAD_LOCKS.clear(); } } diff --git a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java new file mode 100644 index 00000000000000..1192df291d3c98 --- /dev/null +++ b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.source.reader.mysql; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; + +public class MySqlSourceReaderTest { + + @Test + void testNormalizeSslModeMapsAllLegalValues() { + assertEquals("disabled", MySqlSourceReader.normalizeSslModeForMysql("disable")); + assertEquals("required", MySqlSourceReader.normalizeSslModeForMysql("require")); + assertEquals("verify_ca", MySqlSourceReader.normalizeSslModeForMysql("verify-ca")); + } + + @Test + void testNormalizeSslModeRejectsNull() { + assertThrows( + IllegalArgumentException.class, + () -> MySqlSourceReader.normalizeSslModeForMysql(null)); + } + + @Test + void testNormalizeSslModeRejectsMysqlUnderscoreSpelling() { + // FE validator rejects this, but guard reader-side too. + assertThrows( + IllegalArgumentException.class, + () -> MySqlSourceReader.normalizeSslModeForMysql("verify_ca")); + } + + @Test + void testNormalizeSslModeRejectsVerifyFull() { + assertThrows( + IllegalArgumentException.class, + () -> MySqlSourceReader.normalizeSslModeForMysql("verify-full")); + } + + @Test + void testNormalizeSslModeRejectsUppercase() { + assertThrows( + IllegalArgumentException.class, + () -> MySqlSourceReader.normalizeSslModeForMysql("DISABLE")); + } +} diff --git a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java index ae99d4db9868ee..24bb4f5e615d9d 100644 --- a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java +++ b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java @@ -25,9 +25,13 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; +import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyStore; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -37,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -375,4 +380,118 @@ void testConcurrentDownloadSameFileSingleFetch() throws Exception { server.stop(0); } } + + // ------------------------------------------------------------------------- + // PKCS12 truststore conversion + // ------------------------------------------------------------------------- + + private static final String CA_PEM = + "-----BEGIN CERTIFICATE-----\n" + + "MIIDDzCCAfegAwIBAgIULswy9ovSHXeKSxoEen2Y3xEZqBgwDQYJKoZIhvcNAQEL\n" + + "BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMxM1oXDTM2\n" + + "MDIyOTA4MjMxM1owFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMIIBIjANBgkqhkiG\n" + + "9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsVFJhj3Y7zamNZiq9SefnnKAKaOXXUbXo/Fq\n" + + "V6VNzMSkZuwDfRo/RKjvVaUru/JSd7QoV5zGyUYb+oHx/R233R1M0sd23+eR1mRQ\n" + + "w771DmXthbdpIPBEwlmh0LMsiH9cJ7R2iRigCzfd2/SbJC3cvX6CtzyNqSkZboVO\n" + + "fswkotF4ZaJgOiBile4A/zWWqeA07QVd8tusdxaoOJv0E/pjcLi5peGXtQA6SSj4\n" + + "tp20K/tlrRS1Zc0dKgxU7YohxNBwW4QF0uOVR/QBmfzEpMdxKlwcEnHubPAemgt1\n" + + "bp9g9Buwo7oWMvDJuS40xMPOlDhshrzNM8CoWIihgndMPG/LsQIDAQABo1MwUTAd\n" + + "BgNVHQ4EFgQUHBKhmdKPD+b1xDjzzkQVaVETSfUwHwYDVR0jBBgwFoAUHBKhmdKP\n" + + "D+b1xDjzzkQVaVETSfUwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC\n" + + "AQEAnueVOIAk/XLQx3msDY58Reo+D1f/AUy/WTPzxeXCxXLScrjFCLXjrIDzgslN\n" + + "WnP7E5xNJxdrWgskS36IJxVg0+cUfy5kQYYfmWo1vOYdW/AMNBdQwmK5ve3r3Z/3\n" + + "dE2cV4uvL6n0iZZMxnsL5KXwLeSQeTtJepvWi27Z0t8P23lJHJKfl/Ek49ILIDgB\n" + + "zZIMKPgm6w7/U3jUWMUyQ+iI/XiEPrnn4url1FNViC8ucoIm8EU4ZE01j1mbZO8M\n" + + "JSa6InQEIx/1P675qYtuKWF75Tq/qU7+uX7/07AiTyYSrHMT+024TfbRCi1PF/Ka\n" + + "cx+pSJLima+3GHhK2Rj437yx1Q==\n" + + "-----END CERTIFICATE-----\n"; + + private String preloadPem(String fileId, byte[] pemBytes) throws IOException { + String md5 = DigestUtils.md5Hex(pemBytes); + File cachedFile = tempDir.resolve(fileId + "." + md5).toFile(); + Files.write(cachedFile.toPath(), pemBytes); + return "FILE:" + fileId + ":" + md5; + } + + private KeyStore loadPkcs12(String p12Path) throws Exception { + KeyStore keyStore = KeyStore.getInstance("PKCS12"); + try (InputStream in = Files.newInputStream(Paths.get(p12Path))) { + keyStore.load(in, SmallFileMgr.TRUSTSTORE_PASSWORD.toCharArray()); + } + return keyStore; + } + + @Test + void testPkcs12SingleCertConversion() throws Exception { + String filePath = preloadPem("40001", CA_PEM.getBytes()); + String p12Path = SmallFileMgr.getPkcs12TruststorePath( + "host:8030", filePath, "token", tempDir.toString()); + + assertTrue(p12Path.endsWith(".p12")); + assertTrue(new File(p12Path).exists()); + + KeyStore keyStore = loadPkcs12(p12Path); + assertEquals(1, keyStore.size()); + assertTrue(keyStore.containsAlias("ca0")); + } + + /** + * PEM with a chain (intermediate + root) must produce one keystore entry per certificate. + * Using the same cert twice here is sufficient to prove alias uniqueness - without distinct + * aliases the second entry would silently overwrite the first. + */ + @Test + void testPkcs12MultipleCertsPreserveAllEntries() throws Exception { + String chainPem = CA_PEM + CA_PEM; + String filePath = preloadPem("40002", chainPem.getBytes()); + String p12Path = SmallFileMgr.getPkcs12TruststorePath( + "host:8030", filePath, "token", tempDir.toString()); + + KeyStore keyStore = loadPkcs12(p12Path); + assertEquals(2, keyStore.size(), "chain with 2 certs must produce 2 entries"); + assertTrue(keyStore.containsAlias("ca0")); + assertTrue(keyStore.containsAlias("ca1")); + } + + @Test + void testPkcs12SecondCallUsesMemoryCacheWhenFilePresent() throws Exception { + String filePath = preloadPem("40003", CA_PEM.getBytes()); + String first = SmallFileMgr.getPkcs12TruststorePath( + "host:8030", filePath, "token", tempDir.toString()); + + long firstMtime = new File(first).lastModified(); + String second = SmallFileMgr.getPkcs12TruststorePath( + "host:8030", filePath, "token", tempDir.toString()); + assertEquals(first, second); + assertEquals(firstMtime, new File(second).lastModified(), + "second call should hit memory cache and not regenerate .p12"); + } + + @Test + void testPkcs12RegeneratesWhenCachedFileMissing() throws Exception { + String filePath = preloadPem("40005", CA_PEM.getBytes()); + String first = SmallFileMgr.getPkcs12TruststorePath( + "host:8030", filePath, "token", tempDir.toString()); + + // Simulate external deletion after the cache entry was stored. + assertTrue(new File(first).delete()); + + String second = SmallFileMgr.getPkcs12TruststorePath( + "host:8030", filePath, "token", tempDir.toString()); + assertEquals(first, second); + assertTrue(new File(second).exists(), + "cached path whose file disappeared should be regenerated on next call"); + } + + @Test + void testPkcs12InvalidPemThrows() throws Exception { + byte[] invalid = ("-----BEGIN CERTIFICATE-----\n" + + "this-is-not-valid-base64!!!\n" + + "-----END CERTIFICATE-----\n").getBytes(); + String filePath = preloadPem("40004", invalid); + + assertThrows(RuntimeException.class, + () -> SmallFileMgr.getPkcs12TruststorePath( + "host:8030", filePath, "token", tempDir.toString())); + } } diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.out new file mode 100644 index 00000000000000..c7ca957930971d --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_snapshot -- +A1 1 +B1 2 + +-- !select_incremental -- +B1 20 +C1 3 + diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.out new file mode 100644 index 00000000000000..93141f923585ce --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_snapshot_table1 -- +A1 1 +B1 2 + +-- !select_binlog_table1 -- +B1 10 +Doris 18 + diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.out new file mode 100644 index 00000000000000..8d922a718f1c51 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_snapshot -- +1 Alice +2 Bob + +-- !select_incremental -- +2 Bob_v2 +3 Carol + +-- !select_merge_snapshot -- +100 Src1_A +200 Src2_A + +-- !select_merge_incremental -- +100 Src1_A +101 Src1_B +200 Src2_A +201 Src2_B + diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.groovy new file mode 100644 index 00000000000000..f89776cd2317ae --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.groovy @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_mysql_job_col_filter", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_col_filter" + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_mysql_col_filter" + def mysqlDb = "test_cdc_db_col_filter" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """DROP JOB IF EXISTS where jobname = '${jobName}_err1'""" + sql """DROP JOB IF EXISTS where jobname = '${jobName}_err2'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // Create MySQL table with an extra "secret" column to be excluded + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `name` varchar(200) NOT NULL, + `age` int, + `secret` varchar(200), + PRIMARY KEY (`name`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES ('A1', 1, 'secret_A1')""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES ('B1', 2, 'secret_B1')""" + } + + // ── Validation: exclude a non-existent column should fail ────────────── + try { + sql """CREATE JOB ${jobName}_err1 + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "initial", + "table.${table1}.exclude_columns" = "nonexistent_col" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + )""" + assert false : "Should have thrown exception for non-existent excluded column" + } catch (Exception e) { + log.info("Expected error for non-existent column: " + e.message) + assert e.message.contains("does not exist") : "Unexpected error message: " + e.message + } + + // ── Validation: exclude a PK column should fail ──────────────────────── + try { + sql """CREATE JOB ${jobName}_err2 + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "initial", + "table.${table1}.exclude_columns" = "name" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + )""" + assert false : "Should have thrown exception for excluding PK column" + } catch (Exception e) { + log.info("Expected error for PK column: " + e.message) + assert e.message.contains("primary key") : "Unexpected error message: " + e.message + } + + // ── Main job: exclude "secret" column ────────────────────────────────── + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "initial", + "table.${table1}.exclude_columns" = "secret" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + )""" + + // Verify Doris table was created WITHOUT the excluded column + def colNames = (sql """desc ${currentDb}.${table1}""").collect { it[0] } + assert !colNames.contains("secret") : "Excluded column 'secret' must not appear in Doris table" + assert colNames.contains("name") + assert colNames.contains("age") + + // Wait for snapshot to complete + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(1, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'""" + cnt.size() == 1 && cnt.get(0).get(0).toLong() >= 2 + }) + } catch (Exception ex) { + def showJob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showTask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showJob) + log.info("show task: " + showTask) + throw ex + } + + // Snapshot: only name and age, secret absent + qt_select_snapshot """ SELECT * FROM ${table1} ORDER BY name ASC """ + + // ── Incremental DML: secret values must not appear in Doris ─────────── + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${table1} VALUES ('C1', 3, 'secret_C1')""" + sql """UPDATE ${mysqlDb}.${table1} SET age = 20, secret = 'updated_secret' WHERE name = 'B1'""" + sql """DELETE FROM ${mysqlDb}.${table1} WHERE name = 'A1'""" + } + // Wait until C1 appears and A1 is gone + try { + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def names = (sql """ SELECT name FROM ${table1} ORDER BY name ASC """).collect { it[0] } + names.contains('C1') && !names.contains('A1') + }) + } catch (Exception ex) { + def showJob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showTask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showJob) + log.info("show task: " + showTask) + throw ex + } + + qt_select_incremental """ SELECT * FROM ${table1} ORDER BY name ASC """ + + // Doris table still has no secret column after DML events on excluded column + def colNamesAfterDml = (sql """desc ${currentDb}.${table1}""").collect { it[0] } + assert !colNamesAfterDml.contains("secret") : "secret column must not appear in Doris after DML on excluded column" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name = '${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.groovy new file mode 100644 index 00000000000000..51359cb598349c --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.groovy @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_mysql_job_ssl", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_name_ssl" + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_mysql_normal1_ssl" + def mysqlDb = "test_cdc_db_ssl" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // create test + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `name` varchar(200) NOT NULL, + `age` int DEFAULT NULL, + PRIMARY KEY (`name`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1', 1);""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1', 2);""" + } + + try { + sql """DROP FILE "mysql_ca.pem" FROM ${currentDb} PROPERTIES ("catalog" = "streaming_job")""" + } catch (Exception ignored) { + // ignore + } + + sql """CREATE FILE "mysql_ca.pem" + IN ${currentDb} + PROPERTIES + ( + "url" = "https://qa-build.oss-cn-beijing.aliyuncs.com/jianxu/root.crt", + "catalog" = "streaming_job" + ) + """ + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "initial", + "ssl_mode" = "verify-ca", + "ssl_rootcert" = "FILE:mysql_ca.pem" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + def showAllTables = sql """ show tables from ${currentDb}""" + log.info("showAllTables: " + showAllTables) + // check table created + def showTables = sql """ show tables from ${currentDb} like '${table1}'; """ + assert showTables.size() == 1 + + // check job running + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSuccendCount: " + jobSuccendCount) + // check job status and succeed task count larger than 2 + jobSuccendCount.size() == 1 && jobSuccendCount.get(0).get(0).toString().toLong() >= 2L + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + // check snapshot data + qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name asc """ + + // mock incremental into + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${table1} (name,age) VALUES ('Doris',18);""" + sql """UPDATE ${mysqlDb}.${table1} SET age = 10 WHERE name = 'B1';""" + sql """DELETE FROM ${mysqlDb}.${table1} WHERE name = 'A1';""" + } + + // wait for cdc incremental data + try { + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def names = (sql """ SELECT name FROM ${table1} ORDER BY name ASC """).collect { it[0] } + names.contains('Doris') && !names.contains('A1') + }) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + // check incremental data + qt_select_binlog_table1 """ SELECT * FROM ${table1} order by name asc """ + + def jobInfo = sql """ + select status from jobs("type"="insert") where Name='${jobName}' + """ + log.info("jobInfo: " + jobInfo) + assert jobInfo.get(0).get(0) == "RUNNING" + + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ + + try { + sql """DROP FILE "mysql_ca.pem" FROM ${currentDb} PROPERTIES ("catalog" = "streaming_job")""" + } catch (Exception ignored) { + // ignore + } + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.groovy new file mode 100644 index 00000000000000..305290391e62d2 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.groovy @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_mysql_job_table_mapping", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_table_mapping" + def jobNameMerge = "test_streaming_mysql_table_mapping_merge" + def currentDb = (sql "select database()")[0][0] + def mysqlSrcTable = "mysql_src_table" // upstream MySQL table name + def dorisDstTable = "doris_dst_table_mysql" // downstream Doris table name (mapped) + def mysqlSrcTable2 = "mysql_src_table2" // second upstream table (multi-table merge) + def dorisMergeTable = "doris_merge_table_mysql" + def mysqlDb = "test_cdc_db_table_mapping" + + // Cleanup + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'""" + sql """drop table if exists ${currentDb}.${dorisDstTable} force""" + sql """drop table if exists ${currentDb}.${dorisMergeTable} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // ── Case 1: basic table name mapping ───────────────────────────────── + // MySQL table: mysql_src_table → Doris table: doris_dst_table_mysql + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlSrcTable}""" + sql """CREATE TABLE ${mysqlDb}.${mysqlSrcTable} ( + `id` int NOT NULL, + `name` varchar(200), + PRIMARY KEY (`id`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable} VALUES (1, 'Alice')""" + sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable} VALUES (2, 'Bob')""" + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${mysqlSrcTable}", + "offset" = "initial", + "table.${mysqlSrcTable}.target_table" = "${dorisDstTable}" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + )""" + + // Verify the Doris table was created with the mapped name, not the source name + def tables = (sql """show tables from ${currentDb}""").collect { it[0] } + assert tables.contains(dorisDstTable) : "Doris target table '${dorisDstTable}' should exist" + assert !tables.contains(mysqlSrcTable) : "Source table name '${mysqlSrcTable}' must NOT exist in Doris" + + // Wait for snapshot + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(1, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'""" + cnt.size() == 1 && cnt.get(0).get(0).toLong() >= 2 + }) + } catch (Exception ex) { + log.info("show job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("show task: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'""")) + throw ex + } + + qt_select_snapshot """ SELECT * FROM ${dorisDstTable} ORDER BY id ASC """ + + // Incremental: INSERT / UPDATE / DELETE must all land in doris_dst_table_mysql + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable} VALUES (3, 'Carol')""" + sql """UPDATE ${mysqlDb}.${mysqlSrcTable} SET name = 'Bob_v2' WHERE id = 2""" + sql """DELETE FROM ${mysqlDb}.${mysqlSrcTable} WHERE id = 1""" + } + try { + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def ids = (sql """ SELECT id FROM ${dorisDstTable} ORDER BY id ASC """).collect { it[0].toInteger() } + ids.contains(3) && !ids.contains(1) + }) + } catch (Exception ex) { + log.info("show job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("show task: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'""")) + throw ex + } + + qt_select_incremental """ SELECT * FROM ${dorisDstTable} ORDER BY id ASC """ + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + // ── Case 2: multi-table merge (two MySQL tables → one Doris table) ── + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlSrcTable}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlSrcTable2}""" + sql """CREATE TABLE ${mysqlDb}.${mysqlSrcTable} ( + `id` int NOT NULL, + `name` varchar(200), + PRIMARY KEY (`id`) + ) ENGINE=InnoDB""" + sql """CREATE TABLE ${mysqlDb}.${mysqlSrcTable2} ( + `id` int NOT NULL, + `name` varchar(200), + PRIMARY KEY (`id`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable} VALUES (100, 'Src1_A')""" + sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable2} VALUES (200, 'Src2_A')""" + } + + sql """CREATE JOB ${jobNameMerge} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${mysqlSrcTable},${mysqlSrcTable2}", + "offset" = "initial", + "table.${mysqlSrcTable}.target_table" = "${dorisMergeTable}", + "table.${mysqlSrcTable2}.target_table" = "${dorisMergeTable}" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + )""" + + // Wait for snapshot rows from both source tables + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def ids = (sql """ SELECT id FROM ${dorisMergeTable} """).collect { it[0].toInteger() } + ids.contains(100) && ids.contains(200) + }) + } catch (Exception ex) { + log.info("show job: " + (sql """select * from jobs("type"="insert") where Name='${jobNameMerge}'""")) + log.info("show task: " + (sql """select * from tasks("type"="insert") where JobName='${jobNameMerge}'""")) + throw ex + } + + qt_select_merge_snapshot """ SELECT * FROM ${dorisMergeTable} ORDER BY id ASC """ + + // Incremental from both source tables + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable} VALUES (101, 'Src1_B')""" + sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable2} VALUES (201, 'Src2_B')""" + } + try { + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def ids = (sql """ SELECT id FROM ${dorisMergeTable} """).collect { it[0].toInteger() } + ids.contains(101) && ids.contains(201) + }) + } catch (Exception ex) { + log.info("show job: " + (sql """select * from jobs("type"="insert") where Name='${jobNameMerge}'""")) + log.info("show task: " + (sql """select * from tasks("type"="insert") where JobName='${jobNameMerge}'""")) + throw ex + } + + qt_select_merge_incremental """ SELECT * FROM ${dorisMergeTable} ORDER BY id ASC """ + + sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'""" + def mergeJobCnt = sql """select count(1) from jobs("type"="insert") where Name = '${jobNameMerge}'""" + assert mergeJobCnt.get(0).get(0) == 0 + } +}