diff --git a/docs/content/Realtime synchronization from MySQL.md b/docs/content/Realtime synchronization from MySQL.md index abbf142e..04d649c5 100755 --- a/docs/content/Realtime synchronization from MySQL.md +++ b/docs/content/Realtime synchronization from MySQL.md @@ -84,7 +84,8 @@ Flink has been deployed. If Flink has not been deployed, follow these steps to d --sink-conf password= Password1 \ --sink-conf jdbc-url=jdbc:mysql://ip:9030 \ --sink-conf sink.label-prefix=superman \ - --table-conf replication_num=1 + --table-conf replication_num=1 \ + --table-conf fast_schema_evolution=true ``` ## Options @@ -103,4 +104,4 @@ Flink has been deployed. If Flink has not been deployed, follow these steps to d | --sink-conf password | Yes | NONE | The password of the StarRocks | | --sink-conf sink.label-prefix | Yes | No | stream load label | | --table-conf replication_num | Yes | 3 | table property | - +| --table-conf fast_schema_evolution| No | FALSE | Versions later than 3.2, support add/drop column diff --git a/src/main/java/com/starrocks/connector/flink/cdc/DatabaseSync.java b/src/main/java/com/starrocks/connector/flink/cdc/DatabaseSync.java index 73b29978..dc89ff8d 100755 --- a/src/main/java/com/starrocks/connector/flink/cdc/DatabaseSync.java +++ b/src/main/java/com/starrocks/connector/flink/cdc/DatabaseSync.java @@ -51,6 +51,7 @@ public abstract class DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); + private static final String FAST_SCHEMA_EVOLUTION = "fast_schema_evolution"; protected Configuration config; protected String database; protected TableNameConverter converter; @@ -60,6 +61,7 @@ public abstract class DatabaseSync { protected Configuration sinkConfig; public StreamExecutionEnvironment env; private Map tableMapping = new HashMap<>(); + private Boolean isFastSchemaEvolution; public abstract Connection getConnection() throws SQLException; @@ -79,6 +81,7 @@ public void create(StreamExecutionEnvironment env, String database, Configuratio this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); this.sinkConfig = sinkConfig; this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig; + this.isFastSchemaEvolution = checkFastSchemaEvolution(); } public void build() throws Exception { @@ -128,6 +131,11 @@ public void build() throws Exception { } + private boolean checkFastSchemaEvolution() { + String tableProperty = tableConfig.get(FAST_SCHEMA_EVOLUTION); + return tableProperty != null && tableProperty.equalsIgnoreCase("true"); + } + private DebeziumJsonSerializer getSerializer(String table) { String user = sinkConfig.getString(StarRocksSinkOptions.USERNAME); String passwd = sinkConfig.getString(StarRocksSinkOptions.PASSWORD, ""); @@ -137,7 +145,8 @@ private DebeziumJsonSerializer getSerializer(String table) { starRocksBuilder.setTableIdentifier(database + "." + table) .setUsername(user) .setPassword(passwd) - .setJdbcUrl(jdbcUrl); + .setJdbcUrl(jdbcUrl) + .setFastSchemaEvolution(isFastSchemaEvolution); return DebeziumJsonSerializer.builder().setStarRocksOptions(starRocksBuilder.build()).build(); } diff --git a/src/main/java/com/starrocks/connector/flink/cdc/StarRocksOptions.java b/src/main/java/com/starrocks/connector/flink/cdc/StarRocksOptions.java index f2b02e39..5c7c6ead 100755 --- a/src/main/java/com/starrocks/connector/flink/cdc/StarRocksOptions.java +++ b/src/main/java/com/starrocks/connector/flink/cdc/StarRocksOptions.java @@ -30,16 +30,26 @@ public class StarRocksOptions implements Serializable { private StarRocksJdbcConnectionOptions opts; private String tableIdentifier; + private Boolean isFastSchemaEvolution; - public StarRocksOptions(String username, String password, String tableIdentifier, String jdbcUrl) { - this.opts = new StarRocksJdbcConnectionOptions(username, password, jdbcUrl); + public StarRocksOptions(String username, String password, String tableIdentifier, String jdbcUrl, Boolean isFastSchemaEvolution) { + this.opts = new StarRocksJdbcConnectionOptions(jdbcUrl, username, password); this.tableIdentifier = tableIdentifier; + this.isFastSchemaEvolution = isFastSchemaEvolution; } public String getTableIdentifier() { return tableIdentifier; } + public Boolean getFastSchemaEvolution() { + return isFastSchemaEvolution; + } + + public StarRocksJdbcConnectionOptions getOpts() { + return opts; + } + public String save() throws IllegalArgumentException { Properties copy = new Properties(); return IOUtils.propsToString(copy); @@ -54,6 +64,7 @@ public static class Builder { private String username; private String password; private String tableIdentifier; + private Boolean isFastSchemaEvolution; public Builder setTableIdentifier(String tableIdentifier) { this.tableIdentifier = tableIdentifier; @@ -75,9 +86,13 @@ public Builder setJdbcUrl(String jdbcUrl) { return this; } + public void setFastSchemaEvolution(Boolean fastSchemaEvolution) { + isFastSchemaEvolution = fastSchemaEvolution; + } + public StarRocksOptions build() { checkNotNull(tableIdentifier, "No tableIdentifier supplied."); - return new StarRocksOptions(username, password, tableIdentifier, jdbcUrl); + return new StarRocksOptions(username, password, tableIdentifier, jdbcUrl, isFastSchemaEvolution); } } diff --git a/src/main/java/com/starrocks/connector/flink/cdc/json/DebeziumJsonSerializer.java b/src/main/java/com/starrocks/connector/flink/cdc/json/DebeziumJsonSerializer.java index 740cbc65..b7be4e15 100755 --- a/src/main/java/com/starrocks/connector/flink/cdc/json/DebeziumJsonSerializer.java +++ b/src/main/java/com/starrocks/connector/flink/cdc/json/DebeziumJsonSerializer.java @@ -25,6 +25,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.starrocks.connector.flink.catalog.StarRocksCatalog; +import com.starrocks.connector.flink.catalog.StarRocksColumn; import com.starrocks.connector.flink.cdc.StarRocksOptions; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.StringUtils; @@ -33,9 +35,12 @@ import java.io.IOException; import java.io.Serializable; +import java.util.Arrays; +import java.util.Objects; import java.util.HashMap; import java.util.Map; -import java.util.Objects; +import java.util.ArrayList; +import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -59,6 +64,8 @@ public class DebeziumJsonSerializer implements Serializable { private String table; //table name of the cdc upstream, format is db.tbl private String sourceTableName; + private StarRocksCatalog starRocksCatalog; + private Boolean isFastSchemaEvolution; public DebeziumJsonSerializer(StarRocksOptions starRocksOptions, Pattern pattern, String sourceTableName) { this.starRocksOptions = starRocksOptions; @@ -71,6 +78,10 @@ public DebeziumJsonSerializer(StarRocksOptions starRocksOptions, Pattern pattern this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true); this.objectMapper.setNodeFactory(jsonNodeFactory); + this.starRocksCatalog = new StarRocksCatalog(starRocksOptions.getOpts().getDbURL(), + starRocksOptions.getOpts().getUsername().get(), starRocksOptions.getOpts().getPassword().get()); + this.isFastSchemaEvolution = starRocksOptions.getFastSchemaEvolution(); + this.starRocksCatalog.open(); } public String process(String record) throws IOException { @@ -79,8 +90,9 @@ public String process(String record) throws IOException { String op = extractJsonNode(recordRoot, "op"); if (Objects.isNull(op)) { // schema change ddl - // starrocks 存算分离版本目前不支持schemaChange, 先注释掉 - // schemaChange(recordRoot); + if (isFastSchemaEvolution) { + schemaChange(recordRoot); + } return INVALID_RESULT; } Map valueMap; @@ -107,22 +119,17 @@ public String process(String record) throws IOException { @VisibleForTesting public boolean schemaChange(JsonNode recordRoot) { - boolean status = false; + try{ - if(!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)){ + if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) { return false; } - String ddl = extractDDL(recordRoot); - if(StringUtils.isNullOrWhitespaceOnly(ddl)){ - LOG.info("ddl can not do schema change:{}", recordRoot); - return false; - } - // TODO Exec schema change - LOG.info("schema change status:{}", status); + + extractDDLAndExecute(recordRoot); }catch (Exception ex){ LOG.warn("schema change error :", ex); } - return status; + return true; } /** @@ -174,27 +181,46 @@ private Map extractRow(JsonNode recordRow) { return recordMap != null ? recordMap : new HashMap<>(); } - public String extractDDL(JsonNode record) throws JsonProcessingException { + private void extractDDLAndExecute(JsonNode record) throws JsonProcessingException { String historyRecord = extractJsonNode(record, "historyRecord"); if (Objects.isNull(historyRecord)) { - return null; + return; } String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl"); LOG.debug("received debezium ddl :{}", ddl); if (!Objects.isNull(ddl)) { //filter add/drop operation Matcher matcher = addDropDDLPattern.matcher(ddl); - if(matcher.find()){ + if (matcher.find()) { String op = matcher.group(1); String col = matcher.group(3); + + if (op.equalsIgnoreCase("drop")) { + execDropDDL(col); + return; + } + String type = matcher.group(5); type = handleType(type); - ddl = String.format(EXECUTE_DDL, starRocksOptions.getTableIdentifier(), op, col, type); - LOG.info("parse ddl:{}", ddl); - return ddl; + execAddDDL(col, type); } } - return null; + } + + private void execAddDDL(String col, String type) { + List toAddColumns = new ArrayList<>(); + StarRocksColumn.Builder builder = new StarRocksColumn.Builder() + .setColumnName(col) + .setDataType(type); + + toAddColumns.add(builder.build()); + + starRocksCatalog.alterAddColumns(database, table, toAddColumns, 30); + } + + private void execDropDDL(String col) { + List cols = Arrays.asList(col); + starRocksCatalog.alterDropColumns(database, table, cols, 30); } public static DebeziumJsonSerializer.Builder builder() { diff --git a/src/test/java/com/starrocks/connector/flink/cdc/StarRocksCdcToolsTest.java b/src/test/java/com/starrocks/connector/flink/cdc/StarRocksCdcToolsTest.java index f80d7051..46056587 100755 --- a/src/test/java/com/starrocks/connector/flink/cdc/StarRocksCdcToolsTest.java +++ b/src/test/java/com/starrocks/connector/flink/cdc/StarRocksCdcToolsTest.java @@ -61,6 +61,7 @@ public static void main(String[] args) throws Exception{ Map tableConfig = new HashMap<>(); tableConfig.put("replication_num", "1"); + tableConfig.put("fast_schema_evolution", "true"); String includingTables = "tbl1|tbl2|tbl3"; String excludingTables = "";