Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"partitionAssignor": { "kind": "property", "displayName": "Partition Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used" },
"pollOnError": { "kind": "property", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY will let the consumer retry polling the same message again STOP will stop the consumer (have to be manually started\/restarted if the consumer should be able to consume messages again)" },
"pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used when polling the KafkaConsumer." },
"resumeStrategy": { "kind": "property", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This option allows the user to set a custom resume strategy. The resume strategy is executed when partitions are assigned (i.e.: when connecting or reconnecting). It allows implementations to customize how to resume operations and serve as more flexible alternative to the seekTo and the offsetRepository mechanisms. See the ResumeStrategy for implementation details. This option does not affect the auto commit setting. It is likely that implementations using this setting will also want to evaluate using the manual commit option along with this." },
"seekTo": { "kind": "property", "displayName": "Seek To", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning" },
"sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used to detect failures when using Kafka's group management facilities." },
"specificAvroReader": { "kind": "property", "displayName": "Specific Avro Reader", "group": "consumer", "label": "confluent,consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This enables the use of a specific Avro reader for use with the Confluent Platform schema registry and the io.confluent.kafka.serializers.KafkaAvroDeserializer. This option is only available in the Confluent Platform (not standard Apache Kafka)" },
Expand Down Expand Up @@ -160,6 +161,7 @@
"partitionAssignor": { "kind": "parameter", "displayName": "Partition Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used" },
"pollOnError": { "kind": "parameter", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY will let the consumer retry polling the same message again STOP will stop the consumer (have to be manually started\/restarted if the consumer should be able to consume messages again)" },
"pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used when polling the KafkaConsumer." },
"resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This option allows the user to set a custom resume strategy. The resume strategy is executed when partitions are assigned (i.e.: when connecting or reconnecting). It allows implementations to customize how to resume operations and serve as more flexible alternative to the seekTo and the offsetRepository mechanisms. See the ResumeStrategy for implementation details. This option does not affect the auto commit setting. It is likely that implementations using this setting will also want to evaluate using the manual commit option along with this." },
"seekTo": { "kind": "parameter", "displayName": "Seek To", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning" },
"sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used to detect failures when using Kafka's group management facilities." },
"specificAvroReader": { "kind": "parameter", "displayName": "Specific Avro Reader", "group": "consumer", "label": "confluent,consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This enables the use of a specific Avro reader for use with the Confluent Platform schema registry and the io.confluent.kafka.serializers.KafkaAvroDeserializer. This option is only available in the Confluent Platform (not standard Apache Kafka)" },
Expand Down
2 changes: 2 additions & 0 deletions components-starter/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ Number of Camel components: 339 in 274 JAR artifacts (1 deprecated)

| xref:latest@components::http-component.adoc[HTTP] | camel-http-starter | Stable | 2.3 | Send requests to external HTTP servers using Apache HTTP Client 4.x.

| xref:latest@components::hwcloud-image-component.adoc[Huawei Cloud Image Recognition] | camel-huaweicloud-imagerecognition-starter | Preview | 3.12 | To identify objects, scenes, and concepts in images on Huawei Cloud

| xref:latest@components::hwcloud-imagerecognition-component.adoc[Huawei Cloud Image Recognition] | camel-huaweicloud-imagerecognition-starter | Preview | 3.12 | To identify objects, scenes, and concepts in images on Huawei Cloud

| xref:latest@components::hwcloud-dms-component.adoc[Huawei Distributed Message Service (DMS)] | camel-huaweicloud-dms-starter | Preview | 3.12 | To integrate with a fully managed, high-performance message queuing service on Huawei Cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ When using kafka with Spring Boot make sure to use the following Maven dependenc
----


The component supports 104 options, which are listed below.
The component supports 105 options, which are listed below.



Expand Down Expand Up @@ -93,6 +93,7 @@ The component supports 104 options, which are listed below.
| *camel.component.kafka.record-metadata* | Whether the producer should store the RecordMetadata results from sending to Kafka. The results are stored in a List containing the RecordMetadata metadata's. The list is stored on a header with the key KafkaConstants#KAFKA_RECORDMETA | true | Boolean
| *camel.component.kafka.request-required-acks* | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. | 1 | String
| *camel.component.kafka.request-timeout-ms* | The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client. | 30000 | Integer
| *camel.component.kafka.resume-strategy* | This option allows the user to set a custom resume strategy. The resume strategy is executed when partitions are assigned (i.e.: when connecting or reconnecting). It allows implementations to customize how to resume operations and serve as more flexible alternative to the seekTo and the offsetRepository mechanisms. See the ResumeStrategy for implementation details. This option does not affect the auto commit setting. It is likely that implementations using this setting will also want to evaluate using the manual commit option along with this. The option is a org.apache.camel.component.kafka.consumer.support.ResumeStrategy type. | | ResumeStrategy
| *camel.component.kafka.retries* | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second succeeds, then the second record may appear first. | 0 | Integer
| *camel.component.kafka.retry-backoff-ms* | Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. | 100 | Integer
| *camel.component.kafka.sasl-jaas-config* | Expose the kafka sasl.jaas.config parameter Example: org.apache.kafka.common.security.plain.PlainLoginModule required username=USERNAME password=PASSWORD; | | String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.camel.component.kafka.KafkaManualCommitFactory;
import org.apache.camel.component.kafka.PollExceptionStrategy;
import org.apache.camel.component.kafka.PollOnError;
import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.spi.HeaderFilterStrategy;
Expand Down Expand Up @@ -283,6 +284,18 @@ public class KafkaComponentConfiguration
* java.lang.Long type.
*/
private Long pollTimeoutMs = 5000L;
/**
* This option allows the user to set a custom resume strategy. The resume
* strategy is executed when partitions are assigned (i.e.: when connecting
* or reconnecting). It allows implementations to customize how to resume
* operations and serve as more flexible alternative to the seekTo and the
* offsetRepository mechanisms. See the ResumeStrategy for implementation
* details. This option does not affect the auto commit setting. It is
* likely that implementations using this setting will also want to evaluate
* using the manual commit option along with this. The option is a
* org.apache.camel.component.kafka.consumer.support.ResumeStrategy type.
*/
private ResumeStrategy resumeStrategy;
/**
* Set if KafkaConsumer will read from beginning or end on startup:
* beginning : read from beginning end : read from end This is replacing the
Expand Down Expand Up @@ -1015,6 +1028,14 @@ public void setPollTimeoutMs(Long pollTimeoutMs) {
this.pollTimeoutMs = pollTimeoutMs;
}

public ResumeStrategy getResumeStrategy() {
return resumeStrategy;
}

public void setResumeStrategy(ResumeStrategy resumeStrategy) {
this.resumeStrategy = resumeStrategy;
}

public String getSeekTo() {
return seekTo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public Set<ConvertiblePair> getConvertibleTypes() {
answer.add(new ConvertiblePair(String.class, org.apache.camel.spi.HeaderFilterStrategy.class));
answer.add(new ConvertiblePair(String.class, org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer.class));
answer.add(new ConvertiblePair(String.class, org.apache.camel.spi.StateRepository.class));
answer.add(new ConvertiblePair(String.class, org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class));
answer.add(new ConvertiblePair(String.class, org.apache.camel.component.kafka.KafkaManualCommitFactory.class));
answer.add(new ConvertiblePair(String.class, org.apache.camel.component.kafka.PollExceptionStrategy.class));
answer.add(new ConvertiblePair(String.class, org.apache.camel.component.kafka.serde.KafkaHeaderSerializer.class));
Expand All @@ -71,6 +72,7 @@ public Object convert(
case "org.apache.camel.spi.HeaderFilterStrategy": return applicationContext.getBean(ref, org.apache.camel.spi.HeaderFilterStrategy.class);
case "org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer": return applicationContext.getBean(ref, org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer.class);
case "org.apache.camel.spi.StateRepository": return applicationContext.getBean(ref, org.apache.camel.spi.StateRepository.class);
case "org.apache.camel.component.kafka.consumer.support.ResumeStrategy": return applicationContext.getBean(ref, org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class);
case "org.apache.camel.component.kafka.KafkaManualCommitFactory": return applicationContext.getBean(ref, org.apache.camel.component.kafka.KafkaManualCommitFactory.class);
case "org.apache.camel.component.kafka.PollExceptionStrategy": return applicationContext.getBean(ref, org.apache.camel.component.kafka.PollExceptionStrategy.class);
case "org.apache.camel.component.kafka.serde.KafkaHeaderSerializer": return applicationContext.getBean(ref, org.apache.camel.component.kafka.serde.KafkaHeaderSerializer.class);
Expand Down
Loading