From 4ba640e7fe58051dd0a40e169639360ee164c3f5 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Fri, 16 Jun 2017 17:10:57 -0700 Subject: [PATCH 1/6] Update message format in implementation docs --- docs/implementation.html | 101 +++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 46 deletions(-) diff --git a/docs/implementation.html b/docs/implementation.html index 2cf401a5ece16..1179dc1475eb3 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -22,61 +22,70 @@

5.1 Network Layer

5.2 Messages

- Messages consist of a fixed-size header, a variable length opaque key byte array and a variable length opaque value byte array. The header contains the following fields: -

- Leaving the key and value opaque is the right decision: there is a great deal of progress being made on serialization libraries right now, and any particular choice is unlikely to be right for all uses. Needless to say a particular application using Kafka would likely mandate a particular serialization type as part of its usage. The MessageSet interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO Channel. + Messages consist of a fixed-size header, a variable length opaque key byte array and a variable length opaque value byte array. The format of the header is described in he following section. + Leaving the key and value opaque is the right decision: there is a great deal of progress being made on serialization libraries right now, and any particular choice is unlikely to be right for all uses. Needless to say a particular application using Kafka would likely mandate a particular serialization type as part of its usage. The RecordBatch interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO Channel.

5.3 Message Format

+

+ Messages (aka Records) are always written in batches. The technical term for a batch of messages is a RecordBatch, and a RecordBatch contains one or more Records. In the degenerate case, we could have a RecordBatch containing a single Record. + RecordBatches and Records have their own headers. The format of each is described below for Kafka version 0.11.0 and later (message format version v2). Click here for details about message formats 0 and 1.

+ +

5.3.1 Record Batch

+

The following is the on-disk format of a RecordBatch.

+

+		firstOffset: int64
+		batchLength: int32
+		partitionLeaderEpoch: int32
+		magic: int8 (current magic value is 2)
+		crc: int32
+		attributes: int16
+			bit 0~2:
+				0: no compression
+				1: gzip
+				2: snappy
+				3: lz4
+			bit 3: timestampType
+			bit 4: isTransactional
+			bit 5: isControlRecord
+			bit 6~15: unused
+		lastOffsetDelta: int32
+		firstTimestamp: int64
+		lastTimestamp: int64
+		producerId: int64
+		producerEpoch: int16
+		firstSequence: int32
+		records: [Record]
+	

+ +

5.3.2 Record

+

Record level headers were introduced in Kafka 0.11.0. The on-disk format of a Record with Headers is delineated below.

+

+		length: varint
+		attributes: int8
+			bit 0~7: unused
+		timestampDelta: varint
+		offsetDelta: varint
+		keyLength: varint
+		key: byte[]
+		valueLen: varint
+		value: byte[]
+		Headers => [Header]
+	
+
+		headerKeyLength: varint
+		headerKey: String
+		headerValueLength: varint
+		Value: byte[]
+	

-
-       /**
-        * 1. 4 byte CRC32 of the message
-        * 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
-        * 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
-        *    bit 0 ~ 2 : Compression codec.
-        *      0 : no compression
-        *      1 : gzip
-        *      2 : snappy
-        *      3 : lz4
-        *    bit 3 : Timestamp type
-        *      0 : create time
-        *      1 : log append time
-        *    bit 4 ~ 7 : reserved
-        * 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
-        * 5. 4 byte key length, containing length K
-        * 6. K byte key
-        * 7. 4 byte payload length, containing length V
-        * 8. V byte payload
-        */
-    
-

5.4 Log

A log for a topic named "my_topic" with two partitions consists of two directories (namely my_topic_0 and my_topic_1) populated with data files containing the messages for that topic. The format of the log files is a sequence of "log entries""; each log entry is a 4 byte integer N storing the message length which is followed by the N message bytes. Each message is uniquely identified by a 64-bit integer offset giving the byte position of the start of this message in the stream of all messages ever sent to that topic on that partition. The on-disk format of each message is given below. Each log file is named with the offset of the first message it contains. So the first file created will be 00000000000.kafka, and each additional file will have an integer name roughly S bytes from the previous file where S is the max log file size given in the configuration.

- The exact binary format for messages is versioned and maintained as a standard interface so message sets can be transferred between producer, broker, and client without recopying or conversion when desirable. This format is as follows: + The exact binary format for Records is versioned and maintained as a standard interface so message sets can be transferred between producer, broker, and client without recopying or conversion when desirable. The previous section included details about the on-disk format of Records.

-
-    On-disk format of a message
-
-    offset         : 8 bytes 
-    message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)
-    crc            : 4 bytes
-    magic value    : 1 byte
-    attributes     : 1 byte
-    timestamp      : 8 bytes (Only exists when magic value is greater than zero)
-    key length     : 4 bytes
-    key            : K bytes
-    value length   : 4 bytes
-    value          : V bytes
-    
-

+

The use of the message offset as the message id is unusual. Our original idea was to use a GUID generated by the producer, and maintain a mapping from GUID to offset on each broker. But since a consumer must maintain an ID for each server, the global uniqueness of the GUID provides no value. Furthermore, the complexity of maintaining the mapping from a random id to an offset requires a heavy weight index structure which must be synchronized with disk, essentially requiring a full persistent random-access data structure. Thus to simplify the lookup structure we decided to use a simple per-partition atomic counter which could be coupled with the partition id and node id to uniquely identify a message; this makes the lookup structure simpler, though multiple seeks per consumer request are still likely. However once we settled on a counter, the jump to directly using the offset seemed natural—both after all are monotonically increasing integers unique to a partition. Since the offset is hidden from the consumer API this decision is ultimately an implementation detail and we went with the more efficient approach.

From 75dd9bdaa7a9bab1003c63d2b5e23b918693bff5 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Fri, 16 Jun 2017 17:14:31 -0700 Subject: [PATCH 2/6] Add semantics of isTransactional and IsControl bits in the attributes --- docs/implementation.html | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/implementation.html b/docs/implementation.html index 1179dc1475eb3..f3986002100e3 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -45,8 +45,8 @@

5.3.1 Record Batch

2: snappy 3: lz4 bit 3: timestampType - bit 4: isTransactional - bit 5: isControlRecord + bit 4: isTransactional (0 means not transactional) + bit 5: isControlRecord (0 means not a control batch) bit 6~15: unused lastOffsetDelta: int32 firstTimestamp: int64 From 8981fe95268c5afbf70d657cff9d93ed7f19b39f Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Fri, 16 Jun 2017 17:31:14 -0700 Subject: [PATCH 3/6] Minor cleanups --- docs/implementation.html | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/implementation.html b/docs/implementation.html index f3986002100e3..56aebef5e0ede 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -46,7 +46,7 @@

5.3.1 Record Batch

3: lz4 bit 3: timestampType bit 4: isTransactional (0 means not transactional) - bit 5: isControlRecord (0 means not a control batch) + bit 5: isControlBatch (0 means not a control batch) bit 6~15: unused lastOffsetDelta: int32 firstTimestamp: int64 @@ -70,8 +70,9 @@

5.3.2 Record

valueLen: varint value: byte[] Headers => [Header] - -
+	

+
5.4.2.1 Record Header
+

 		headerKeyLength: varint
 		headerKey: String
 		headerValueLength: varint

From 92a82f7f11f3d7e13a1a9c95f5110877666a6ea5 Mon Sep 17 00:00:00 2001
From: Apurva Mehta 
Date: Tue, 27 Jun 2017 11:45:08 -0700
Subject: [PATCH 4/6] Address PR comments

---
 docs/implementation.html | 25 +++++++++++++++++++------
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git a/docs/implementation.html b/docs/implementation.html
index 56aebef5e0ede..5b3090c1fef89 100644
--- a/docs/implementation.html
+++ b/docs/implementation.html
@@ -22,18 +22,18 @@ 

5.1 Network Layer

5.2 Messages

- Messages consist of a fixed-size header, a variable length opaque key byte array and a variable length opaque value byte array. The format of the header is described in he following section. + Messages consist of a variable-length header, a variable length opaque key byte array and a variable length opaque value byte array. The format of the header is described in the following section. Leaving the key and value opaque is the right decision: there is a great deal of progress being made on serialization libraries right now, and any particular choice is unlikely to be right for all uses. Needless to say a particular application using Kafka would likely mandate a particular serialization type as part of its usage. The RecordBatch interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO Channel.

5.3 Message Format

- Messages (aka Records) are always written in batches. The technical term for a batch of messages is a RecordBatch, and a RecordBatch contains one or more Records. In the degenerate case, we could have a RecordBatch containing a single Record. - RecordBatches and Records have their own headers. The format of each is described below for Kafka version 0.11.0 and later (message format version v2). Click here for details about message formats 0 and 1.

+ Messages (aka Records) are always written in batches. The technical term for a batch of messages is a record batch, and a record batch contains one or more records. In the degenerate case, we could have a record batch containing a single record. + Record batches and records have their own headers. The format of each is described below for Kafka version 0.11.0 and later (message format version v2). Click here for details about message formats 0 and 1.

5.3.1 Record Batch

The following is the on-disk format of a RecordBatch.

-		firstOffset: int64
+		baseOffset: int64
 		batchLength: int32
 		partitionLeaderEpoch: int32
 		magic: int8 (current magic value is 2)
@@ -53,9 +53,20 @@ 

5.3.1 Record Batch

lastTimestamp: int64 producerId: int64 producerEpoch: int16 - firstSequence: int32 + baseSequence: int32 records: [Record]

+

Note that when compression is enabled, the compressed record data is serialized directly following the count of the number of records.

+ +

The CRC covers the data from the attributes to the end of the batch (i.e. all the bytes that follow the CRC). It is located after the magic byte, which + means that clients must parse the magic byte before deciding how to interpret the bytes between the batch length and the magic byte. The partition leader + epoch field is not included in the CRC computation to avoid the need to recompute the CRC when this field is assigned for every batch that is received by + the broker. The CRC-32C (Castagnoli) polynomial is used for the computation.

+ +

On compaction: unlike the older message formats, magic v2 and above preserves the first and last offset/sequence numbers from the original batch when the log is cleaned. This is required in order to be able to restore the + producer's state when the log is reloaded. If we did not retain the last sequence number, for example, then after a partition leader failure, the producer might see an OutOfSequence error. The base sequence number must + be preserved for duplicate checking (the broker checks incoming Produce requests for duplicates by verifying that the first and last sequence numbers of the incoming batch match the last from that producer). As a result, + it is possible to have empty batches in the log when all the records in the batch are cleaned but batch is still retained in order to preserve a producer's last sequence number.

5.3.2 Record

Record level headers were introduced in Kafka 0.11.0. The on-disk format of a Record with Headers is delineated below.

@@ -78,13 +89,15 @@
5.4.2.1 Record Header
headerValueLength: varint Value: byte[]

+

We use the the same varint encoding as Protobuf. More information on the latter can be found here. The count of headers in a record + is also encoded as a varint.

5.4 Log

A log for a topic named "my_topic" with two partitions consists of two directories (namely my_topic_0 and my_topic_1) populated with data files containing the messages for that topic. The format of the log files is a sequence of "log entries""; each log entry is a 4 byte integer N storing the message length which is followed by the N message bytes. Each message is uniquely identified by a 64-bit integer offset giving the byte position of the start of this message in the stream of all messages ever sent to that topic on that partition. The on-disk format of each message is given below. Each log file is named with the offset of the first message it contains. So the first file created will be 00000000000.kafka, and each additional file will have an integer name roughly S bytes from the previous file where S is the max log file size given in the configuration.

- The exact binary format for Records is versioned and maintained as a standard interface so message sets can be transferred between producer, broker, and client without recopying or conversion when desirable. The previous section included details about the on-disk format of Records.

+ The exact binary format for Records is versioned and maintained as a standard interface so record batches can be transferred between producer, broker, and client without recopying or conversion when desirable. The previous section included details about the on-disk format of Records.

The use of the message offset as the message id is unusual. Our original idea was to use a GUID generated by the producer, and maintain a mapping from GUID to offset on each broker. But since a consumer must maintain an ID for each server, the global uniqueness of the GUID provides no value. Furthermore, the complexity of maintaining the mapping from a random id to an offset requires a heavy weight index structure which must be synchronized with disk, essentially requiring a full persistent random-access data structure. Thus to simplify the lookup structure we decided to use a simple per-partition atomic counter which could be coupled with the partition id and node id to uniquely identify a message; this makes the lookup structure simpler, though multiple seeks per consumer request are still likely. However once we settled on a counter, the jump to directly using the offset seemed natural—both after all are monotonically increasing integers unique to a partition. Since the offset is hidden from the consumer API this decision is ultimately an implementation detail and we went with the more efficient approach. From 4ec2a6b421bca06040dc7f787045afb14be9b8f8 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Wed, 28 Jun 2017 16:16:10 -0700 Subject: [PATCH 5/6] Address PR Comments --- docs/implementation.html | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/docs/implementation.html b/docs/implementation.html index 5b3090c1fef89..e1bd3858320f8 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -28,7 +28,7 @@

5.2 Messages

5.3 Message Format

Messages (aka Records) are always written in batches. The technical term for a batch of messages is a record batch, and a record batch contains one or more records. In the degenerate case, we could have a record batch containing a single record. - Record batches and records have their own headers. The format of each is described below for Kafka version 0.11.0 and later (message format version v2). Click here for details about message formats 0 and 1.

+ Record batches and records have their own headers. The format of each is described below for Kafka version 0.11.0 and later (message format version v2, or magic=2). Click here for details about message formats 0 and 1.

5.3.1 Record Batch

The following is the on-disk format of a RecordBatch.

@@ -50,7 +50,7 @@

5.3.1 Record Batch

bit 6~15: unused lastOffsetDelta: int32 firstTimestamp: int64 - lastTimestamp: int64 + maxTimestamp: int64 producerId: int64 producerEpoch: int16 baseSequence: int32 @@ -66,10 +66,19 @@

5.3.1 Record Batch

On compaction: unlike the older message formats, magic v2 and above preserves the first and last offset/sequence numbers from the original batch when the log is cleaned. This is required in order to be able to restore the producer's state when the log is reloaded. If we did not retain the last sequence number, for example, then after a partition leader failure, the producer might see an OutOfSequence error. The base sequence number must be preserved for duplicate checking (the broker checks incoming Produce requests for duplicates by verifying that the first and last sequence numbers of the incoming batch match the last from that producer). As a result, - it is possible to have empty batches in the log when all the records in the batch are cleaned but batch is still retained in order to preserve a producer's last sequence number.

+ it is possible to have empty batches in the log when all the records in the batch are cleaned but batch is still retained in order to preserve a producer's last sequence number. One oddity here is that the baseTimestamp + field is not preserved during compaction, so it will change if the first record in the batch is compacted away.

+ +
5.3.1.1 Control Batches
+

A control batch contains a single record called the control record. Control records should not be passed on to applications. Instead, they are used by consumers to filter out aborted transactional messages.

+

A control record has an empty value. The key of a control record conforms to the following schema:

+

+       version: int16
+       type: int16 (0 indicates an abort marker, 1 indicates a commit)
+    

5.3.2 Record

-

Record level headers were introduced in Kafka 0.11.0. The on-disk format of a Record with Headers is delineated below.

+

Record level headers were introduced in Kafka 0.11.0. The on-disk format of a record with Headers is delineated below.

 		length: varint
 		attributes: int8
@@ -97,7 +106,7 @@ 

5.4 Log

A log for a topic named "my_topic" with two partitions consists of two directories (namely my_topic_0 and my_topic_1) populated with data files containing the messages for that topic. The format of the log files is a sequence of "log entries""; each log entry is a 4 byte integer N storing the message length which is followed by the N message bytes. Each message is uniquely identified by a 64-bit integer offset giving the byte position of the start of this message in the stream of all messages ever sent to that topic on that partition. The on-disk format of each message is given below. Each log file is named with the offset of the first message it contains. So the first file created will be 00000000000.kafka, and each additional file will have an integer name roughly S bytes from the previous file where S is the max log file size given in the configuration.

- The exact binary format for Records is versioned and maintained as a standard interface so record batches can be transferred between producer, broker, and client without recopying or conversion when desirable. The previous section included details about the on-disk format of Records.

+ The exact binary format for records is versioned and maintained as a standard interface so record batches can be transferred between producer, broker, and client without recopying or conversion when desirable. The previous section included details about the on-disk format of records.

The use of the message offset as the message id is unusual. Our original idea was to use a GUID generated by the producer, and maintain a mapping from GUID to offset on each broker. But since a consumer must maintain an ID for each server, the global uniqueness of the GUID provides no value. Furthermore, the complexity of maintaining the mapping from a random id to an offset requires a heavy weight index structure which must be synchronized with disk, essentially requiring a full persistent random-access data structure. Thus to simplify the lookup structure we decided to use a simple per-partition atomic counter which could be coupled with the partition id and node id to uniquely identify a message; this makes the lookup structure simpler, though multiple seeks per consumer request are still likely. However once we settled on a counter, the jump to directly using the offset seemed natural—both after all are monotonically increasing integers unique to a partition. Since the offset is hidden from the consumer API this decision is ultimately an implementation detail and we went with the more efficient approach. From ff00ae82710a1279a13c9152bb3ed54a651c3a65 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Wed, 28 Jun 2017 18:04:55 -0700 Subject: [PATCH 6/6] Address PR comments --- docs/implementation.html | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/implementation.html b/docs/implementation.html index e1bd3858320f8..af234ea626fea 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -71,11 +71,12 @@

5.3.1 Record Batch

5.3.1.1 Control Batches

A control batch contains a single record called the control record. Control records should not be passed on to applications. Instead, they are used by consumers to filter out aborted transactional messages.

-

A control record has an empty value. The key of a control record conforms to the following schema:

+

The key of a control record conforms to the following schema:

-       version: int16
+       version: int16 (current version is 0)
        type: int16 (0 indicates an abort marker, 1 indicates a commit)
     

+

The schema for the value of a control record is dependent on the type. The value is opaque to clients.

5.3.2 Record

Record level headers were introduced in Kafka 0.11.0. The on-disk format of a record with Headers is delineated below.