Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Doris] Fix the abnormality of deleting data in CDC scenario. #7315

Merged
merged 6 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand All @@ -29,6 +30,7 @@
import org.apache.seatunnel.format.text.TextSerializationSchema;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand All @@ -42,39 +44,54 @@ public class SeaTunnelRowSerializer implements DorisSerializer {
private final SeaTunnelRowType seaTunnelRowType;
private final String fieldDelimiter;
private final boolean enableDelete;
private final SerializationSchema serialize;

public SeaTunnelRowSerializer(
String type,
SeaTunnelRowType seaTunnelRowType,
String fieldDelimiter,
boolean enableDelete) {
this.type = type;
this.seaTunnelRowType = seaTunnelRowType;
this.fieldDelimiter = fieldDelimiter;
this.enableDelete = enableDelete;
}
List<Object> fieldNames = new ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldNames()));
List<SeaTunnelDataType<?>> fieldTypes =
new ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldTypes()));

if (enableDelete) {
fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
fieldTypes.add(STRING_TYPE);
}

public byte[] buildJsonString(SeaTunnelRow row, SeaTunnelRowType seaTunnelRowType)
throws IOException {
this.seaTunnelRowType =
new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new SeaTunnelDataType<?>[0]));

JsonSerializationSchema jsonSerializationSchema =
new JsonSerializationSchema(seaTunnelRowType, NULL_VALUE);
ObjectMapper mapper = jsonSerializationSchema.getMapper();
mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true);
return jsonSerializationSchema.serialize(row);
if (JSON.equals(type)) {
JsonSerializationSchema jsonSerializationSchema =
new JsonSerializationSchema(this.seaTunnelRowType, NULL_VALUE);
ObjectMapper mapper = jsonSerializationSchema.getMapper();
mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true);
this.serialize = jsonSerializationSchema;
} else {
this.serialize =
TextSerializationSchema.builder()
.seaTunnelRowType(this.seaTunnelRowType)
.delimiter(fieldDelimiter)
.nullValue(NULL_VALUE)
.build();
}
}

public byte[] buildCSVString(SeaTunnelRow row, SeaTunnelRowType seaTunnelRowType)
throws IOException {
public byte[] buildJsonString(SeaTunnelRow row) {

return serialize.serialize(row);
}

TextSerializationSchema build =
TextSerializationSchema.builder()
.seaTunnelRowType(seaTunnelRowType)
.delimiter(fieldDelimiter)
.nullValue(NULL_VALUE)
.build();
public byte[] buildCSVString(SeaTunnelRow row) {

return build.serialize(row);
return serialize.serialize(row);
}

public String parseDeleteSign(RowKind rowKind) {
Expand All @@ -93,29 +110,17 @@ public void open() throws IOException {}
@Override
public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {

List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
List<SeaTunnelDataType<?>> fieldTypes = Arrays.asList(seaTunnelRowType.getFieldTypes());

if (enableDelete) {
SeaTunnelRow seaTunnelRowEnableDelete = seaTunnelRow.copy();
seaTunnelRowEnableDelete.setField(
seaTunnelRow.getFields().length, parseDeleteSign(seaTunnelRow.getRowKind()));
fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
fieldTypes.add(STRING_TYPE);

List<Object> newFields = new ArrayList<>(Arrays.asList(seaTunnelRow.getFields()));
newFields.add(parseDeleteSign(seaTunnelRow.getRowKind()));
seaTunnelRow = new SeaTunnelRow(newFields.toArray());
}

if (JSON.equals(type)) {
return buildJsonString(
seaTunnelRow,
new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
return buildJsonString(seaTunnelRow);
} else if (CSV.equals(type)) {
return buildCSVString(
seaTunnelRow,
new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
return buildCSVString(seaTunnelRow);
} else {
throw new IllegalArgumentException("The type " + type + " is not supported!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.seatunnel.connectors.doris.sink.writer;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;

import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
Expand All @@ -31,9 +34,9 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
Expand Down Expand Up @@ -64,23 +67,23 @@ public class DorisStreamLoad implements Serializable {
private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
private static final String JOB_EXIST_FINISHED = "FINISHED";
private final String loadUrlStr;
private final String hostPort;
@Getter private final String hostPort;
private final String abortUrlStr;
private final String user;
private final String passwd;
private final String db;
@Getter private final String db;
private final String table;
private final boolean enable2PC;
private final boolean enableDelete;
private final Properties streamLoadProp;
private final RecordStream recordStream;
private Future<CloseableHttpResponse> pendingLoadFuture;
@Getter private Future<CloseableHttpResponse> pendingLoadFuture;
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
private volatile boolean loadBatchFirstRecord;
private volatile boolean loading = false;
private String label;
private long recordCount = 0;
@Getter private long recordCount = 0;

public DorisStreamLoad(
String hostPort,
Expand Down Expand Up @@ -115,18 +118,6 @@ public DorisStreamLoad(
loadBatchFirstRecord = true;
}

public String getDb() {
return db;
}

public String getHostPort() {
return hostPort;
}

public Future<CloseableHttpResponse> getPendingLoadFuture() {
return pendingLoadFuture;
}

public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
long startChkID = chkID;
log.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
Expand Down Expand Up @@ -196,10 +187,6 @@ public void writeRecord(byte[] record) throws IOException {
recordCount++;
}

public long getRecordCount() {
return recordCount;
}

public String getLoadFailedMsg() {
if (!loading) {
return null;
Expand Down Expand Up @@ -300,10 +287,9 @@ public void abortTransaction(long txnID) throws Exception {
"Fail to abort transaction " + txnID + " with url " + abortUrlStr);
}

ObjectMapper mapper = new ObjectMapper();
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res =
mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {});
JsonUtils.parseObject(loadResult, new TypeReference<HashMap<String, String>>() {});
if (!LoadStatus.SUCCESS.equals(res.get("status"))) {
if (ResponseUtil.isCommitted(res.get("msg"))) {
throw new DorisConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,19 @@
<version>${mysql.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-mysql</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- test dependencies on TestContainers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading
Loading