StreamsConfig
is Apache Kafka’s AbstractConfig for the configuration properties for the following Kafka clients (that Kafka Streams uses under the covers):
import org.apache.kafka.streams.StreamsConfig
val props = new java.util.Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId")
import org.apache.kafka.clients.consumer.ConsumerConfig
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8082")
val conf = new StreamsConfig(props)
// Consumer Properties
val consumerConfigs = conf.getConsumerConfigs("groupId", "clientId")
import collection.JavaConverters._
scala> consumerConfigs.asScala.map { case (key, value) => s"$key -> $value" }.foreach(println)
replication.factor -> 1
num.standby.replicas -> 0
max.poll.records -> 1000
group.id -> groupId
partition.assignment.strategy -> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor
bootstrap.servers -> localhost:8082
enable.auto.commit -> false
admin.retries -> 5
application.server ->
max.poll.interval.ms -> 2147483647
auto.offset.reset -> earliest
windowstore.changelog.additional.retention.ms -> 86400000
internal.leave.group.on.close -> false
application.id -> groupId
client.id -> clientId-consumer
StreamsConfig
does not allow users to configure certain Kafka configurations (e.g. for consumer) that are simply removed (with a WARN message in the logs).
Name | Value | Description |
---|---|---|
|
||
|
||
|
The maximum number of records to buffer per partition |
|
|
||
|
||
|
StreamsConfig
uses consumer
prefix for custom Kafka configurations of a Kafka consumer.
Map<String, Object> getConsumerConfigs(
final String groupId,
final String clientId)
getConsumerConfigs
gets the common consumer configuration first.
getConsumerConfigs
then adds the additional consumer configuration properties.
Name | Value | Description |
---|---|---|
|
||
|
||
|
Caution
|
FIXME |
Note
|
getConsumerConfigs is used exclusively when StreamThread is created.
|
Map<String, Object> getProducerConfigs(final String clientId)
getProducerConfigs
…FIXME
Note
|
getProducerConfigs is used when…FIXME
|
Map<String, Object> getAdminConfigs(final String clientId)
getAdminConfigs
…FIXME
Note
|
getAdminConfigs is used when…FIXME
|
Map<String, Object> clientProps(
final Set<String> configNames,
final Map<String, Object> originals)
clientProps
collects the configuration properties from originals
that have their names in the input configNames
, i.e. includes the properties that have been listed in configNames
.
Note
|
clientProps is used exclusively when StreamsConfig is requested to getClientPropsWithPrefix.
|
Getting Subset of User Configuration by Given Names and Prefix — getClientPropsWithPrefix
Internal Method
Map<String, Object> getClientPropsWithPrefix(
final String prefix,
final Set<String> configNames)
getClientPropsWithPrefix
takes only the properties (as passed in by a user) that have their keys in configNames
and adds all properties with the given prefix
.
Internally, getClientPropsWithPrefix
collects the configuration properties from the original values of Kafka properties as passed in by a user that have their names in configNames
.
getClientPropsWithPrefix
then copies all original settings with the given prefix
(stripping the prefix before adding them) to the collected properties (and possibly overwriting some).
Note
|
getClientPropsWithPrefix uses AbstractConfig.originals to get the original values of Kafka properties as passed in by the user.
|
Note
|
getClientPropsWithPrefix is used when StreamsConfig is requested for getAdminConfigs, getCommonConsumerConfigs, getConsumerConfigs and getProducerConfigs.
|
Map<String, Object> getCommonConsumerConfigs()
getCommonConsumerConfigs
gets a subset of user configuration for a Kafka consumer as well as the properties with consumer prefix.
Note
|
getCommonConsumerConfigs uses ConsumerConfig.configNames for the list of the Kafka Consumer-specific configuration keys.
|
Caution
|
FIXME |
Note
|
getCommonConsumerConfigs is used when StreamsConfig is requested for getConsumerConfigs and getRestoreConsumerConfigs.
|
Removing "Illegal" User-Defined Configuration Properties — checkIfUnexpectedUserSpecifiedConsumerConfig
Internal Method
void checkIfUnexpectedUserSpecifiedConsumerConfig(
final Map<String, Object> clientProvidedProps,
final String[] nonConfigurableConfigs)
checkIfUnexpectedUserSpecifiedConsumerConfig
removes non-configurable configurations (nonConfigurableConfigs
) from user-defined configurations (clientProvidedProps
) and logging a warning.
Internally, checkIfUnexpectedUserSpecifiedConsumerConfig
iterates over nonConfigurableConfigs
…FIXME
Note
|
checkIfUnexpectedUserSpecifiedConsumerConfig is used when StreamsConfig is requested for getCommonConsumerConfigs and getProducerConfigs.
|