Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve](schemaChange)schema change ddl supports multi-column changes, synchronous defaults #167

Merged
merged 2 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ under the License.
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
<version>2.4.1</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will 2.4.1 conflict with oracle2.3.0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This problem occurs when running the CdcOraclelSyncDatabaseCase class alone

<scope>provided</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
public class FieldSchema {
private String name;
private String typeString;
private String defaultValue;
private String comment;

public FieldSchema() {
Expand All @@ -30,6 +31,13 @@ public FieldSchema(String name, String typeString, String comment) {
this.comment = comment;
}

public FieldSchema(String name, String typeString, String defaultValue, String comment) {
this.name = name;
this.typeString = typeString;
this.defaultValue = defaultValue;
this.comment = comment;
}

public String getName() {
return name;
}
Expand All @@ -53,4 +61,12 @@ public String getComment() {
public void setComment(String comment) {
this.comment = comment;
}

public String getDefaultValue() {
return defaultValue;
}

public void setDefaultValue(String defaultValue) {
this.defaultValue = defaultValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.commons.codec.binary.Base64;

import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.HttpGetWithEntity;
import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;

import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.StringUtils;
import org.apache.http.HttpHeaders;
Expand All @@ -44,8 +51,11 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -70,8 +80,12 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
private String table;
//table name of the cdc upstream, format is db.tbl
private String sourceTableName;
private boolean firstLoad;
private boolean firstSchemaChange;
private Map<String, FieldSchema> originFieldSchemaMap;
private final boolean newSchemaChange;

public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, String sourceTableName) {
public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, String sourceTableName, boolean newSchemaChange) {
this.dorisOptions = dorisOptions;
this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
Expand All @@ -82,6 +96,9 @@ public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern,
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper.setNodeFactory(jsonNodeFactory);
this.newSchemaChange = newSchemaChange;
this.firstLoad = true;
this.firstSchemaChange = true;
}

@Override
Expand All @@ -91,9 +108,17 @@ public byte[] serialize(String record) throws IOException {
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
//schema change ddl
schemaChange(recordRoot);
if (newSchemaChange) {
schemaChangeV2(recordRoot);
} else {
schemaChange(recordRoot);
}
return null;
}

if (newSchemaChange && firstLoad) {
initOriginFieldSchema(recordRoot);
}
Map<String, String> valueMap;
switch (op) {
case OP_READ:
Expand All @@ -113,6 +138,67 @@ public byte[] serialize(String record) throws IOException {
return objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8);
}

public boolean schemaChangeV2(JsonNode recordRoot) {
boolean status = false;
try {
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
List<String> ddlSqlList = extractDDLList(recordRoot);
if (CollectionUtils.isEmpty(ddlSqlList)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}

List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
for (int i = 0; i < ddlSqlList.size(); i++) {
DDLSchema ddlSchema = ddlSchemas.get(i);
String ddlSql = ddlSqlList.get(i);
boolean doSchemaChange = checkSchemaChange(ddlSchema);
status = doSchemaChange && execSchemaChange(ddlSql);
LOG.info("schema change status:{}", status);
}
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
}
return status;
}

private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
Map<String,Object> param = buildRequestParam(ddlSchema);
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
boolean success = handleResponse(httpGet);
if (!success) {
LOG.warn("schema change can not do table {}.{}",database,table);
}
return success;
}

private List<String> extractDDLList(JsonNode record) throws JsonProcessingException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add unit tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion, I have changed.

JsonNode historyRecord = objectMapper.readTree(extractJsonNode(record, "historyRecord"));
JsonNode tableChanges = historyRecord.get("tableChanges");
JsonNode tableChange = tableChanges.get(0);
if (Objects.isNull(tableChange)|| !tableChange.get("type").asText().equals("ALTER")) {
return null;
}
JsonNode columns = tableChange.get("table").get("columns");
if (firstSchemaChange) {
fillOriginSchema(columns);
}
String ddl = extractJsonNode(historyRecord, "ddl");
LOG.debug("received debezium ddl :{}", ddl);

Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(updateFiledSchema, column);
}
SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap);
return SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier());
}

@VisibleForTesting
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;
Expand Down Expand Up @@ -168,6 +254,13 @@ private boolean checkSchemaChange(String ddl) throws IOException, IllegalArgumen
return success;
}

protected Map<String, Object> buildRequestParam(DDLSchema ddlSchema) {
Map<String, Object> params = new HashMap<>();
params.put("isDropColumn", ddlSchema.isDropColumn());
params.put("columnName", ddlSchema.getColumnName());
return params;
}

/**
* Build param
* {
Expand Down Expand Up @@ -233,7 +326,8 @@ private boolean handleResponse(HttpUriRequest request) {
}

private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null ? record.get(key).asText() : null;
return record != null && record.get(key) != null &&
!(record.get(key) instanceof NullNode) ? record.get(key).asText() : null;
}

private Map<String, String> extractBeforeRow(JsonNode record) {
Expand Down Expand Up @@ -277,6 +371,68 @@ private String authHeader() {
return "Basic " + new String(Base64.encodeBase64((dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
}

private void fillOriginSchema(JsonNode columns) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as above

if (Objects.nonNull(originFieldSchemaMap)) {
for (JsonNode column : columns) {
String fieldName = column.get("name").asText();
if (originFieldSchemaMap.containsKey(fieldName)) {
JsonNode length = column.get("length");
JsonNode scale = column.get("scale");
String type = MysqlType.toDorisType(column.get("typeName").asText(),
length == null ? 0 : length.asInt(),
scale == null ? 0 : scale.asInt());
String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
String comment = extractJsonNode(column, "comment");
FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName);
fieldSchema.setName(fieldName);
fieldSchema.setTypeString(type);
fieldSchema.setComment(comment);
fieldSchema.setDefaultValue(defaultValue);
}
}
} else {
originFieldSchemaMap = new LinkedHashMap<>();
columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, column));
}
firstSchemaChange = false;
firstLoad = false;
}

private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) {
String fieldName = column.get("name").asText();
JsonNode length = column.get("length");
JsonNode scale = column.get("scale");
String type = MysqlType.toDorisType(column.get("typeName").asText(),
length == null ? 0 : length.asInt(), scale == null ? 0 : scale.asInt());
String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
String comment = extractJsonNode(column, "comment");
filedSchemaMap.put(fieldName, new FieldSchema(fieldName, type, defaultValue, comment));
}

private String handleDefaultValue(String defaultValue) {
if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
return null;
}
// Due to historical reasons, doris needs to add quotes to the default value of the new column
if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
Copy link
Member

@JNSimba JNSimba Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is recommended to write an example on the comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, I've added.

return defaultValue;
} else if (defaultValue.equals("1970-01-01 00:00:00")) {
// TODO: The default value of setting the current time in CDC is 1970-01-01 00:00:00
return "current_timestamp";
}
return "'" + defaultValue + "'";
}

private void initOriginFieldSchema(JsonNode recordRoot) {
originFieldSchemaMap = new LinkedHashMap<>();
Set<String> columnNameSet = extractAfterRow(recordRoot).keySet();
if (CollectionUtils.isEmpty(columnNameSet)) {
columnNameSet = extractBeforeRow(recordRoot).keySet();
}
columnNameSet.forEach(columnName -> originFieldSchemaMap.put(columnName, new FieldSchema()));
firstLoad = false;
}

public static JsonDebeziumSchemaSerializer.Builder builder() {
return new JsonDebeziumSchemaSerializer.Builder();
}
Expand All @@ -288,12 +444,18 @@ public static class Builder {
private DorisOptions dorisOptions;
private Pattern addDropDDLPattern;
private String sourceTableName;
private boolean newSchemaChange;

public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
return this;
}

public JsonDebeziumSchemaSerializer.Builder setNewSchemaChange(boolean newSchemaChange) {
this.newSchemaChange = newSchemaChange;
return this;
}

public JsonDebeziumSchemaSerializer.Builder setPattern(Pattern addDropDDLPattern) {
this.addDropDDLPattern = addDropDDLPattern;
return this;
Expand All @@ -305,7 +467,7 @@ public JsonDebeziumSchemaSerializer.Builder setSourceTableName(String sourceTabl
}

public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName);
return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange);
}
}

Expand Down