Skip to content

Commit

Permalink
DBZ-7183 Support MySQL 8 high resolution replication timestamps from …
Browse files Browse the repository at this point in the history
…GTID events
  • Loading branch information
methodmissing committed Nov 28, 2023
1 parent a019418 commit e5cf9f2
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 6 deletions.
1 change: 1 addition & 0 deletions COPYRIGHT.txt
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ Liu Hanlin
Liu Lang Wa
Liz Chatman
Lokesh Sanapalli
Lourens Naudé
Luca Scannapieco
Luis Garcés-Erice
Lukas Krejci
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -87,6 +88,7 @@ public class MySqlStreamingChangeEventSource implements StreamingChangeEventSour
private long initialEventsToSkip = 0L;
private boolean skipEvent = false;
private boolean ignoreDmlEventByGtidSource = false;
private final boolean isGtidModeEnabled;
private final Predicate<String> gtidDmlSourceFilter;
private final AtomicLong totalRecordCounter = new AtomicLong();
private volatile Map<String, ?> lastOffset = null;
Expand Down Expand Up @@ -190,6 +192,7 @@ public MySqlStreamingChangeEventSource(MySqlConnectorConfig connectorConfig, Abs
Configuration configuration = connectorConfig.getConfig();
boolean filterDmlEventsByGtidSource = configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS);
gtidDmlSourceFilter = filterDmlEventsByGtidSource ? connectorConfig.gtidSourceFilter() : null;
isGtidModeEnabled = connection.isGtidModeEnabled();
}

protected void onEvent(MySqlOffsetContext offsetContext, Event event) {
Expand All @@ -212,11 +215,30 @@ protected void onEvent(MySqlOffsetContext offsetContext, Event event) {
return;
}

ts = clock.currentTimeInMillis() - eventTs;
eventTimestamp = getEventTimestamp(event, eventTs);

ts = clock.currentTimeInMillis() - eventTimestamp.toEpochMilli();
LOGGER.trace("Current milliseconds behind source: {} ms", ts);
metrics.setMilliSecondsBehindSource(ts);
}

private Instant getEventTimestamp(Event event, long eventTs) {
// Prefer higher resolution replication timestamps from MySQL 8 GTID events, if possible
if (isGtidModeEnabled) {
if (event.getHeader().getEventType() == EventType.GTID) {
GtidEventData gtidEvent = unwrapData(event);
final long gtidEventTs = gtidEvent.getOriginalCommitTimestamp();
if (gtidEventTs != 0) {
// >= MySQL 8.0.1, prefer the higher resolution replication timestamp
return Instant.EPOCH.plus(gtidEventTs, ChronoUnit.MICROS);
}
}
}

// Fallback to second resolution event timestamps
return Instant.ofEpochMilli(eventTs);
}

protected void ignoreEvent(MySqlOffsetContext offsetContext, Event event) {
LOGGER.trace("Ignoring event due to missing handler: {}", event);
}
Expand All @@ -228,10 +250,6 @@ protected void handleEvent(MySqlPartition partition, MySqlOffsetContext offsetCo
}

final EventHeader eventHeader = event.getHeader();
// Update the source offset info. Note that the client returns the value in *milliseconds*, even though the binlog
// contains only *seconds* precision ...
// HEARTBEAT events have no timestamp; only set the timestamp if the event is not a HEARTBEAT
eventTimestamp = !eventHeader.getEventType().equals(EventType.HEARTBEAT) ? Instant.ofEpochMilli(eventHeader.getTimestamp()) : null;
offsetContext.setBinlogServerId(eventHeader.getServerId());

final EventType eventType = eventHeader.getEventType();
Expand Down Expand Up @@ -869,7 +887,6 @@ public void execute(ChangeEventSourceContext context, MySqlPartition partition,
client.registerEventListener((event) -> logEvent(effectiveOffsetContext, event));
}

final boolean isGtidModeEnabled = connection.isGtidModeEnabled();
metrics.setIsGtidModeEnabled(isGtidModeEnabled);

// Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium checkpoint.
Expand Down
1 change: 1 addition & 0 deletions jenkins-jobs/scripts/config/Aliases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,4 @@ Lars M Johansson,Lars M. Johansson
ahmedrachid,Ahmed Rachid Hazourli
sherpa003,Jiri Kulhanek
slknijnenburg,Sebastiaan Knijnenburg
methodmissing,Lourens Naudé

0 comments on commit e5cf9f2

Please sign in to comment.