forked from AxonFramework/extension-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DefaultKafkaMessageConverter.java
313 lines (282 loc) · 14 KB
/
DefaultKafkaMessageConverter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
/*
* Copyright (c) 2010-2018. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.extensions.kafka.eventhandling;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventEntry;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.messaging.MetaData;
import org.axonframework.serialization.LazyDeserializingObject;
import org.axonframework.serialization.SerializedMessage;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.SimpleSerializedObject;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.upcasting.event.InitialEventRepresentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.extensions.kafka.eventhandling.HeaderUtils.*;
import static org.axonframework.messaging.Headers.*;
/**
* Converts and {@link EventMessage} to a {@link ProducerRecord} Kafka message and from a {@link ConsumerRecord} Kafka
* message back to an EventMessage (if possible).
* <p>
* During conversion meta data entries with the {@code 'axon-metadata-'} prefix are passed to the {@link Headers}. Other
* message-specific attributes are added as metadata. The {@link EventMessage#getPayload()} is serialized using the
* configured {@link Serializer} and passed as the Kafka record's body.
* <p>
* This implementation will suffice in most cases.
*
* @author Nakul Mishra
* @author Steven van Beelen
* @since 4.0
*/
public class DefaultKafkaMessageConverter implements KafkaMessageConverter<String, byte[]> {
private static final Logger logger = LoggerFactory.getLogger(KafkaMessageConverter.class);
private final Serializer serializer;
private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
private final BiFunction<String, Object, RecordHeader> headerValueMapper;
private final EventUpcasterChain upcasterChain;
/**
* Instantiate a {@link DefaultKafkaMessageConverter} based on the fields contained in the {@link Builder}.
* <p>
* Will assert that the {@link Serializer} is not {@code null} and will throw an {@link AxonConfigurationException}
* if it is {@code null}.
*
* @param builder the {@link Builder} used to instantiate a {@link DefaultKafkaMessageConverter} instance
*/
@SuppressWarnings("WeakerAccess")
protected DefaultKafkaMessageConverter(Builder builder) {
builder.validate();
this.serializer = builder.serializer;
this.sequencingPolicy = builder.sequencingPolicy;
this.headerValueMapper = builder.headerValueMapper;
this.upcasterChain = builder.upcasterChain;
}
/**
* Instantiate a Builder to be able to create a {@link DefaultKafkaMessageConverter}.
* <p>
* The {@link SequencingPolicy} is defaulted to an {@link SequentialPerAggregatePolicy}, and the {@code
* headerValueMapper} is defaulted to the {@link HeaderUtils#byteMapper()} function. The {@link Serializer} is a
* <b>hard requirement</b> and as such should be provided.
*
* @return a Builder to be able to create a {@link DefaultKafkaMessageConverter}
*/
public static Builder builder() {
return new Builder();
}
/**
* {@inheritDoc}
* <p>
* Note that the {@link ProducerRecord} created through this method sets the {@link ProducerRecord#timestamp()} to
* {@code null}. Doing so will ensure the used Producer sets a timestamp itself for the record. The {@link
* EventMessage#getTimestamp()} field is however still taken into account, but as headers.
* <p>
* Additional note that the ProducerRecord will be given a {@code null} {@link ProducerRecord#partition()} value. In
* return, the {@link ProducerRecord#key()} field is defined by using the configured {@link SequencingPolicy} to
* retrieve the given {@code eventMessage}'s {@code sequenceIdentifier}. The combination of a {@code null} partition
* and the possibly present or empty key will define which partition the Producer will choose to dispatch the record
* on.
*
* @see ProducerRecord
*/
@Override
public ProducerRecord<String, byte[]> createKafkaMessage(EventMessage<?> eventMessage, String topic) {
SerializedObject<byte[]> serializedObject = eventMessage.serializePayload(serializer, byte[].class);
return new ProducerRecord<>(
topic, null, null, recordKey(eventMessage),
serializedObject.getData(),
toHeaders(eventMessage, serializedObject, headerValueMapper)
);
}
private String recordKey(EventMessage<?> eventMessage) {
Object sequenceIdentifier = sequencingPolicy.getSequenceIdentifierFor(eventMessage);
return sequenceIdentifier != null ? sequenceIdentifier.toString() : null;
}
@Override
public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, byte[]> consumerRecord) {
try {
Headers headers = consumerRecord.headers();
if (isAxonMessage(headers)) {
byte[] messageBody = consumerRecord.value();
final Optional<SerializedMessage<?>> message;
// domain events may be upcasted
if (isDomainEvent(headers)) {
message = createDomainEventAndUpcast(headers, messageBody);
} else {
message = createEvent(headers, messageBody);
}
return message.flatMap(serializedMessage -> buildMessage(headers, serializedMessage));
}
} catch (Exception e) {
logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, e);
}
return Optional.empty();
}
private boolean isAxonMessage(Headers headers) {
return keys(headers).containsAll(Arrays.asList(MESSAGE_ID, MESSAGE_TYPE));
}
private Optional<SerializedMessage<?>> createEvent(Headers headers, byte[] messageBody) {
SimpleSerializedObject<byte[]> serializedObject = new SimpleSerializedObject<>(
messageBody,
byte[].class,
valueAsString(headers, MESSAGE_TYPE),
valueAsString(headers, MESSAGE_REVISION, null)
);
return Optional.of(new SerializedMessage<>(
valueAsString(headers, MESSAGE_ID),
new LazyDeserializingObject<>(serializedObject, serializer),
new LazyDeserializingObject<>(MetaData.from(extractAxonMetadata(headers)))
));
}
private Optional<SerializedMessage<?>> createDomainEventAndUpcast(Headers headers, byte[] messageBody) {
GenericDomainEventEntry<Object> domainEventEntry = new GenericDomainEventEntry<>(
valueAsString(headers, AGGREGATE_TYPE),
valueAsString(headers, AGGREGATE_ID),
valueAsLong(headers, AGGREGATE_SEQ),
valueAsString(headers, MESSAGE_ID),
valueAsLong(headers, MESSAGE_TIMESTAMP),
valueAsString(headers, MESSAGE_TYPE),
valueAsString(headers, MESSAGE_REVISION, null),
messageBody,
serializer.serialize(MetaData.from(extractAxonMetadata(headers)), byte[].class).getData()
);
return upcasterChain.upcast(
Stream.of(new InitialEventRepresentation(domainEventEntry, serializer))
).findFirst().map(upcastedEventData -> new SerializedMessage<>(
valueAsString(headers, MESSAGE_ID),
new LazyDeserializingObject<>(upcastedEventData.getData(), serializer),
upcastedEventData.getMetaData()
));
}
private boolean isDomainEvent(Headers headers) {
return headers.lastHeader(AGGREGATE_ID) != null;
}
private Optional<EventMessage<?>> buildMessage(Headers headers, SerializedMessage<?> message) {
long timestamp = valueAsLong(headers, MESSAGE_TIMESTAMP);
return isDomainEvent(headers)
? buildDomainEvent(headers, message, timestamp)
: buildEvent(message, timestamp);
}
private Optional<EventMessage<?>> buildDomainEvent(Headers headers, SerializedMessage<?> message, long timestamp) {
return Optional.of(new GenericDomainEventMessage<>(
valueAsString(headers, AGGREGATE_TYPE),
valueAsString(headers, AGGREGATE_ID),
valueAsLong(headers, AGGREGATE_SEQ),
message,
() -> Instant.ofEpochMilli(timestamp)
));
}
private Optional<EventMessage<?>> buildEvent(SerializedMessage<?> message, long timestamp) {
return Optional.of(new GenericEventMessage<>(message, () -> Instant.ofEpochMilli(timestamp)));
}
/**
* Builder class to instantiate a {@link DefaultKafkaMessageConverter}.
* <p>
* The {@link SequencingPolicy} is defaulted to an {@link SequentialPerAggregatePolicy}, and the {@code
* headerValueMapper} is defaulted to the {@link HeaderUtils#byteMapper()} function. The {@link Serializer} is a
* <b>hard requirement</b> and as such should be provided.
*/
public static class Builder {
private Serializer serializer;
private SequencingPolicy<? super EventMessage<?>> sequencingPolicy = SequentialPerAggregatePolicy.instance();
private BiFunction<String, Object, RecordHeader> headerValueMapper = byteMapper();
private EventUpcasterChain upcasterChain = new EventUpcasterChain();
/**
* Sets the serializer to serialize the Event Message's payload with.
*
* @param serializer The serializer to serialize the Event Message's payload with
*
* @return the current Builder instance, for fluent interfacing
*/
public Builder serializer(Serializer serializer) {
assertNonNull(serializer, "Serializer may not be null");
this.serializer = serializer;
return this;
}
/**
* Sets the {@link SequencingPolicy}, with a generic of being a super of {@link EventMessage}, used to generate
* the key for the {@link ProducerRecord}. Defaults to a {@link SequentialPerAggregatePolicy} instance.
*
* @param sequencingPolicy a {@link SequencingPolicy} used to generate the key for the {@link ProducerRecord}
*
* @return the current Builder instance, for fluent interfacing
*/
public Builder sequencingPolicy(SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
assertNonNull(sequencingPolicy, "SequencingPolicy may not be null");
this.sequencingPolicy = sequencingPolicy;
return this;
}
/**
* Sets the {@code headerValueMapper}, a {@link BiFunction} of {@link String}, {@link Object} and {@link
* RecordHeader}, used for mapping values to Kafka headers. Defaults to the {@link HeaderUtils#byteMapper()}
* function.
*
* @param headerValueMapper a {@link BiFunction} of {@link String}, {@link Object} and {@link RecordHeader},
* used for mapping values to Kafka headers
*
* @return the current Builder instance, for fluent interfacing
*/
public Builder headerValueMapper(BiFunction<String, Object, RecordHeader> headerValueMapper) {
assertNonNull(headerValueMapper, "{} may not be null");
this.headerValueMapper = headerValueMapper;
return this;
}
/**
* Sets the {@code upcasterChain} to be used during the consumption of events.
*
* @param upcasterChain upcaster chain to be used on event reading.
*
* @return the current Builder instance, for fluent interfacing
*/
public Builder upcasterChain(EventUpcasterChain upcasterChain) {
assertNonNull(upcasterChain, "UpcasterChain must not be null");
this.upcasterChain = upcasterChain;
return this;
}
/**
* Initializes a {@link DefaultKafkaMessageConverter} as specified through this Builder.
*
* @return a {@link DefaultKafkaMessageConverter} as specified through this Builder
*/
public DefaultKafkaMessageConverter build() {
return new DefaultKafkaMessageConverter(this);
}
/**
* Validates whether the fields contained in this Builder are set accordingly.
*
* @throws AxonConfigurationException if one field is asserted to be incorrect according to the Builder's
* specifications
*/
@SuppressWarnings("WeakerAccess")
protected void validate() throws AxonConfigurationException {
assertNonNull(serializer, "The Serializer is a hard requirement and should be provided");
}
}
}