Skip to content

Commit

Permalink
[Improve](schemaChange)schema change ddl supports multi-column change…
Browse files Browse the repository at this point in the history
…s, synchronous defaults (#167)
  • Loading branch information
DongLiang-0 committed Aug 3, 2023
1 parent 857bae3 commit 14be15b
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 11 deletions.
2 changes: 1 addition & 1 deletion flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ under the License.
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
<version>2.4.1</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
public class FieldSchema {
private String name;
private String typeString;
private String defaultValue;
private String comment;

public FieldSchema() {
Expand All @@ -30,6 +31,13 @@ public FieldSchema(String name, String typeString, String comment) {
this.comment = comment;
}

public FieldSchema(String name, String typeString, String defaultValue, String comment) {
this.name = name;
this.typeString = typeString;
this.defaultValue = defaultValue;
this.comment = comment;
}

public String getName() {
return name;
}
Expand All @@ -53,4 +61,12 @@ public String getComment() {
public void setComment(String comment) {
this.comment = comment;
}

public String getDefaultValue() {
return defaultValue;
}

public void setDefaultValue(String defaultValue) {
this.defaultValue = defaultValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.commons.codec.binary.Base64;

import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.HttpGetWithEntity;
import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;

import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.StringUtils;
import org.apache.http.HttpHeaders;
Expand All @@ -44,8 +51,11 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -70,8 +80,12 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
private String table;
//table name of the cdc upstream, format is db.tbl
private String sourceTableName;
private boolean firstLoad;
private boolean firstSchemaChange;
private Map<String, FieldSchema> originFieldSchemaMap;
private final boolean newSchemaChange;

public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, String sourceTableName) {
public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, String sourceTableName, boolean newSchemaChange) {
this.dorisOptions = dorisOptions;
this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
Expand All @@ -82,6 +96,9 @@ public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern,
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper.setNodeFactory(jsonNodeFactory);
this.newSchemaChange = newSchemaChange;
this.firstLoad = true;
this.firstSchemaChange = true;
}

@Override
Expand All @@ -91,9 +108,17 @@ public byte[] serialize(String record) throws IOException {
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
//schema change ddl
schemaChange(recordRoot);
if (newSchemaChange) {
schemaChangeV2(recordRoot);
} else {
schemaChange(recordRoot);
}
return null;
}

if (newSchemaChange && firstLoad) {
initOriginFieldSchema(recordRoot);
}
Map<String, String> valueMap;
switch (op) {
case OP_READ:
Expand All @@ -113,6 +138,70 @@ public byte[] serialize(String record) throws IOException {
return objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8);
}

public boolean schemaChangeV2(JsonNode recordRoot) {
boolean status = false;
try {
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
List<String> ddlSqlList = extractDDLList(recordRoot);
if (CollectionUtils.isEmpty(ddlSqlList)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}

List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
for (int i = 0; i < ddlSqlList.size(); i++) {
DDLSchema ddlSchema = ddlSchemas.get(i);
String ddlSql = ddlSqlList.get(i);
boolean doSchemaChange = checkSchemaChange(ddlSchema);
status = doSchemaChange && execSchemaChange(ddlSql);
LOG.info("schema change status:{}", status);
}
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
}
return status;
}

private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
Map<String,Object> param = buildRequestParam(ddlSchema);
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
boolean success = handleResponse(httpGet);
if (!success) {
LOG.warn("schema change can not do table {}.{}",database,table);
}
return success;
}

@VisibleForTesting
public List<String> extractDDLList(JsonNode record) throws JsonProcessingException {
JsonNode historyRecord = objectMapper.readTree(extractJsonNode(record, "historyRecord"));
JsonNode tableChanges = historyRecord.get("tableChanges");
JsonNode tableChange = tableChanges.get(0);
String ddl = extractJsonNode(historyRecord, "ddl");
LOG.debug("received debezium ddl :{}", ddl);

Matcher matcher = addDropDDLPattern.matcher(ddl);
if (Objects.isNull(tableChange)|| !tableChange.get("type").asText().equals("ALTER") || !matcher.find()) {
return null;
}

JsonNode columns = tableChange.get("table").get("columns");
if (firstSchemaChange) {
fillOriginSchema(columns);
}
Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(updateFiledSchema, column);
}
SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap);
return SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier());
}

@VisibleForTesting
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;
Expand Down Expand Up @@ -168,6 +257,13 @@ private boolean checkSchemaChange(String ddl) throws IOException, IllegalArgumen
return success;
}

protected Map<String, Object> buildRequestParam(DDLSchema ddlSchema) {
Map<String, Object> params = new HashMap<>();
params.put("isDropColumn", ddlSchema.isDropColumn());
params.put("columnName", ddlSchema.getColumnName());
return params;
}

/**
* Build param
* {
Expand Down Expand Up @@ -233,7 +329,8 @@ private boolean handleResponse(HttpUriRequest request) {
}

private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null ? record.get(key).asText() : null;
return record != null && record.get(key) != null &&
!(record.get(key) instanceof NullNode) ? record.get(key).asText() : null;
}

private Map<String, String> extractBeforeRow(JsonNode record) {
Expand Down Expand Up @@ -277,6 +374,76 @@ private String authHeader() {
return "Basic " + new String(Base64.encodeBase64((dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
}

@VisibleForTesting
public void fillOriginSchema(JsonNode columns) {
if (Objects.nonNull(originFieldSchemaMap)) {
for (JsonNode column : columns) {
String fieldName = column.get("name").asText();
if (originFieldSchemaMap.containsKey(fieldName)) {
JsonNode length = column.get("length");
JsonNode scale = column.get("scale");
String type = MysqlType.toDorisType(column.get("typeName").asText(),
length == null ? 0 : length.asInt(),
scale == null ? 0 : scale.asInt());
String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
String comment = extractJsonNode(column, "comment");
FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName);
fieldSchema.setName(fieldName);
fieldSchema.setTypeString(type);
fieldSchema.setComment(comment);
fieldSchema.setDefaultValue(defaultValue);
}
}
} else {
originFieldSchemaMap = new LinkedHashMap<>();
columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, column));
}
firstSchemaChange = false;
firstLoad = false;
}

private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) {
String fieldName = column.get("name").asText();
JsonNode length = column.get("length");
JsonNode scale = column.get("scale");
String type = MysqlType.toDorisType(column.get("typeName").asText(),
length == null ? 0 : length.asInt(), scale == null ? 0 : scale.asInt());
String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
String comment = extractJsonNode(column, "comment");
filedSchemaMap.put(fieldName, new FieldSchema(fieldName, type, defaultValue, comment));
}

private String handleDefaultValue(String defaultValue) {
if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
return null;
}
// Due to historical reasons, doris needs to add quotes to the default value of the new column
// For example in mysql: alter table add column c1 int default 100
// In Doris: alter table add column c1 int default '100'
if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
return defaultValue;
} else if (defaultValue.equals("1970-01-01 00:00:00")) {
// TODO: The default value of setting the current time in CDC is 1970-01-01 00:00:00
return "current_timestamp";
}
return "'" + defaultValue + "'";
}

private void initOriginFieldSchema(JsonNode recordRoot) {
originFieldSchemaMap = new LinkedHashMap<>();
Set<String> columnNameSet = extractAfterRow(recordRoot).keySet();
if (CollectionUtils.isEmpty(columnNameSet)) {
columnNameSet = extractBeforeRow(recordRoot).keySet();
}
columnNameSet.forEach(columnName -> originFieldSchemaMap.put(columnName, new FieldSchema()));
firstLoad = false;
}

@VisibleForTesting
public Map<String, FieldSchema> getOriginFieldSchemaMap() {
return originFieldSchemaMap;
}

public static JsonDebeziumSchemaSerializer.Builder builder() {
return new JsonDebeziumSchemaSerializer.Builder();
}
Expand All @@ -288,12 +455,18 @@ public static class Builder {
private DorisOptions dorisOptions;
private Pattern addDropDDLPattern;
private String sourceTableName;
private boolean newSchemaChange;

public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
return this;
}

public JsonDebeziumSchemaSerializer.Builder setNewSchemaChange(boolean newSchemaChange) {
this.newSchemaChange = newSchemaChange;
return this;
}

public JsonDebeziumSchemaSerializer.Builder setPattern(Pattern addDropDDLPattern) {
this.addDropDDLPattern = addDropDDLPattern;
return this;
Expand All @@ -305,7 +478,7 @@ public JsonDebeziumSchemaSerializer.Builder setSourceTableName(String sourceTabl
}

public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName);
return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange);
}
}

Expand Down

0 comments on commit 14be15b

Please sign in to comment.