Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4816: Message format changes for idempotent/transactional producer #2614

Closed

Conversation

hachikuji
Copy link

No description provided.

@hachikuji hachikuji force-pushed the exactly-once-message-format branch from a4bd61d to c2a14f8 Compare March 1, 2017 00:12
@hachikuji
Copy link
Author

cc @ijuma @junrao

A couple initial points of discussion:

  1. Message class hierarchy. The LogEntry interface now represents the record set and LogRecord represents the record. I have left Records with its current name which is a bit weird when you need to write things like records.entries.records. I have also been thinking of changing LogEntry to LogRecordSet, which would make the Records name a bit more intuitive. Previously I considered changing Records to simply Log, though that's a little annoying because of the server class with the same name.
  2. The new magic version is implemented in EosLogEntry and EosLogRecord. This name should be changed of course. Any suggestions? The old magic version is implemented in OldLogEntry, so we could call it NewLogEntry?

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1902/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1899/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1900/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1904/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1905/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1907/
Test FAILed (JDK 8 and Scala 2.11).

@hachikuji
Copy link
Author

FYI: System tests run here: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2017-03-01--001.1488358155--confluentinc--exactly-once-message-format--c2a14f8/report.html. The one failure looks like a known problem, but I haven't investigated.

@ijuma
Copy link
Contributor

ijuma commented Mar 3, 2017

@hachikuji, good questions!

  1. I don't like the Set suffix because order is relevant, so if we wanted to go that way, I think it should be LogRecordsList. Which is kind of just LogRecords, which is confusing given that it's part of Records. Is record set just a record batch?
  2. Instead of NewLogEntry, should it be DefaultLogEntry? And OldLogEntry could be LegacyLogEntry or something like that.

@hachikuji hachikuji force-pushed the exactly-once-message-format branch from c2a14f8 to 76e4913 Compare March 3, 2017 17:39
@hachikuji
Copy link
Author

@ijuma Thanks for the suggestions.

  1. It is a batch. The producer already uses RecordBatch, but maybe we could commandeer that name? I'm not too fond of LogRecordsList. It's a bit tough to come up with something reasonable given the name of Records. I'd rename that personally to something which conveyed the fact that it was a sequence of bytes of the log (but of course I already tried that and failed).

  2. DefaultLogEntry sounds good to me. No strong preference between OldLogEntry and LegacyLogEntry. But perhaps "legacy" is a bit more suggestive of its use.

The other thing I wanted to mention is that I've left around the old Record class more or less as it currently is. This is why I needed to introduce the LogRecord interface. This seemed fine given use of the "log" prefix in LogEntry, but I've considered several times moving the current Record into OldLogEntry, and then using Record in place of LogRecord. What do you think?

@ijuma
Copy link
Contributor

ijuma commented Mar 3, 2017

I'm not fond of LogRecordsList either, in case it was not clear from my message. :) I was thinking that maybe we could rename the existing RecordBatch to ProducerBatch or something.

Yes, I prefer Legacy a bit for the reason you state.

I think it makes sense to rename LogRecord to Record. The current Record could then either be moved to LegacyLogEntry (already using this name ;)) or it could just be LegacyRecord.

@asfbot
Copy link

asfbot commented Mar 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1981/
Test PASSed (JDK 8 and Scala 2.11).

@hachikuji
Copy link
Author

So if we follow those suggestions, then we have the following hierarchy:

Records -> [RecordBatch]
RecordBatch -> [Record]

That seems reasonable to me. The RecordBatch -> ProducerBatch renaming will probably cause some confusion, but people will get over it.

@ijuma
Copy link
Contributor

ijuma commented Mar 3, 2017

@junrao, what do you think?

@asfbot
Copy link

asfbot commented Mar 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1979/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1978/
Test FAILed (JDK 7 and Scala 2.10).

* @param value The value to write
* @param out The output to write to
*/
public static void writeUnsignedVarLong(long value, DataOutput out) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should introduce a ByteUtils class and move all the relevant methods there. Maybe we could do that in its own PR, which could be merged quickly independently of the message format changes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the patch. Looks good overall. Made a pass of non-test files and added some comments.

*
* -----------------------------------------------------------------------------------
* | Unused (5-16) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
* -----------------------------------------------------------------------------------
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may add new compression codec in the future. Using the bits from 15 downwards makes adding new compression codec a bit easier in the future. Also, unused should be 5-15.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to leave space for the compression type, but I'd hope we won't need more than 3 bits for it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is we'll have another message format bump before we run out of room for compression codecs, but I don't mind adding another bit if you think it's useful.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I may have misunderstood. Are you suggesting moving the transactional flag and timestamp type to the end of the second byte?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already have 3 bits for compression, so leaving this as it is will be fine.

static final int BASE_TIMESTAMP_LENGTH = 8;
static final int MAX_TIMESTAMP_OFFSET = BASE_TIMESTAMP_OFFSET + BASE_TIMESTAMP_LENGTH;
static final int MAX_TIMESTAMP_LENGTH = 8;
static final int LAST_OFFSET_DELTA_OFFSET = MAX_TIMESTAMP_OFFSET + MAX_TIMESTAMP_LENGTH;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the comment above the class and in the KIP wiki, LastOffsetDelta is after Attributes.


@Override
public int lastSequence() {
// FIXME: cast to int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is no longer valid?

while (iterator.hasNext()) {
LogRecord record = iterator.next();
int offsetDelta = (int) (record.offset() - baseOffset);
size += EosLogRecord.sizeInBytes(offsetDelta, record.timestamp(), record.key(), record.value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, should we be passing in delta timestamp?

* ValueLen => VarInt
* Value => data
*
* The record attributes indicate whether the key and value fields are present. The current attributes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment seems outdated. Attributes are no longer used to indicate the presence of key and value.

Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
struct.set(ACKS_KEY_NAME, acks);
struct.set(TIMEOUT_KEY_NAME, timeout);

// TODO: Include transactional id once transactions are supported
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the TODO still valid?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the TODO should really go to the other constructor.

val records = {
val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
Seq(new KafkaRecord(timestamp, key, value)).asJava))
val builder = MemoryRecords.builder(buffer, magicValue, compressionType, TimestampType.CREATE_TIME, 0L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should timestampType be hardcoded to createTime or should it use the topic level config? Ditto for line 256 below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CREATE_TIME is correct, see 7565dcd

@@ -579,11 +592,15 @@ class GroupMetadataManager(val brokerId: Int,
case Some((magicValue, timestampType, timestamp)) =>
val partitionOpt = replicaManager.getPartition(appendPartition)
partitionOpt.foreach { partition =>
val tombstones = removedOffsets.map { case (topicPartition, offsetAndMetadata) =>
var offset = 0L
val builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magicValue, compressionType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 1024 guaranteed to be always large enough?

print("offset: " + record.offset + " position: " + validBytes +
" " + entry.timestampType + ": " + record.timestamp + " isvalid: " + record.isValid +
" keysize: " + record.keySize + " valuesize: " + record.valueSize + " magic: " + entry.magic +
" compresscodec: " + entry.compressionType + " crc: " + record.checksum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we print out additional information related to EOS format (e.g. pid, epoch, sequence) etc? Should we support printing out the control message?

@@ -25,6 +27,8 @@
files="Fetcher.java"/>
<suppress checks="ParameterNumber"
files="ConfigDef.java"/>
<suppress checks="ParameterNumber"
files="EosLogEntry.java"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have increased the param limit in checkstyle, do we still need this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd need to increase a bit more to capture this one too. Increasing to 12 dropped two others.

@junrao
Copy link
Contributor

junrao commented Mar 6, 2017

A few other high level comments.

  1. In the EOS message format, it's probably worth considering including the message count in the message format. This has the benefit that the consumer can allocate the right size of the array to store those messages and is also consistent with how we represent an array in other places.

  2. About EosLogEntry and OldLogEntry. Perhaps we can name them based on the magic. So, it would be V2LogEntry, V1LogEntry and V0LogEntry.

@ijuma
Copy link
Contributor

ijuma commented Mar 6, 2017

@junrao, about including the message format version as a prefix or suffix. I considered that, but the issue is that more than one version is supported by each class. Unless we want to move away from that, it seems a bit awkward.

@hachikuji hachikuji force-pushed the exactly-once-message-format branch from 76e4913 to 5ac42cc Compare March 9, 2017 01:37
@asfbot
Copy link

asfbot commented Mar 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2099/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2102/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2100/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few comments. I didn't review the message format changes themselves since you are about to do a bunch of renames and a trunk merge (I assume). Probably best to review after that.

/**
* Maintains node api versions for access outside of NetworkClient (which is where the information is derived).
* The pattern is akin to the use of {@link Metadata} for topic metadata.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a note that this is an internal class since the package doesn't make that obvious.

usableMagic = LogEntry.MAGIC_VALUE_V2;
}
if (usableMagic < maxUsableMagic)
maxUsableMagic = usableMagic;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maxUsableMagic = Math.min(usableMagic, maxUsableMagic)?


default:
usableMagic = LogEntry.MAGIC_VALUE_V2;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this switch should be a separate method as it's the interesting logic that may need to be updated when we add more produce versions. Maybe it should live in ProduceRequest? And we should have a test that breaks when we add a new produce version to the protocol (if we don't already) so that we don't forget to update it (since the client would still work fine, it could go unnoticed).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fair. Let me see if we can move it to ProduceRequest.

version = versionInfo.usableVersion(clientRequest.apiKey());
} else {
versionInfo.ensureUsable(clientRequest.apiKey(), builder.desiredVersion());
version = builder.desiredVersion();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, would it be better to have a NodeApiVersions.usableVersion that takes a desiredVersion as well? We could the collapse these two branches and there would be no need to call ensureUsable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds good.

private static final short NODE_TOO_OLD = (short) -1;
private static final short NODE_TOO_NEW = (short) -2;
private final Collection<ApiVersion> nodeApiVersions;
// An array of the usable versions of each API, indexed by the ApiKeys ID.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an existing issue, but realised that this comment is out of date.

}
for (UsableVersion usableVersion : usableVersions.values()) {
if (usableVersion.apiVersion.apiKey == apiKey.id)
return usableVersion.apiVersion;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just simply call get on the map? Also, maybe we should return null if there's no element (like Map.get).

} else {
usableVersions.put(nodeApiKey, v);
usableVersions.put(nodeApiKey, new UsableVersion(nodeApiVersion, v));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could move the logic that creates the appropriate UsableVersion into a static factory method (or constructor) in UsableVersion.

private static final short NODE_TOO_NEW = (short) -2;

private final ApiVersion apiVersion;
private final Short defaultVersion;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the term default a bit confusing here. This is usableVersion, right? I guess you were trying to avoid repeating it since the class name is UsableVersion. Could we just call it value?

this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, discoverBrokerVersions);
boolean discoverBrokerVersions,
ApiVersions nodeApiVersions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the field name should just be apiVersion, no?

"0.10.2" -> KAFKA_0_10_2_IV0
"0.10.2" -> KAFKA_0_10_2_IV0,
// KIP-98, aka EOS, changes.
"0.10.3-IV0" -> KAFKA_0_10_3_IV0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be "0.11.0-IV0" and the rest should be updated accordingly.

* High-level representation of a kafka record. This is useful when building record sets to
* avoid depending on a specific magic version.
*/
public class KafkaRecord {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would BasicRecord or SimpleRecord be a better name? I think the key feature is that it just captures the essence of what a record is: timestamp, key, value (and potentially headers in the future?).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SimpleRecord works for me. I may have actually had this in the code at one time or another.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I finally have gone through the whole diff. :) LGTM to merge if the system tests pass.

I have made a few notes of follow-up changes (most are not time sensitive although a couple are) and will file JIRAs in the next few days.

While going through the changes, I put together some minor improvements and some additional tests:

https://github.com/confluentinc/kafka/pull/151/files

We can merge it to this PR or to trunk subsequently depending on your preference.

}

@Test
public void testIterator() {
MemoryRecordsBuilder builder1 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset);
MemoryRecordsBuilder builder2 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why we had 2 builders?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we were verifying the auto-incrementing behavior. I will verify that we have both cases covered.

for (Header header : headers) {
String headerKey = header.key();
if (headerKey == null) {
size += NULL_VARINT_SIZE_BYTES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we decide on this one?

buffer.position(RECORDS_OFFSET);
final int totalRecords = count();

return new Iterator<Record>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice that we avoid the overhead of the streams here.

int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - baseTimestamp;
long crc = DefaultRecord.writeTo(appendStream, isControlRecord, offsetDelta, timestampDelta, key, value, headers);
// TODO: The crc is useless for the new message format. Maybe we should let writeTo return the written size?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that it's worth adding a brief comment explaining until we fix the issue.

* @param record the record to add
*/
public void append(Record record) {
appendWithOffset(record.offset(), record.isControlRecord(), record.timestamp(), record.key(), record.value(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK that we don't check the magic here, but we do for the append that takes a legacy record?

Copy link
Author

@hachikuji hachikuji Mar 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good question. I'm not sure there's a good reason for the check in the LegacyRecord case. I think initially we were doing something like copying the bytes directly, so it may have made more sense before.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I guess we still do the memory copy in this case. The question is whether we should?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is only used by ByteBufferMessageSet (i.e. Scala clients) so it seems OK to favour consistency over optimisation in this case.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the patch. LGTM. Just a few minor comments.

record.buffer(),
wrapperRecordTimestamp,
wrapperRecord.timestampType()
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to previous line?

*/
public class MemoryRecords extends AbstractRecords {

public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));

private final ByteBuffer buffer;

private final Iterable<ByteBufferLogEntry> shallowEntries = new Iterable<ByteBufferLogEntry>() {
private final Iterable<MutableRecordBatch> logEntries = new Iterable<MutableRecordBatch>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logEntries => recordBatch?

@@ -54,7 +54,7 @@ class ConsumerFetcherThread(name: String,
replicaId(Request.OrdinaryConsumerId).
maxWait(config.fetchWaitMaxMs).
minBytes(config.fetchMinBytes).
requestVersion(kafka.api.FetchRequest.CurrentVersion)
requestVersion(3) // for now, the old consumer is pinned to the old message format through the the fetch request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the the => the

@asfbot
Copy link

asfbot commented Mar 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2330/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2330/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2334/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2350/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2354/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2350/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2358/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2354/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2354/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fix looks good. Some minor comments.

byte magicByte) {
int i = 0;
for (RecordBatch batch : convertedRecords.batches()) {
assertEquals("magic byte should be " + magicByte, magicByte, batch.magic());
assertTrue("Magic byte should be lower than or equal to " + magicByte, batch.magic() <= magicByte);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't assertEquals work here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only convert messages when necessary. So if we down-convert a batch to version 1, and we have messages which are magic 0, we won't up-convert them to 1.

byte magicByte) {
int i = 0;
for (RecordBatch batch : convertedRecords.batches()) {
assertEquals("magic byte should be " + magicByte, magicByte, batch.magic());
assertTrue("Magic byte should be lower than or equal to " + magicByte, batch.magic() <= magicByte);
assertEquals("Compression type should not be affected by conversion", compressionType, batch.compressionType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth adding some timestamp assertions as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. May as well.


// down conversion for compressed messages
// down conversion for non-compressed messages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-compressed should be removed, right?

Records convertedRecords = fileRecords.downConvert(RecordBatch.MAGIC_VALUE_V0);
verifyConvertedMessageSet(records, offsets, convertedRecords, RecordBatch.MAGIC_VALUE_V0);
Records convertedRecords = fileRecords.downConvert(toMagic);
verifyConvertedMessageSet(records, offsets, convertedRecords, compressionType, toMagic);
}
}

private void verifyConvertedMessageSet(List<SimpleRecord> initialRecords,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verifyConvertedBatches

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2356/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2360/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2356/
Test PASSed (JDK 8 and Scala 2.12).

* Note that when compression is enabled (see attributes below), the compressed record data is serialized
* directly following the count of the number of records.
*
* The CRC covers the data from the attributes to the end of the batch. Note that the location is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if CRC only covers from Attributes to the end, do we really need to switch PartitionLeaderEpoch and CRC?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true that the change is not strictly needed. The reasoning for the switch is that this way the CRC includes everything that follows it (same as for V0 and V1 formats). In V0 and V1, the offset and length are before the CRC and are not included in the computation. In V2, the offset, length, partitionLeaderEpoch and magic are before the CRC and not included in the computation.

Given that, do you think the benefit is worth the switch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier, Jason's proposal is to put CRC to the very end after records and covers everything from magic. If we do that, the switch is not ideal, but makes sense. If CRC is still at the front, then it seems leaving CRC in its current location causes less confusion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we leave the CRC in the same position as in V0 and V1, would you still change the computation to be from attributes to the end? A couple of alternatives (each with pros/cons):

  1. Include magic in the CRC computation, but not partitionLeaderEpoch (con: we'd have to call Checksum.update twice instead of once).
  2. Compute the CRC from magic to the end (as it was before), but with PartitionLeaderEpoch always assumed to be -1. This makes the computation simple on the producer side, but it's a bit tricky after the PartitionLeaderEpoch has been set. This is a bit similar to how TCP includes the checksum field in the checksum computation, but assumes it to be always 0 (to avoid the chicken and egg problem).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can swap the CRC and leader epoch fields in the current version, but keep the scope of the CRC unchanged. In other words:

 * RecordBatch =>
 *  BaseOffset => Int64
 *  Length => Int32
 *  CRC => Uint32 (covers everything beginning with Attributes)
 *  Magic => Int8
 *  PartitionLeaderEpoch => Int32
 *  Attributes => Int16
 *  LastOffsetDelta => Int32
 *  BaseTimestamp => Int64
 *  MaxTimestamp => Int64
 *  ProducerId => Int64
 *  ProducerEpoch => Int16
 *  BaseSequence => Int32
 *  Records => [Record]

It's a little odd that we change the scope of the CRC in this format, but perhaps it's less odd than changing the location of the field entirely.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tough one. It's nice being consistent with the old format to some extent, but clients would still need to look at the magic byte first to figure out how to validate the CRC, so maybe the consistency win is marginal. The main advantage of the format in the current patch is that it's clear at a glance what is covered by the CRC, so it seems easier to explain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline and Jun said he was OK with leaving as is.

@hachikuji
Copy link
Author

hachikuji commented Mar 24, 2017

Ran system core and client system tests here: http://testing.confluent.io/confluent-kafka-branch-builder-system-test-results/?prefix=2017-03-23--001.1490326564--confluentinc--exactly-once-message-format--2e7cf82/.

The failing zookeeper upgrade test seems to be transient. Here's a passing run: http://testing.confluent.io/confluent-kafka-branch-builder-system-test-results/?prefix=2017-03-24--001.1490323181--confluentinc--exactly-once-message-format--2e7cf82/. I investigated the failure in one case and it seems to be unrelated (apparently caused the overridden min isr setting of 2 and the fact that the test does not actually ensure that 2 brokers are always up).

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for running the system tests. Latest updates LGTM too. Will merge to trunk shortly.

This is an impressive PR, well done. :) Also, kudos to Jun for his detailed review.

I took notes of follow-up work and will file a few JIRAs in the next few days.

@hachikuji, please update the KIP page to include the updates that we did during the PR review.

@apurvam
Copy link
Contributor

apurvam commented Mar 24, 2017

LGTM!

@asfgit asfgit closed this in 5bd06f1 Mar 24, 2017
@@ -114,8 +143,28 @@ public FetchResponse(Struct struct) {
int partition = partitionResponseHeader.getInt(PARTITION_KEY_NAME);
Errors error = Errors.forCode(partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME));
long highWatermark = partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME);
long lastStableOffset = INVALID_LSO;
if (partitionResponse.hasField(LAST_STABLE_OFFSET_KEY_NAME))
Copy link
Member

@lindong28 lindong28 Mar 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: we should use partitionResponseHeader instead of partitionResponse to get the response. I guess this is not discovered by any test because the current patch always has last_stable_offset = -1L?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this @lindong28. This field won't be used until the transactional code lands. I submitted a PR with tests:

#2737

@ijuma ijuma deleted the exactly-once-message-format branch January 25, 2020 16:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants