New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
change kafkaConnectSource to return KeyValue type record #2902
Conversation
public byte[] getValue() { | ||
return valueConverter.fromConnectData( | ||
public KeyValue<byte[], byte[]> getValue() { | ||
byte[] keyBytes = keyConverter.fromConnectData( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 157 would also convert the original key. I think we should try to cache keyBytes, so either getKey
or getValue
is called, the key is converted to keyBytes
, any subsequent calls, will just use the cached bytes.
public KeyValue<byte[], byte[]> getValue() { | ||
byte[] keyBytes = keyConverter.fromConnectData( | ||
srcRecord.topic(), srcRecord.keySchema(), srcRecord.key()); | ||
byte[] valueBytes = valueConverter.fromConnectData( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here to cache value bytes.
String line1 = "This is the first line\n"; | ||
os.write(line1.getBytes()); | ||
os.flush(); | ||
log.info("write 2 lines."); | ||
kafkaConnectSource.getOffsetWriter().offset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the reason of changing these lines here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sijie , we already do the offset handling in KafkaConnecSource.
ab92694
to
d7aec0b
Compare
change kafkaConnectSource to return KeyValue type record.