Skip to content

Latest commit

 

History

History
130 lines (83 loc) · 5.02 KB

kafka-streams-AssignedStreamsTasks.adoc

File metadata and controls

130 lines (83 loc) · 5.02 KB

AssignedStreamsTasks — AssignedTasks For StreamTasks

AssignedStreamsTasks is a AssignedTasks for StreamTasks that…​FIXME

AssignedStreamsTasks is created along with a StreamThread (when KafkaStreams is created).

AssignedStreamsTasks is a RestoringTasks that…​FIXME

It appears that AssignedStreamsTasks simply operates on the running tasks (i.e. the tasks that are in running internal registry). When requested to process or punctuate AssignedStreamsTasks simply walks over the running internal registry and triggers execution of every task.

AssignedStreamsTasks uses a TaskAction<StreamTask> called maybeCommit that is used in maybeCommit. The task action takes a stream task and checks if the task needs a commit. If so, the action does the following:

  1. Increments committed counter

  2. Requests the stream task to commit

  3. Prints out the following DEBUG message to the logs:

    Committed active task [id] per user request in

AssignedStreamsTasks takes a LogContext when created.

AssignedStreamsTasks uses stream task for taskTypeName.

Table 1. AssignedStreamsTasks’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

committed

Number of…​FIXME

log

maybeCommitAction

Tip

Enable any of ERROR, WARN, INFO, DEBUG, TRACE logging levels for org.apache.kafka.streams.processor.internals.AssignedStreamsTasks logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.AssignedStreamsTasks=TRACE

Requesting Stream Tasks to Process Records — process Method

int process()

process requests every running stream task to process a single record.

In the end, process gives the number of stream tasks that processed a single record successfully.

Note
process is used exclusively when TaskManager is requested to process records.

process and TaskMigratedException

In case of a TaskMigratedException, process prints out the following INFO message to the logs:

Failed to process stream task [id] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.

process then closes the task (considering the task a zombie). If this reports a RuntimeException, process re-throws it.

process removes the task from running and throws the TaskMigratedException.

process and RuntimeException

In case of a RuntimeException, process prints out the following ERROR message to the logs followed by the exception.

Failed to process stream task [id] due to the following error:

process re-throws the RuntimeException.

Committing Running Stream Tasks that Requested It — maybeCommit Method

int maybeCommit()

maybeCommit resets committed to 0 and executes the maybeCommitAction task action (that modifies committed).

In the end, maybeCommit gives the number of running stream tasks that needed a commit.

Note
maybeCommit is used exclusively when TaskManager is requested to maybeCommitActiveTasks.

Creating AssignedStreamsTasks Instance

AssignedStreamsTasks takes the following when created:

  • AssignedStreamsTasks

AssignedStreamsTasks initializes the internal registries and counters.

AssignedStreamsTasks is created when…​MEFIXME

AssignedStreamsTasks is created along with…​MEFIXME

punctuate Method

int punctuate()

punctuate…​FIXME

Note
punctuate is used exclusively when TaskManager is requested to punctuate.