-
Notifications
You must be signed in to change notification settings - Fork 215
/
KafkaConsumerStreamFactory.java
144 lines (128 loc) · 6.32 KB
/
KafkaConsumerStreamFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;
import static org.eclipse.ditto.connectivity.service.EnforcementFactoryFactory.newEnforcementFilterFactory;
import static org.eclipse.ditto.placeholders.PlaceholderFactory.newHeadersPlaceholder;
import java.util.Map;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.Enforcement;
import org.eclipse.ditto.connectivity.model.EnforcementFilterFactory;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.service.config.ConnectionThrottlingConfig;
import org.eclipse.ditto.connectivity.service.messaging.AcknowledgeableMessage;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import akka.NotUsed;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
/**
* A factory for building different {@link KafkaConsumerStream} implementations, e.g. for different quality of services.
*/
final class KafkaConsumerStreamFactory {
private final ConsumerData consumerData;
private final boolean dryRun;
private final PropertiesFactory propertiesFactory;
private final AtMostOnceKafkaConsumerSourceSupplier atMostOnceKafkaConsumerSourceSupplier;
private final AtLeastOnceKafkaConsumerSourceSupplier atLeastOnceKafkaConsumerSourceSupplier;
private final ConnectionThrottlingConfig throttlingConfig;
KafkaConsumerStreamFactory(final ConnectionThrottlingConfig throttlingConfig,
final PropertiesFactory propertiesFactory,
final ConsumerData consumerData,
final boolean dryRun) {
this.throttlingConfig = throttlingConfig;
this.consumerData = consumerData;
this.dryRun = dryRun;
this.propertiesFactory = propertiesFactory;
atMostOnceKafkaConsumerSourceSupplier =
new AtMostOnceKafkaConsumerSourceSupplier(propertiesFactory, consumerData.getAddress(), dryRun);
atLeastOnceKafkaConsumerSourceSupplier =
new AtLeastOnceKafkaConsumerSourceSupplier(propertiesFactory, consumerData.getAddress(), dryRun);
}
/**
* Only used for testing purpose
*
* @param throttlingConfig the throttling config to use during the test.
* @param atMostOnceKafkaConsumerSourceSupplier source supplier for "at most once"
* @param atLeastOnceKafkaConsumerSourceSupplier source supplier for "at least once"
* @param consumerData the consumer data
* @param dryRun indicates whether the connection runs in a dry run.
*/
KafkaConsumerStreamFactory(final ConnectionThrottlingConfig throttlingConfig,
final AtMostOnceKafkaConsumerSourceSupplier atMostOnceKafkaConsumerSourceSupplier,
final AtLeastOnceKafkaConsumerSourceSupplier atLeastOnceKafkaConsumerSourceSupplier,
final ConsumerData consumerData,
final boolean dryRun) {
this.consumerData = consumerData;
this.dryRun = dryRun;
propertiesFactory = null;
this.throttlingConfig = throttlingConfig;
this.atMostOnceKafkaConsumerSourceSupplier = atMostOnceKafkaConsumerSourceSupplier;
this.atLeastOnceKafkaConsumerSourceSupplier = atLeastOnceKafkaConsumerSourceSupplier;
}
KafkaConsumerStream newAtMostOnceConsumerStream(
final Materializer materializer,
final ConnectionMonitor inboundMonitor,
final Sink<AcknowledgeableMessage, NotUsed> messageMappingSink,
final Sink<DittoRuntimeException, ?> dreSink,
final ConnectionId connectionId,
final String consumerId) {
final KafkaMessageTransformer kafkaMessageTransformer = buildKafkaMessageTransformer(inboundMonitor,
connectionId);
return new AtMostOnceConsumerStream(atMostOnceKafkaConsumerSourceSupplier,
throttlingConfig,
kafkaMessageTransformer,
dryRun,
materializer,
inboundMonitor,
messageMappingSink,
dreSink,
connectionId,
consumerId);
}
KafkaConsumerStream newAtLeastOnceConsumerStream(
final Materializer materializer,
final ConnectionMonitor inboundMonitor,
final ConnectionMonitor ackMonitor,
final Sink<AcknowledgeableMessage, NotUsed> messageMappingSink,
final Sink<DittoRuntimeException, ?> dreSink,
final ConnectionId connectionId,
final String consumerId) {
final KafkaMessageTransformer kafkaMessageTransformer = buildKafkaMessageTransformer(inboundMonitor,
connectionId);
return new AtLeastOnceConsumerStream(atLeastOnceKafkaConsumerSourceSupplier,
propertiesFactory.getCommitterSettings(),
throttlingConfig,
kafkaMessageTransformer,
dryRun,
materializer,
inboundMonitor,
ackMonitor,
messageMappingSink,
dreSink,
connectionId,
consumerId);
}
private KafkaMessageTransformer buildKafkaMessageTransformer(final ConnectionMonitor inboundMonitor,
final ConnectionId connectionId) {
final Source source = consumerData.getSource();
final String address = consumerData.getAddress();
final Enforcement enforcement = source.getEnforcement().orElse(null);
final EnforcementFilterFactory<Map<String, String>, Signal<?>> headerEnforcementFilterFactory =
enforcement != null
? newEnforcementFilterFactory(enforcement, newHeadersPlaceholder())
: input -> null;
return new KafkaMessageTransformer(connectionId, source, address, headerEnforcementFilterFactory,
inboundMonitor);
}
}