From 0c960448808f2c0037dde870906bacd0fe0fe776 Mon Sep 17 00:00:00 2001 From: Oleksii Ivanov Date: Wed, 27 Mar 2024 00:36:50 +0200 Subject: [PATCH] feat(kafka): migrate kafka inbound connectors to template generator --- .../kafka-connector-message-start.json | 314 ---------- .../kafka-inbound-connector-boundary.json | 280 --------- .../kafka-inbound-connector-hybrid.json | 249 -------- .../kafka-inbound-connector-intermediate.json | 540 +++++++++--------- .../kafka-inbound-connector.json | 239 -------- .../kafka-outbound-connector-hybrid.json | 254 -------- connectors/kafka/pom.xml | 40 +- .../kafka/inbound/KafkaConnectorConsumer.java | 8 +- .../inbound/KafkaConnectorProperties.java | 198 +++---- .../kafka/inbound/KafkaExecutable.java | 38 ++ .../inbound/KafkaPropertyTransformer.java | 14 +- .../outbound/model/KafkaConnectorRequest.java | 9 +- .../kafka/inbound/KafkaExecutableTest.java | 38 +- .../integration/KafkaIntegrationTest.java | 91 ++- 14 files changed, 541 insertions(+), 1771 deletions(-) delete mode 100644 connectors/kafka/element-templates/kafka-connector-message-start.json delete mode 100644 connectors/kafka/element-templates/kafka-inbound-connector-boundary.json delete mode 100644 connectors/kafka/element-templates/kafka-inbound-connector-hybrid.json delete mode 100644 connectors/kafka/element-templates/kafka-inbound-connector.json delete mode 100644 connectors/kafka/element-templates/kafka-outbound-connector-hybrid.json diff --git a/connectors/kafka/element-templates/kafka-connector-message-start.json b/connectors/kafka/element-templates/kafka-connector-message-start.json deleted file mode 100644 index 35af573f67..0000000000 --- a/connectors/kafka/element-templates/kafka-connector-message-start.json +++ /dev/null @@ -1,314 +0,0 @@ -{ - "$schema": "https://unpkg.com/@camunda/zeebe-element-templates-json-schema/resources/schema.json", - "name": "Kafka Message Start Event Connector", - "id": "io.camunda.connectors.inbound.KafkaMessageStart.v1", - "version": 2, - "description": "Consume Kafka messages", - "documentationRef": "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/kafka/?kafka=inbound", - "category": { - "id": "connectors", - "name": "Connectors" - }, - "appliesTo": [ - "bpmn:StartEvent" - ], - "elementType": { - "value": "bpmn:StartEvent", - "eventDefinition": "bpmn:MessageEventDefinition" - }, - "groups": [ - { - "id": "authentication", - "label": "Authentication" - }, - { - "id": "kafka", - "label": "Kafka" - }, - { - "id": "activation", - "label": "Activation" - }, - { - "id": "correlation", - "label": "Subprocess correlation" - }, - { - "id": "variable-mapping", - "label": "Variable mapping" - } - ], - "properties": [ - { - "type": "Hidden", - "value": "io.camunda:connector-kafka-inbound:1", - "binding": { - "type": "zeebe:property", - "name": "inbound.type" - } - }, - { - "type": "Hidden", - "generatedValue": { - "type": "uuid" - }, - "binding": { - "type": "bpmn:Message#property", - "name": "name" - } - }, - { - "label": "Authentication type", - "description": "Username/password or custom", - "id": "authenticationType", - "group": "authentication", - "type": "Dropdown", - "value": "credentials", - "choices": [ - { - "name": "Credentials", - "value": "credentials" - }, - { - "name": "Custom", - "value": "custom" - } - ], - "binding": { - "type": "zeebe:property", - "name": "authenticationType" - } - }, - { - "label": "Username", - "description": "Provide the username (must have permissions to produce message to the topic)", - "group": "authentication", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "authentication.username" - }, - "condition": { - "property": "authenticationType", - "equals": "credentials" - } - }, - { - "label": "Password", - "description": "Provide a password for the user", - "group": "authentication", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "authentication.password" - }, - "condition": { - "property": "authenticationType", - "equals": "credentials" - } - }, - { - "label": "Bootstrap servers", - "description": "Provide bootstrap server(s), comma-delimited if there are multiple", - "group": "kafka", - "type": "String", - "feel": "optional", - "binding": { - "type": "zeebe:property", - "name": "topic.bootstrapServers" - }, - "constraints": { - "notEmpty": true - } - }, - { - "label": "Topic", - "description": "Provide the topic name", - "group": "kafka", - "type": "String", - "binding": { - "type": "zeebe:property", - "name": "topic.topicName" - }, - "constraints": { - "notEmpty": true - } - }, - { - "label": "Consumer Group ID", - "description": "Provide the consumer group ID used by the connector. Leave empty for an automatically generated one", - "group": "kafka", - "type": "String", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "groupId" - }, - "constraints": { - "notEmpty": false, - "maxLength": 250 - } - }, - { - "label": "Additional properties", - "description": "Provide additional Kafka consumer properties in JSON", - "group": "kafka", - "type": "String", - "optional": true, - "feel": "required", - "binding": { - "type": "zeebe:property", - "name": "additionalProperties" - } - }, - { - "label": "Offsets", - "description": "List of offsets, e.g. '10' or '=[10, 23]'. If specified, it has to have the same number of values as the number of partitions", - "group": "kafka", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "offsets" - } - }, - { - "label": "Auto offset reset", - "description": "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. You should only select none if you specified the offsets", - "id": "autoOffsetReset", - "group": "kafka", - "type": "Dropdown", - "value": "latest", - "choices": [ - { - "name": "Latest", - "value": "latest" - }, - { - "name": "Earliest", - "value": "earliest" - }, - { - "name": "None", - "value": "none" - } - ], - "binding": { - "type": "zeebe:property", - "name": "autoOffsetReset" - } - }, - { - "label": "Message ID expression", - "feel": "required", - "type": "String", - "optional": true, - "group": "activation", - "binding": { - "type": "zeebe:property", - "name": "messageIdExpression" - }, - "description": "Expression to extract unique identifier of a message" - }, - { - "label": "Activation condition", - "type": "String", - "group": "activation", - "feel": "required", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "activationCondition" - }, - "description": "Condition under which the Connector triggers. Leave empty to catch all events" - }, - { - "label": "Correlation required", - "description": "Indicates whether correlation is required. This is needed for event-based subprocess message start events", - "id": "correlationRequired", - "group": "correlation", - "type": "Dropdown", - "value": "notRequired", - "choices": [ - { - "name": "Correlation not required", - "value": "notRequired" - }, - { - "name": "Correlation required", - "value": "required" - } - ], - "binding": { - "type": "zeebe:property", - "name": "correlationRequired" - } - }, - { - "label": "Correlation key (process)", - "type": "String", - "group": "correlation", - "feel": "required", - "description": "Sets up the correlation key from process variables", - "binding": { - "type": "bpmn:Message#zeebe:subscription#property", - "name": "correlationKey" - }, - "constraints": { - "notEmpty": true - }, - "condition": { - "property": "correlationRequired", - "equals": "required" - } - }, - { - "label": "Correlation key (payload)", - "type": "String", - "group": "correlation", - "feel": "required", - "binding": { - "type": "zeebe:property", - "name": "correlationKeyExpression" - }, - "description": "Extracts the correlation key from the incoming message payload", - "constraints": { - "notEmpty": true - }, - "condition": { - "property": "correlationRequired", - "equals": "required" - } - }, - { - "label": "Result variable", - "type": "String", - "group": "variable-mapping", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "resultVariable" - }, - "description": "Name of variable to store the result of the connector in" - }, - { - "label": "Result expression", - "description": "Expression to map the inbound payload to process variables", - "group": "variable-mapping", - "type": "Text", - "feel": "required", - "binding": { - "type": "zeebe:property", - "name": "resultExpression" - } - } - ], - "icon": { - "contents": "data:image/svg+xml;utf8,%3Csvg width='18' height='18' viewBox='0 0 256 416' xmlns='http://www.w3.org/2000/svg' preserveAspectRatio='xMidYMid'%3E%3Cpath d='M201.816 230.216c-16.186 0-30.697 7.171-40.634 18.461l-25.463-18.026c2.703-7.442 4.255-15.433 4.255-23.797 0-8.219-1.498-16.076-4.112-23.408l25.406-17.835c9.936 11.233 24.409 18.365 40.548 18.365 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184-29.875 0-54.184 24.305-54.184 54.184 0 5.348.808 10.505 2.258 15.389l-25.423 17.844c-10.62-13.175-25.911-22.374-43.333-25.182v-30.64c24.544-5.155 43.037-26.962 43.037-53.019C124.171 24.305 99.862 0 69.987 0 40.112 0 15.803 24.305 15.803 54.184c0 25.708 18.014 47.246 42.067 52.769v31.038C25.044 143.753 0 172.401 0 206.854c0 34.621 25.292 63.374 58.355 68.94v32.774c-24.299 5.341-42.552 27.011-42.552 52.894 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-25.883-18.253-47.553-42.552-52.894v-32.775a69.965 69.965 0 0 0 42.6-24.776l25.633 18.143c-1.423 4.84-2.22 9.946-2.22 15.24 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184zm0-126.695c14.487 0 26.27 11.788 26.27 26.271s-11.783 26.27-26.27 26.27-26.27-11.787-26.27-26.27c0-14.483 11.783-26.271 26.27-26.271zm-158.1-49.337c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27zm52.541 307.278c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27zm-26.272-117.97c-20.205 0-36.642-16.434-36.642-36.638 0-20.205 16.437-36.642 36.642-36.642 20.204 0 36.641 16.437 36.641 36.642 0 20.204-16.437 36.638-36.641 36.638zm131.831 67.179c-14.487 0-26.27-11.788-26.27-26.271s11.783-26.27 26.27-26.27 26.27 11.787 26.27 26.27c0 14.483-11.783 26.271-26.27 26.271z' style='fill:%23231f20'/%3E%3C/svg%3E" - } -} \ No newline at end of file diff --git a/connectors/kafka/element-templates/kafka-inbound-connector-boundary.json b/connectors/kafka/element-templates/kafka-inbound-connector-boundary.json deleted file mode 100644 index ae6cf5fb51..0000000000 --- a/connectors/kafka/element-templates/kafka-inbound-connector-boundary.json +++ /dev/null @@ -1,280 +0,0 @@ -{ - "$schema": "https://unpkg.com/@camunda/zeebe-element-templates-json-schema/resources/schema.json", - "name": "Kafka Boundary Event Connector", - "id": "io.camunda.connectors.inbound.KafkaBoundary.v1", - "version": 3, - "description": "Consume Kafka messages", - "documentationRef": "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/kafka/?kafka=inbound", - "category": { - "id": "connectors", - "name": "Connectors" - }, - "appliesTo": [ - "bpmn:BoundaryEvent" - ], - "elementType": { - "value": "bpmn:BoundaryEvent", - "eventDefinition": "bpmn:MessageEventDefinition" - }, - "groups": [ - { - "id": "authentication", - "label": "Authentication" - }, - { - "id": "kafka", - "label": "Kafka" - }, - { - "id": "activation", - "label": "Activation" - }, - { - "id": "variable-mapping", - "label": "Variable mapping" - } - ], - "properties": [ - { - "type": "Hidden", - "value": "io.camunda:connector-kafka-inbound:1", - "binding": { - "type": "zeebe:property", - "name": "inbound.type" - } - }, - { - "type": "Hidden", - "generatedValue": { - "type": "uuid" - }, - "binding": { - "type": "bpmn:Message#property", - "name": "name" - } - }, - { - "label": "Authentication type", - "description": "Username/password or custom", - "id": "authenticationType", - "group": "authentication", - "type": "Dropdown", - "value": "credentials", - "choices": [ - { - "name": "Credentials", - "value": "credentials" - }, - { - "name": "Custom", - "value": "custom" - } - ], - "binding": { - "type": "zeebe:property", - "name": "authenticationType" - } - }, - { - "label": "Username", - "description": "Provide the username (must have permissions to produce message to the topic)", - "group": "authentication", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "authentication.username" - }, - "condition": { - "property": "authenticationType", - "equals": "credentials" - } - }, - { - "label": "Password", - "description": "Provide a password for the user", - "group": "authentication", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "authentication.password" - }, - "condition": { - "property": "authenticationType", - "equals": "credentials" - } - }, - { - "label": "Bootstrap servers", - "description": "Provide bootstrap server(s), comma-delimited if there are multiple", - "group": "kafka", - "type": "String", - "feel": "optional", - "binding": { - "type": "zeebe:property", - "name": "topic.bootstrapServers" - }, - "constraints": { - "notEmpty": true - } - }, - { - "label": "Topic", - "description": "Provide the topic name", - "group": "kafka", - "type": "String", - "binding": { - "type": "zeebe:property", - "name": "topic.topicName" - }, - "constraints": { - "notEmpty": true - } - }, - { - "label": "Consumer Group ID", - "description": "Provide the consumer group ID used by the connector. Leave empty for an automatically generated one", - "group": "kafka", - "type": "String", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "groupId" - }, - "constraints": { - "notEmpty": false, - "maxLength": 250 - } - }, - { - "label": "Additional properties", - "description": "Provide additional Kafka consumer properties in JSON", - "group": "kafka", - "type": "String", - "optional": true, - "feel": "required", - "binding": { - "type": "zeebe:property", - "name": "additionalProperties" - } - }, - { - "label": "Offsets", - "description": "List of offsets, e.g. '10' or '=[10, 23]'. If specified, it has to have the same number of values as the number of partitions", - "group": "kafka", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "offsets" - } - }, - { - "label": "Auto offset reset", - "description": "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. You should only select none if you specified the offsets", - "id": "autoOffsetReset", - "group": "kafka", - "type": "Dropdown", - "value": "latest", - "choices": [ - { - "name": "Latest", - "value": "latest" - }, - { - "name": "Earliest", - "value": "earliest" - }, - { - "name": "None", - "value": "none" - } - ], - "binding": { - "type": "zeebe:property", - "name": "autoOffsetReset" - } - }, - { - "label": "Correlation key (process)", - "type": "String", - "group": "activation", - "feel": "required", - "description": "Sets up the correlation key from process variables", - "binding": { - "type": "bpmn:Message#zeebe:subscription#property", - "name": "correlationKey" - }, - "constraints": { - "notEmpty": true - } - }, - { - "label": "Correlation key (payload)", - "type": "String", - "group": "activation", - "feel": "required", - "binding": { - "type": "zeebe:property", - "name": "correlationKeyExpression" - }, - "description": "Extracts the correlation key from the incoming message payload", - "constraints": { - "notEmpty": true - } - }, - { - "label": "Message ID expression", - "feel": "required", - "type": "String", - "optional": true, - "group": "activation", - "binding": { - "type": "zeebe:property", - "name": "messageIdExpression" - }, - "description": "Expression to extract unique identifier of a message" - }, - { - "label": "Activation condition", - "type": "String", - "group": "activation", - "feel": "required", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "activationCondition" - }, - "description": "Condition under which the Connector triggers. Leave empty to catch all events" - }, - { - "label": "Result variable", - "type": "String", - "group": "variable-mapping", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "resultVariable" - }, - "description": "Name of variable to store the result of the connector in" - }, - { - "label": "Result expression", - "description": "Expression to map the inbound payload to process variables", - "group": "variable-mapping", - "type": "Text", - "feel": "required", - "binding": { - "type": "zeebe:property", - "name": "resultExpression" - } - } - ], - "icon": { - "contents": "data:image/svg+xml;utf8,%3Csvg width='18' height='18' viewBox='0 0 256 416' xmlns='http://www.w3.org/2000/svg' preserveAspectRatio='xMidYMid'%3E%3Cpath d='M201.816 230.216c-16.186 0-30.697 7.171-40.634 18.461l-25.463-18.026c2.703-7.442 4.255-15.433 4.255-23.797 0-8.219-1.498-16.076-4.112-23.408l25.406-17.835c9.936 11.233 24.409 18.365 40.548 18.365 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184-29.875 0-54.184 24.305-54.184 54.184 0 5.348.808 10.505 2.258 15.389l-25.423 17.844c-10.62-13.175-25.911-22.374-43.333-25.182v-30.64c24.544-5.155 43.037-26.962 43.037-53.019C124.171 24.305 99.862 0 69.987 0 40.112 0 15.803 24.305 15.803 54.184c0 25.708 18.014 47.246 42.067 52.769v31.038C25.044 143.753 0 172.401 0 206.854c0 34.621 25.292 63.374 58.355 68.94v32.774c-24.299 5.341-42.552 27.011-42.552 52.894 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-25.883-18.253-47.553-42.552-52.894v-32.775a69.965 69.965 0 0 0 42.6-24.776l25.633 18.143c-1.423 4.84-2.22 9.946-2.22 15.24 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184zm0-126.695c14.487 0 26.27 11.788 26.27 26.271s-11.783 26.27-26.27 26.27-26.27-11.787-26.27-26.27c0-14.483 11.783-26.271 26.27-26.271zm-158.1-49.337c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27zm52.541 307.278c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27zm-26.272-117.97c-20.205 0-36.642-16.434-36.642-36.638 0-20.205 16.437-36.642 36.642-36.642 20.204 0 36.641 16.437 36.641 36.642 0 20.204-16.437 36.638-36.641 36.638zm131.831 67.179c-14.487 0-26.27-11.788-26.27-26.271s11.783-26.27 26.27-26.27 26.27 11.787 26.27 26.27c0 14.483-11.783 26.271-26.27 26.271z' style='fill:%23231f20'/%3E%3C/svg%3E" - } -} \ No newline at end of file diff --git a/connectors/kafka/element-templates/kafka-inbound-connector-hybrid.json b/connectors/kafka/element-templates/kafka-inbound-connector-hybrid.json deleted file mode 100644 index 6e8d6dac71..0000000000 --- a/connectors/kafka/element-templates/kafka-inbound-connector-hybrid.json +++ /dev/null @@ -1,249 +0,0 @@ -{ - "$schema": "https://unpkg.com/@camunda/zeebe-element-templates-json-schema/resources/schema.json", - "name": "Kafka Start Event Connector", - "id": "io.camunda.connectors.inbound.KAFKA.v1", - "version": 3, - "description": "Consume Kafka messages in hybrid mode", - "documentationRef": "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/kafka/?kafka=inbound", - "category": { - "id": "connectors", - "name": "Connectors" - }, - "appliesTo": [ - "bpmn:StartEvent" - ], - "elementType": { - "value": "bpmn:StartEvent" - }, - "groups": [ - { - "id": "taskDefinitionType", - "label": "Task definition type" - }, - { - "id": "authentication", - "label": "Authentication" - }, - { - "id": "kafka", - "label": "Kafka" - }, - { - "id": "activation", - "label": "Activation" - }, - { - "id": "variable-mapping", - "label": "Variable mapping" - } - ], - "properties": [ - { - "type": "String", - "group": "taskDefinitionType", - "value": "io.camunda:connector-kafka-inbound:2", - "binding": { - "type": "zeebe:property", - "name": "inbound.type" - }, - "constraints": { - "notEmpty": true - } - }, - { - "label": "Authentication type", - "description": "Username/password or custom", - "id": "authenticationType", - "group": "authentication", - "type": "Dropdown", - "value": "credentials", - "choices": [ - { - "name": "Credentials", - "value": "credentials" - }, - { - "name": "Custom", - "value": "custom" - } - ], - "binding": { - "type": "zeebe:property", - "name": "authenticationType" - } - }, - { - "label": "Username", - "description": "Provide the username (must have permissions to produce message to the topic)", - "group": "authentication", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "authentication.username" - }, - "condition": { - "property": "authenticationType", - "equals": "credentials" - } - }, - { - "label": "Password", - "description": "Provide a password for the user", - "group": "authentication", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "authentication.password" - }, - "condition": { - "property": "authenticationType", - "equals": "credentials" - } - }, - { - "label": "Bootstrap servers", - "description": "Provide bootstrap server(s), comma-delimited if there are multiple", - "group": "kafka", - "type": "String", - "feel": "optional", - "binding": { - "type": "zeebe:property", - "name": "topic.bootstrapServers" - }, - "constraints": { - "notEmpty": true - } - }, - { - "label": "Topic", - "description": "Provide the topic name", - "group": "kafka", - "type": "String", - "binding": { - "type": "zeebe:property", - "name": "topic.topicName" - }, - "constraints": { - "notEmpty": true - } - }, - { - "label": "Consumer Group ID", - "description": "Provide the consumer group ID used by the connector. Leave empty for an automatically generated one", - "group": "kafka", - "type": "String", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "groupId" - }, - "constraints": { - "notEmpty": false, - "maxLength": 250 - } - }, - { - "label": "Headers", - "description": "Provide Kafka producer headers in JSON", - "group": "kafka", - "type": "String", - "optional": true, - "feel": "required", - "binding": { - "type": "zeebe:input", - "name": "headers" - } - }, - { - "label": "Additional properties", - "description": "Provide additional Kafka consumer properties in JSON", - "group": "kafka", - "type": "String", - "optional": true, - "feel": "required", - "binding": { - "type": "zeebe:property", - "name": "additionalProperties" - } - }, - { - "label": "Offsets", - "description": "List of offsets, e.g. '10' or '=[10, 23]'. If specified, it has to have the same number of values as the number of partitions", - "group": "kafka", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "offsets" - } - }, - { - "label": "Auto offset reset", - "description": "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. You should only select none if you specified the offsets", - "id": "autoOffsetReset", - "group": "kafka", - "type": "Dropdown", - "value": "latest", - "choices": [ - { - "name": "Latest", - "value": "latest" - }, - { - "name": "Earliest", - "value": "earliest" - }, - { - "name": "None", - "value": "none" - } - ], - "binding": { - "type": "zeebe:property", - "name": "autoOffsetReset" - } - }, - { - "label": "Activation condition", - "type": "String", - "group": "activation", - "feel": "required", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "activationCondition" - }, - "description": "Condition under which the Connector triggers. Leave empty to catch all events" - }, - { - "label": "Result variable", - "type": "String", - "group": "variable-mapping", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "resultVariable" - }, - "description": "Name of variable to store the result of the connector in" - }, - { - "label": "Result expression", - "description": "Expression to map the inbound payload to process variables", - "group": "variable-mapping", - "type": "Text", - "feel": "required", - "binding": { - "type": "zeebe:property", - "name": "resultExpression" - } - } - ], - "icon": { - "contents": "data:image/svg+xml;utf8,%3Csvg width='18' height='18' viewBox='0 0 256 416' xmlns='http://www.w3.org/2000/svg' preserveAspectRatio='xMidYMid'%3E%3Cpath d='M201.816 230.216c-16.186 0-30.697 7.171-40.634 18.461l-25.463-18.026c2.703-7.442 4.255-15.433 4.255-23.797 0-8.219-1.498-16.076-4.112-23.408l25.406-17.835c9.936 11.233 24.409 18.365 40.548 18.365 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184-29.875 0-54.184 24.305-54.184 54.184 0 5.348.808 10.505 2.258 15.389l-25.423 17.844c-10.62-13.175-25.911-22.374-43.333-25.182v-30.64c24.544-5.155 43.037-26.962 43.037-53.019C124.171 24.305 99.862 0 69.987 0 40.112 0 15.803 24.305 15.803 54.184c0 25.708 18.014 47.246 42.067 52.769v31.038C25.044 143.753 0 172.401 0 206.854c0 34.621 25.292 63.374 58.355 68.94v32.774c-24.299 5.341-42.552 27.011-42.552 52.894 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-25.883-18.253-47.553-42.552-52.894v-32.775a69.965 69.965 0 0 0 42.6-24.776l25.633 18.143c-1.423 4.84-2.22 9.946-2.22 15.24 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184zm0-126.695c14.487 0 26.27 11.788 26.27 26.271s-11.783 26.27-26.27 26.27-26.27-11.787-26.27-26.27c0-14.483 11.783-26.271 26.27-26.271zm-158.1-49.337c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27zm52.541 307.278c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27zm-26.272-117.97c-20.205 0-36.642-16.434-36.642-36.638 0-20.205 16.437-36.642 36.642-36.642 20.204 0 36.641 16.437 36.641 36.642 0 20.204-16.437 36.638-36.641 36.638zm131.831 67.179c-14.487 0-26.27-11.788-26.27-26.271s11.783-26.27 26.27-26.27 26.27 11.787 26.27 26.27c0 14.483-11.783 26.271-26.27 26.271z' style='fill:%23231f20'/%3E%3C/svg%3E" - } -} \ No newline at end of file diff --git a/connectors/kafka/element-templates/kafka-inbound-connector-intermediate.json b/connectors/kafka/element-templates/kafka-inbound-connector-intermediate.json index dacfeaa103..15f0746b3d 100644 --- a/connectors/kafka/element-templates/kafka-inbound-connector-intermediate.json +++ b/connectors/kafka/element-templates/kafka-inbound-connector-intermediate.json @@ -1,281 +1,307 @@ { - "$schema": "https://unpkg.com/@camunda/zeebe-element-templates-json-schema/resources/schema.json", - "name": "Kafka Intermediate Catch Event Connector", - "id": "io.camunda.connectors.inbound.KafkaIntermediate.v1", - "version": 4, - "description": "Consume Kafka messages", - "documentationRef": "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/kafka/?kafka=inbound", - "category": { - "id": "connectors", - "name": "Connectors" + "$schema" : "https://unpkg.com/@camunda/zeebe-element-templates-json-schema/resources/schema.json", + "name" : "Kafka Intermediate Catch Event Connector", + "id" : "io.camunda.connectors.inbound.KafkaIntermediate.v1", + "description" : "Consume Kafka messages", + "documentationRef" : "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/kafka/?kafka=inbound", + "version" : 4, + "category" : { + "id" : "connectors", + "name" : "Connectors" }, - "appliesTo": [ - "bpmn:IntermediateCatchEvent", - "bpmn:IntermediateThrowEvent" - ], - "elementType": { - "value": "bpmn:IntermediateCatchEvent", - "eventDefinition": "bpmn:MessageEventDefinition" + "appliesTo" : [ "bpmn:IntermediateCatchEvent", "bpmn:IntermediateThrowEvent" ], + "elementType" : { + "value" : "bpmn:IntermediateCatchEvent", + "eventDefinition" : "bpmn:MessageEventDefinition" }, - "groups": [ - { - "id": "authentication", - "label": "Authentication" + "groups" : [ { + "id" : "authentication", + "label" : "Authentication" + }, { + "id" : "kafka", + "label" : "Kafka" + }, { + "id" : "message", + "label" : "Message deserialization" + }, { + "id" : "activation", + "label" : "Activation" + }, { + "id" : "correlation", + "label" : "Correlation" + }, { + "id" : "output", + "label" : "Output mapping" + } ], + "properties" : [ { + "value" : "io.camunda:connector-kafka-inbound:1", + "binding" : { + "name" : "inbound.type", + "type" : "zeebe:property" }, - { - "id": "kafka", - "label": "Kafka" + "type" : "Hidden" + }, { + "id" : "authenticationType", + "label" : "Authentication type", + "description" : "Username/password or custom", + "optional" : false, + "value" : "credentials", + "constraints" : { + "notEmpty" : true }, - { - "id": "activation", - "label": "Activation" + "group" : "authentication", + "binding" : { + "name" : "authenticationType", + "type" : "zeebe:property" }, - { - "id": "variable-mapping", - "label": "Variable mapping" - } - ], - "properties": [ - { - "type": "Hidden", - "value": "io.camunda:connector-kafka-inbound:1", - "binding": { - "type": "zeebe:property", - "name": "inbound.type" - } + "type" : "Dropdown", + "choices" : [ { + "name" : "Credentials", + "value" : "credentials" + }, { + "name" : "Custom", + "value" : "custom" + } ] + }, { + "id" : "authentication.username", + "label" : "Username", + "description" : "Provide the username (must have permissions to produce message to the topic)", + "optional" : true, + "feel" : "optional", + "group" : "authentication", + "binding" : { + "name" : "authentication.username", + "type" : "zeebe:property" }, - { - "type": "Hidden", - "generatedValue": { - "type": "uuid" - }, - "binding": { - "type": "bpmn:Message#property", - "name": "name" - } + "type" : "String" + }, { + "id" : "authentication.password", + "label" : "Password", + "description" : "Provide a password for the user", + "optional" : true, + "feel" : "optional", + "group" : "authentication", + "binding" : { + "name" : "authentication.password", + "type" : "zeebe:property" }, - { - "label": "Authentication type", - "description": "Username/password or custom", - "id": "authenticationType", - "group": "authentication", - "type": "Dropdown", - "value": "credentials", - "choices": [ - { - "name": "Credentials", - "value": "credentials" - }, - { - "name": "Custom", - "value": "custom" - } - ], - "binding": { - "type": "zeebe:property", - "name": "authenticationType" - } + "type" : "String" + }, { + "id" : "serializationType", + "label" : "Deserialization type", + "description" : "Select the deserialization type. For details, visit the documentation", + "optional" : false, + "value" : "json", + "group" : "kafka", + "binding" : { + "name" : "serializationType", + "type" : "zeebe:property" }, - { - "label": "Username", - "description": "Provide the username (must have permissions to produce message to the topic)", - "group": "authentication", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "authentication.username" - }, - "condition": { - "property": "authenticationType", - "equals": "credentials" - } + "type" : "Dropdown", + "choices" : [ { + "name" : "Default (JSON)", + "value" : "json" + }, { + "name" : "AVRO (experimental)", + "value" : "avro" + } ] + }, { + "id" : "topic.bootstrapServers", + "label" : "Bootstrap servers", + "description" : "Provide bootstrap server(s), comma-delimited if there are multiple", + "optional" : false, + "constraints" : { + "notEmpty" : true }, - { - "label": "Password", - "description": "Provide a password for the user", - "group": "authentication", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "authentication.password" - }, - "condition": { - "property": "authenticationType", - "equals": "credentials" - } + "feel" : "optional", + "group" : "kafka", + "binding" : { + "name" : "topic.bootstrapServers", + "type" : "zeebe:property" }, - { - "label": "Bootstrap servers", - "description": "Provide bootstrap server(s), comma-delimited if there are multiple", - "group": "kafka", - "type": "String", - "feel": "optional", - "binding": { - "type": "zeebe:property", - "name": "topic.bootstrapServers" - }, - "constraints": { - "notEmpty": true - } + "type" : "String" + }, { + "id" : "topic.topicName", + "label" : "Topic", + "description" : "Provide topic name", + "optional" : false, + "constraints" : { + "notEmpty" : true }, - { - "label": "Topic", - "description": "Provide the topic name", - "group": "kafka", - "type": "String", - "binding": { - "type": "zeebe:property", - "name": "topic.topicName" - }, - "constraints": { - "notEmpty": true - } + "feel" : "optional", + "group" : "kafka", + "binding" : { + "name" : "topic.topicName", + "type" : "zeebe:property" }, - { - "label": "Consumer Group ID", - "description": "Provide the consumer group ID used by the connector. Leave empty for an automatically generated one", - "group": "kafka", - "type": "String", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "groupId" - }, - "constraints": { - "notEmpty": false, - "maxLength": 250 - } + "type" : "String" + }, { + "id" : "groupId", + "label" : "Consumer Group ID", + "description" : "Provide the consumer group ID used by the connector. Leave empty for an automatically generated one", + "optional" : false, + "feel" : "optional", + "group" : "kafka", + "binding" : { + "name" : "groupId", + "type" : "zeebe:property" }, - { - "label": "Additional properties", - "description": "Provide additional Kafka consumer properties in JSON", - "group": "kafka", - "type": "String", - "optional": true, - "feel": "required", - "binding": { - "type": "zeebe:property", - "name": "additionalProperties" - } + "type" : "String" + }, { + "id" : "additionalProperties", + "label" : "Additional properties", + "description" : "Provide additional Kafka consumer properties in JSON", + "optional" : true, + "feel" : "required", + "group" : "kafka", + "binding" : { + "name" : "additionalProperties", + "type" : "zeebe:property" }, - { - "label": "Offsets", - "description": "List of offsets, e.g. '10' or '=[10, 23]'. If specified, it has to have the same number of values as the number of partitions", - "group": "kafka", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "offsets" - } + "type" : "String" + }, { + "id" : "offsets", + "label" : "Offsets", + "description" : "List of offsets, e.g. '10' or '=[10, 23]'. If specified, it has to have the same number of values as the number of partitions", + "optional" : false, + "feel" : "optional", + "group" : "kafka", + "binding" : { + "name" : "offsets", + "type" : "zeebe:property" }, - { - "label": "Auto offset reset", - "description": "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. You should only select none if you specified the offsets", - "id": "autoOffsetReset", - "group": "kafka", - "type": "Dropdown", - "value": "latest", - "choices": [ - { - "name": "Latest", - "value": "latest" - }, - { - "name": "Earliest", - "value": "earliest" - }, - { - "name": "None", - "value": "none" - } - ], - "binding": { - "type": "zeebe:property", - "name": "autoOffsetReset" - } + "type" : "String" + }, { + "id" : "autoOffsetReset", + "label" : "Auto offset reset", + "description" : "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. You should only select none if you specified the offsets", + "optional" : false, + "value" : "latest", + "constraints" : { + "notEmpty" : true }, - { - "label": "Correlation key (process)", - "type": "String", - "group": "activation", - "feel": "required", - "description": "Sets up the correlation key from process variables", - "binding": { - "type": "bpmn:Message#zeebe:subscription#property", - "name": "correlationKey" - }, - "constraints": { - "notEmpty": true - } + "group" : "kafka", + "binding" : { + "name" : "autoOffsetReset", + "type" : "zeebe:property" }, - { - "label": "Correlation key (payload)", - "type": "String", - "group": "activation", - "feel": "required", - "binding": { - "type": "zeebe:property", - "name": "correlationKeyExpression" - }, - "description": "Extracts the correlation key from the incoming message payload", - "constraints": { - "notEmpty": true - } + "type" : "Dropdown", + "choices" : [ { + "name" : "None", + "value" : "none" + }, { + "name" : "Latest", + "value" : "latest" + }, { + "name" : "Earliest", + "value" : "earliest" + } ] + }, { + "id" : "avro.schema", + "label" : "Avro schema", + "description" : "Avro schema for the message value", + "optional" : false, + "constraints" : { + "notEmpty" : true }, - { - "label": "Message ID expression", - "feel": "required", - "type": "String", - "optional": true, - "group": "activation", - "binding": { - "type": "zeebe:property", - "name": "messageIdExpression" - }, - "description": "Expression to extract unique identifier of a message" + "feel" : "required", + "group" : "message", + "binding" : { + "name" : "avro.schema", + "type" : "zeebe:property" }, - { - "label": "Activation condition", - "type": "String", - "group": "activation", - "feel": "required", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "activationCondition" - }, - "description": "Condition under which the Connector triggers. Leave empty to catch all events" + "condition" : { + "property" : "serializationType", + "equals" : "avro", + "type" : "simple" }, - { - "label": "Result variable", - "type": "String", - "group": "variable-mapping", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "resultVariable" - }, - "description": "Name of variable to store the result of the connector in" + "type" : "Text" + }, { + "id" : "activationCondition", + "label" : "Activation condition", + "description" : "Condition under which the Connector triggers. Leave empty to catch all events", + "optional" : true, + "feel" : "required", + "group" : "activation", + "binding" : { + "name" : "activationCondition", + "type" : "zeebe:property" }, - { - "label": "Result expression", - "description": "Expression to map the inbound payload to process variables", - "group": "variable-mapping", - "type": "Text", - "feel": "required", - "binding": { - "type": "zeebe:property", - "name": "resultExpression" - } - } - ], - "icon": { - "contents": "data:image/svg+xml;utf8,%3Csvg width='18' height='18' viewBox='0 0 256 416' xmlns='http://www.w3.org/2000/svg' preserveAspectRatio='xMidYMid'%3E%3Cpath d='M201.816 230.216c-16.186 0-30.697 7.171-40.634 18.461l-25.463-18.026c2.703-7.442 4.255-15.433 4.255-23.797 0-8.219-1.498-16.076-4.112-23.408l25.406-17.835c9.936 11.233 24.409 18.365 40.548 18.365 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184-29.875 0-54.184 24.305-54.184 54.184 0 5.348.808 10.505 2.258 15.389l-25.423 17.844c-10.62-13.175-25.911-22.374-43.333-25.182v-30.64c24.544-5.155 43.037-26.962 43.037-53.019C124.171 24.305 99.862 0 69.987 0 40.112 0 15.803 24.305 15.803 54.184c0 25.708 18.014 47.246 42.067 52.769v31.038C25.044 143.753 0 172.401 0 206.854c0 34.621 25.292 63.374 58.355 68.94v32.774c-24.299 5.341-42.552 27.011-42.552 52.894 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-25.883-18.253-47.553-42.552-52.894v-32.775a69.965 69.965 0 0 0 42.6-24.776l25.633 18.143c-1.423 4.84-2.22 9.946-2.22 15.24 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184zm0-126.695c14.487 0 26.27 11.788 26.27 26.271s-11.783 26.27-26.27 26.27-26.27-11.787-26.27-26.27c0-14.483 11.783-26.271 26.27-26.271zm-158.1-49.337c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27zm52.541 307.278c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27zm-26.272-117.97c-20.205 0-36.642-16.434-36.642-36.638 0-20.205 16.437-36.642 36.642-36.642 20.204 0 36.641 16.437 36.641 36.642 0 20.204-16.437 36.638-36.641 36.638zm131.831 67.179c-14.487 0-26.27-11.788-26.27-26.271s11.783-26.27 26.27-26.27 26.27 11.787 26.27 26.27c0 14.483-11.783 26.271-26.27 26.271z' style='fill:%23231f20'/%3E%3C/svg%3E" + "type" : "String" + }, { + "id" : "correlationKeyProcess", + "label" : "Correlation key (process)", + "description" : "Sets up the correlation key from process variables", + "constraints" : { + "notEmpty" : true + }, + "feel" : "required", + "group" : "correlation", + "binding" : { + "name" : "correlationKey", + "type" : "bpmn:Message#zeebe:subscription#property" + }, + "type" : "String" + }, { + "id" : "correlationKeyPayload", + "label" : "Correlation key (payload)", + "description" : "Extracts the correlation key from the incoming message payload", + "constraints" : { + "notEmpty" : true + }, + "feel" : "required", + "group" : "correlation", + "binding" : { + "name" : "correlationKeyExpression", + "type" : "zeebe:property" + }, + "type" : "String" + }, { + "id" : "messageIdExpression", + "label" : "Message ID expression", + "description" : "Expression to extract unique identifier of a message", + "optional" : true, + "feel" : "required", + "group" : "correlation", + "binding" : { + "name" : "messageIdExpression", + "type" : "zeebe:property" + }, + "type" : "String" + }, { + "id" : "messageNameUuid", + "generatedValue" : { + "type" : "uuid" + }, + "group" : "correlation", + "binding" : { + "name" : "name", + "type" : "bpmn:Message#property" + }, + "type" : "Hidden" + }, { + "id" : "resultVariable", + "label" : "Result variable", + "description" : "Name of variable to store the response in", + "group" : "output", + "binding" : { + "name" : "resultVariable", + "type" : "zeebe:property" + }, + "type" : "String" + }, { + "id" : "resultExpression", + "label" : "Result expression", + "description" : "Expression to map the response into process variables", + "feel" : "required", + "group" : "output", + "binding" : { + "name" : "resultExpression", + "type" : "zeebe:property" + }, + "type" : "Text" + } ], + "icon" : { + "contents" : "" } } \ No newline at end of file diff --git a/connectors/kafka/element-templates/kafka-inbound-connector.json b/connectors/kafka/element-templates/kafka-inbound-connector.json deleted file mode 100644 index 911ed24699..0000000000 --- a/connectors/kafka/element-templates/kafka-inbound-connector.json +++ /dev/null @@ -1,239 +0,0 @@ -{ - "$schema": "https://unpkg.com/@camunda/zeebe-element-templates-json-schema/resources/schema.json", - "name": "Kafka Start Event Connector", - "id": "io.camunda.connectors.inbound.KAFKA.v1", - "version": 3, - "description": "Consume Kafka messages", - "documentationRef": "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/kafka/?kafka=inbound", - "category": { - "id": "connectors", - "name": "Connectors" - }, - "appliesTo": [ - "bpmn:StartEvent" - ], - "elementType": { - "value": "bpmn:StartEvent" - }, - "groups": [ - { - "id": "authentication", - "label": "Authentication" - }, - { - "id": "kafka", - "label": "Kafka" - }, - { - "id": "activation", - "label": "Activation" - }, - { - "id": "variable-mapping", - "label": "Variable mapping" - } - ], - "properties": [ - { - "type": "Hidden", - "value": "io.camunda:connector-kafka-inbound:1", - "binding": { - "type": "zeebe:property", - "name": "inbound.type" - } - }, - { - "label": "Authentication type", - "id": "authenticationType", - "description": "Username/password or custom", - "group": "authentication", - "type": "Dropdown", - "value": "credentials", - "choices": [ - { - "name": "Credentials", - "value": "credentials" - }, - { - "name": "Custom", - "value": "custom" - } - ], - "binding": { - "type": "zeebe:property", - "name": "authenticationType" - } - }, - { - "label": "Username", - "id": "authentication.username", - "description": "Provide the username (must have permissions to produce message to the topic)", - "group": "authentication", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "authentication.username" - }, - "condition": { - "property": "authenticationType", - "equals": "credentials" - } - }, - { - "label": "Password", - "id": "authentication.password", - "description": "Provide a password for the user", - "group": "authentication", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "authentication.password" - }, - "condition": { - "property": "authenticationType", - "equals": "credentials" - } - }, - { - "label": "Bootstrap servers", - "id": "topic.bootstrapServers", - "description": "Provide bootstrap server(s), comma-delimited if there are multiple", - "group": "kafka", - "type": "String", - "feel": "optional", - "binding": { - "type": "zeebe:property", - "name": "topic.bootstrapServers" - }, - "constraints": { - "notEmpty": true - } - }, - { - "label": "Topic", - "id": "topic.topicName", - "description": "Provide the topic name", - "group": "kafka", - "type": "String", - "binding": { - "type": "zeebe:property", - "name": "topic.topicName" - }, - "constraints": { - "notEmpty": true - } - }, - { - "label": "Consumer Group ID", - "id": "groupId", - "description": "Provide the consumer group ID used by the connector. Leave empty for an automatically generated one", - "group": "kafka", - "type": "String", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "groupId" - }, - "constraints": { - "notEmpty": false, - "maxLength": 250 - } - }, - { - "label": "Additional properties", - "id": "additionalProperties", - "description": "Provide additional Kafka consumer properties in JSON", - "group": "kafka", - "type": "String", - "optional": true, - "feel": "required", - "binding": { - "type": "zeebe:property", - "name": "additionalProperties" - } - }, - { - "label": "Offsets", - "id": "offsets", - "description": "List of offsets, e.g. '10' or '=[10, 23]'. If specified, it has to have the same number of values as the number of partitions", - "group": "kafka", - "type": "String", - "feel": "optional", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "offsets" - } - }, - { - "label": "Auto offset reset", - "id": "autoOffsetReset", - "description": "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. You should only select none if you specified the offsets", - "group": "kafka", - "type": "Dropdown", - "value": "latest", - "choices": [ - { - "name": "Latest", - "value": "latest" - }, - { - "name": "Earliest", - "value": "earliest" - }, - { - "name": "None", - "value": "none" - } - ], - "binding": { - "type": "zeebe:property", - "name": "autoOffsetReset" - } - }, - { - "label": "Activation condition", - "id": "activationCondition", - "type": "String", - "group": "activation", - "feel": "required", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "activationCondition" - }, - "description": "Condition under which the Connector triggers. Leave empty to catch all events" - }, - { - "label": "Result variable", - "id": "resultVariable", - "type": "String", - "group": "variable-mapping", - "optional": true, - "binding": { - "type": "zeebe:property", - "name": "resultVariable" - }, - "description": "Name of variable to store the result of the connector in" - }, - { - "label": "Result expression", - "id": "resultExpression", - "description": "Expression to map the inbound payload to process variables", - "group": "variable-mapping", - "type": "Text", - "feel": "required", - "binding": { - "type": "zeebe:property", - "name": "resultExpression" - } - } - ], - "icon": { - "contents": "data:image/svg+xml;utf8,%3Csvg width='18' height='18' viewBox='0 0 256 416' xmlns='http://www.w3.org/2000/svg' preserveAspectRatio='xMidYMid'%3E%3Cpath d='M201.816 230.216c-16.186 0-30.697 7.171-40.634 18.461l-25.463-18.026c2.703-7.442 4.255-15.433 4.255-23.797 0-8.219-1.498-16.076-4.112-23.408l25.406-17.835c9.936 11.233 24.409 18.365 40.548 18.365 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184-29.875 0-54.184 24.305-54.184 54.184 0 5.348.808 10.505 2.258 15.389l-25.423 17.844c-10.62-13.175-25.911-22.374-43.333-25.182v-30.64c24.544-5.155 43.037-26.962 43.037-53.019C124.171 24.305 99.862 0 69.987 0 40.112 0 15.803 24.305 15.803 54.184c0 25.708 18.014 47.246 42.067 52.769v31.038C25.044 143.753 0 172.401 0 206.854c0 34.621 25.292 63.374 58.355 68.94v32.774c-24.299 5.341-42.552 27.011-42.552 52.894 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-25.883-18.253-47.553-42.552-52.894v-32.775a69.965 69.965 0 0 0 42.6-24.776l25.633 18.143c-1.423 4.84-2.22 9.946-2.22 15.24 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184zm0-126.695c14.487 0 26.27 11.788 26.27 26.271s-11.783 26.27-26.27 26.27-26.27-11.787-26.27-26.27c0-14.483 11.783-26.271 26.27-26.271zm-158.1-49.337c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27zm52.541 307.278c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27zm-26.272-117.97c-20.205 0-36.642-16.434-36.642-36.638 0-20.205 16.437-36.642 36.642-36.642 20.204 0 36.641 16.437 36.641 36.642 0 20.204-16.437 36.638-36.641 36.638zm131.831 67.179c-14.487 0-26.27-11.788-26.27-26.271s11.783-26.27 26.27-26.27 26.27 11.787 26.27 26.27c0 14.483-11.783 26.271-26.27 26.271z' style='fill:%23231f20'/%3E%3C/svg%3E" - } -} \ No newline at end of file diff --git a/connectors/kafka/element-templates/kafka-outbound-connector-hybrid.json b/connectors/kafka/element-templates/kafka-outbound-connector-hybrid.json deleted file mode 100644 index 714476ba1a..0000000000 --- a/connectors/kafka/element-templates/kafka-outbound-connector-hybrid.json +++ /dev/null @@ -1,254 +0,0 @@ -{ - "$schema" : "https://unpkg.com/@camunda/zeebe-element-templates-json-schema/resources/schema.json", - "name" : "Kafka Outbound Connector", - "id" : "io.camunda.connectors.KAFKA.v1", - "description" : "Produce Kafka message", - "documentationRef" : "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/kafka/?kafka=outbound", - "version" : 4, - "category" : { - "id" : "connectors", - "name" : "Connectors" - }, - "appliesTo" : [ "bpmn:Task" ], - "elementType" : { - "value" : "bpmn:ServiceTask" - }, - "groups" : [ { - "id" : "taskDefinitionType", - "label" : "Task definition type" - }, { - "id" : "authentication", - "label" : "Authentication" - }, { - "id" : "kafka", - "label" : "Kafka" - }, { - "id" : "message", - "label" : "Message" - }, { - "id" : "output", - "label" : "Output mapping" - }, { - "id" : "error", - "label" : "Error handling" - }, { - "id" : "retries", - "label" : "Retries" - } ], - "properties" : [ { - "id" : "taskDefinitionType", - "value" : "io.camunda:connector-kafka:1", - "group" : "taskDefinitionType", - "binding" : { - "property" : "type", - "type" : "zeebe:taskDefinition" - }, - "type" : "String" - }, { - "id" : "authentication.username", - "label" : "Username", - "description" : "Provide the username (must have permissions to produce message to the topic)", - "optional" : true, - "feel" : "optional", - "group" : "authentication", - "binding" : { - "name" : "authentication.username", - "type" : "zeebe:input" - }, - "type" : "String" - }, { - "id" : "authentication.password", - "label" : "Password", - "description" : "Provide a password for the user", - "optional" : true, - "feel" : "optional", - "group" : "authentication", - "binding" : { - "name" : "authentication.password", - "type" : "zeebe:input" - }, - "type" : "String" - }, { - "id" : "serializationType", - "label" : "Serialization type", - "description" : "Select the serialization type. For details, visit the documentation", - "optional" : false, - "value" : "json", - "group" : "kafka", - "binding" : { - "name" : "serializationType", - "type" : "zeebe:input" - }, - "type" : "Dropdown", - "choices" : [ { - "name" : "Default (JSON)", - "value" : "json" - }, { - "name" : "AVRO (experimental)", - "value" : "avro" - } ] - }, { - "id" : "topic.bootstrapServers", - "label" : "Bootstrap servers", - "description" : "Provide bootstrap server(s), comma-delimited if there are multiple", - "optional" : false, - "constraints" : { - "notEmpty" : true - }, - "feel" : "optional", - "group" : "kafka", - "binding" : { - "name" : "topic.bootstrapServers", - "type" : "zeebe:input" - }, - "type" : "String" - }, { - "id" : "topic.topicName", - "label" : "Topic", - "description" : "Provide topic name", - "optional" : false, - "constraints" : { - "notEmpty" : true - }, - "feel" : "optional", - "group" : "kafka", - "binding" : { - "name" : "topic.topicName", - "type" : "zeebe:input" - }, - "type" : "String" - }, { - "id" : "headers", - "label" : "Headers", - "description" : "Provide Kafka producer headers in JSON", - "optional" : true, - "feel" : "required", - "group" : "kafka", - "binding" : { - "name" : "headers", - "type" : "zeebe:input" - }, - "type" : "String" - }, { - "id" : "additionalProperties", - "label" : "Additional properties", - "description" : "Provide additional Kafka producer properties in JSON", - "optional" : true, - "feel" : "required", - "group" : "kafka", - "binding" : { - "name" : "additionalProperties", - "type" : "zeebe:input" - }, - "type" : "String" - }, { - "id" : "message.key", - "label" : "Key", - "description" : "Provide message key", - "optional" : false, - "constraints" : { - "notEmpty" : true - }, - "feel" : "optional", - "group" : "message", - "binding" : { - "name" : "message.key", - "type" : "zeebe:input" - }, - "type" : "String" - }, { - "id" : "message.value", - "label" : "Value", - "description" : "Provide message value", - "optional" : false, - "constraints" : { - "notEmpty" : true - }, - "feel" : "optional", - "group" : "message", - "binding" : { - "name" : "message.value", - "type" : "zeebe:input" - }, - "type" : "String" - }, { - "id" : "avro.schema", - "label" : "Avro schema", - "description" : "Avro schema for the message value", - "optional" : false, - "constraints" : { - "notEmpty" : true - }, - "feel" : "required", - "group" : "message", - "binding" : { - "name" : "avro.schema", - "type" : "zeebe:input" - }, - "condition" : { - "property" : "serializationType", - "equals" : "avro", - "type" : "simple" - }, - "type" : "Text" - }, { - "id" : "resultVariable", - "label" : "Result variable", - "description" : "Name of variable to store the response in", - "group" : "output", - "binding" : { - "key" : "resultVariable", - "type" : "zeebe:taskHeader" - }, - "type" : "String" - }, { - "id" : "resultExpression", - "label" : "Result expression", - "description" : "Expression to map the response into process variables", - "feel" : "required", - "group" : "output", - "binding" : { - "key" : "resultExpression", - "type" : "zeebe:taskHeader" - }, - "type" : "Text" - }, { - "id" : "errorExpression", - "label" : "Error expression", - "description" : "Expression to handle errors. Details in the documentation.", - "feel" : "required", - "group" : "error", - "binding" : { - "key" : "errorExpression", - "type" : "zeebe:taskHeader" - }, - "type" : "Text" - }, { - "id" : "retryCount", - "label" : "Retries", - "description" : "Number of retries", - "value" : "3", - "feel" : "optional", - "group" : "retries", - "binding" : { - "property" : "retries", - "type" : "zeebe:taskDefinition" - }, - "type" : "String" - }, { - "id" : "retryBackoff", - "label" : "Retry backoff", - "description" : "ISO-8601 duration to wait between retries", - "value" : "PT0S", - "feel" : "optional", - "group" : "retries", - "binding" : { - "key" : "retryBackoff", - "type" : "zeebe:taskHeader" - }, - "type" : "String" - } ], - "icon" : { - "contents" : "" - } -} \ No newline at end of file diff --git a/connectors/kafka/pom.xml b/connectors/kafka/pom.xml index 68e686ef06..0f3ef40c59 100644 --- a/connectors/kafka/pom.xml +++ b/connectors/kafka/pom.xml @@ -85,13 +85,41 @@ except in compliance with the proprietary license. io.camunda.connector element-template-generator-maven-plugin - ${project.version} - - io.camunda.connector.kafka.outbound.KafkaConnectorFunction - - true - kafka-outbound-connector + + + io.camunda.connector.kafka.outbound.KafkaConnectorFunction + + + io.camunda.connectors.KAFKA.v1 + kafka-outbound-connector.json + + + true + + + io.camunda.connector.kafka.inbound.KafkaExecutable + + + io.camunda.connectors.inbound.KAFKA.v1 + kafka-inbound-connector-start-event.json + + + io.camunda.connectors.inbound.KafkaMessageStart.v1 + kafka-inbound-start-message.json + + + io.camunda.connectors.inbound.KafkaIntermediate.v1 + kafka-inbound-connector-intermediate.json + + + iio.camunda.connectors.inbound.KafkaBoundary.v1 + kafka-inbound-connector-boundary.json + + + true + + diff --git a/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorConsumer.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorConsumer.java index d500287991..4dd7fb5fcf 100644 --- a/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorConsumer.java +++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorConsumer.java @@ -88,8 +88,8 @@ public KafkaConnectorConsumer( } public void startConsumer() { - if (elementProps.getAvro() != null) { - var schemaString = StringEscapeUtils.unescapeJson(elementProps.getAvro().schema()); + if (elementProps.avro() != null) { + var schemaString = StringEscapeUtils.unescapeJson(elementProps.avro().schema()); Schema schema = new Schema.Parser().setValidate(true).parse(schemaString); AvroSchema avroSchema = new AvroSchema(schema); AvroMapper avroMapper = new AvroMapper(); @@ -107,8 +107,8 @@ public void startConsumer() { private void prepareConsumer() { try { this.consumer = consumerCreatorFunction.apply(getKafkaProperties(elementProps, context)); - var partitions = assignTopicPartitions(consumer, elementProps.getTopic().topicName()); - Optional.ofNullable(elementProps.getOffsets()) + var partitions = assignTopicPartitions(consumer, elementProps.topic().topicName()); + Optional.ofNullable(elementProps.offsets()) .ifPresent(offsets -> seekOffsets(consumer, partitions, offsets)); reportUp(); } catch (Exception ex) { diff --git a/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorProperties.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorProperties.java index ce61fa0f61..d5ecadb5e4 100644 --- a/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorProperties.java +++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorProperties.java @@ -8,34 +8,95 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.camunda.connector.feel.annotation.FEEL; +import io.camunda.connector.generator.dsl.Property; +import io.camunda.connector.generator.java.annotation.TemplateProperty; import io.camunda.connector.kafka.model.Avro; import io.camunda.connector.kafka.model.KafkaAuthentication; import io.camunda.connector.kafka.model.KafkaTopic; +import io.camunda.connector.kafka.model.SerializationType; import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; -import java.util.HashMap; import java.util.List; import java.util.Map; -public final class KafkaConnectorProperties { - - @NotNull private AuthenticationType authenticationType; - - @Valid private KafkaAuthentication authentication; - - @Valid @NotNull private KafkaTopic topic; - - @FEEL private Map additionalProperties = new HashMap<>(); - - private String activationCondition; - - @FEEL private List offsets; - - @NotNull private AutoOffsetReset autoOffsetReset = AutoOffsetReset.NONE; - - private String groupId; - - @Valid private Avro avro; +public record KafkaConnectorProperties( + @TemplateProperty( + group = "kafka", + label = "Deserialization type", + id = "serializationType", + defaultValue = "json", + type = TemplateProperty.PropertyType.Dropdown, + choices = { + @TemplateProperty.DropdownPropertyChoice(value = "json", label = "Default (JSON)"), + @TemplateProperty.DropdownPropertyChoice( + value = "avro", + label = "AVRO (experimental)") + }, + description = + "Select the deserialization type. For details, visit the documentation") + SerializationType serializationType, + @NotNull + @TemplateProperty( + group = "authentication", + label = "Authentication type", + defaultValue = "credentials", + type = TemplateProperty.PropertyType.Dropdown, + choices = { + @TemplateProperty.DropdownPropertyChoice( + value = "credentials", + label = "Credentials"), + @TemplateProperty.DropdownPropertyChoice(value = "custom", label = "Custom") + }, + description = "Username/password or custom") + AuthenticationType authenticationType, + @Valid KafkaAuthentication authentication, + @Valid @NotNull + // @TemplateProperty( + // group = "kafka", + // label = "Consumer Group ID", + // feel = Property.FeelMode.disabled, + // description = "Provide the topic name") + // todo override property + // @NestedProperties(overrideProperty = true, id = "", feel = + // Property.FeelMode.disabled) + KafkaTopic topic, + @TemplateProperty( + group = "kafka", + label = "Consumer Group ID", + feel = Property.FeelMode.disabled, + description = + "Provide the consumer group ID used by the connector. Leave empty for an automatically generated one") + String groupId, + @FEEL + @TemplateProperty( + group = "kafka", + label = "Additional properties", + optional = true, + feel = Property.FeelMode.required, + description = "Provide additional Kafka consumer properties in JSON") + Map additionalProperties, + @FEEL + @TemplateProperty( + group = "kafka", + label = "Offsets", + description = + "List of offsets, e.g. '10' or '=[10, 23]'. If specified, it has to have the same number of values as the number of partitions") + List offsets, + @NotNull + @TemplateProperty( + group = "kafka", + label = "Auto offset reset", + defaultValue = "latest", + type = TemplateProperty.PropertyType.Dropdown, + choices = { + @TemplateProperty.DropdownPropertyChoice(value = "none", label = "None"), + @TemplateProperty.DropdownPropertyChoice(value = "latest", label = "Latest"), + @TemplateProperty.DropdownPropertyChoice(value = "earliest", label = "Earliest") + }, + description = + "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. You should only select none if you specified the offsets") + AutoOffsetReset autoOffsetReset, // = AutoOffsetReset.NONE; + @Valid Avro avro) { public enum AutoOffsetReset { @JsonProperty("none") @@ -61,101 +122,4 @@ public enum AuthenticationType { credentials, custom } - - public AuthenticationType getAuthenticationType() { - return authenticationType; - } - - public void setAuthenticationType(AuthenticationType authenticationType) { - this.authenticationType = authenticationType; - } - - public AutoOffsetReset getAutoOffsetReset() { - return autoOffsetReset; - } - - public List getOffsets() { - return offsets; - } - - public void setOffsets(List offsets) { - this.offsets = offsets; - } - - public void setAutoOffsetReset(AutoOffsetReset autoOffsetReset) { - this.autoOffsetReset = autoOffsetReset; - } - - public String getActivationCondition() { - return activationCondition; - } - - public void setActivationCondition(String activationCondition) { - this.activationCondition = activationCondition; - } - - public KafkaAuthentication getAuthentication() { - return authentication; - } - - public void setAuthentication(KafkaAuthentication authentication) { - this.authentication = authentication; - } - - public KafkaTopic getTopic() { - return topic; - } - - public void setTopic(KafkaTopic topic) { - this.topic = topic; - } - - public Map getAdditionalProperties() { - return additionalProperties; - } - - public void setAdditionalProperties(Map additionalProperties) { - this.additionalProperties = additionalProperties; - } - - public String getGroupId() { - return groupId; - } - - public void setGroupId(String groupId) { - this.groupId = groupId; - } - - public Avro getAvro() { - return avro; - } - - public void setAvro(Avro avro) { - this.avro = avro; - } - - @Override - public String toString() { - return "KafkaConnectorProperties{" - + "authenticationType='" - + authenticationType - + '\'' - + ", authentication=" - + authentication - + ", topic=" - + topic - + ", additionalProperties=" - + additionalProperties - + ", activationCondition='" - + activationCondition - + '\'' - + ", offsets=" - + offsets - + ", autoOffsetReset=" - + autoOffsetReset - + ", groupId='" - + groupId - + '\'' - + '}'; - } } diff --git a/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaExecutable.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaExecutable.java index 312a01611d..841be71b5b 100644 --- a/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaExecutable.java +++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaExecutable.java @@ -10,6 +10,8 @@ import io.camunda.connector.api.inbound.Health; import io.camunda.connector.api.inbound.InboundConnectorContext; import io.camunda.connector.api.inbound.InboundConnectorExecutable; +import io.camunda.connector.generator.dsl.BpmnType; +import io.camunda.connector.generator.java.annotation.ElementTemplate; import java.util.Properties; import java.util.function.Function; import org.apache.kafka.clients.consumer.Consumer; @@ -18,6 +20,42 @@ import org.slf4j.LoggerFactory; @InboundConnector(name = "Kafka Consumer", type = "io.camunda:connector-kafka-inbound:1") +@ElementTemplate( + id = "io.camunda.connectors.webhook", + name = "Kafka Event Connector", + icon = "icon.svg", + version = 4, + inputDataClass = KafkaConnectorProperties.class, + description = "Consume Kafka messages", + documentationRef = + "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/kafka/?kafka=inbound", + propertyGroups = { + @ElementTemplate.PropertyGroup(id = "authentication", label = "Authentication"), + @ElementTemplate.PropertyGroup(id = "kafka", label = "Kafka"), + @ElementTemplate.PropertyGroup(id = "message", label = "Message deserialization"), + }, + elementTypes = { + @ElementTemplate.ConnectorElementType( + appliesTo = BpmnType.START_EVENT, + elementType = BpmnType.START_EVENT, + templateIdOverride = "io.camunda.connectors.inbound.KAFKA.v1", + templateNameOverride = "Kafka Start Event Connector"), + @ElementTemplate.ConnectorElementType( + appliesTo = BpmnType.START_EVENT, + elementType = BpmnType.MESSAGE_START_EVENT, + templateIdOverride = "io.camunda.connectors.inbound.KafkaMessageStart.v1", + templateNameOverride = "Kafka Message Start Event Connector"), + @ElementTemplate.ConnectorElementType( + appliesTo = {BpmnType.INTERMEDIATE_THROW_EVENT, BpmnType.INTERMEDIATE_CATCH_EVENT}, + elementType = BpmnType.INTERMEDIATE_CATCH_EVENT, + templateIdOverride = "io.camunda.connectors.inbound.KafkaIntermediate.v1", + templateNameOverride = "Kafka Intermediate Catch Event Connector"), + @ElementTemplate.ConnectorElementType( + appliesTo = BpmnType.BOUNDARY_EVENT, + elementType = BpmnType.BOUNDARY_EVENT, + templateIdOverride = "io.camunda.connectors.inbound.KafkaBoundary.v1", + templateNameOverride = "Kafka Boundary Event Connector") + }) public class KafkaExecutable implements InboundConnectorExecutable { private static final Logger LOG = LoggerFactory.getLogger(KafkaExecutable.class); private final Function> consumerCreatorFunction; diff --git a/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaPropertyTransformer.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaPropertyTransformer.java index b78de79569..73288ce6be 100644 --- a/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaPropertyTransformer.java +++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaPropertyTransformer.java @@ -39,12 +39,12 @@ public static Properties getKafkaProperties( KafkaConnectorRequest connectorRequest = new KafkaConnectorRequest( SerializationType.JSON, - props.getAuthentication(), - props.getTopic(), + props.authentication(), + props.topic(), null, null, null, - props.getAdditionalProperties()); + props.additionalProperties() == null ? new HashMap<>() : props.additionalProperties()); final Properties kafkaProps = KafkaPropertiesUtil.assembleKafkaClientProperties(connectorRequest); @@ -53,13 +53,13 @@ public static Properties getKafkaProperties( // GROUP_ID_CONFIG is mandatory. It will be used to assign a client id kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig); } - kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, props.getAutoOffsetReset().toString()); + kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, props.autoOffsetReset().toString()); kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); kafkaProps.put(TopicConfig.RETENTION_MS_CONFIG, -1); kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER); - if (props.getAvro() == null) { + if (props.avro() == null) { kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER); } else { kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BYTE_ARRAY_DESERIALIZER); @@ -70,8 +70,8 @@ public static Properties getKafkaProperties( private static String resolveGroupId( KafkaConnectorProperties kafkaConnectorProperties, InboundConnectorContext context) { - var clientId = kafkaConnectorProperties.getGroupId(); - if (kafkaConnectorProperties.getGroupId() == null) { + var clientId = kafkaConnectorProperties.groupId(); + if (kafkaConnectorProperties.groupId() == null) { clientId = computeGroupId(context); } return clientId.substring(0, Math.min(clientId.length(), 250)); diff --git a/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaConnectorRequest.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaConnectorRequest.java index f9bbe674eb..f205088fbc 100644 --- a/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaConnectorRequest.java +++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaConnectorRequest.java @@ -35,14 +35,7 @@ public record KafkaConnectorRequest( @Valid KafkaAuthentication authentication, @Valid @NotNull KafkaTopic topic, @Valid @NotNull KafkaMessage message, - @TemplateProperty( - group = "kafka", - label = "Headers", - optional = true, - feel = Property.FeelMode.required, - description = "Provide Kafka producer headers in JSON") - @Valid - Avro avro, + @Valid Avro avro, @TemplateProperty( group = "kafka", label = "Headers", diff --git a/connectors/kafka/src/test/java/io/camunda/connector/kafka/inbound/KafkaExecutableTest.java b/connectors/kafka/src/test/java/io/camunda/connector/kafka/inbound/KafkaExecutableTest.java index 1b9d352702..67e6cd4aa1 100644 --- a/connectors/kafka/src/test/java/io/camunda/connector/kafka/inbound/KafkaExecutableTest.java +++ b/connectors/kafka/src/test/java/io/camunda/connector/kafka/inbound/KafkaExecutableTest.java @@ -21,7 +21,9 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.camunda.connector.kafka.model.KafkaAuthentication; import io.camunda.connector.kafka.model.KafkaTopic; +import io.camunda.connector.kafka.model.SerializationType; import io.camunda.connector.test.inbound.InboundConnectorContextBuilder; import io.camunda.connector.test.inbound.InboundConnectorDefinitionBuilder; import io.camunda.connector.validation.impl.DefaultValidationProvider; @@ -54,6 +56,7 @@ public class KafkaExecutableTest { private InboundConnectorContextBuilder.TestInboundConnectorContext originalContext; private List topicPartitions; private KafkaConnectorProperties kafkaConnectorProperties; + private KafkaTopic kafkaTopic; @Mock private KafkaConsumer mockConsumer; private String topic; @@ -67,12 +70,19 @@ public void setUp() { Arrays.asList( new PartitionInfo(topic, 0, null, null, null), new PartitionInfo(topic, 1, null, null, null)); - KafkaTopic kafkaTopic = new KafkaTopic("localhost:9092", topic); - kafkaConnectorProperties = new KafkaConnectorProperties(); - kafkaConnectorProperties.setAutoOffsetReset(KafkaConnectorProperties.AutoOffsetReset.NONE); - kafkaConnectorProperties.setAuthenticationType( - KafkaConnectorProperties.AuthenticationType.custom); - kafkaConnectorProperties.setTopic(kafkaTopic); + kafkaTopic = new KafkaTopic("localhost:9092", topic); + KafkaAuthentication kafkaAuthentication = new KafkaAuthentication(null, null); + kafkaConnectorProperties = + new KafkaConnectorProperties( + SerializationType.JSON, + KafkaConnectorProperties.AuthenticationType.custom, + kafkaAuthentication, + kafkaTopic, + null, + null, + null, + KafkaConnectorProperties.AutoOffsetReset.NONE, + null); context = InboundConnectorContextBuilder.create() @@ -161,7 +171,18 @@ void testGetKafkaProperties() { @Test void testGroupIdUsage() { // When - kafkaConnectorProperties.setGroupId("my-group-id"); + KafkaAuthentication kafkaAuthentication = new KafkaAuthentication(null, null); + KafkaConnectorProperties kafkaConnectorProperties = + new KafkaConnectorProperties( + SerializationType.JSON, + KafkaConnectorProperties.AuthenticationType.custom, + kafkaAuthentication, + kafkaTopic, + "my-group-id", + null, + List.of(0L, 0L), + KafkaConnectorProperties.AutoOffsetReset.EARLIEST, + null); Properties properties = KafkaPropertyTransformer.getKafkaProperties(kafkaConnectorProperties, context); // Then @@ -216,6 +237,7 @@ public void testOffsets(Object input, List expected) { properties.put("topic", new KafkaTopic("test", "test")); properties.put("offsets", input); properties.put("authenticationType", "custom"); + properties.put("autoOffsetReset", "none"); context = InboundConnectorContextBuilder.create() @@ -226,7 +248,7 @@ public void testOffsets(Object input, List expected) { .build(); var boundProps = context.bindProperties(KafkaConnectorProperties.class); - assertThat(boundProps.getOffsets()).isEqualTo(expected); + assertThat(boundProps.offsets()).isEqualTo(expected); } private static Stream provideStringsForGetOffsets() { diff --git a/connectors/kafka/src/test/java/io/camunda/connector/kafka/integration/KafkaIntegrationTest.java b/connectors/kafka/src/test/java/io/camunda/connector/kafka/integration/KafkaIntegrationTest.java index e138d9429c..0aae0f033c 100644 --- a/connectors/kafka/src/test/java/io/camunda/connector/kafka/integration/KafkaIntegrationTest.java +++ b/connectors/kafka/src/test/java/io/camunda/connector/kafka/integration/KafkaIntegrationTest.java @@ -180,13 +180,19 @@ void publishStringMessageWithOutboundConnector() throws Exception { void setInvalidOffsetForInboundConnectorWhenAutoOffsetResetIsNone() { // Given KafkaTopic kafkaTopic = new KafkaTopic(BOOTSTRAP_SERVERS, TOPIC); + KafkaAuthentication kafkaAuthentication = new KafkaAuthentication(null, null); - KafkaConnectorProperties kafkaConnectorProperties = new KafkaConnectorProperties(); - kafkaConnectorProperties.setAutoOffsetReset(KafkaConnectorProperties.AutoOffsetReset.NONE); - kafkaConnectorProperties.setAuthenticationType( - KafkaConnectorProperties.AuthenticationType.custom); - kafkaConnectorProperties.setOffsets(List.of(9999L, 8888L)); - kafkaConnectorProperties.setTopic(kafkaTopic); + KafkaConnectorProperties kafkaConnectorProperties = + new KafkaConnectorProperties( + SerializationType.JSON, + KafkaConnectorProperties.AuthenticationType.custom, + kafkaAuthentication, + kafkaTopic, + null, + null, + List.of(9999L, 8888L), + KafkaConnectorProperties.AutoOffsetReset.NONE, + null); InboundConnectorContextBuilder.TestInboundConnectorContext context = InboundConnectorContextBuilder.create() @@ -220,11 +226,19 @@ void setInvalidOffsetForInboundConnectorWhenAutoOffsetResetIsNone() { void consumeMessageWithInboundConnector() throws Exception { // Given KafkaTopic kafkaTopic = new KafkaTopic(BOOTSTRAP_SERVERS, TOPIC); - KafkaConnectorProperties kafkaConnectorProperties = new KafkaConnectorProperties(); - kafkaConnectorProperties.setAutoOffsetReset(KafkaConnectorProperties.AutoOffsetReset.EARLIEST); - kafkaConnectorProperties.setAuthenticationType( - KafkaConnectorProperties.AuthenticationType.custom); - kafkaConnectorProperties.setTopic(kafkaTopic); + KafkaAuthentication kafkaAuthentication = new KafkaAuthentication(null, null); + + KafkaConnectorProperties kafkaConnectorProperties = + new KafkaConnectorProperties( + SerializationType.JSON, + KafkaConnectorProperties.AuthenticationType.custom, + kafkaAuthentication, + kafkaTopic, + null, + null, + null, + KafkaConnectorProperties.AutoOffsetReset.EARLIEST, + null); InboundConnectorContextBuilder.TestInboundConnectorContext context = InboundConnectorContextBuilder.create() @@ -262,12 +276,19 @@ void consumeMessageWithInboundConnector() throws Exception { void consumeSameMessageWithInboundConnectorAgainWithOffsets() throws Exception { // Given KafkaTopic kafkaTopic = new KafkaTopic(BOOTSTRAP_SERVERS, TOPIC); - KafkaConnectorProperties kafkaConnectorProperties = new KafkaConnectorProperties(); - kafkaConnectorProperties.setAutoOffsetReset(KafkaConnectorProperties.AutoOffsetReset.EARLIEST); - kafkaConnectorProperties.setAuthenticationType( - KafkaConnectorProperties.AuthenticationType.custom); - kafkaConnectorProperties.setOffsets(List.of(0L, 0L)); - kafkaConnectorProperties.setTopic(kafkaTopic); + + KafkaAuthentication kafkaAuthentication = new KafkaAuthentication(null, null); + KafkaConnectorProperties kafkaConnectorProperties = + new KafkaConnectorProperties( + SerializationType.JSON, + KafkaConnectorProperties.AuthenticationType.custom, + kafkaAuthentication, + kafkaTopic, + null, + null, + List.of(0L, 0L), + KafkaConnectorProperties.AutoOffsetReset.EARLIEST, + null); InboundConnectorContextBuilder.TestInboundConnectorContext context = InboundConnectorContextBuilder.create() @@ -345,12 +366,18 @@ void consumeAvro() { // Given KafkaTopic kafkaTopic = new KafkaTopic(BOOTSTRAP_SERVERS, AVRO_TOPIC); - KafkaConnectorProperties kafkaConnectorProperties = new KafkaConnectorProperties(); - kafkaConnectorProperties.setAutoOffsetReset(KafkaConnectorProperties.AutoOffsetReset.EARLIEST); - kafkaConnectorProperties.setAuthenticationType( - KafkaConnectorProperties.AuthenticationType.custom); - kafkaConnectorProperties.setTopic(kafkaTopic); - kafkaConnectorProperties.setAvro(avro); + KafkaAuthentication kafkaAuthentication = new KafkaAuthentication(null, null); + KafkaConnectorProperties kafkaConnectorProperties = + new KafkaConnectorProperties( + SerializationType.AVRO, + KafkaConnectorProperties.AuthenticationType.custom, + kafkaAuthentication, + kafkaTopic, + null, + null, + List.of(0L, 0L), + KafkaConnectorProperties.AutoOffsetReset.EARLIEST, + avro); InboundConnectorContextBuilder.TestInboundConnectorContext context = InboundConnectorContextBuilder.create() @@ -413,11 +440,19 @@ void publishMessageWithHeaders() throws Exception { void consumeMessageWithHeaders() { // Given KafkaTopic kafkaTopic = new KafkaTopic(BOOTSTRAP_SERVERS, TOPIC); - KafkaConnectorProperties kafkaConnectorProperties = new KafkaConnectorProperties(); - kafkaConnectorProperties.setAutoOffsetReset(KafkaConnectorProperties.AutoOffsetReset.EARLIEST); - kafkaConnectorProperties.setAuthenticationType( - KafkaConnectorProperties.AuthenticationType.custom); - kafkaConnectorProperties.setTopic(kafkaTopic); + + KafkaAuthentication kafkaAuthentication = new KafkaAuthentication(null, null); + KafkaConnectorProperties kafkaConnectorProperties = + new KafkaConnectorProperties( + SerializationType.JSON, + KafkaConnectorProperties.AuthenticationType.custom, + kafkaAuthentication, + kafkaTopic, + null, + null, + List.of(0L, 0L), + KafkaConnectorProperties.AutoOffsetReset.EARLIEST, + null); InboundConnectorContextBuilder.TestInboundConnectorContext context = InboundConnectorContextBuilder.create()