Skip to content

Latest commit

 

History

History
261 lines (182 loc) · 8.63 KB

kafka-streams-KafkaStreams.adoc

File metadata and controls

261 lines (182 loc) · 8.63 KB

KafkaStreams

KafkaStreams is the entry point for continuous record stream processing in Kafka Streams.

KafkaStreams is "merely" a Kafka client that consumes messages from one or many Kafka source topics (known as SourceNodes) and publishes the processing results to zero, one or more Kafka target topics (known as SinkNodes).

Note
A Kafka Streams developer defines the processing logic either using a Topology directly (and define a DAG topology of Processors) or indirectly through a StreamsBuilder that provides the high-level DSL to define transformations.
import org.apache.kafka.streams.KafkaStreams
val topology: Topology = ...
val config: StreamsConfig = ...
val ks = new KafkaStreams(topology, config)

Once created, you should start KafkaStreams to start consuming, processing, and producing records (described in Topology).

ks.start()
Table 1. KafkaStreams’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

globalStreamThread

adminClient

Administrative client for Kafka, i.e. AdminClient, that supports managing and inspecting topics, brokers, configurations and ACLs.

queryableStoreProvider

QueryableStoreProvider

stateDirectory

StateDirectory

stateLock

Object lock for…​FIXME

streamsMetadataState

StreamsMetadataState (for the InternalTopologyBuilder and application.server configuration property)

threads

Stream processor threads (aka StreamThread)

Note
The number of stream processor threads per KafkaStreams instance is controlled by num.stream.threads configuration property with the default being 1 thread.
  • Created when KafkaStreams is created

  • Started when KafkaStreams is started

  • Shut down when KafkaStreams is closed

Tip

Enable DEBUG logging level for org.apache.kafka.streams.KafkaStreams logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.KafkaStreams=DEBUG

Cleaning Up Local Directory of State Store — cleanUp Method

void cleanUp()

cleanUp simply requests StateDirectory to clean when KafkaStreams is not running.

Note
cleanUp can only be executed before KafkaStreams will be started or after has been closed.

cleanUp reports a IllegalStateException when KafkaStreams is running.

Cannot clean up while running.

isRunning Internal Method

boolean isRunning()

isRunning…​FIXME

Note
isRunning is used when…​FIXME

Closing KafkaStreams — close Method

void close()  // (1)
synchronized boolean close(final long timeout, final TimeUnit timeUnit)
  1. Calls close(final long timeout, final TimeUnit timeUnit) with 0 timeout

close…​FIXME

Important
Always execute close on a KafkaStreams instance even if you never call start to avoid resource leaks.

localThreadsMetadata Method

Set<ThreadMetadata> localThreadsMetadata()

localThreadsMetadata…​FIXME

Note
localThreadsMetadata is used when…​FIXME

Creating KafkaStreams Instance

// public API
KafkaStreams(
  final Topology topology,
  final Properties props) // (1)
KafkaStreams(
  final Topology topology,
  final StreamsConfig config) // (2)

// public API (mostly for testing)
KafkaStreams(
  final Topology topology,
  final StreamsConfig config,
  final KafkaClientSupplier clientSupplier) // (3)
KafkaStreams(
  final Topology topology,
  final StreamsConfig config,
  final Time time)  // (4)

// private/internal API
KafkaStreams(
  final InternalTopologyBuilder internalTopologyBuilder,
  final StreamsConfig config,
  final KafkaClientSupplier clientSupplier) // (5)
KafkaStreams(
  final InternalTopologyBuilder internalTopologyBuilder,
  final StreamsConfig config,
  final KafkaClientSupplier clientSupplier,
  final Time time)  // (6)
  1. Calls KafkaStreams (2) with StreamsConfig with the input props

  2. Calls KafkaStreams (3) with DefaultKafkaClientSupplier

  3. Calls the internal KafkaStreams (6) with SystemTime

KafkaStreams takes the following when created:

KafkaStreams initializes the internal registries and counters.

setRunningFromCreated Internal Method

boolean setRunningFromCreated()

setRunningFromCreated…​FIXME

Note
setRunningFromCreated is used exclusively when KafkaStreams is started.

Describing Itself (Text Representation) — toString Method

String toString() // (1)
String toString(final String indent)
  1. Calls toString(final String indent) with an empty indent, i.e. ""

Note
toString with an indent is deprecated and should not be used. Use localThreadsMetadata instead.

toString…​FIXME

Starting KafkaStreams — start Method

synchronized void start()
throws IllegalStateException, StreamsException

start starts the Topology (that in turn starts consuming, processing, and producing records).

Internally, start prints out the following DEBUG message to the logs:

Starting Streams client

start marks KafkaStreams as running (i.e. transitions from CREATED to RUNNING state and notifies StateListeners).

start starts global stream thread if defined (which is when…​FIXME)

start starts stream threads.

start schedules a thread that requests StateDirectory to cleanRemovedTasks every state.cleanup.delay.ms milliseconds.

You should see the following DEBUG message in the logs:

Started Streams client

In case the changing state to running fails, start merely prints out the following ERROR message to the logs:

Already stopped, cannot re-start

allMetadataForStore Method

Collection<StreamsMetadata> allMetadataForStore(final String storeName)

allMetadataForStore…​FIXME

Note
allMetadataForStore is used when…​FIXME

store Method

<T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType)

store…​FIXME

Note
store is used when…​FIXME