Skip to content

Latest commit

 

History

History
131 lines (85 loc) · 5.46 KB

kafka-streams-RecordQueue.adoc

File metadata and controls

131 lines (85 loc) · 5.46 KB

RecordQueue

RecordQueue is a FIFO queue of StampedRecords that a StreamTask uses for…​FIXME

RecordQueue is created along with a StreamTask exclusively.

kafka streams RecordQueue
Figure 1. RecordQueue, StreamTask and TaskCreator
Table 1. RecordQueue’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

fifoQueue

Java’s java.util.ArrayDeque of StampedRecords (i.e. orderable Kafka ConsumerRecords with a timestamp)

Used when…​FIXME

recordDeserializer

RecordDeserializer (for the SourceNode and DeserializationExceptionHandler)

Used when…​FIXME

timeTracker

TimestampTracker

Used when…​FIXME

Tip

Enable TRACE logging level for org.apache.kafka.streams.processor.internals.RecordQueue logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.RecordQueue=TRACE

Creating RecordQueue Instance

RecordQueue takes the following when created:

RecordQueue initializes the internal registries and counters.

Adding Kafka ConsumerRecords (as StampedRecords) — addRawRecords Method

int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords)

For every Kafka ConsumerRecord in the input rawRecords, addRawRecords does the following:

  1. Requests the RecordDeserializer to deserialize the record (with the ProcessorContext)

  2. Requests the TimestampTracker for the currently-tracked timestamp and uses it to request the TimestampExtractor to extract the timestamp of the record

  3. Creates a StampedRecord for the record and the record timestamp

  4. Inserts the StampedRecord at the end of the fifoQueue

  5. Requests the TimestampTracker to add the StampedRecord

While processing ConsumerRecords, addRawRecords prints out the following TRACE message to the logs:

Source node [name] extracted timestamp [timestamp] for record [record]

With all ConsumerRecords processed, addRawRecords updates the partitionTime if the currently-tracked timestamp (from the TimestampTracker) is greater.

In the end, addRawRecords returns the number of ConsumerRecords in the queue.

Note

addRawRecords skips (drops) Kafka ConsumerRecords when one of the following conditions holds:

  • RecordDeserializer could not deserialize and the DeserializationExceptionHandler is not set to fail upon a deserialization error

  • Timestamp is negative (and hence invalid)

Note
addRawRecords is used exclusively when PartitionGroup is requested to add records to a RecordQueue for a Kafka partition.

Clearing (Resetting) RecordQueue — clear Method

void clear()

clear removes all of the elements from the fifoQueue and requests the TimestampTracker to clear itself.

In the end, clear (re)sets the partitionTime to NOT_KNOWN.

Note
clear is used exclusively when PartitionGroup is requested to clear.

Requesting Number of Records in Queue — size Method

int size()

size simply returns the number of records in the fifoQueue.

Note

size is used when: