Skip to content

Latest commit

 

History

History
704 lines (545 loc) · 31.4 KB

kafka-component.adoc

File metadata and controls

704 lines (545 loc) · 31.4 KB

Kafka

Since Camel 2.13

Both producer and consumer are supported

The Kafka component is used for communicating with Apache Kafka message broker.

Maven users will need to add the following dependency to their pom.xml for this component.

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-kafka</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

URI format

kafka:topic[?options]

For more information about Producer/Consumer configuration:

If you want to send a message to a dynamic topic then use KafkaConstants.OVERRIDE_TOPIC as it is used as a one-time header that is not sent along the message, and actually is removed in the producer.

Usage

Consumer error handling

While kafka consumer is polling messages from the kafka broker, then errors can happen. This section describes what happens and what you can configure.

The consumer may throw exception when invoking the Kafka poll API. For example, if the message cannot be deserialized due to invalid data, and many other kinds of errors. Those errors are in the form of KafkaException which are either retriable or not. The exceptions which can be retried (RetriableException) will be retried again (with a poll timeout in between). All other kinds of exceptions are handled according to the pollOnError configuration. This configuration has the following values:

  • DISCARD will discard the message and continue to poll the next message.

  • ERROR_HANDLER will use Camel’s error handler to process the exception, and afterwards continue to poll the next message.

  • RECONNECT will re-connect the consumer and try to poll the message again.

  • RETRY will let the consumer retry polling the same message again

  • STOP will stop the consumer (it has to be manually started/restarted if the consumer should be able to consume messages again).

The default is ERROR_HANDLER, which will let Camel’s error handler (if any configured) process the caused exception. Afterwards continue to poll the next message. This behavior is similar to the bridgeErrorHandler option that Camel components have.

For advanced control a custom implementation of org.apache.camel.component.kafka.PollExceptionStrategy can be configured on the component level, which allows controlling which of the strategies to use for each exception.

Consumer error handling (advanced)

By default, Camel will poll using the ERROR_HANDLER to process exceptions. How Camel handles a message that results in an exception can be altered using the breakOnFirstError attribute in the configuration. Instead of continuing to poll the next message, Camel will instead commit the offset so that the message that caused the exception will be retried. This is similar to the RETRY polling strategy above.

KafkaComponent kafka = new KafkaComponent();
kafka.setBreakOnFirstError(true);
...
camelContext.addComponent("kafka", kafka);

It is recommended that you read the section below "Using manual commit with Kafka consumer" to understand how breakOnFirstError will work based on the CommitManager that is configured.

The Kafka idempotent repository

The camel-kafka library provides a Kafka topic-based idempotent repository. This repository stores broadcasts all changes to idempotent state (add/remove) in a Kafka topic, and populates a local in-memory cache for each repository’s process instance through event sourcing. The topic used must be unique per idempotent repository instance. The mechanism does not have any requirements about the number of topic partitions, as the repository consumes from all partitions at the same time. It also does not have any requirements about the replication factor of the topic. Each repository instance that uses the topic, (e.g., typically on different machines running in parallel) controls its own consumer group, so in a cluster of 10 Camel processes using the same topic, each will control its own offset. On startup, the instance subscribes to the topic, rewinds the offset to the beginning and rebuilds the cache to the latest state. The cache will not be considered warmed up until one poll of pollDurationMs in length returns 0 records. Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the latter happens, the idempotent repository may be in an inconsistent state until its consumer catches up to the end of the topic. Be mindful of the format of the header used for the uniqueness check. By default, it uses Strings as the data types. When using primitive numeric formats, the header must be deserialized accordingly. Check the samples below for examples.

A KafkaIdempotentRepository has the following properties:

Property Default Description

topic

Required The name of the Kafka topic to use to broadcast changes. (required)

bootstrapServers

Required The bootstrap.servers property on the internal Kafka producer and consumer. Use this as shorthand if not setting consumerConfig and producerConfig. If used, this component will apply sensible default configurations for the producer and consumer.

groupId

The groupId to assign to the idempotent consumer.

startupOnly

false

Whether to sync on startup only, or to continue syncing while Camel is running.

maxCacheSize

1000

How many of the most recently used keys should be stored in memory (default 1000).

pollDurationMs

100

The poll duration of the Kafka consumer. The local caches are updated immediately. This value will affect how far behind other peers that update their caches from the topic are relative to the idempotent consumer instance that sent the cache action message. The default value of this is 100 ms. If setting this value explicitly, be aware that there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository’s consumer and the Kafka brokers. The cache warmup process also depends on there being one poll that fetches nothing - this indicates that the stream has been consumed up to the current point. If the poll duration is excessively long for the rate at which messages are sent on the topic, there exists a possibility that the cache cannot be warmed up and will operate in an inconsistent state relative to its peers until it catches up.

producerConfig

Sets the properties that will be used by the Kafka producer that broadcasts changes. Overrides bootstrapServers, so must define the Kafka bootstrap.servers property itself

consumerConfig

Sets the properties that will be used by the Kafka consumer that populates the cache from the topic. Overrides bootstrapServers, so must define the Kafka bootstrap.servers property itself

The repository can be instantiated by defining the topic and bootstrapServers, or the producerConfig and consumerConfig property sets can be explicitly defined to enable features such as SSL/SASL. To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in Spring, as it is CamelContext aware.

Sample usage is as follows:

KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");

SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);

// later in RouteBuilder...
from("direct:performInsert")
    .idempotentConsumer(header("id")).idempotentRepository("insertDbIdemRepo")
        // once-only insert into the database
    .end()

In XML:

<!-- simple -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="bootstrapServers" value="localhost:9091"/>
</bean>

<!-- complex -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="maxCacheSize" value="10000"/>
  <property name="consumerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
  <property name="producerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
</bean>

There are 3 alternatives to choose from when using idempotency with numeric identifiers. The first one is to use the static method numericHeader method from org.apache.camel.component.kafka.serde.KafkaSerdeHelper to perform the conversion for you:

from("direct:performInsert")
    .idempotentConsumer(numericHeader("id")).idempotentRepository("insertDbIdemRepo")
        // once-only insert into the database
    .end()

Alternatively, it is possible to use a custom serializer configured via the route URL to perform the conversion:

public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
    private static final Logger LOG = LoggerFactory.getLogger(CustomHeaderDeserializer.class);

    @Override
    public Object deserialize(String key, byte[] value) {
        if (key.equals("id")) {
            BigInteger bi = new BigInteger(value);

            return String.valueOf(bi.longValue());
        } else {
            return super.deserialize(key, value);
        }
    }
}

Lastly, it is also possible to do so in a processor:

from(from).routeId("foo")
    .process(exchange -> {
        byte[] id = exchange.getIn().getHeader("id", byte[].class);

        BigInteger bi = new BigInteger(id);
        exchange.getIn().setHeader("id", String.valueOf(bi.longValue()));
    })
    .idempotentConsumer(header("id"))
    .idempotentRepository("kafkaIdempotentRepository")
    .to(to);

Manual commits with the Kafka consumer

By default, the Kafka consumer will use auto commit, where the offset will be committed automatically in the background using a given interval.

In case you want to force manual commits, you can use KafkaManualCommit API from the Camel Exchange, stored on the message header. This requires turning on manual commits by either setting the option allowManualCommit to true on the KafkaComponent or on the endpoint, for example:

KafkaComponent kafka = new KafkaComponent();
kafka.setAutoCommitEnable(false);
kafka.setAllowManualCommit(true);
// ...
camelContext.addComponent("kafka", kafka);

By default, it uses the NoopCommitManager behind the scenes. To commit an offset, you will require you to use the KafkaManualCommit from Java code such as a Camel Processor:

public void process(Exchange exchange) {
    KafkaManualCommit manual =
        exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
    manual.commit();
}

The KafkaManualCommit will force a synchronous commit which will block until the commit is acknowledged on Kafka, or if it fails an exception is thrown. You can use an asynchronous commit as well by configuring the KafkaManualCommitFactory with the DefaultKafkaManualAsyncCommitFactory implementation.

Then the commit will be done in the next consumer loop using the kafka asynchronous commit api.

If you want to use a custom implementation of KafkaManualCommit then you can configure a custom KafkaManualCommitFactory on the KafkaComponent that creates instances of your custom implementation.

When configuring a consumer to use manual commit and a specific CommitManager it is important to understand how these influence the behavior of breakOnFirstError

KafkaComponent kafka = new KafkaComponent();
kafka.setAutoCommitEnable(false);
kafka.setAllowManualCommit(true);
kafka.setBreakOnFirstError(true);
kafka.setKafkaManualCommitFactory(new DefaultKafkaManualCommitFactory());
...
camelContext.addComponent("kafka", kafka);

When the CommitManager is left to the default NoopCommitManager then breakOnFirstError will not automatically commit the offset so that the message with an error is retried. The consumer must manage that in the route using KafkaManualCommit.

When the CommitManager is changed to either the synchronous or asynchronous manager then breakOnFirstError will automatically commit the offset so that the message with an error is retried. This message will be continually retried until it can be processed without an error.

Note 1: records from a partition must be processed and committed by the same thread as the consumer. This means that certain EIPs, async or concurrent operations in the DSL may cause the commit to fail. In such circumstances, tyring to commit the transaction will cause the Kafka client to throw a java.util.ConcurrentModificationException exception with the message KafkaConsumer is not safe for multi-threaded access. To prevent this from happening, redesign your route to avoid those operations.

*Note 2: this is mostly useful with aggregation’s completion timeout strategies.

Pausable Consumers

The Kafka component supports pausable consumers. This type of consumer can pause consuming data based on conditions external to the component itself, such as an external system being unavailable or other transient conditions.

from("kafka:topic")
    .pausable(new KafkaConsumerListener(), () -> canContinue()) // the pausable check gets called if the exchange fails to be processed ...
    .routeId("pausable-route")
    .process(this::process) // Kafka consumer will be paused if this one throws an exception ...
    .to("some:destination"); // or this one

In this example, consuming messages can pause (by calling the Kafka’s Consumer pause method) if the result from canContinue is false.

Important
The pausable EIP is meant to be used as a support mechanism when there is an exception somewhere in the route that prevents the exchange from being processed. More specifically, the check called by the pausable EIP should be used to test for transient conditions preventing the exchange from being processed.
Note
most users should prefer using the RoutePolicy, which offers better control of the route.

Kafka Headers propagation

When consuming messages from Kafka, headers will be propagated to camel exchange headers automatically. Producing flow backed by same behaviour - camel headers of particular exchange will be propagated to kafka message headers.

Since kafka headers allow only byte[] values, in order camel exchange header to be propagated its value should be serialized to bytes[], otherwise header will be skipped. The following header value types are supported: String, Integer, Long, Double, Boolean, byte[]. Note: all headers propagated from kafka to camel exchange will contain byte[] value by default. To override default functionality, these uri parameters can be set: headerDeserializer for from route and headerSerializer for to route. For example:

from("kafka:my_topic?headerDeserializer=#myDeserializer")
...
.to("kafka:my_topic?headerSerializer=#mySerializer")

By default, all headers are being filtered by KafkaHeaderFilterStrategy. Strategy filters out headers which start with Camel or org.apache.camel prefixes. Default strategy can be overridden by using headerFilterStrategy uri parameter in both to and from routes:

from("kafka:my_topic?headerFilterStrategy=#myStrategy")
...
.to("kafka:my_topic?headerFilterStrategy=#myStrategy")

myStrategy object should be a subclass of HeaderFilterStrategy and must be placed in the Camel registry, either manually or by registration as a bean in Spring, as it is CamelContext aware.

Kafka Transaction

You need to add transactional.id, enable.idempotence and retries in additional-properties to enable kafka transaction with the producer.

from("direct:transaction")
.to("kafka:my_topic?additional-properties[transactional.id]=1234&additional-properties[enable.idempotence]=true&additional-properties[retries]=5");

At the end of exchange routing, the kafka producer would commit the transaction or abort it if there is an Exception throwing or the exchange is RollbackOnly. Since Kafka does not support transactions in multi threads, it will throw ProducerFencedException if there is another producer with the same transaction.id to make the transactional request.

It would work with JTA camel-jta by using transacted() and if it involves some resources (SQL or JMS), which supports XA, then they would work in tandem, where they both will either commit or rollback at the end of the exchange routing. In some cases, if the JTA transaction manager fails to commit (during the 2PC processing), but kafka transaction has been committed before and there is no chance to roll back the changes since the kafka transaction does not support JTA/XA spec. There is still a risk with the data consistency.

Setting Kerberos config file

Configure the 'krb5.conf' file directly through the API:

static {
    KafkaComponent.setKerberosConfigLocation("path/to/config/file");
}

Batching Consumer

To use a Kafka batching consumer with Camel, an application has to set the configuration batching to true.

The received records are stored in a list in the exchange used in the pipeline. As such, it is possible to commit individually every record or the whole batch at once by committing the last exchange on the list.

The size of the batch is controlled by the option maxPollRecords.

To avoid blocking for too long, waiting for the whole set of records to fill the batch, it is possible to use the pollTimeoutMs option to set a timeout for the polling. In this case, the batch may contain less messages than set in the maxPollRecords.

Automatic Commits

By default, Camel uses automatic commits when using batch processing. In this case, Camel automatically commits the records after they have been successfully processed by the application.

In case of failures, the records will not be processed.

The code below provides an example of this approach:

public void configure() {
    from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
        // The received records are stored as exchanges in a list. This gets the list of those exchanges
        final List<?> exchanges = e.getMessage().getBody(List.class);

        // Ensure we are actually receiving what we are asking for
        if (exchanges == null || exchanges.isEmpty()) {
            return;
        }

        // The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list
        for (Object obj : exchanges) {
            if (obj instanceof Exchange exchange) {
                LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));
            }
        }
    }).to(KafkaTestUtil.MOCK_RESULT);
}
Handling Errors with Automatic Commits

When using automatic commits, Camel will not commit records if there is a failure in processing. Because of this, there is a risk that records could be reprocessed multiple times.

It is recommended to implement appropriate error handling mechanisms and patterns (i.e.; such as dead-letter queues), to prevent failed records from blocking processing progress.

The code below provides an example of handling errors with automatic commits:

public void configure() {
    /*
     We want to use continued here, so that Camel auto-commits the batch even though part of it has failed. In a
     production scenario, applications should probably send these records to a separate topic or fix the condition
     that lead to the failure
     */
    onException(IllegalArgumentException.class).process(exchange -> {
        LOG.warn("Failed to process batch {}", exchange.getMessage().getBody());
        LOG.warn("Failed to process due to {}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage());
    }).continued(true);

    from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
        // The received records are stored as exchanges in a list. This gets the list of those exchanges
        final List<?> exchanges = e.getMessage().getBody(List.class);

        // Ensure we are actually receiving what we are asking for
        if (exchanges == null || exchanges.isEmpty()) {
            return;
        }

        // The records from the batch are stored in a list of exchanges in the original exchange.
        int i = 0;
        for (Object o : exchanges) {
            if (o instanceof Exchange exchange) {
                i++;
                LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));

                if (i == 4) {
                    throw new IllegalArgumentException("Failed to process record");
                }
            }
        }
    }).to(KafkaTestUtil.MOCK_RESULT);
}

Manual Commits

When working with batch processing with manual commits, it’s up to the application to commit the records, and handle the outcome of potentially invalid records.

The code below provides an example of this approach:

public void configure() {
    from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
    .process(e -> {
        // The received records are stored as exchanges in a list. This gets the list of those exchanges
        final List<?> exchanges = e.getMessage().getBody(List.class);

        // Ensure we are actually receiving what we are asking for
        if (exchanges == null || exchanges.isEmpty()) {
            return;
        }

        /*
        Every exchange in that list should contain a reference to the manual commit object. We use the reference
        for the last exchange in the list to commit the whole batch
         */
        final Object tmp = exchanges.getLast();
        if (tmp instanceof Exchange exchange) {
            KafkaManualCommit manual =
                    exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
            LOG.debug("Performing manual commit");
            manual.commit();
            LOG.debug("Done performing manual commit");
        }
    });
}

Dealing with long polling timeouts

In some cases, applications may want the polling process to have a long timeout (see: pollTimeoutMs).

To properly do so, first make sure to have a max polling interval that is higher than the polling timeout (see: maxPollIntervalMs).

Then, increase the shutdown timeout to ensure that committing, closing and other Kafka operations are not abruptly aborted. For instance:

public void configure() {
    // Note that this can be configured in other ways
    getCamelContext().getShutdownStrategy().setTimeout(10000);

    // route setup ...
}

Custom Subscription Adapters

Applications with complex subscription logic may provide a custom bean to handle the subscription process. To so, it is necessary to implement the interface SubscribeAdapter.

Example subscriber adapter that subscribes to a set of Kafka topics or patterns
public class CustomSubscribeAdapter implements SubscribeAdapter {
    @Override
    public void subscribe(Consumer<?, ?> consumer, ConsumerRebalanceListener reBalanceListener, TopicInfo topicInfo) {
        if (topicInfo.isPattern()) {
            consumer.subscribe(topicInfo.getPattern(), reBalanceListener);
        } else {
            consumer.subscribe(topicInfo.getTopics(), reBalanceListener);
        }
    }
}

Then, it is necessary to add it as named bean instance to the registry:

Add to registry example
context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new CustomSubscribeAdapter());

Interoperability

JMS

When interoperating Kafka and JMS, it may be necessary to coerce the JMS headers into their expected type.

For instance, when consuming messages from Kafka carrying JMS headers and then sending them to a JMS broker, those headers are first deserialized into a byte array. Then, the camel-jms component tries to coerce this byte array into the specific type used by. However, both the origin endpoint as well as how this was setup on the code itsef may affect how the data is serialized and deserialized. As such, it is not feasible to naively assume the data type of the byte array.

To address this issue, we provide a custom header deserializer to force Kafka to de-serialize the JMS data according to the JMS specification. This approach ensures that the headers are properly interpreted and processed by the camel-jms component.

Due to the inherent complexity of each possible system and endpoint, it may not be possible for this deserializer to cover all possible scenarios. As such, it is provided as model that can be modified and/or adapted for the specific needs of each application.

To utilize this solution, you need to modify the route URI on the consumer end of the pipeline by including the headerDeserializer option. For example:

Route snippet
from("kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer")
    .to("...");

Examples

Consuming messages from Kafka

Here is the minimal route you need to read messages from Kafka.

from("kafka:test?brokers=localhost:9092")
    .log("Message received from Kafka : ${body}")
    .log("    on the topic ${headers[kafka.TOPIC]}")
    .log("    on the partition ${headers[kafka.PARTITION]}")
    .log("    with the offset ${headers[kafka.OFFSET]}")
    .log("    with the key ${headers[kafka.KEY]}")

If you need to consume messages from multiple topics, you can use a comma separated list of topic names.

from("kafka:test,test1,test2?brokers=localhost:9092")
    .log("Message received from Kafka : ${body}")
    .log("    on the topic ${headers[kafka.TOPIC]}")
    .log("    on the partition ${headers[kafka.PARTITION]}")
    .log("    with the offset ${headers[kafka.OFFSET]}")
    .log("    with the key ${headers[kafka.KEY]}")

It’s also possible to subscribe to multiple topics giving a pattern as the topic name and using the topicIsPattern option.

from("kafka:test.*?brokers=localhost:9092&topicIsPattern=true")
    .log("Message received from Kafka : ${body}")
    .log("    on the topic ${headers[kafka.TOPIC]}")
    .log("    on the partition ${headers[kafka.PARTITION]}")
    .log("    with the offset ${headers[kafka.OFFSET]}")
    .log("    with the key ${headers[kafka.KEY]}")

When consuming messages from Kafka, you can use your own offset management and not delegate this management to Kafka. To keep the offsets, the component needs a StateRepository implementation such as FileStateRepository. This bean should be available in the registry. Here how to use it :

// Create the repository in which the Kafka offsets will be persisted
FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));

// Bind this repository into the Camel registry
Registry registry = createCamelRegistry();
registry.bind("offsetRepo", repository);

// Configure the camel context
DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
        fromF("kafka:%s?brokers=localhost:{{kafkaPort}}" +
                     // Set up the topic and broker address
                     "&groupId=A" +
                     // The consumer processor group ID
                     "&autoOffsetReset=earliest" +
                     // Ask to start from the beginning if we have unknown offset
                     "&offsetRepository=#offsetRepo", TOPIC)
                     // Keep the offsets in the previously configured repository
                .to("mock:result");
    }
});

Producing messages to Kafka

Here is the minimal route you need to produce messages to Kafka.

from("direct:start")
    .setBody(constant("Message from Camel"))          // Message to send
    .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message
    .to("kafka:test?brokers=localhost:9092");

SSL configuration

You have two different ways to configure the SSL communication on the Kafka component.

The first way is through the many SSL endpoint parameters:

from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
             "&groupId=A" +
             "&sslKeystoreLocation=/path/to/keystore.jks" +
             "&sslKeystorePassword=changeit" +
             "&sslKeyPassword=changeit" +
             "&securityProtocol=SSL")
        .to("mock:result");

The second way is to use the sslContextParameters endpoint parameter:

// Configure the SSLContextParameters object
KeyStoreParameters ksp = new KeyStoreParameters();
ksp.setResource("/path/to/keystore.jks");
ksp.setPassword("changeit");
KeyManagersParameters kmp = new KeyManagersParameters();
kmp.setKeyStore(ksp);
kmp.setKeyPassword("changeit");
SSLContextParameters scp = new SSLContextParameters();
scp.setKeyManagers(kmp);

// Bind this SSLContextParameters into the Camel registry
Registry registry = createCamelRegistry();
registry.bind("ssl", scp);

// Configure the camel context
DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
        from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
                     // Set up the topic and broker address
                     "&groupId=A" +
                     // The consumer processor group ID
                     "&sslContextParameters=#ssl" +
                     // The security protocol
                     "&securityProtocol=SSL)
                     // Reference the SSL configuration
                .to("mock:result");
    }
});

spring-boot:partial$starter.adoc