Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4390: Replace MessageSet usage with client-side alternatives #2140

Closed
wants to merge 22 commits into from

Conversation

hachikuji
Copy link

No description provided.

@hachikuji
Copy link
Author

ping @junrao @guozhangwang @ijuma @apurvam

This patch modifies the server implementation to use the client-side Record objects for all internal processing. As you can see, this was a hefty bit of work, but fortunately most of the transformations are straightforward. The main thing to focus on is the implementation of LogValidator, which contains the offset assignment and record validation logic that was previously contained in ByteBufferMessageSet. I've been pretty careful to preserve the optimizations that were present previously (e.g. in-place assignment where possible), but don't take my word for it.

One quick note on naming. I've renamed the Records object and subclasses to LogBuffer. So MemoryRecords is now MemoryLogBuffer. The reason for this change was that it felt unintuitive for an instance of Records to be an Iterable<LogEntry>, with the LogEntry instances being the actual container for the records. A LogBuffer instead represents a range of the log and provides access to the log entries contained in it. That seemed more intuitive to me, but let me know if you agree or if you have other suggestions.

@onurkaraman
Copy link
Contributor

Slightly related, slightly tangential: is there a specific reason why we put the new broker-specific java classes under clients/ ?

I'm talking about stuff like:
FileRecords
LeaderAndIsrRequest / LeaderAndIsrResponse
StopReplicaRequest / StopReplicaResponse
UpdateMetadataRequest / UpdateMetadataResponse

@hachikuji
Copy link
Author

hachikuji commented Nov 16, 2016

@onurkaraman Yeah, I've wondered a bit about that also. I'd be OK moving FileRecords to the server if people prefer that. I was thinking one potential benefit is that it opens the door to adding persistence to the client, which some users have requested (we have an ancient JIRA for this, but the use case might not be too compelling). In the end, I decided it wasn't that much code, so having it in clients didn't hurt too much and it kept all record-related stuff close together, which may make it easier to share common pieces.

@ijuma
Copy link
Contributor

ijuma commented Nov 16, 2016

@hachikuji, thanks for tackling this.

About the naming question, I also found it a bit confusing how we sometimes have an offset and sometimes don't when talking about records. That is, MemoryRecords includes the offset (and record size in the underlying buffer) for each record while Record does not. It all becomes clearer when one realises that MemoryRecords (renamed to MemoryLogBuffer in the PR) actually contains LogEntry instances, each being a pair of offset and Record.

One thing to think about is whether this fits with the other Record classes we have and whether that matters (maybe it doesn't). For example, ConsumerRecord contains the offset while ProducerRecord does not. Also, it would have been a bit easier to review if the rename had been done in a separate PR, but probably too late for that. :)

About having the classes in clients, I think that's OK as they are in an internal common package.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed a couple of things when I scanned the PR. Will do a proper review later.

@@ -427,49 +427,49 @@ public ByteBuffer validate(Object item) {
}
};

public static final Type RECORDS = new Type() {
public static final Type LOGBUFFER = new Type() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional that this is LOGBUFFER instead of LOG_BUFFER? Same for the toString implementation.

@@ -123,7 +123,7 @@ public int timeout() {
return timeout;
}

public Map<TopicPartition, MemoryRecords> partitionRecords() {
public Map<TopicPartition, MemoryLogBuffer> partitionRecords() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this method be renamed as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. There are probably a few of these. I'll do another pass and try to find others.

@hachikuji
Copy link
Author

@becketqin Would be nice to get your feedback also. Put on the coffee and lose yourself in code review!

@becketqin
Copy link
Contributor

Wow, a 5000 line change... I'll take a look this weekend...

@becketqin
Copy link
Contributor

@hachikuji I ran into some other stuff today and didn't finish reading the entire patch. Just some thoughts when I was reading the code. I think "Message" (and "Record" in the new clients) are a well established concept for Kafka. It is indeed a little weird that Records is an Iterable<LogEntry>, but I felt changing all the Reocrds to LogBuffer seems introducing a new concept (BTW FileLogBuffer sounds a little weird given it actually does not have a buffer). I would like to see if we can avoid solving the confusion by adding a new concept.

Not sure if there was any thinking on changing LogEntry to something like LogRecord which indicate it's something resides in the log? Then Records would contain a few LogRecord which contains (Offset + Record). We also have a pretty symmetric naming for ProducerRecord, ConsumerRecord and LogRecord clearly indicating where they are used. We can also consider renaming Records to LogRecords to make it clear. I am not sure if Records counts as a public interface or not, though. I know we have a page stating which packages are public and which are not, but I doubt if people follows that given we have an explicit internals package...

I'll save my other comments until I go through a full pass of the code in case some of them are not valid at all (I already found some...)

@ijuma
Copy link
Contributor

ijuma commented Nov 21, 2016

@becketqin, that's a fair point about the rename and introducing a new concept. I have similar concerns and was wondering how we could make things clearer without that. Your suggestion looks promising.

@onurkaraman
Copy link
Contributor

Does it make sense to separate the renaming from the actual task of this patch?

@hachikuji
Copy link
Author

@becketqin Thanks for taking a look. I'm not sure I follow why you consider the renaming a conceptual change. The object works the same as before, but I felt the name fit closer to what the object actually represents, which is a range of bytes from the log. The name Records to me just suggests a container for Record objects.

The suggestion about LogRecord makes sense to me. I have actually done something similar in work building off of this patch. At the same time, I would like to preserve a concept of LogEntry as a container for records which sits between LogBuffer (or Records) and LogRecord (or Record). The basic idea is to treat the shallow messages as log entries, and the deep messages as log records (an uncompressed message is treated as a log entry containing just a single log record).

To give a bit more background, we're trying to generalize the concept of a message set so that 1) it uses a separate schema from individual messages, and 2) it's extended to uncompressed data. This allows us to amortize the cost of additional metadata which is invariant for the messages contained in a message set. I'm happy to provide some additional detail if you're interested (there will be a KIP on the way some time in the next few weeks).

@onurkaraman Yeah, we can do that if it makes this patch easier to get in. Let's see what others think. Sigh, any suggestion to reduce lines of code is likely to be popular to all except me.

@hachikuji
Copy link
Author

I ran system tests on the latest patch and everything looks good: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2016-11-22--001.1479824556--hachikuji--KAFKA4390--24dc7ed/report.html. I will probably continue to add some additional test cases, but I'll leave the rest as is pending further review comments.

@junrao
Copy link
Contributor

junrao commented Nov 23, 2016

@hachikuji : Will also take a look at the patch. Just a quick note. Could you do some performance test to make sure there is no regression?

@hachikuji
Copy link
Author

hachikuji commented Nov 23, 2016

@junrao Thanks for taking a look. Performance testing is next on my plate after I fill in some of the gaps in test coverage.

@hachikuji hachikuji force-pushed the KAFKA4390 branch 2 times, most recently from f002ac6 to b646ef5 Compare November 28, 2016 04:49
@hachikuji
Copy link
Author

Update: I've begun performance testing. I'm seeing a substantial performance degradation on the consumer side. I'll update this PR when I know more.

@hachikuji
Copy link
Author

I found the cause of the performance regression. When handling a fetch, we must read through the log to find the starting position of a given offset (starting from the position given by the index). To do so, we only need to read the offset and size, but one of my recent commits accidentally changed this behavior to unnecessarily read the full record. I've fixed this in the last commit and now it looks like performance is within 5% of trunk for the producer and consumer. Perhaps still on the slower side though, so I'll continue investigating.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the patch. Made a pass on this. Some of the issues that I pointed out seem to be existing. We can decide whether to address them here or in followup jiras.


public void setCreateTime(long timestamp) {
Record record = record();
if (record.magic() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assert record.magic() > 0?

throw new KafkaException(String.format("Size of FileRecords %s has been truncated during write: old size %d, new size %d", file.getAbsolutePath(), size, newSize));

long position = start + offset;
long count = Math.min(length, this.size.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent, perhaps this.size and this.channel should just be size and channel?

@Override
public long writeTo(GatheringByteChannel channel, long position, int length) throws IOException {
ByteBuffer dup = buffer.duplicate();
dup.position(new Long(position).intValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that some of the changes are lost during rebase? For example, there was code in MemoryRecords for setting the buffer limit according to length, and cast position to int instead of creating a Long object.


for (LogEntry deepEntry : shallowEntry) {
Record deepRecord = deepEntry.record();
messagesRead += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be an existing issue. For uncompressed messages, do we double count messagesRead since we already increased the count in line 98?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I'll fix this and the one below and update the test cases.

if (writeOriginalEntry) {
// There are no messages compacted out and no message format conversion, write the original message set back
shallowEntry.writeTo(buffer);
messagesRetained += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and line 144 don't seem to be correct. It seems that we should add the number of entries in retainedEntries?

@@ -152,7 +152,7 @@ class DelayedFetch(delayMs: Long,
)

val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)
tp -> FetchPartitionData(result.errorCode, result.hw, result.info.logBuffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import kafka.api.FetchResponsePartitionData

messageTimestampType = TimestampType.LOG_APPEND_TIME,
messageTimestampDiffMaxMs = 1000L)
val validatedLogBuffer = validatedResults.validatedEntries
assertEquals("message set size should not change", logBuffer.deepEntries.asScala.size, validatedLogBuffer.deepEntries.asScala.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"message set size" is bit ambiguous. Perhaps we should say "number of messages"?

}
}

/* check that offsets are assigned based on byte offset from the given base offset */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what's "byte offset".

@@ -70,7 +70,7 @@ class MessageCompressionTest extends JUnitSuite {
testCompressSize(GZIPCompressionCodec, messages, 396)

if(isSnappyAvailable)
testCompressSize(SnappyCompressionCodec, messages, 502)
testCompressSize(SnappyCompressionCodec, messages, 1063)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why do we have to change the expected size?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It puzzled me for a while when writing this code why the size was coming out different only for snappy, but it turns out that we've overridden the block size in the client code, instead of using the default as was done for the server code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. We probably don't want the change the buffer size in the server to be the same as the client. We may consider changing the client to be the same as the server. See KAFKA-3704 for details.

convertAndVerify(v, Message.MagicValue_V0, Message.MagicValue_V1)
} else if (v.magicValue == Message.MagicValue_V1) {
convertAndVerify(v, Message.MagicValue_V1, Message.MagicValue_V0)
if (v.codec == NoCompressionCodec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason that we only test non-compressed message conversion now?

Copy link
Author

@hachikuji hachikuji Dec 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I think the test was broken or at least incomplete since Message.toFormatVersion only did shallow conversion. When I implemented this in the client code, I forbid shallow-only conversion because it results in bugs like we found in LogCleaner. We'll probably end up dropping this code after we remove Message.toFormatVersion as suggested above.

@hachikuji
Copy link
Author

@junrao I really appreciate the thorough review. I've addressed the easier items and left a few replies. I'll get to the rest tomorrow.

By the way, in the near future, I'd like to squash commits to make rebasing a bit easier. It hasn't been too much of a problem yet, but it will get harder with more iterations.

@guozhangwang
Copy link
Contributor

guozhangwang commented Dec 5, 2016

About the naming of Records to LogBuffer, I share the same concern with @becketqin and @ijuma . My proposal would be to rename LogEntry to RecordEntry or simply RecordAndOffset (seems more Scala-ish), and to me it is OK to have Records.iterator() return Iterator<RecordEntry>. As for ConsumerRecord and ProducerRecord, it would be best if them both contain a Record field, and then ConsumerRecord to contain a separate offset field, but since it is public APIs we have to leave it as is, which is not too bad to me.

Also, I'd like to suggest we separate the renaming out of this PR for the ease of reviewing, if it is still possible to revert it back.

@hachikuji
Copy link
Author

@guozhangwang RecordEntry would work for me. Keep in mind that if KIP-98 is approved, it will do more than just track the offset, so I'd rather not use something specific like RecordAndOffset. Since my LogBuffer suggestion is not too popular, I'll go ahead and revert that change. Then the hierarchy will be Records -> RecordEntry -> Record. Does that sound reasonable?

@hachikuji hachikuji force-pushed the KAFKA4390 branch 2 times, most recently from 42b51b2 to fb79917 Compare December 5, 2016 21:31
@hachikuji
Copy link
Author

I've gone ahead and squashed commits. You can still find the old commit history here: https://github.com/hachikuji/kafka/tree/KAFKA-4390-UNSQUASHED.

@asfbot
Copy link

asfbot commented Dec 12, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/81/
Test PASSed (JDK 8 and Scala 2.11).

@hachikuji
Copy link
Author

hachikuji commented Dec 12, 2016

@junrao Latest round of comments addressed. Please take a look.

@ijuma On the question of Iterator vs Iterable, I'm pretty open. I used the former for consistency with the old code, but I agree it would be nice to be able to use the "foreach" syntax.

@asfbot
Copy link

asfbot commented Dec 12, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/86/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Dec 12, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/85/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Dec 12, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/87/
Test FAILed (JDK 8 and Scala 2.11).

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the latest patch. Just a few minor comments. Also, could you post the latest performance results? Assuming there is no degradation, the patch LGTM.

* A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
* for the in-memory representation.
* Interface for accessing the records contained in a log. The log itself is represented as a sequence of log entries.
* Each log entry consists of a 4 byte size, an 8 byte offset, and a "shallow" {@link Record record}. If
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a 4 byte size, an 8 byte offset => an 8 byte offset, a 4 byte size of the record

}

/**
* Close this batch for no more appends
* Filter this log buffer into the provided ByteBuffer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reference to "log buffer" still valid?

}

/**
* Get the records from this log buffer (note this requires "deep" iteration into the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reference to "log buffer " still valid?

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/92/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/91/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/90/
Test PASSed (JDK 7 and Scala 2.10).

@hachikuji
Copy link
Author

@junrao Thanks for the reviews. I did some testing this evening. I thought I was seeing some performance difference initially in the producer, but it seems within the variance of the test runs. If I were guessing from the results, I'd say the non-compressed path is a tad slower while the compressed path might be a tad faster, but don't put much weight behind either conclusion. In any case, the results seem close enough that I'd recommend merging now. Note that I did add one commit to address a couple minor cleanups and tighten up the iteration code a little.

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/96/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/97/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/98/
Test FAILed (JDK 8 and Scala 2.11).

public int read() {
if (!buffer.hasRemaining()) {
return -1;
private static class UnderlyingInputStream extends InputStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit annoying that we create so much indirection (DataLogInputStream -> ByteBufferInputStream -> UnderlyingInputStream -> ByteBuffer -> byte[]). In an ideal world, we would not bother with InputStream at all and would just operate at the ByteBuffer level. However, the gzip case is hard to do that way.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Haha, yeah. One of the layers is sort of fake (DataInputStream should be a mixin), but the point is still valid.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the patch. LGTM. Just a couple of minor comments.

About the performance. We used to optimize the path for recompression on the broker side by implementing a chained ByteBuffer list to avoid copying during buffer overflow for writes. With the patch, we lose such optimization and simply recopies data to a bigger buffer during overflow. This will affect the performance of recompression when the estimated after-compression size is lower. Recompression can happen when (1) producer is old, and (2) the broker compression codec is different from the producer's, both should be uncommon. So, we can probably commit the patch as it is. If the recompression performance is a problem, we can always optimize the code for expanding the buffer in ByteBufferOutputStream later.

this.buffer = buffer;
buffer.position(LOG_OVERHEAD);
this.record = new Record(buffer.slice());
buffer.position(OFFSET_OFFSET);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change the position of buffer? Perhaps we could instead just change the position in the slice passed to Record().

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently Record expects the position of the ByteBuffer to be at 0. I was tempted to change this assumption, but decided to leave it for now (it's a bit annoying to change all the accessors to assume relative positioning). We could accomplish the same result using mark() and reset() if that seems any better.

* A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
* for the in-memory representation.
* Interface for accessing the records contained in a log. The log itself is represented as a sequence of log entries.
* Each log entry consists of a 4 byte size, an 8 byte offset, a 4 byte record size, and a "shallow" {@link Record record}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"a 4 byte size," needs to be removed.

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/106/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/104/
Test PASSed (JDK 7 and Scala 2.10).

@asfgit asfgit closed this in 67f1e5b Dec 13, 2016
@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/105/
Test PASSed (JDK 8 and Scala 2.12).

soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>, Jun Rao <junrao@gmail.com>

Closes apache#2140 from hachikuji/KAFKA4390
long position = start + offset;
long count = Math.min(length, this.size - offset);
long count = Math.min(length, size.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Math.min(length, size.get() - offset)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dengziming : Thanks for the comment. This does seem like a bug. Would you be interested in submitting a separate PR to have this fixed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants