Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ header:
- "docker/thirdparties/docker-compose/kerberos/sql/**"
- "docker/thirdparties/docker-compose/iceberg/spark-defaults.conf.tpl"
- "docker/thirdparties/docker-compose/postgresql/certs/**"
- "docker/thirdparties/docker-compose/mysql/certs/**"
- "conf/mysql_ssl_default_certificate/*"
- "conf/mysql_ssl_default_certificate/client_certificate/ca.pem"
- "conf/mysql_ssl_default_certificate/client_certificate/client-cert.pem"
Expand Down
19 changes: 19 additions & 0 deletions docker/thirdparties/docker-compose/mysql/certs/root.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDDzCCAfegAwIBAgIULswy9ovSHXeKSxoEen2Y3xEZqBgwDQYJKoZIhvcNAQEL
BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMxM1oXDTM2
MDIyOTA4MjMxM1owFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMIIBIjANBgkqhkiG
9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsVFJhj3Y7zamNZiq9SefnnKAKaOXXUbXo/Fq
V6VNzMSkZuwDfRo/RKjvVaUru/JSd7QoV5zGyUYb+oHx/R233R1M0sd23+eR1mRQ
w771DmXthbdpIPBEwlmh0LMsiH9cJ7R2iRigCzfd2/SbJC3cvX6CtzyNqSkZboVO
fswkotF4ZaJgOiBile4A/zWWqeA07QVd8tusdxaoOJv0E/pjcLi5peGXtQA6SSj4
tp20K/tlrRS1Zc0dKgxU7YohxNBwW4QF0uOVR/QBmfzEpMdxKlwcEnHubPAemgt1
bp9g9Buwo7oWMvDJuS40xMPOlDhshrzNM8CoWIihgndMPG/LsQIDAQABo1MwUTAd
BgNVHQ4EFgQUHBKhmdKPD+b1xDjzzkQVaVETSfUwHwYDVR0jBBgwFoAUHBKhmdKP
D+b1xDjzzkQVaVETSfUwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC
AQEAnueVOIAk/XLQx3msDY58Reo+D1f/AUy/WTPzxeXCxXLScrjFCLXjrIDzgslN
WnP7E5xNJxdrWgskS36IJxVg0+cUfy5kQYYfmWo1vOYdW/AMNBdQwmK5ve3r3Z/3
dE2cV4uvL6n0iZZMxnsL5KXwLeSQeTtJepvWi27Z0t8P23lJHJKfl/Ek49ILIDgB
zZIMKPgm6w7/U3jUWMUyQ+iI/XiEPrnn4url1FNViC8ucoIm8EU4ZE01j1mbZO8M
JSa6InQEIx/1P675qYtuKWF75Tq/qU7+uX7/07AiTyYSrHMT+024TfbRCi1PF/Ka
cx+pSJLima+3GHhK2Rj437yx1Q==
-----END CERTIFICATE-----
18 changes: 18 additions & 0 deletions docker/thirdparties/docker-compose/mysql/certs/server.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-----BEGIN CERTIFICATE-----
MIIC+zCCAeOgAwIBAgIUG/9rYO8McYBH83YOe4nzMcb4YCAwDQYJKoZIhvcNAQEL
BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMyNloXDTM2
MDIyOTA4MjMyNlowFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0B
AQEFAAOCAQ8AMIIBCgKCAQEAtHYYwevcMqMPbCAaQlrX7qJtRXf/j+WfGFbM4/PZ
Y6cjSsrqUgwHduMyE4yce9vWWygLJM/S9aBI3jsvhAdLVaFIXhOU4jMuyk+1RJvu
k+iUJ3wabo2Zv6605wUU7wS0FCfJJMxG/zz5FYtX8kMw7sKJWLhB4C+oQlO+mSj4
CKjg7mNZjgKz024/BW7FKhAaYYGI9GNmjIgvjSDXGOXzd2nM9XLoVNIkR8mgD69l
yHHzhGUAdXDxaTr+026Z2uBrnip7ZjDIB65J/qrxSc8eK1ZhZzYdHBpLnP67zuWR
iyKDNETpRa1SoWCk9/9+AGwygRcXC7h1GpMb46wce4/TtwIDAQABo0IwQDAdBgNV
HQ4EFgQUEeFQVqK+A/H6R2iSiNW57cSilGcwHwYDVR0jBBgwFoAUHBKhmdKPD+b1
xDjzzkQVaVETSfUwDQYJKoZIhvcNAQELBQADggEBAKpxfqPTPXL2+n/OW6F8cvwK
aod3BOquIjIKm17+Uob0rcOnxssYNQa0g9pW2zgIlAS+QUZ1K46ygJWrLNKdpIzt
mG2Hn6kUX9J7Xo+F5IldlX2bImi3b2/oI8IliLzawsofondCzL2BIfWLhE3LaISF
iN8pfzjoHCZXfLm3oUzxaeltFqEP+cApig/hAO17FkMHY6sl9QII94MV2d9gVwVl
pAi1ALOzOQKbsTCdRspoadPqmZ7AgbtS3RiVMmCZHwrtCvdIcaBuiPy5KiBFPCEX
Cdia+GWqETKBNpornHeMQ7d/J2ilbFRs+mRAUtyeWK0ilcdOxbmMOzCsjIV8kgI=
-----END CERTIFICATE-----
28 changes: 28 additions & 0 deletions docker/thirdparties/docker-compose/mysql/certs/server.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0dhjB69wyow9s
IBpCWtfuom1Fd/+P5Z8YVszj89ljpyNKyupSDAd24zITjJx729ZbKAskz9L1oEje
Oy+EB0tVoUheE5TiMy7KT7VEm+6T6JQnfBpujZm/rrTnBRTvBLQUJ8kkzEb/PPkV
i1fyQzDuwolYuEHgL6hCU76ZKPgIqODuY1mOArPTbj8FbsUqEBphgYj0Y2aMiC+N
INcY5fN3acz1cuhU0iRHyaAPr2XIcfOEZQB1cPFpOv7Tbpna4GueKntmMMgHrkn+
qvFJzx4rVmFnNh0cGkuc/rvO5ZGLIoM0ROlFrVKhYKT3/34AbDKBFxcLuHUakxvj
rBx7j9O3AgMBAAECggEABZ+8uxdWnQYl+4xlV5E0gmTx3dh8Qd351UfFsW0demDr
lU1SI3I4I/Lelv8lyrLXZzjcwPfmezfec6RnF37p7ijSPgrIG2PLplCqJsy6BzK1
ycH/yaYm6sIFSBqdF+ZO5QOaGOWZpA9lgsYHNVt/jdvJCq/50ZhJZO2fvfi9dr4I
vLjcCX57t+V9n68zHCdw8pTw3eSvO34wv8FXXQyofYi6+swoV/NhGFS1xMlc2USO
KQ0Do/Y8Dxr/5HawoiMTzO/o4M0Bdmb237fW4D0yVqaevjVWKe/wq2q3VZyBatB2
XDMkL1ZaWiRsRZHoliiIh3K3gQ2jmtsMXjzv+IKdvQKBgQDgPsk7y5Ms5rjArL8g
qCP2o8a/IvxzCwxcvK59nfmWFuFeJsxE3uvp89UriqC6yGD5yxAmjDKvHOFtV+CE
KjCnMgt/jU6BpkaHzTRR8Gtt/RkILZTZiKoNdEgOTeBjHKCoOUoM7Dc78nW7Dp0F
QoLdAe0g0pSRy5iFcWBiX7UP5QKBgQDOBBRfnaU6fICVH0SmqBoKVSCDm+saYMAW
99mypm2xViP4VQOa1QjNRiEN9kllxD4I+S48kALSCpif+A/IE89bNgFNEOvTYbkW
+mvjoFLQtN79Tc8/G0CEi+WhRWWpY9WnMuzj1r/pAbC8uOEKvJ+tYfKmHZN5kvoC
k0e2yMCDawKBgFi6Hw9sxkgO5m0+LMW0Ib62IK6CHlc6uOJ8uaH0fsvXM8b4HPzn
I3tHQkJfMKeXH1/W7AYElQ1apQuJqMlClEujbo9CjxyXePLEy/3b3fYAHgZxWqMU
Aw0dxGD8iVtN+Xd2a4lfcZ9jmRexeYmaPoNJ/tRs3eIuJ6QtLxDdg5vNAoGBAIqU
C/BVZrN01Dl7Ev7XzMxufrSIyRixRAUvK20Urmy/eOqupQIdkxIhvlJZ/P1LiD8Y
/pUWeg83uXrBrjvzt2OvbCie3UMPVSWzxacUTSC+ydCx6lqUxk1inVBiEgRjd3BE
vTx1VBo0XOJVqmtCflZusH41HuKEj0/0KiU13OmJAoGAYkxy/U6uHHn6xB3KriID
bZgfYRlLv1bD4AYiOcjFke3/4MZJ2U4t/x6uzEjQZd/0waSeE3YY/MfEXufdHM99
ZUlAHwLhjLcY58HgkyMkw4sRaHYxTQdOuxcnmzX1+sHKxKXlYoboLgh8Qf9A4DcR
HZde9n1uVLVtlBRTjjL5O84=
-----END PRIVATE KEY-----
19 changes: 17 additions & 2 deletions docker/thirdparties/docker-compose/mysql/mysql-5.7.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ version: "2.1"
services:
doris--mysql_57:
image: mysql:5.7.36
command: --default-authentication-plugin=mysql_native_password
restart: always
environment:
MYSQL_ROOT_PASSWORD: 123456
Expand All @@ -29,15 +28,31 @@ services:
LANG: C.UTF-8
ports:
- ${DOCKER_MYSQL_57_EXTERNAL_PORT}:3306
entrypoint:
- bash
- -c
- |
chown mysql:mysql /etc/mysql/certs/*
chmod 600 /etc/mysql/certs/server.key
chmod 644 /etc/mysql/certs/server.crt /etc/mysql/certs/root.crt
exec docker-entrypoint.sh "$@"
- --
command:
- "mysqld"
- "--default-authentication-plugin=mysql_native_password"
- "--ssl-ca=/etc/mysql/certs/root.crt"
- "--ssl-cert=/etc/mysql/certs/server.crt"
- "--ssl-key=/etc/mysql/certs/server.key"
healthcheck:
test: mysqladmin ping -h 127.0.0.1 -u root --password=$$MYSQL_ROOT_PASSWORD && mysql -h 127.0.0.1 -u root --password=$$MYSQL_ROOT_PASSWORD -e "SELECT 1 FROM doris_test.deadline;"
interval: 5s
timeout: 60s
retries: 120
volumes:
- ./data/:/var/lib/mysql
- ./data/:/var/lib/mysql
- ./init:/docker-entrypoint-initdb.d
- ./my.cnf:/etc/mysql/conf.d/my.cnf
- ./certs:/etc/mysql/certs
networks:
- doris--mysql_57

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class DataSourceConfigKeys {
public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
public static final String SSL_MODE = "ssl_mode";
public static final String SSL_ROOTCERT = "ssl_rootcert";
// PG-style spelling; MySQL normalizes to underscore form.
public static final String SSL_MODE_DISABLE = "disable";
public static final String SSL_MODE_REQUIRE = "require";
public static final String SSL_MODE_VERIFY_CA = "verify-ca";

// PostgreSQL replication slot and publication config
public static final String SLOT_NAME = "slot_name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ public class DataSourceConfigValidator {
DataSourceConfigKeys.PUBLICATION_NAME
);

private static final Set<String> ALLOW_SSL_MODES = Sets.newHashSet(
DataSourceConfigKeys.SSL_MODE_DISABLE,
DataSourceConfigKeys.SSL_MODE_REQUIRE,
DataSourceConfigKeys.SSL_MODE_VERIFY_CA
);

// Known suffixes for per-table config keys (format: "table.<tableName>.<suffix>")
private static final Set<String> ALLOW_TABLE_LEVEL_SUFFIXES = Sets.newHashSet(
DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX,
Expand Down Expand Up @@ -102,6 +108,16 @@ public static void validateSource(Map<String, String> input,
throw new IllegalArgumentException("Invalid value for key '" + key + "': " + value);
}
}

// Cross-field: verify-ca must be paired with a CA cert; otherwise the reader will
// silently fall back to the JVM default truststore and likely fail to connect.
if (DataSourceConfigKeys.SSL_MODE_VERIFY_CA.equals(input.get(DataSourceConfigKeys.SSL_MODE))
&& (input.get(DataSourceConfigKeys.SSL_ROOTCERT) == null
|| input.get(DataSourceConfigKeys.SSL_ROOTCERT).trim().isEmpty())) {
throw new IllegalArgumentException(
"ssl_mode '" + DataSourceConfigKeys.SSL_MODE_VERIFY_CA
+ "' requires ssl_rootcert to be set");
}
}

public static void validateTarget(Map<String, String> input) throws IllegalArgumentException {
Expand Down Expand Up @@ -144,6 +160,9 @@ private static boolean isValidValue(String key, String value, String dataSourceT
return value.length() <= PG_MAX_IDENTIFIER_LENGTH
&& PG_IDENTIFIER_PATTERN.matcher(value).matches();
}
if (key.equals(DataSourceConfigKeys.SSL_MODE) && !ALLOW_SSL_MODES.contains(value)) {
return false;
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,87 @@ public class DataSourceConfigValidatorTest {

private static final int PG_MAX_IDENTIFIER_LENGTH = 63;

private static Map<String, String> sslModeInput(String value) {
Map<String, String> input = new HashMap<>();
input.put(DataSourceConfigKeys.SSL_MODE, value);
return input;
}

@Test
public void testSslModeLegalValues() {
DataSourceConfigValidator.validateSource(
sslModeInput(DataSourceConfigKeys.SSL_MODE_DISABLE), DataSourceType.MYSQL.name());
DataSourceConfigValidator.validateSource(
sslModeInput(DataSourceConfigKeys.SSL_MODE_REQUIRE), DataSourceType.MYSQL.name());
// verify-ca additionally requires ssl_rootcert; covered by testVerifyCaWithRootcertPasses.
}

@Test
public void testSslModeRejectsMysqlUnderscoreSpelling() {
assertReject(sslModeInput("verify_ca"));
}

@Test
public void testSslModeRejectsVerifyFull() {
assertReject(sslModeInput("verify-full"));
}

@Test
public void testSslModeRejectsPreferredAndAllow() {
assertReject(sslModeInput("preferred"));
assertReject(sslModeInput("prefer"));
assertReject(sslModeInput("allow"));
}

@Test
public void testSslModeRejectsUppercaseVariants() {
assertReject(sslModeInput("DISABLE"));
assertReject(sslModeInput("Verify-CA"));
}

@Test
public void testSslModeRejectsEmpty() {
assertReject(sslModeInput(""));
}

@Test
public void testSslModeOptional() {
// ssl_mode is not required; validateSource should pass when absent
Map<String, String> input = new HashMap<>();
input.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
DataSourceConfigValidator.validateSource(input, DataSourceType.MYSQL.name());
}

@Test
public void testVerifyCaRequiresRootcert() {
Map<String, String> input = sslModeInput(DataSourceConfigKeys.SSL_MODE_VERIFY_CA);
assertReject(input);
}

@Test
public void testVerifyCaWithRootcertPasses() {
Map<String, String> input = sslModeInput(DataSourceConfigKeys.SSL_MODE_VERIFY_CA);
input.put(DataSourceConfigKeys.SSL_ROOTCERT, "FILE:ca.pem");
DataSourceConfigValidator.validateSource(input, DataSourceType.MYSQL.name());
}

@Test
public void testDisableWithoutRootcertPasses() {
DataSourceConfigValidator.validateSource(
sslModeInput(DataSourceConfigKeys.SSL_MODE_DISABLE), DataSourceType.MYSQL.name());
DataSourceConfigValidator.validateSource(
sslModeInput(DataSourceConfigKeys.SSL_MODE_REQUIRE), DataSourceType.MYSQL.name());
}

private static void assertReject(Map<String, String> input) {
try {
DataSourceConfigValidator.validateSource(input, DataSourceType.MYSQL.name());
Assert.fail("expected IllegalArgumentException for input: " + input);
} catch (IllegalArgumentException ignored) {
// expected
}
}

@Test
public void testSlotNameAndPublicationNameAllowed() {
Map<String, String> props = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.cdcclient.source.reader.SplitReadResult;
import org.apache.doris.cdcclient.source.reader.SplitRecords;
import org.apache.doris.cdcclient.utils.ConfigUtil;
import org.apache.doris.cdcclient.utils.SmallFileMgr;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.request.CompareOffsetRequest;
import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
Expand Down Expand Up @@ -878,6 +879,28 @@ private MySqlSourceConfig generateMySqlConfig(Map<String, String> cdcConfig, Str
dbzProps.setProperty(
MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS.name(),
DEBEZIUM_HEARTBEAT_INTERVAL_MS + "");

if (cdcConfig.containsKey(DataSourceConfigKeys.SSL_MODE)) {
String normalized =
normalizeSslModeForMysql(cdcConfig.get(DataSourceConfigKeys.SSL_MODE));
dbzProps.put("database.ssl.mode", normalized);
// Flink CDC's forked MySqlConnection drops Debezium SSL props from the snapshot
// JDBC URL, so mirror to Connector/J native names.
jdbcProperteis.put("sslMode", normalized);
}
if (cdcConfig.containsKey(DataSourceConfigKeys.SSL_ROOTCERT)) {
String fileName = cdcConfig.get(DataSourceConfigKeys.SSL_ROOTCERT);
String truststorePath = SmallFileMgr.getPkcs12TruststorePath(fileName);
LOG.info("Using SSL truststore file path: {}", truststorePath);
dbzProps.put("database.ssl.truststore", truststorePath);
dbzProps.put("database.ssl.truststore.password", SmallFileMgr.TRUSTSTORE_PASSWORD);
jdbcProperteis.put("trustCertificateKeyStoreUrl", "file:" + truststorePath);
// Connector/J defaults keystore type to JKS; we generate PKCS12.
jdbcProperteis.put("trustCertificateKeyStoreType", "PKCS12");
jdbcProperteis.put(
"trustCertificateKeyStorePassword", SmallFileMgr.TRUSTSTORE_PASSWORD);
}

configFactory.debeziumProperties(dbzProps);
configFactory.heartbeatInterval(Duration.ofMillis(DEBEZIUM_HEARTBEAT_INTERVAL_MS));

Expand Down Expand Up @@ -1056,6 +1079,31 @@ public DeserializeResult deserialize(Map<String, String> config, SourceRecord el
return serializer.deserialize(config, element);
}

/** Map Doris ssl_mode (PG-style) to Debezium MySQL's underscore spelling. */
static String normalizeSslModeForMysql(String sslMode) {
if (sslMode == null) {
throw new IllegalArgumentException("ssl_mode must not be null");
}
switch (sslMode) {
case DataSourceConfigKeys.SSL_MODE_DISABLE:
return MySqlConnectorConfig.SecureConnectionMode.DISABLED.getValue();
case DataSourceConfigKeys.SSL_MODE_REQUIRE:
return MySqlConnectorConfig.SecureConnectionMode.REQUIRED.getValue();
case DataSourceConfigKeys.SSL_MODE_VERIFY_CA:
return MySqlConnectorConfig.SecureConnectionMode.VERIFY_CA.getValue();
default:
throw new IllegalArgumentException(
"Unsupported ssl_mode for MySQL: '"
+ sslMode
+ "'. Allowed: "
+ String.join(
", ",
DataSourceConfigKeys.SSL_MODE_DISABLE,
DataSourceConfigKeys.SSL_MODE_REQUIRE,
DataSourceConfigKeys.SSL_MODE_VERIFY_CA));
}
}

/**
* Filtered record iterator that only returns data change records, filtering out watermark,
* heartbeat and other events. This is a private static inner class that encapsulates record
Expand Down
Loading
Loading