Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-1174 #806

Merged
merged 3 commits into from Mar 12, 2019
Merged

DBZ-1174 #806

merged 3 commits into from Mar 12, 2019

Conversation

gunnarmorling
Copy link
Member

@gunnarmorling gunnarmorling commented Mar 11, 2019

hashhar and others added 3 commits March 11, 2019 16:43
- wal2json sends the txn commitTime using a function from PostgreSQL's C
  library. The value that Debezium recevies is in nanoseconds.
- decoderbufs sends the txn commitTime in microseconds.
- RecordsSnapshotProducer updates SourceInfo.ts_usec by converting
  System.currentTimeMillis() to microseconds.
- RecordsSnapshotProducer updates the SourceInfo's ts_usec field using
  message.getCommitTime().

This means that when using wal2json, the value of SourceInfo.ts_usec
is in microseconds since epoch during snapshot but is in nanoseconds
during streaming. To fix this, we changed
Wal2JsonReplicationMessage.getCommitTime() to return in microseconds.
@gunnarmorling
Copy link
Member Author

gunnarmorling commented Mar 11, 2019

@jpechane, can you review and merge this one? Thanks!

assertTrue(record.value() instanceof Struct);
Struct source = ((Struct) record.value()).getStruct("source");
// 1 minute difference is okay
System.out.println("TS_USEC\t" + source.getInt64("ts_usec"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooops. I forgot to remove this. Moving this into the assertTrue message would be better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will take care of it.

// check records from streaming
inst = Instant.now();
// Microseconds since epoch, may overflow
final long microsStream = TimeUnit.SECONDS.toMicros(inst.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(inst.getNano());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with Conversions.toEpochMicros().

// check records from snapshot
Instant inst = Instant.now();
// Microseconds since epoch, may overflow
final long microsSnapshot = TimeUnit.SECONDS.toMicros(inst.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(inst.getNano());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with Conversions.toEpochMicros().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, good spot. Added that method only later on. I'll update.

@@ -37,14 +38,17 @@ static Clock system() {
return SYSTEM;
}

default Instant currentTime() {
return Instant.ofEpochMilli(currentTimeInMillis());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to not use Instant.now()? Is that to allow for mocking the Clock in tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the idea is to make Clock implementations pluggable (and their only required method is currentTimeInMillis().

@gunnarmorling
Copy link
Member Author

Thanks for reviewing, @hashhar! I'll push an update in a bit.

@@ -53,7 +55,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
LOGGER.debug("Message arrived for decoding {}", message);
final long txId = message.getLong("xid");
final String timestamp = message.getString("timestamp");
final long commitTime = dateTime.systemTimestamp(timestamp);
final Instant commitTime = Conversions.toInstant(dateTime.systemTimestamp(timestamp));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gunnarmorling Should not systemTimestamp return Instant instead of doing conversion here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally yes. But I was trying, this is like pulling Sphaghetti from the plate, never ending. So I decided to stop here, and leave that for another time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough :-)

@jpechane jpechane merged commit ee9207e into debezium:master Mar 12, 2019
@jpechane
Copy link
Contributor

@hashhar @gunnarmorling Thanks guys!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants