Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Getter
@Setter
@NoArgsConstructor
@ToString
@AllArgsConstructor
@Builder
public class CommitOffsetRequest {
Expand All @@ -38,4 +36,19 @@ public class CommitOffsetRequest {
public long filteredRows;
public long loadedRows;
public long loadBytes;
public String tableSchemas;

@Override
public String toString() {
return "CommitOffsetRequest{"
+ "jobId=" + jobId
+ ", taskId=" + taskId
+ ", offset='" + offset + "'"
+ ", scannedRows=" + scannedRows
+ ", filteredRows=" + filteredRows
+ ", loadedRows=" + loadedRows
+ ", loadBytes=" + loadBytes
+ ", tableSchemasSize=" + (tableSchemas != null ? tableSchemas.length() : 0)
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@
@EqualsAndHashCode(callSuper = true)
public abstract class JobBaseRecordRequest extends JobBaseConfig {
protected Map<String, Object> meta;
protected String tableSchemas;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,10 @@ public void commitOffset(CommitOffsetRequest offsetRequest) throws JobException
JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) offsetProvider;
op.setHasMoreData(false);
}
if (offsetRequest.getTableSchemas() != null) {
JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) offsetProvider;
op.setTableSchemas(offsetRequest.getTableSchemas());
}
persistOffsetProviderIfNeed();
log.info("Streaming multi table job {} task {} commit offset successfully, offset: {}",
getJobId(), offsetRequest.getTaskId(), offsetRequest.getOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ private WriteRecordRequest buildRequestParams() throws JobException {
String feAddr = Env.getCurrentEnv().getMasterHost() + ":" + Env.getCurrentEnv().getMasterHttpPort();
request.setFrontendAddress(feAddr);
request.setMaxInterval(jobProperties.getMaxIntervalSecond());
if (offsetProvider instanceof JdbcSourceOffsetProvider) {
String schemas = ((JdbcSourceOffsetProvider) offsetProvider).getTableSchemas();
if (schemas != null) {
request.setTableSchemas(schemas);
}
}
return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public class JdbcSourceOffsetProvider implements SourceOffsetProvider {
@SerializedName("bop")
Map<String, String> binlogOffsetPersist;

@SerializedName("ts")
String tableSchemas;

volatile boolean hasMoreData = true;

public JdbcSourceOffsetProvider(Long jobId, DataSourceType sourceType, Map<String, String> sourceProperties) {
Expand Down Expand Up @@ -352,6 +355,7 @@ public void replayIfNeed(StreamingInsertJob job) throws JobException {
JdbcSourceOffsetProvider.class);
this.binlogOffsetPersist = replayFromPersist.getBinlogOffsetPersist();
this.chunkHighWatermarkMap = replayFromPersist.getChunkHighWatermarkMap();
this.tableSchemas = replayFromPersist.getTableSchemas();
log.info("Replaying offset provider for job {}, binlogOffset size {}, chunkHighWatermark size {}",
getJobId(),
binlogOffsetPersist == null ? 0 : binlogOffsetPersist.size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ public class Constants {

// Debezium default properties
public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 3000L;

public static final String DORIS_TARGET_DB = "doris_target_db";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cdcclient.common;

public class DorisType {
public static final String BOOLEAN = "BOOLEAN";
public static final String TINYINT = "TINYINT";
public static final String SMALLINT = "SMALLINT";
public static final String INT = "INT";
public static final String BIGINT = "BIGINT";
public static final String LARGEINT = "LARGEINT";
// largeint is bigint unsigned in information_schema.COLUMNS
public static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
public static final String FLOAT = "FLOAT";
public static final String DOUBLE = "DOUBLE";
public static final String DECIMAL = "DECIMAL";
public static final String DATE = "DATE";
public static final String DATETIME = "DATETIME";
public static final String CHAR = "CHAR";
public static final String VARCHAR = "VARCHAR";
public static final String STRING = "STRING";
public static final String HLL = "HLL";
public static final String BITMAP = "BITMAP";
public static final String ARRAY = "ARRAY";
public static final String JSONB = "JSONB";
public static final String JSON = "JSON";
public static final String MAP = "MAP";
public static final String STRUCT = "STRUCT";
public static final String VARIANT = "VARIANT";
public static final String IPV4 = "IPV4";
public static final String IPV6 = "IPV6";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.doris.cdcclient.common.Env;
import org.apache.doris.cdcclient.model.response.RecordWithMeta;
import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad;
import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
import org.apache.doris.cdcclient.source.reader.SourceReader;
import org.apache.doris.cdcclient.source.reader.SplitReadResult;
import org.apache.doris.cdcclient.utils.SchemaChangeManager;
import org.apache.doris.job.cdc.request.FetchRecordRequest;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
import org.apache.doris.job.cdc.split.BinlogSplit;
Expand Down Expand Up @@ -166,11 +168,12 @@ private RecordWithMeta buildRecordResponse(
}

// Process data messages
List<String> serializedRecords =
DeserializeResult result =
sourceReader.deserialize(fetchRecord.getConfig(), element);
if (!CollectionUtils.isEmpty(serializedRecords)) {
if (result.getType() == DeserializeResult.Type.DML
&& !CollectionUtils.isEmpty(result.getRecords())) {
recordCount++;
recordResponse.getRecords().addAll(serializedRecords);
recordResponse.getRecords().addAll(result.getRecords());
hasReceivedData = true;
lastMessageIsHeartbeat = false;
}
Expand Down Expand Up @@ -236,21 +239,34 @@ public CompletableFuture<Void> writeRecordsAsync(WriteRecordRequest writeRecordR
* <p>Heartbeat events will carry the latest offset.
*/
public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception {
// Extract connection parameters up front for use throughout this method
String feAddr = writeRecordRequest.getFrontendAddress();
String targetDb = writeRecordRequest.getTargetDb();
String token = writeRecordRequest.getToken();

// Enrich the source config with the Doris target DB so the deserializer can build
// DDL referencing the correct Doris database, not the upstream source database.
Map<String, String> deserializeContext = new HashMap<>(writeRecordRequest.getConfig());
deserializeContext.put(Constants.DORIS_TARGET_DB, targetDb);

SourceReader sourceReader = Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
long scannedRows = 0L;
int heartbeatCount = 0;
SplitReadResult readResult = null;
boolean hasExecuteDDL = false;
boolean isSnapshotSplit = false;
try {
// 1. submit split async
readResult = sourceReader.prepareAndSubmitSplit(writeRecordRequest);
batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);

boolean isSnapshotSplit = sourceReader.isSnapshotSplit(readResult.getSplit());
isSnapshotSplit = sourceReader.isSnapshotSplit(readResult.getSplit());
long startTime = System.currentTimeMillis();
long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 1000;
boolean shouldStop = false;
boolean lastMessageIsHeartbeat = false;

LOG.info(
"Start polling records for jobId={} taskId={}, isSnapshotSplit={}",
writeRecordRequest.getJobId(),
Expand Down Expand Up @@ -309,15 +325,22 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
}

// Process data messages
List<String> serializedRecords =
sourceReader.deserialize(writeRecordRequest.getConfig(), element);

if (!CollectionUtils.isEmpty(serializedRecords)) {
String database = writeRecordRequest.getTargetDb();
DeserializeResult result =
sourceReader.deserialize(deserializeContext, element);

if (result.getType() == DeserializeResult.Type.SCHEMA_CHANGE) {
// Flush pending data before DDL
batchStreamLoad.forceFlush();
SchemaChangeManager.executeDdls(feAddr, targetDb, token, result.getDdls());
hasExecuteDDL = true;
sourceReader.applySchemaChange(result.getUpdatedSchemas());
lastMessageIsHeartbeat = false;
}
if (!CollectionUtils.isEmpty(result.getRecords())) {
String table = extractTable(element);
for (String record : serializedRecords) {
for (String record : result.getRecords()) {
scannedRows++;
batchStreamLoad.writeRecord(database, table, record.getBytes());
batchStreamLoad.writeRecord(targetDb, table, record.getBytes());
}
// Mark last message as data (not heartbeat)
lastMessageIsHeartbeat = false;
Expand Down Expand Up @@ -346,8 +369,22 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
// The offset must be reset before commitOffset to prevent the next taskId from being create
// by the fe.
batchStreamLoad.resetTaskId();

// Serialize tableSchemas back to FE when:
// 1. A DDL was executed (in-memory schema was updated), OR
// 2. It's a binlog split AND FE had no schema (FE tableSchemas was null) — this covers
// incremental-only startup and the first binlog round after snapshot completes.
String tableSchemas = null;
boolean feHadNoSchema = writeRecordRequest.getTableSchemas() == null;
if (hasExecuteDDL || (!isSnapshotSplit && feHadNoSchema)) {
tableSchemas = sourceReader.serializeTableSchemas();
}
batchStreamLoad.commitOffset(
currentTaskId, metaResponse, scannedRows, batchStreamLoad.getLoadStatistic());
currentTaskId,
metaResponse,
scannedRows,
batchStreamLoad.getLoadStatistic(),
tableSchemas);
}

public static boolean isHeartbeatEvent(SourceRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -503,7 +504,8 @@ public void commitOffset(
String taskId,
List<Map<String, String>> meta,
long scannedRows,
LoadStatistic loadStatistic) {
LoadStatistic loadStatistic,
String tableSchemas) {
try {
String url = String.format(COMMIT_URL_PATTERN, frontendAddress, targetDb);
CommitOffsetRequest commitRequest =
Expand All @@ -515,6 +517,7 @@ public void commitOffset(
.filteredRows(loadStatistic.getFilteredRows())
.loadedRows(loadStatistic.getLoadedRows())
.loadBytes(loadStatistic.getLoadBytes())
.tableSchemas(tableSchemas)
.build();
String param = OBJECT_MAPPER.writeValueAsString(commitRequest);

Expand All @@ -527,7 +530,11 @@ public void commitOffset(
.commit()
.setEntity(new StringEntity(param));

LOG.info("commit offset for jobId {} taskId {}, params {}", jobId, taskId, param);
LOG.info(
"commit offset for jobId {} taskId {}, commitRequest {}",
jobId,
taskId,
commitRequest.toString());
Throwable resEx = null;
int retry = 0;
while (retry <= RETRY) {
Expand All @@ -541,11 +548,15 @@ public void commitOffset(
: "";
LOG.info("commit result {}", responseBody);
if (statusCode == 200) {
LOG.info("commit offset for jobId {} taskId {}", jobId, taskId);
// A 200 response indicates that the request was successful, and
// information such as offset and statistics may have already been
// updated. Retrying may result in repeated updates.
return;
JsonNode root = OBJECT_MAPPER.readTree(responseBody);
JsonNode code = root.get("code");
if (code != null && code.asInt() == 0) {
LOG.info(
"commit offset for jobId {} taskId {} successfully",
jobId,
taskId);
return;
}
}
LOG.error(
"commit offset failed with {}, reason {}, to retry",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.cdcclient.sink;

import org.apache.doris.cdcclient.common.Constants;
import org.apache.doris.cdcclient.utils.HttpUtil;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.MapUtils;
Expand Down Expand Up @@ -69,7 +70,7 @@ public HttpPutBuilder enable2PC() {
}

public HttpPutBuilder addTokenAuth(String token) {
header.put(HttpHeaders.AUTHORIZATION, "Basic YWRtaW46");
header.put(HttpHeaders.AUTHORIZATION, HttpUtil.getAuthHeader());
header.put("token", token);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand All @@ -58,24 +57,28 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTime;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Time;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** SourceRecord ==> [{},{}] */
/** SourceRecord ==> DeserializeResult */
public class DebeziumJsonDeserializer
implements SourceRecordDeserializer<SourceRecord, List<String>> {
implements SourceRecordDeserializer<SourceRecord, DeserializeResult> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DebeziumJsonDeserializer.class);
private static ObjectMapper objectMapper = new ObjectMapper();
@Setter private ZoneId serverTimeZone = ZoneId.systemDefault();
@Getter @Setter protected Map<TableId, TableChanges.TableChange> tableSchemas;

public DebeziumJsonDeserializer() {}

Expand All @@ -86,15 +89,14 @@ public void init(Map<String, String> props) {
}

@Override
public List<String> deserialize(Map<String, String> context, SourceRecord record)
public DeserializeResult deserialize(Map<String, String> context, SourceRecord record)
throws IOException {
if (RecordUtils.isDataChangeRecord(record)) {
LOG.trace("Process data change record: {}", record);
return deserializeDataChangeRecord(record);
} else if (RecordUtils.isSchemaChangeEvent(record)) {
return Collections.emptyList();
List<String> rows = deserializeDataChangeRecord(record);
return DeserializeResult.dml(rows);
} else {
return Collections.emptyList();
return DeserializeResult.empty();
}
}

Expand Down
Loading
Loading