-
Notifications
You must be signed in to change notification settings - Fork 35
/
KafkaConnectorFunction.java
80 lines (72 loc) · 3.4 KB
/
KafkaConnectorFunction.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. Licensed under a proprietary license.
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
package io.camunda.connector.kafka.outbound;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.connector.api.annotation.OutboundConnector;
import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.api.outbound.OutboundConnectorContext;
import io.camunda.connector.api.outbound.OutboundConnectorFunction;
import io.camunda.connector.feel.ConnectorsObjectMapperSupplier;
import io.camunda.connector.kafka.outbound.model.KafkaConnectorRequest;
import io.camunda.connector.kafka.outbound.model.KafkaConnectorResponse;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
@OutboundConnector(
name = "KAFKA",
inputVariables = {"authentication", "topic", "message", "additionalProperties"},
type = "io.camunda:connector-kafka:1")
public class KafkaConnectorFunction implements OutboundConnectorFunction {
private final Function<Properties, Producer> producerCreatorFunction;
private static final ObjectMapper objectMapper =
ConnectorsObjectMapperSupplier.getCopy().enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
public KafkaConnectorFunction() {
this(KafkaProducer::new);
}
public KafkaConnectorFunction(final Function<Properties, Producer> producerCreatorFunction) {
this.producerCreatorFunction = producerCreatorFunction;
}
@Override
public Object execute(final OutboundConnectorContext context) throws Exception {
var connectorRequest = context.bindVariables(KafkaConnectorRequest.class);
return executeConnector(connectorRequest);
}
private KafkaConnectorResponse executeConnector(final KafkaConnectorRequest request) {
Properties props = request.assembleKafkaClientProperties();
Producer<String, String> producer = producerCreatorFunction.apply(props);
RecordMetadata recordMetadata;
try {
String transformedValue =
request.getMessage().getValue() instanceof String
? (String) request.getMessage().getValue()
: objectMapper.writeValueAsString(request.getMessage().getValue());
Future<RecordMetadata> kafkaResponse =
producer.send(
new ProducerRecord<>(
request.getTopic().getTopicName(),
request.getMessage().getKey().toString(),
transformedValue));
KafkaConnectorResponse result = new KafkaConnectorResponse();
recordMetadata = kafkaResponse.get(45, TimeUnit.SECONDS);
result.setTopic(recordMetadata.topic());
result.setPartition(recordMetadata.partition());
result.setOffset(recordMetadata.offset());
result.setTimestamp(recordMetadata.timestamp());
return result;
} catch (Exception e) {
throw new ConnectorException("FAIL", "Kafka Producer execution exception", e);
} finally {
producer.close();
}
}
}