Skip to content

Commit

Permalink
Add leader epoch for KIP-101 and no longer use attributes for null ke…
Browse files Browse the repository at this point in the history
…y/value (apache#136)
  • Loading branch information
hachikuji committed Mar 10, 2017
1 parent 6429859 commit 691d838
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 93 deletions.
Expand Up @@ -36,24 +36,25 @@
* LogEntry implementation for magic 2 and above. The schema is given below:
*
* LogEntry =>
* FirstOffset => int64
* Length => int32
* CRC => int32
* Magic => int8
* Attributes => int16
* LastOffsetDelta => int32
* FirstTimestamp => int64
* MaxTimestamp => int64
* PID => int64
* Epoch => int16
* FirstSequence => int32
* BaseOffset => Int64
* Length => Int32
* CRC => Int32
* Magic => Int8
* Attributes => Int16
* LastOffsetDelta => Int32
* BaseTimestamp => Int64
* MaxTimestamp => Int64
* PID => Int64
* Epoch => Int16
* BaseSequence => Int32
* PartitionLeaderEpoch => Int32
* Records => Record1, Record2, … , RecordN
*
* The current attributes are given below:
*
* -----------------------------------------------------------------------------------------------
* | Unused (5-16) | Transactional (bit 4) | Timestamp Type (bit 3) | Compression Type (bits 0-2) |
* -----------------------------------------------------------------------------------------------
* -----------------------------------------------------------------------------------
* | Unused (5-16) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
* -----------------------------------------------------------------------------------
*/
public class EosLogEntry extends AbstractLogEntry implements LogEntry.MutableLogEntry {
static final int BASE_OFFSET_OFFSET = 0;
Expand All @@ -78,7 +79,9 @@ public class EosLogEntry extends AbstractLogEntry implements LogEntry.MutableLog
static final int EPOCH_LENGTH = 2;
static final int BASE_SEQUENCE_OFFSET = EPOCH_OFFSET + EPOCH_LENGTH;
static final int BASE_SEQUENCE_LENGTH = 4;
static final int RECORDS_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH;
static final int PARTITION_LEADER_EPOCH_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH;
static final int PARTITION_LEADER_EPOCH_LENGTH = 4;
static final int RECORDS_OFFSET = PARTITION_LEADER_EPOCH_OFFSET + PARTITION_LEADER_EPOCH_LENGTH;
public static final int LOG_ENTRY_OVERHEAD = RECORDS_OFFSET;

private static final byte COMPRESSION_CODEC_MASK = 0x07;
Expand Down Expand Up @@ -170,6 +173,11 @@ public boolean isTransactional() {
return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
}

@Override
public int partitionLeaderEpoch() {
return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
}

private Iterator<LogRecord> compressedIterator() {
ByteBuffer buffer = this.buffer.duplicate();
buffer.position(RECORDS_OFFSET);
Expand Down Expand Up @@ -256,6 +264,11 @@ public void setMaxTimestamp(TimestampType timestampType, long maxTimestamp) {
ByteUtils.writeUnsignedInt(buffer, CRC_OFFSET, crc);
}

@Override
public void setPartitionLeaderEpoch(int epoch) {
buffer.putInt(PARTITION_LEADER_EPOCH_LENGTH, epoch);
}

@Override
public long checksum() {
return ByteUtils.readUnsignedInt(buffer, CRC_OFFSET);
Expand Down Expand Up @@ -307,8 +320,9 @@ static void writeHeader(ByteBuffer buffer,
long pid,
short epoch,
int sequence,
boolean isTransactional) {
if (magic < 2)
boolean isTransactional,
int partitionLeaderEpoch) {
if (magic < LogEntry.CURRENT_MAGIC_VALUE)
throw new IllegalArgumentException("Invalid magic value " + magic);
if (baseTimestamp < 0 && baseTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + baseTimestamp);
Expand All @@ -326,6 +340,7 @@ static void writeHeader(ByteBuffer buffer,
buffer.putLong(position + PID_OFFSET, pid);
buffer.putShort(position + EPOCH_OFFSET, epoch);
buffer.putInt(position + BASE_SEQUENCE_OFFSET, sequence);
buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch);
long crc = Utils.computeChecksum(buffer, position + MAGIC_OFFSET, size - MAGIC_OFFSET);
buffer.putInt(position + CRC_OFFSET, (int) (crc & 0xffffffffL));
}
Expand Down
116 changes: 66 additions & 50 deletions clients/src/main/java/org/apache/kafka/common/record/EosLogRecord.java
Expand Up @@ -32,26 +32,31 @@
* This class implements the inner record format for magic 2 and above. The schema is as follows:
*
* Record =>
* Length => varint
* Attributes => int8
* TimestampDelta => varlong
* OffsetDelta => varint
* KeyLen => varint [OPTIONAL]
* Key => data [OPTIONAL]
* Value => data [OPTIONAL]
* Length => Varint
* Attributes => Int8
* TimestampDelta => Varlong
* OffsetDelta => Varint
* KeyLen => Varint
* Key => data
* ValueLen => Varint
* Value => data
*
* The record attributes indicate whether the key and value fields are present. The first bit
* is used to indicate a null key; if set, the key length and key data will be left out of the
* message. Similarly, if the second bit is set, the value field will be left out.
* The record attributes indicate whether the key and value fields are present. The current attributes
* are depicted below:
*
* -----------------------------------
* | Unused (1-7) | Control Flag (0) |
* -----------------------------------
*
* The control flag is used to implement control records (see {@link ControlRecordType}).
*
* The offset and timestamp deltas compute the difference relative to the base offset and
* base timestamp of the log entry that this record is contained in.
*/
public class EosLogRecord implements LogRecord {
private static final int MAX_RECORD_OVERHEAD = 21;
private static final int NULL_KEY_MASK = 0x01;
private static final int NULL_VALUE_MASK = 0x02;
private static final int CONTROL_FLAG_MASK = 0x04;
private static final int CONTROL_FLAG_MASK = 0x01;
private static final int NULL_VALUE_SIZE_BYTES = ByteUtils.sizeOfVarint(-1);

private final int sizeInBytes;
private final byte attributes;
Expand Down Expand Up @@ -158,20 +163,27 @@ public static long writeTo(DataOutputStream out,
int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value);
ByteUtils.writeVarint(sizeInBytes, out);

byte attributes = computeAttributes(isControlRecord, key, value);
byte attributes = computeAttributes(isControlRecord);
out.write(attributes);

ByteUtils.writeVarlong(timestampDelta, out);
ByteUtils.writeVarint(offsetDelta, out);

if (key != null) {
if (key == null) {
ByteUtils.writeVarint(-1, out);
} else {
int keySize = key.remaining();
ByteUtils.writeVarint(keySize, out);
out.write(key.array(), key.arrayOffset(), keySize);
}

if (value != null)
out.write(value.array(), value.arrayOffset(), value.remaining());
if (value == null) {
ByteUtils.writeVarint(-1, out);
} else {
int valueSize = value.remaining();
ByteUtils.writeVarint(valueSize, out);
out.write(value.array(), value.arrayOffset(), valueSize);
}

return computeChecksum(timestampDelta, key, value);
}
Expand Down Expand Up @@ -200,11 +212,8 @@ private static long computeChecksum(long timestamp,
Crc32 crc = new Crc32();
crc.updateLong(timestamp);

if (key != null) {
int size = key.remaining();
crc.updateInt(size);
crc.update(key.array(), key.arrayOffset(), size);
}
if (key != null)
crc.update(key.array(), key.arrayOffset(), key.remaining());

if (value != null)
crc.update(value.array(), value.arrayOffset(), value.remaining());
Expand Down Expand Up @@ -274,35 +283,31 @@ private static EosLogRecord readFrom(ByteBuffer buffer,
long offset = baseOffset + delta;
int sequence = baseSequence >= 0 ? baseSequence + delta : LogEntry.NO_SEQUENCE;

ByteBuffer key = null;
if (hasKey(attributes)) {
int keySizeInBytes = ByteUtils.readVarint(buffer);
final ByteBuffer key;
int keySize = ByteUtils.readVarint(buffer);
if (keySize < 0) {
key = null;
} else {
key = buffer.slice();
key.limit(keySizeInBytes);
buffer.position(buffer.position() + keySizeInBytes);
key.limit(keySize);
buffer.position(buffer.position() + keySize);
}

ByteBuffer value = null;
if (hasValue(attributes))
final ByteBuffer value;
int valueSize = ByteUtils.readVarint(buffer);
if (valueSize < 0) {
value = null;
} else {
value = buffer.slice();
return new EosLogRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value);
}

private static byte computeAttributes(boolean isControlRecord, ByteBuffer key, ByteBuffer value) {
byte attributes = isControlRecord ? CONTROL_FLAG_MASK : (byte) 0;
if (key == null)
attributes |= NULL_KEY_MASK;
if (value == null)
attributes |= NULL_VALUE_MASK;
return attributes;
}
value.limit(valueSize);
buffer.position(buffer.position() + valueSize);
}

private static boolean hasKey(byte attributes) {
return (attributes & NULL_KEY_MASK) == 0;
return new EosLogRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value);
}

private static boolean hasValue(byte attributes) {
return (attributes & NULL_VALUE_MASK) == 0;
private static byte computeAttributes(boolean isControlRecord) {
return isControlRecord ? CONTROL_FLAG_MASK : (byte) 0;
}

public static int sizeInBytes(int offsetDelta,
Expand All @@ -328,24 +333,35 @@ private static int sizeOfBodyInBytes(int offsetDelta,
size += ByteUtils.sizeOfVarint(offsetDelta);
size += ByteUtils.sizeOfVarlong(timestamp);

if (key != null) {
if (key == null) {
size += NULL_VALUE_SIZE_BYTES;
} else {
int keySize = key.remaining();
size += ByteUtils.sizeOfVarint(keySize);
size += keySize;
}

if (value != null)
size += value.remaining();
if (value == null) {
size += NULL_VALUE_SIZE_BYTES;
} else {
int valueSize = value.remaining();
size += ByteUtils.sizeOfVarint(valueSize);
size += valueSize;
}

return size;
}

static int recordSizeUpperBound(byte[] key, byte[] value) {
int size = MAX_RECORD_OVERHEAD;
if (key != null)
if (key == null)
size += NULL_VALUE_SIZE_BYTES;
else
size += key.length + ByteUtils.sizeOfVarint(key.length);
if (value != null)
size += value.length;
if (value == null)
size += NULL_VALUE_SIZE_BYTES;
else
size += value.length + ByteUtils.sizeOfVarint(value.length);
return size;
}

Expand Down
Expand Up @@ -254,6 +254,12 @@ public boolean isTransactional() {
return underlying.isTransactional();
}

@Override
public int partitionLeaderEpoch() {
loadUnderlyingEntry();
return underlying.partitionLeaderEpoch();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
14 changes: 14 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
Expand Up @@ -51,6 +51,12 @@ public interface LogEntry extends Iterable<LogRecord> {
short NO_EPOCH = -1;
int NO_SEQUENCE = -1;

/**
* Used to indicate an unknown leader epoch, which will be the case when the record set is
* first created by the producer.
*/
int UNKNOWN_PARTITION_LEADER_EPOCH = -1;

/**
* Check whether the checksum of this entry is correct.
*
Expand Down Expand Up @@ -174,13 +180,21 @@ public interface LogEntry extends Iterable<LogRecord> {
*/
boolean isTransactional();

/**
* Get the partition leader epoch of this entry.
* @return The leader epoch or -1 if it is unknown
*/
int partitionLeaderEpoch();

/**
* A mutable log entry is one that can be modified in place (without copying).
*/
interface MutableLogEntry extends LogEntry {
void setOffset(long offset);

void setMaxTimestamp(TimestampType timestampType, long maxTimestamp);

void setPartitionLeaderEpoch(int epoch);
}

}
Expand Up @@ -275,7 +275,7 @@ public static MemoryRecordsBuilder builder(ByteBuffer buffer,
int writeLimit) {
return new MemoryRecordsBuilder(buffer, LogEntry.CURRENT_MAGIC_VALUE, compressionType, timestampType, 0L,
System.currentTimeMillis(), LogEntry.NO_PID, LogEntry.NO_EPOCH, LogEntry.NO_SEQUENCE, false,
writeLimit);
LogEntry.UNKNOWN_PARTITION_LEADER_EPOCH, writeLimit);
}

public static MemoryRecordsBuilder builder(ByteBuffer buffer,
Expand Down Expand Up @@ -315,7 +315,8 @@ public static MemoryRecordsBuilder builder(ByteBuffer buffer,
short epoch,
int baseSequence) {
return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset,
logAppendTime, pid, epoch, baseSequence, false, buffer.capacity());
logAppendTime, pid, epoch, baseSequence, false, LogEntry.UNKNOWN_PARTITION_LEADER_EPOCH,
buffer.capacity());
}

public static MemoryRecords withRecords(CompressionType compressionType, KafkaRecord ... records) {
Expand Down
Expand Up @@ -60,6 +60,7 @@ public class MemoryRecordsBuilder {
private final short epoch;
private final int baseSequence;
private final boolean isTransactional;
private final int partitionLeaderEpoch;
private final int writeLimit;
private final int initialCapacity;

Expand Down Expand Up @@ -101,6 +102,7 @@ public MemoryRecordsBuilder(ByteBuffer buffer,
short epoch,
int baseSequence,
boolean isTransactional,
int partitionLeaderEpoch,
int writeLimit) {
if (magic > LogEntry.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("TimestampType must be set for magic >= 0");
Expand Down Expand Up @@ -138,6 +140,7 @@ public MemoryRecordsBuilder(ByteBuffer buffer,
this.epoch = epoch;
this.baseSequence = baseSequence;
this.isTransactional = isTransactional;
this.partitionLeaderEpoch = partitionLeaderEpoch;
this.writeLimit = writeLimit;
this.initialCapacity = buffer.capacity();

Expand Down Expand Up @@ -234,7 +237,7 @@ private void writeEosLogEntryHeader() {
}

EosLogEntry.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
baseTimestamp, maxTimestamp, pid, epoch, baseSequence, isTransactional);
baseTimestamp, maxTimestamp, pid, epoch, baseSequence, isTransactional, partitionLeaderEpoch);

buffer.position(pos);
}
Expand Down

0 comments on commit 691d838

Please sign in to comment.