Skip to content

Commit

Permalink
DBZ-5170 Mysql Commit Timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
harveyyue committed Jun 8, 2022
1 parent bbb625f commit 4c74451
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 22 deletions.
Expand Up @@ -527,13 +527,14 @@ protected void handleRowsQuery(MySqlOffsetContext offsetContext, Event event) {
* @throws InterruptedException if this thread is interrupted while recording the DDL statements
*/
protected void handleQueryEvent(MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) throws InterruptedException {
Instant eventTime = Conversions.toInstantFromMillis(eventTimestamp.toEpochMilli());
QueryEventData command = unwrapData(event);
LOGGER.debug("Received query command: {}", event);
String sql = command.getSql().trim();
if (sql.equalsIgnoreCase("BEGIN")) {
// We are starting a new transaction ...
offsetContext.startNextTransaction();
eventDispatcher.dispatchTransactionStartedEvent(partition, offsetContext.getTransactionId(), offsetContext);
eventDispatcher.dispatchTransactionStartedEvent(partition, offsetContext.getTransactionId(), offsetContext, eventTime);
offsetContext.setBinlogThread(command.getThreadId());
if (initialEventsToSkip != 0) {
LOGGER.debug("Restarting partially-processed transaction; change events will not be created for the first {} events plus {} more rows in the next event",
Expand Down Expand Up @@ -568,9 +569,8 @@ protected void handleQueryEvent(MySqlPartition partition, MySqlOffsetContext off
MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER.name());
}

Instant schemaTimestamp = Conversions.toInstantFromMillis(eventTimestamp.toEpochMilli());
final List<SchemaChangeEvent> schemaChangeEvents = taskContext.getSchema().parseStreamingDdl(partition, sql,
command.getDatabase(), offsetContext, schemaTimestamp);
command.getDatabase(), offsetContext, eventTime);
try {
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
if (taskContext.getSchema().skipSchemaChangeEvent(schemaChangeEvent)) {
Expand All @@ -595,7 +595,8 @@ protected void handleQueryEvent(MySqlPartition partition, MySqlOffsetContext off

private void handleTransactionCompletion(MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) throws InterruptedException {
// We are completing the transaction ...
eventDispatcher.dispatchTransactionCommittedEvent(partition, offsetContext);
eventDispatcher.dispatchTransactionCommittedEvent(partition, offsetContext,
Conversions.toInstantFromMillis(eventTimestamp.toEpochMilli()));
offsetContext.commitTransaction();
offsetContext.setBinlogThread(-1L);
skipEvent = false;
Expand Down
Expand Up @@ -434,7 +434,7 @@ public void accept(LogMinerEvent event) throws InterruptedException {

lastCommittedScn = Scn.valueOf(commitScn.longValue());
if (getTransactionEventCount(transaction) > 0 && !skipExcludedUserName) {
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext);
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, transaction.getChangeTime());
}
else {
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
Expand Down
Expand Up @@ -152,7 +152,7 @@ private void dispatchDataChangeEvent(RowLCR lcr, Map<String, Object> chunkValues
LOGGER.debug("Processing DML event {}", lcr);

if (RowLCR.COMMIT.equals(lcr.getCommandType())) {
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext);
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, Instant.now());
return;
}

Expand Down
Expand Up @@ -227,11 +227,11 @@ private void processMessages(ChangeEventSourceContext context, PostgresPartition
taskContext.getSlotXmin(connection),
null);
if (message.getOperation() == Operation.BEGIN) {
dispatcher.dispatchTransactionStartedEvent(partition, toString(message.getTransactionId()), offsetContext);
dispatcher.dispatchTransactionStartedEvent(partition, toString(message.getTransactionId()), offsetContext, message.getCommitTime());
}
else if (message.getOperation() == Operation.COMMIT) {
commitMessage(partition, offsetContext, lsn);
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext);
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, message.getCommitTime());
}
maybeWarnAboutGrowingWalBacklog(true);
}
Expand Down
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.pipeline;

import java.time.Instant;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Objects;
Expand Down Expand Up @@ -265,15 +266,15 @@ public void dispatchFilteredEvent(P partition, OffsetContext offset) throws Inte
}
}

public void dispatchTransactionCommittedEvent(P partition, OffsetContext offset) throws InterruptedException {
transactionMonitor.transactionComittedEvent(partition, offset);
public void dispatchTransactionCommittedEvent(P partition, OffsetContext offset, Instant timestamp) throws InterruptedException {
transactionMonitor.transactionComittedEvent(partition, offset, timestamp);
if (incrementalSnapshotChangeEventSource != null) {
incrementalSnapshotChangeEventSource.processTransactionCommittedEvent(partition, offset);
}
}

public void dispatchTransactionStartedEvent(P partition, String transactionId, OffsetContext offset) throws InterruptedException {
transactionMonitor.transactionStartedEvent(partition, transactionId, offset);
public void dispatchTransactionStartedEvent(P partition, String transactionId, OffsetContext offset, Instant timestamp) throws InterruptedException {
transactionMonitor.transactionStartedEvent(partition, transactionId, offset, timestamp);
if (incrementalSnapshotChangeEventSource != null) {
incrementalSnapshotChangeEventSource.processTransactionStartedEvent(partition, offset);
}
Expand Down
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.pipeline.txmetadata;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class TransactionMonitor {
public static final String DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY = "event_count";
public static final String DEBEZIUM_TRANSACTION_COLLECTION_KEY = "data_collection";
public static final String DEBEZIUM_TRANSACTION_DATA_COLLECTIONS_KEY = "data_collections";
public static final String DEBEZIUM_TRANSACTION_TS_MS = "ts_ms";

public static final Schema TRANSACTION_BLOCK_SCHEMA = SchemaBuilder.struct().optional()
.field(DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA)
Expand Down Expand Up @@ -92,6 +94,7 @@ public TransactionMonitor(CommonConnectorConfig connectorConfig, EventMetadataPr
.field(DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA)
.field(DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(DEBEZIUM_TRANSACTION_DATA_COLLECTIONS_KEY, SchemaBuilder.array(EVENT_COUNT_PER_DATA_COLLECTION_SCHEMA).optional().build())
.field(DEBEZIUM_TRANSACTION_TS_MS, Schema.INT64_SCHEMA)
.build();

this.topicName = connectorConfig.getTransactionTopic();
Expand All @@ -115,40 +118,40 @@ public void dataEvent(Partition partition, DataCollectionId source, OffsetContex
// commit transaction
if (transactionContext.isTransactionInProgress()) {
LOGGER.trace("Transaction was in progress, executing implicit transaction commit");
endTransaction(partition, offset);
endTransaction(partition, offset, eventMetadataProvider.getEventTimestamp(source, offset, key, value));
}
return;
}

if (!transactionContext.isTransactionInProgress()) {
transactionContext.beginTransaction(txId);
beginTransaction(partition, offset);
beginTransaction(partition, offset, eventMetadataProvider.getEventTimestamp(source, offset, key, value));
}
else if (!transactionContext.getTransactionId().equals(txId)) {
endTransaction(partition, offset);
endTransaction(partition, offset, eventMetadataProvider.getEventTimestamp(source, offset, key, value));
transactionContext.endTransaction();
transactionContext.beginTransaction(txId);
beginTransaction(partition, offset);
beginTransaction(partition, offset, eventMetadataProvider.getEventTimestamp(source, offset, key, value));
}
transactionEvent(offset, source, value);
}

public void transactionComittedEvent(Partition partition, OffsetContext offset) throws InterruptedException {
public void transactionComittedEvent(Partition partition, OffsetContext offset, Instant timestamp) throws InterruptedException {
if (!connectorConfig.shouldProvideTransactionMetadata()) {
return;
}
if (offset.getTransactionContext().isTransactionInProgress()) {
endTransaction(partition, offset);
endTransaction(partition, offset, timestamp);
}
offset.getTransactionContext().endTransaction();
}

public void transactionStartedEvent(Partition partition, String transactionId, OffsetContext offset) throws InterruptedException {
public void transactionStartedEvent(Partition partition, String transactionId, OffsetContext offset, Instant timestamp) throws InterruptedException {
if (!connectorConfig.shouldProvideTransactionMetadata()) {
return;
}
offset.getTransactionContext().beginTransaction(transactionId);
beginTransaction(partition, offset);
beginTransaction(partition, offset, timestamp);
}

private void transactionEvent(OffsetContext offsetContext, DataCollectionId source, Struct value) {
Expand All @@ -164,24 +167,26 @@ private void transactionEvent(OffsetContext offsetContext, DataCollectionId sour
value.put(Envelope.FieldName.TRANSACTION, txStruct);
}

private void beginTransaction(Partition partition, OffsetContext offsetContext) throws InterruptedException {
private void beginTransaction(Partition partition, OffsetContext offsetContext, Instant timestamp) throws InterruptedException {
final Struct key = new Struct(transactionKeySchema);
key.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId());
final Struct value = new Struct(transactionValueSchema);
value.put(DEBEZIUM_TRANSACTION_STATUS_KEY, TransactionStatus.BEGIN.name());
value.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId());
value.put(DEBEZIUM_TRANSACTION_TS_MS, timestamp.toEpochMilli());

sender.accept(new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(),
topicName, null, key.schema(), key, value.schema(), value));
}

private void endTransaction(Partition partition, OffsetContext offsetContext) throws InterruptedException {
private void endTransaction(Partition partition, OffsetContext offsetContext, Instant timestamp) throws InterruptedException {
final Struct key = new Struct(transactionKeySchema);
key.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId());
final Struct value = new Struct(transactionValueSchema);
value.put(DEBEZIUM_TRANSACTION_STATUS_KEY, TransactionStatus.END.name());
value.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId());
value.put(DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY, offsetContext.getTransactionContext().getTotalEventCount());
value.put(DEBEZIUM_TRANSACTION_TS_MS, timestamp.toEpochMilli());

final Set<Entry<String, Long>> perTableEventCount = offsetContext.getTransactionContext().getPerTableEventCount().entrySet();
final List<Struct> valuePerTableCount = new ArrayList<>(perTableEventCount.size());
Expand Down

0 comments on commit 4c74451

Please sign in to comment.