Skip to content

Latest commit

 

History

History
332 lines (221 loc) · 10.2 KB

kafka-streams-StreamTask.adoc

File metadata and controls

332 lines (221 loc) · 10.2 KB

StreamTask

StreamTask is a concrete stream processor task that uses a PartitionGroup for…​FIXME

StreamTask is created exclusively when TaskCreator is requested to create a StreamTask.

StreamTask may need a commit, i.e…​.FIXME

StreamTask uses commitOffsetNeeded flag to…​FIXME

StreamTask uses buffered.records.per.partition configuration property to control when to resume a partition (when processing a single record).

Table 1. StreamTask’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

partitionGroup

consumedOffsets

Offsets by topic partitions, i.e. Map<TopicPartition, Long>, that StreamTask has processed successfully.

Tip

Enable TRACE logging level for org.apache.kafka.streams.processor.internals.StreamTask logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.StreamTask=TRACE

closeTopology Internal Method

void closeTopology()

closeTopology…​FIXME

Note
closeTopology is used exclusively when StreamTask is requested to suspend.

suspend Method

void suspend()  // (1)

// PRIVATE API
void suspend(final boolean clean)
  1. Uses clean flag enabled, i.e. true

Note
suspend is part of Task Contract to…​FIXME.

suspend…​FIXME

Note
The private suspend is used exclusively when StreamTask is requested to close.

commit Method

void commit()
Note
commit is part of Task Contract to…​FIXME.

commit…​FIXME

close Method

void close(
  final boolean clean,
  final boolean isZombie)
Note
close is part of Task Contract to…​FIXME.

close…​FIXME

Creating StreamTask Instance

StreamTask takes the following when created:

StreamTask initializes the internal registries and counters.

initTopology Internal Method

void initTopology()

initTopology…​FIXME

Note
initTopology is used when…​FIXME

initializeTopology Method

void initializeTopology()
Note
initializeTopology is part of Task Contract to…​FIXME.

initializeTopology initTopology. It then requests InternalProcessorContext to initialized and in the end sets taskInitialized to true.

updateProcessorContext Internal Method

void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode)

updateProcessorContext…​FIXME

Note
updateProcessorContext is used when…​FIXME

Processing Single Record — process Method

boolean process()

process requests PartitionGroup for nextRecord (with RecordInfo).

process prints out the following TRACE message to the logs:

Start processing one record [record]

process requests RecordInfo for the source processor node.

process updateProcessorContext (with the current record and the source processor node).

process requests the source processor node to process the key and the value of the record.

process prints out the following TRACE message to the logs:

Completed processing one record [record]

process requests RecordInfo for the topic partition and stores the partition and the record’s offset in consumedOffsets.

process turns commitOffsetNeeded flag on.

process requests the Kafka consumer to resume the partition if the size of the queue of the RecordInfo is exactly maxBufferedSize.

process always requests InternalProcessorContext to setCurrentNode as null.

In case of a ProducerFencedException, process reports a TaskMigratedException.

In case of a KafkaException, process reports a StreamsException.

In the end, process gives true when processing a single record was successful, and false when there were no records to process.

Note
process is used exclusively when AssignedStreamsTasks is requested to process.

numBuffered Method

int numBuffered()

numBuffered…​FIXME

Note
numBuffered is used when…​FIXME

closeSuspended Method

void closeSuspended(
  boolean clean,
  final boolean isZombie,
  RuntimeException firstException)
Note
closeSuspended is part of Task Contract to…​FIXME.

closeSuspended…​FIXME

addRecords Method

int addRecords(
  final TopicPartition partition,
  final Iterable<ConsumerRecord<byte[], byte[]>> records)

You should see the following TRACE message in the logs:

Added records into the buffered queue of partition [partition], new queue size is [newQueueSize]"

addRecords requests the Kafka Consumer to pause the partition if the queue size of the partition exceeded buffered.records.per.partition configuration property.

In the end, addRecords returns the number of records added.

Note
addRecords is used exclusively when StreamThread is requested to add records to active stream tasks (and report skipped records).

flushState Method

void flushState()
Note
flushState is part of AbstractTask Contract to…​FIXME.

flushState…​FIXME

recordCollectorOffsets Method

Map<TopicPartition, Long> recordCollectorOffsets()
Note
recordCollectorOffsets is part of AbstractTask Contract to…​FIXME.

recordCollectorOffsets…​FIXME

Executing Scheduled Periodic Action — punctuate Method

void punctuate(
  final ProcessorNode node,
  final long timestamp,
  final PunctuationType type,
  final Punctuator punctuator)
Note
punctuate is part of ProcessorNodePunctuator Contract to execute a scheduled periodic action.

punctuate…​FIXME

maybePunctuateStreamTime Method

boolean maybePunctuateStreamTime()

maybePunctuateStreamTime…​FIXME

Note
maybePunctuateStreamTime is used exclusively when AssignedStreamsTasks is requested to punctuate.

maybePunctuateSystemTime Method

boolean maybePunctuateSystemTime()

maybePunctuateSystemTime…​FIXME

Note
maybePunctuateSystemTime is used exclusively when AssignedStreamsTasks is requested to punctuate.

schedule Method

// PUBLIC API
Cancellable schedule(
  final long interval,
  final PunctuationType type,
  final Punctuator punctuator)
// PACKAGE PROTECTED
Cancellable schedule(
  final long startTime,
  final long interval,
  final PunctuationType type,
  final Punctuator punctuator)

schedule…​FIXME

Note
schedule is used exclusively when ProcessorContextImpl is requested to schedule.