Skip to content

Commit

Permalink
[Enhancement] Synchronize ddl schema change to Starrocks by parsing j…
Browse files Browse the repository at this point in the history
…son (#341)

Signed-off-by: chenhaifengkeda <chenhaifeng_yewu@cmss.chinamobile.com>
  • Loading branch information
chenhaifengkeda committed Mar 21, 2024
1 parent b3edcf0 commit 08de67a
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 26 deletions.
5 changes: 3 additions & 2 deletions docs/content/Realtime synchronization from MySQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ Flink has been deployed. If Flink has not been deployed, follow these steps to d
--sink-conf password= Password1 \
--sink-conf jdbc-url=jdbc:mysql://ip:9030 \
--sink-conf sink.label-prefix=superman \
--table-conf replication_num=1
--table-conf replication_num=1 \
--table-conf fast_schema_evolution=true
```

## Options
Expand All @@ -103,4 +104,4 @@ Flink has been deployed. If Flink has not been deployed, follow these steps to d
| --sink-conf password | Yes | NONE | The password of the StarRocks |
| --sink-conf sink.label-prefix | Yes | No | stream load label |
| --table-conf replication_num | Yes | 3 | table property |

| --table-conf fast_schema_evolution| No | FALSE | Versions later than 3.2, support add/drop column
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

public abstract class DatabaseSync {
private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class);
private static final String FAST_SCHEMA_EVOLUTION = "fast_schema_evolution";
protected Configuration config;
protected String database;
protected TableNameConverter converter;
Expand All @@ -60,6 +61,7 @@ public abstract class DatabaseSync {
protected Configuration sinkConfig;
public StreamExecutionEnvironment env;
private Map<String, String> tableMapping = new HashMap<>();
private Boolean isFastSchemaEvolution;

public abstract Connection getConnection() throws SQLException;

Expand All @@ -79,6 +81,7 @@ public void create(StreamExecutionEnvironment env, String database, Configuratio
this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables);
this.sinkConfig = sinkConfig;
this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;
this.isFastSchemaEvolution = checkFastSchemaEvolution();
}

public void build() throws Exception {
Expand Down Expand Up @@ -128,6 +131,11 @@ public void build() throws Exception {

}

private boolean checkFastSchemaEvolution() {
String tableProperty = tableConfig.get(FAST_SCHEMA_EVOLUTION);
return tableProperty != null && tableProperty.equalsIgnoreCase("true");
}

private DebeziumJsonSerializer getSerializer(String table) {
String user = sinkConfig.getString(StarRocksSinkOptions.USERNAME);
String passwd = sinkConfig.getString(StarRocksSinkOptions.PASSWORD, "");
Expand All @@ -137,7 +145,8 @@ private DebeziumJsonSerializer getSerializer(String table) {
starRocksBuilder.setTableIdentifier(database + "." + table)
.setUsername(user)
.setPassword(passwd)
.setJdbcUrl(jdbcUrl);
.setJdbcUrl(jdbcUrl)
.setFastSchemaEvolution(isFastSchemaEvolution);

return DebeziumJsonSerializer.builder().setStarRocksOptions(starRocksBuilder.build()).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,26 @@
public class StarRocksOptions implements Serializable {
private StarRocksJdbcConnectionOptions opts;
private String tableIdentifier;
private Boolean isFastSchemaEvolution;

public StarRocksOptions(String username, String password, String tableIdentifier, String jdbcUrl) {
this.opts = new StarRocksJdbcConnectionOptions(username, password, jdbcUrl);
public StarRocksOptions(String username, String password, String tableIdentifier, String jdbcUrl, Boolean isFastSchemaEvolution) {
this.opts = new StarRocksJdbcConnectionOptions(jdbcUrl, username, password);
this.tableIdentifier = tableIdentifier;
this.isFastSchemaEvolution = isFastSchemaEvolution;
}

public String getTableIdentifier() {
return tableIdentifier;
}

public Boolean getFastSchemaEvolution() {
return isFastSchemaEvolution;
}

public StarRocksJdbcConnectionOptions getOpts() {
return opts;
}

public String save() throws IllegalArgumentException {
Properties copy = new Properties();
return IOUtils.propsToString(copy);
Expand All @@ -54,6 +64,7 @@ public static class Builder {
private String username;
private String password;
private String tableIdentifier;
private Boolean isFastSchemaEvolution;

public Builder setTableIdentifier(String tableIdentifier) {
this.tableIdentifier = tableIdentifier;
Expand All @@ -75,9 +86,13 @@ public Builder setJdbcUrl(String jdbcUrl) {
return this;
}

public void setFastSchemaEvolution(Boolean fastSchemaEvolution) {
isFastSchemaEvolution = fastSchemaEvolution;
}

public StarRocksOptions build() {
checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
return new StarRocksOptions(username, password, tableIdentifier, jdbcUrl);
return new StarRocksOptions(username, password, tableIdentifier, jdbcUrl, isFastSchemaEvolution);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.starrocks.connector.flink.catalog.StarRocksCatalog;
import com.starrocks.connector.flink.catalog.StarRocksColumn;
import com.starrocks.connector.flink.cdc.StarRocksOptions;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.StringUtils;
Expand All @@ -33,9 +35,12 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -59,6 +64,8 @@ public class DebeziumJsonSerializer implements Serializable {
private String table;
//table name of the cdc upstream, format is db.tbl
private String sourceTableName;
private StarRocksCatalog starRocksCatalog;
private Boolean isFastSchemaEvolution;

public DebeziumJsonSerializer(StarRocksOptions starRocksOptions, Pattern pattern, String sourceTableName) {
this.starRocksOptions = starRocksOptions;
Expand All @@ -71,6 +78,10 @@ public DebeziumJsonSerializer(StarRocksOptions starRocksOptions, Pattern pattern
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper.setNodeFactory(jsonNodeFactory);
this.starRocksCatalog = new StarRocksCatalog(starRocksOptions.getOpts().getDbURL(),
starRocksOptions.getOpts().getUsername().get(), starRocksOptions.getOpts().getPassword().get());
this.isFastSchemaEvolution = starRocksOptions.getFastSchemaEvolution();
this.starRocksCatalog.open();
}

public String process(String record) throws IOException {
Expand All @@ -79,8 +90,9 @@ public String process(String record) throws IOException {
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
// schema change ddl
// starrocks 存算分离版本目前不支持schemaChange, 先注释掉
// schemaChange(recordRoot);
if (isFastSchemaEvolution) {
schemaChange(recordRoot);
}
return INVALID_RESULT;
}
Map<String, String> valueMap;
Expand All @@ -107,22 +119,17 @@ public String process(String record) throws IOException {

@VisibleForTesting
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;

try{
if(!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)){
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
String ddl = extractDDL(recordRoot);
if(StringUtils.isNullOrWhitespaceOnly(ddl)){
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}
// TODO Exec schema change
LOG.info("schema change status:{}", status);

extractDDLAndExecute(recordRoot);
}catch (Exception ex){
LOG.warn("schema change error :", ex);
}
return status;
return true;
}

/**
Expand Down Expand Up @@ -174,27 +181,46 @@ private Map<String, String> extractRow(JsonNode recordRow) {
return recordMap != null ? recordMap : new HashMap<>();
}

public String extractDDL(JsonNode record) throws JsonProcessingException {
private void extractDDLAndExecute(JsonNode record) throws JsonProcessingException {
String historyRecord = extractJsonNode(record, "historyRecord");
if (Objects.isNull(historyRecord)) {
return null;
return;
}
String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl");
LOG.debug("received debezium ddl :{}", ddl);
if (!Objects.isNull(ddl)) {
//filter add/drop operation
Matcher matcher = addDropDDLPattern.matcher(ddl);
if(matcher.find()){
if (matcher.find()) {
String op = matcher.group(1);
String col = matcher.group(3);

if (op.equalsIgnoreCase("drop")) {
execDropDDL(col);
return;
}

String type = matcher.group(5);
type = handleType(type);
ddl = String.format(EXECUTE_DDL, starRocksOptions.getTableIdentifier(), op, col, type);
LOG.info("parse ddl:{}", ddl);
return ddl;
execAddDDL(col, type);
}
}
return null;
}

private void execAddDDL(String col, String type) {
List<StarRocksColumn> toAddColumns = new ArrayList<>();
StarRocksColumn.Builder builder = new StarRocksColumn.Builder()
.setColumnName(col)
.setDataType(type);

toAddColumns.add(builder.build());

starRocksCatalog.alterAddColumns(database, table, toAddColumns, 30);
}

private void execDropDDL(String col) {
List<String> cols = Arrays.asList(col);
starRocksCatalog.alterDropColumns(database, table, cols, 30);
}

public static DebeziumJsonSerializer.Builder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public static void main(String[] args) throws Exception{

Map<String,String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
tableConfig.put("fast_schema_evolution", "true");

String includingTables = "tbl1|tbl2|tbl3";
String excludingTables = "";
Expand Down

0 comments on commit 08de67a

Please sign in to comment.