Skip to content

Commit

Permalink
Improved journal resiliency (#2669)
Browse files Browse the repository at this point in the history
Make sure the message sets we write to journal fit into a segment

Do not try to write a bigger message set to the journal than the max
segment size. Avoids a MessageSetSizeTooLargeException.

Fixes #2659
  • Loading branch information
bernd authored and joschi committed Sep 13, 2016
1 parent 2ba3fb2 commit 4346865
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 9 deletions.
Expand Up @@ -136,11 +136,14 @@ public void sleep(long ms) {
private final KafkaScheduler kafkaScheduler; private final KafkaScheduler kafkaScheduler;
private final Meter writtenMessages; private final Meter writtenMessages;
private final Meter readMessages; private final Meter readMessages;
private final Meter writeDiscardedMessages;


private final OffsetFileFlusher offsetFlusher; private final OffsetFileFlusher offsetFlusher;
private final DirtyLogFlusher dirtyLogFlusher; private final DirtyLogFlusher dirtyLogFlusher;
private final RecoveryCheckpointFlusher recoveryCheckpointFlusher; private final RecoveryCheckpointFlusher recoveryCheckpointFlusher;
private final LogRetentionCleaner logRetentionCleaner; private final LogRetentionCleaner logRetentionCleaner;
private final long maxSegmentSize;
private final int maxMessageSize;


private long nextReadOffset = 0L; private long nextReadOffset = 0L;
private ScheduledFuture<?> checkpointFlusherFuture; private ScheduledFuture<?> checkpointFlusherFuture;
Expand Down Expand Up @@ -168,9 +171,13 @@ public KafkaJournal(@Named("message_journal_dir") File journalDirectory,
this.scheduler = scheduler; this.scheduler = scheduler;
this.throttleThresholdPercentage = intRange(throttleThresholdPercentage, 0, 100); this.throttleThresholdPercentage = intRange(throttleThresholdPercentage, 0, 100);
this.serverStatus = serverStatus; this.serverStatus = serverStatus;
this.maxSegmentSize = segmentSize.toBytes();
// Max message size should not be bigger than max segment size.
this.maxMessageSize = Ints.saturatedCast(maxSegmentSize);


this.writtenMessages = metricRegistry.meter(name(this.getClass(), "writtenMessages")); this.writtenMessages = metricRegistry.meter(name(this.getClass(), "writtenMessages"));
this.readMessages = metricRegistry.meter(name(this.getClass(), "readMessages")); this.readMessages = metricRegistry.meter(name(this.getClass(), "readMessages"));
this.writeDiscardedMessages = metricRegistry.meter(name(this.getClass(), "writeDiscardedMessages"));


registerUncommittedGauge(metricRegistry, name(this.getClass(), "uncommittedMessages")); registerUncommittedGauge(metricRegistry, name(this.getClass(), "uncommittedMessages"));


Expand All @@ -193,8 +200,8 @@ public KafkaJournal(@Named("message_journal_dir") File journalDirectory,
.put(LogConfig.RetentionBytesProp(), retentionSize.toBytes()) .put(LogConfig.RetentionBytesProp(), retentionSize.toBytes())
// retentionMs: The age approximate maximum age of the last segment that is retained // retentionMs: The age approximate maximum age of the last segment that is retained
.put(LogConfig.RetentionMsProp(), retentionAge.getMillis()) .put(LogConfig.RetentionMsProp(), retentionAge.getMillis())
// maxMessageSize: The maximum size of a message in the log // maxMessageSize: The maximum size of a message in the log (ensure that it's not larger than the max segment size)
.put(LogConfig.MaxMessageBytesProp(), Integer.MAX_VALUE) .put(LogConfig.MaxMessageBytesProp(), maxMessageSize)
// maxIndexSize: The maximum size of an index file // maxIndexSize: The maximum size of an index file
.put(LogConfig.SegmentIndexBytesProp(), Ints.saturatedCast(Size.megabytes(1L).toBytes())) .put(LogConfig.SegmentIndexBytesProp(), Ints.saturatedCast(Size.megabytes(1L).toBytes()))
// indexInterval: The approximate number of bytes between index entries // indexInterval: The approximate number of bytes between index entries
Expand Down Expand Up @@ -376,31 +383,82 @@ public Entry createEntry(byte[] idBytes, byte[] messageBytes) {
public long write(List<Entry> entries) { public long write(List<Entry> entries) {
try (Timer.Context ignored = writeTime.time()) { try (Timer.Context ignored = writeTime.time()) {
long payloadSize = 0L; long payloadSize = 0L;
long messageSetSize = 0L;
long lastWriteOffset = 0L;


final List<Message> messages = new ArrayList<>(entries.size()); final List<Message> messages = new ArrayList<>(entries.size());
for (final Entry entry : entries) { for (final Entry entry : entries) {
final byte[] messageBytes = entry.getMessageBytes(); final byte[] messageBytes = entry.getMessageBytes();
final byte[] idBytes = entry.getIdBytes(); final byte[] idBytes = entry.getIdBytes();


payloadSize += messageBytes.length; payloadSize += messageBytes.length;
messages.add(new Message(messageBytes, idBytes));
final Message newMessage = new Message(messageBytes, idBytes);
// Calculate the size of the new message in the message set by including the overhead for the log entry.
final int newMessageSize = MessageSet.entrySize(newMessage);

if (newMessageSize > maxMessageSize) {
writeDiscardedMessages.mark();
LOG.warn("Message with ID <{}> is too large to store in journal, skipping! (size: {} bytes / max: {} bytes)",
new String(idBytes, StandardCharsets.UTF_8), newMessageSize, maxMessageSize);
payloadSize = 0;
continue;
}

// If adding the new message to the message set would overflow the max segment size, flush the current
// list of message to avoid a MessageSetSizeTooLargeException.
if ((messageSetSize + newMessageSize) > maxSegmentSize) {
if (LOG.isDebugEnabled()) {
LOG.debug("Flushing {} bytes message set with {} messages to avoid overflowing segment with max size of {} bytes",
messageSetSize, messages.size(), maxSegmentSize);
}
lastWriteOffset = flushMessages(messages, payloadSize);
// Reset the messages list and size counters to start a new batch.
messages.clear();
messageSetSize = 0;
payloadSize = 0;
}
messages.add(newMessage);
messageSetSize += newMessageSize;


if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Message {} contains bytes {}", bytesToHex(idBytes), bytesToHex(messageBytes)); LOG.trace("Message {} contains bytes {}", bytesToHex(idBytes), bytesToHex(messageBytes));
} }
} }


final ByteBufferMessageSet messageSet = new ByteBufferMessageSet(JavaConversions.asScalaBuffer(messages).toSeq()); // Flush the rest of the messages.
if (messages.size() > 0) {
lastWriteOffset = flushMessages(messages, payloadSize);
}


final LogAppendInfo appendInfo = kafkaLog.append(messageSet, true);
long lastWriteOffset = appendInfo.lastOffset();
LOG.debug("Wrote {} messages to journal: {} bytes, log position {} to {}",
entries.size(), payloadSize, appendInfo.firstOffset(), lastWriteOffset);
writtenMessages.mark(entries.size());
return lastWriteOffset; return lastWriteOffset;
} }
} }


private long flushMessages(List<Message> messages, long payloadSize) {
if (messages.isEmpty()) {
LOG.debug("No messages to flush, not trying to write an empty message set.");
return -1L;
}

final ByteBufferMessageSet messageSet = new ByteBufferMessageSet(JavaConversions.asScalaBuffer(messages).toSeq());

if (LOG.isDebugEnabled()) {
LOG.debug("Trying to write ByteBufferMessageSet with size of {} bytes to journal", messageSet.sizeInBytes());
}

final LogAppendInfo appendInfo = kafkaLog.append(messageSet, true);
long lastWriteOffset = appendInfo.lastOffset();

if (LOG.isDebugEnabled()) {
LOG.debug("Wrote {} messages to journal: {} bytes (payload {} bytes), log position {} to {}",
messages.size(), messageSet.sizeInBytes(), payloadSize, appendInfo.firstOffset(), lastWriteOffset);
}
writtenMessages.mark(messages.size());

return lastWriteOffset;
}

/** /**
* Writes a single message to the journal and returns the new write position * Writes a single message to the journal and returns the new write position
* *
Expand Down
Expand Up @@ -25,6 +25,8 @@
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import kafka.common.KafkaException; import kafka.common.KafkaException;
import kafka.log.LogSegment; import kafka.log.LogSegment;
import kafka.message.Message;
import kafka.message.MessageSet;
import kafka.utils.FileLock; import kafka.utils.FileLock;
import org.graylog2.Configuration; import org.graylog2.Configuration;
import org.graylog2.audit.NullAuditEventSender; import org.graylog2.audit.NullAuditEventSender;
Expand Down Expand Up @@ -59,6 +61,7 @@
import static org.apache.commons.io.filefilter.FileFilterUtils.fileFileFilter; import static org.apache.commons.io.filefilter.FileFilterUtils.fileFileFilter;
import static org.apache.commons.io.filefilter.FileFilterUtils.nameFileFilter; import static org.apache.commons.io.filefilter.FileFilterUtils.nameFileFilter;
import static org.apache.commons.io.filefilter.FileFilterUtils.suffixFileFilter; import static org.apache.commons.io.filefilter.FileFilterUtils.suffixFileFilter;
import static org.apache.commons.lang.RandomStringUtils.randomAlphanumeric;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -179,6 +182,78 @@ private int countSegmentsInDir(File messageJournalFile) {
return messageJournalFile.list(and(fileFileFilter(), suffixFileFilter(".log"))).length; return messageJournalFile.list(and(fileFileFilter(), suffixFileFilter(".log"))).length;
} }


@Test
public void maxSegmentSize() throws Exception {
final Size segmentSize = Size.kilobytes(1L);
final KafkaJournal journal = new KafkaJournal(journalDirectory,
scheduler,
segmentSize,
Duration.standardHours(1),
Size.kilobytes(10L),
Duration.standardDays(1),
1_000_000,
Duration.standardMinutes(1),
100,
new MetricRegistry(),
serverStatus);

long size = 0L;
long maxSize = segmentSize.toBytes();
final List<Journal.Entry> list = Lists.newArrayList();

while (size <= maxSize) {
final byte[] idBytes = ("the1-id").getBytes(UTF_8);
final byte[] messageBytes = ("the1-message").getBytes(UTF_8);

size += idBytes.length + messageBytes.length;

list.add(journal.createEntry(idBytes, messageBytes));
}

// Make sure all messages have been written
assertThat(journal.write(list)).isEqualTo(list.size() - 1);
}

@Test
public void maxMessageSize() throws Exception {
final Size segmentSize = Size.kilobytes(1L);
final KafkaJournal journal = new KafkaJournal(journalDirectory,
scheduler,
segmentSize,
Duration.standardHours(1),
Size.kilobytes(10L),
Duration.standardDays(1),
1_000_000,
Duration.standardMinutes(1),
100,
new MetricRegistry(),
serverStatus);

long size = 0L;
long maxSize = segmentSize.toBytes();
final List<Journal.Entry> list = Lists.newArrayList();

final String largeMessage1 = randomAlphanumeric(Ints.saturatedCast(segmentSize.toBytes() * 2));
list.add(journal.createEntry(randomAlphanumeric(6).getBytes(UTF_8), largeMessage1.getBytes(UTF_8)));

final byte[] idBytes0 = randomAlphanumeric(6).getBytes(UTF_8);
// Build a message that has exactly the max segment size
final String largeMessage2 = randomAlphanumeric(Ints.saturatedCast(segmentSize.toBytes() - MessageSet.LogOverhead() - Message.MessageOverhead() - idBytes0.length));
list.add(journal.createEntry(idBytes0, largeMessage2.getBytes(UTF_8)));

while (size <= maxSize) {
final byte[] idBytes = randomAlphanumeric(6).getBytes(UTF_8);
final byte[] messageBytes = "the-message".getBytes(UTF_8);

size += idBytes.length + messageBytes.length;

list.add(journal.createEntry(idBytes, messageBytes));
}

// Make sure all messages but the large one have been written
assertThat(journal.write(list)).isEqualTo(list.size() - 2);
}

@Test @Test
public void segmentRotation() throws Exception { public void segmentRotation() throws Exception {
final Size segmentSize = Size.kilobytes(1L); final Size segmentSize = Size.kilobytes(1L);
Expand Down

0 comments on commit 4346865

Please sign in to comment.