-
Notifications
You must be signed in to change notification settings - Fork 13
/
AutoVisibilityExtenderMessageProcessingDecorator.java
331 lines (299 loc) · 14.8 KB
/
AutoVisibilityExtenderMessageProcessingDecorator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
package com.jashmore.sqs.decorator;
import com.jashmore.documentation.annotations.ThreadSafe;
import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.aws.AwsConstants;
import com.jashmore.sqs.util.collections.CollectionUtils;
import com.jashmore.sqs.util.thread.ThreadUtils;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.immutables.value.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.Message;
/**
* {@link MessageProcessingDecorator} that will continually extend the visibility of the message while it is being processed.
*
* <p>No effort is made to guarantee that a message is successfully extended and therefore if the request fails or partially fails (some messages are not
* extended) it will not re-attempt to extend them and just assume that they passed. Therefore if you desire a higher certainty that the visibility
* extension will succeed you can configure the {@link AutoVisibilityExtenderMessageProcessingDecoratorProperties#bufferDuration()} to be a higher value.
* For example, you could have the {@link AutoVisibilityExtenderMessageProcessingDecoratorProperties#visibilityTimeout()} to be 30 seconds but the
* {@link AutoVisibilityExtenderMessageProcessingDecoratorProperties#bufferDuration()} to be 20 seconds and therefore you will have 3 attempts to successfully
* extend the message.
*
* <p>Note that this only works with synchronous implementations of the message listener, e.g. functions that are executed using the
* {@link com.jashmore.sqs.processor.LambdaMessageProcessor} or the {@link com.jashmore.sqs.processor.CoreMessageProcessor} where the function does not
* return a {@link CompletableFuture}. This is because it is not easy to interrupt the processing of a message if it has been placed onto a different
* thread to process.
*
* <p>This {@link MessageProcessingDecorator} is thread safe and will work safely when multiple messages are all being processed at once.
*
* @see AutoVisibilityExtenderMessageProcessingDecoratorProperties for configuration options
*/
@ThreadSafe
public class AutoVisibilityExtenderMessageProcessingDecorator implements MessageProcessingDecorator {
private static final Logger log = LoggerFactory.getLogger(AutoVisibilityExtenderMessageProcessingDecorator.class);
private final SqsAsyncClient sqsAsyncClient;
private final QueueProperties queueProperties;
private final AutoVisibilityExtenderMessageProcessingDecoratorProperties decoratorProperties;
private final Map<Message, MessageProcessingState> currentMessagesProcessing;
private final Object waitingLock = new Object();
public AutoVisibilityExtenderMessageProcessingDecorator(
final SqsAsyncClient sqsAsyncClient,
final QueueProperties queueProperties,
final AutoVisibilityExtenderMessageProcessingDecoratorProperties decoratorProperties
) {
this.sqsAsyncClient = sqsAsyncClient;
this.queueProperties = queueProperties;
this.decoratorProperties = decoratorProperties;
this.currentMessagesProcessing = new HashMap<>();
}
@Override
public void onPreMessageProcessing(final MessageProcessingContext context, final Message message) {
synchronized (waitingLock) {
final Instant timeNow = Instant.now();
log.debug("Registering message {} with visibility auto extender", message.messageId());
currentMessagesProcessing.put(
message,
ImmutableMessageProcessingState
.builder()
.thread(Thread.currentThread())
.startTime(timeNow)
.nextVisibilityExtensionTime(nextExtensionTime(timeNow, message, decoratorProperties.bufferDuration()))
.build()
);
if (currentMessagesProcessing.size() == 1) {
CompletableFuture
.runAsync(
this::performBackgroundThread,
Executors.newSingleThreadExecutor(
ThreadUtils.singleNamedThreadFactory(context.getListenerIdentifier() + "-auto-visibility-extender")
)
)
.whenComplete(
(ignored, throwable) -> {
if (throwable != null) {
log.error("Unexpected error with visibility timeout extender", throwable);
}
}
);
}
// We need to notify the background thread to recalculate the updated time in case it has configured this message to have a smaller visibility
// timeout then the current wait time
waitingLock.notify();
}
}
@Override
public void onMessageProcessingThreadComplete(final MessageProcessingContext context, final Message message) {
removeMessageFromAutoVisibilityExtender(message);
}
@Override
public void onMessageResolve(MessageProcessingContext context, Message message) {
// Needed in case the message listener is manually acknowledging the message
removeMessageFromAutoVisibilityExtender(message);
}
private void removeMessageFromAutoVisibilityExtender(final Message message) {
synchronized (waitingLock) {
final MessageProcessingState valueStored = currentMessagesProcessing.remove(message);
// Makes sure we only do this once for the message
if (valueStored != null) {
decoratorProperties.messageDoneProcessing(message);
waitingLock.notify();
}
}
}
private void performBackgroundThread() {
log.debug("Starting background thread for auto visibility extender");
synchronized (waitingLock) {
while (!currentMessagesProcessing.isEmpty()) {
final Instant timeNow = Instant.now();
final Duration maxDuration = decoratorProperties.maxDuration();
final Duration bufferDuration = decoratorProperties.bufferDuration();
interruptLongRunningThreads(timeNow, maxDuration);
extendThreadsWithMoreTime(timeNow, bufferDuration);
try {
waitUntilNextIteration(maxDuration);
} catch (final InterruptedException interruptedException) {
break;
}
}
}
log.debug("Finished background thread for auto visibility extender");
}
private void interruptLongRunningThreads(final Instant timeNow, final Duration maxDuration) {
final Map<Message, MessageProcessingState> messagesToInterrupt = currentMessagesProcessing
.entrySet()
.stream()
.filter(messageStateEntry -> timeNow.compareTo(messageStateEntry.getValue().startTime().plus(maxDuration)) >= 0)
.collect(CollectionUtils.pairsToMap());
messagesToInterrupt.forEach(
(message, state) -> {
log.info("Interrupting message processing thread due to exceeded time for message {}", message.messageId());
state.thread().interrupt();
currentMessagesProcessing.remove(message);
}
);
}
/**
* For each message that has hit the visibility timeout extension time, attempt to extend the visibility.
*
* <p>This method does not wait for the response from the visibility timeout extension and just assumes that it works.
*
* @param timeNow the time that this iteration started at
* @param bufferDuration the amount of buffer time for the next visibility timeout extension
*/
private void extendThreadsWithMoreTime(final Instant timeNow, final Duration bufferDuration) {
final Map<Message, MessageProcessingState> messagesToExtend = currentMessagesProcessing
.entrySet()
.stream()
.filter(messageStateEntry -> timeNow.compareTo(messageStateEntry.getValue().nextVisibilityExtensionTime()) >= 0)
.collect(CollectionUtils.pairsToMap());
List<Message> messageBatch = new ArrayList<>(AwsConstants.MAX_NUMBER_OF_MESSAGES_IN_BATCH);
for (final Map.Entry<Message, MessageProcessingState> stateEntry : messagesToExtend.entrySet()) {
final Message message = stateEntry.getKey();
final MessageProcessingState state = stateEntry.getValue();
log.info("Automatically extending visibility timeout of message {}", message.messageId());
messageBatch.add(message);
if (messageBatch.size() == AwsConstants.MAX_NUMBER_OF_MESSAGES_IN_BATCH) {
extendMessageBatch(messageBatch);
messageBatch.clear();
}
currentMessagesProcessing.put(
message,
ImmutableMessageProcessingState
.builder()
.from(state)
.nextVisibilityExtensionTime(timeNow.plus(decoratorProperties.visibilityTimeout(message).minus(bufferDuration)))
.build()
);
}
if (!messageBatch.isEmpty()) {
extendMessageBatch(messageBatch);
}
}
private void extendMessageBatch(final List<Message> messageBatch) {
sqsAsyncClient
.changeMessageVisibilityBatch(
builder ->
builder
.queueUrl(queueProperties.getQueueUrl())
.entries(
messageBatch
.stream()
.map(
message ->
ChangeMessageVisibilityBatchRequestEntry
.builder()
.id(message.messageId())
.receiptHandle(message.receiptHandle())
.visibilityTimeout((int) decoratorProperties.visibilityTimeout(message).getSeconds())
.build()
)
.collect(Collectors.toList())
)
)
.whenComplete(
(ignoredResponse, throwable) -> {
if (throwable != null) {
log.error(
"Error changing visibility timeout for message. The following messages were not extended: " +
messageBatch.stream().map(Message::messageId).collect(Collectors.toList()),
throwable
);
}
if (ignoredResponse.hasFailed()) {
log.error(
"Some messages failed to be have their visibility timeout changed: {}",
ignoredResponse.failed().stream().map(BatchResultErrorEntry::id).collect(Collectors.toList())
);
}
}
);
}
/**
* If there are more messages that are currently processing, determine the next time that a message needs to be interrupted or extended and wait until
* that.
*
* @param maxDuration the maximum amount of time to wait for a message
* @throws InterruptedException if the thread was interrupted while waiting
*/
private void waitUntilNextIteration(final Duration maxDuration) throws InterruptedException {
final Optional<Instant> optionalEarliestNextUpdateTime = currentMessagesProcessing
.values()
.stream()
.map(state -> determineEarliestTrigger(state, maxDuration))
.min(Instant::compareTo);
if (!optionalEarliestNextUpdateTime.isPresent()) {
return;
}
final long nextTime = Instant.now().until(optionalEarliestNextUpdateTime.get(), ChronoUnit.MILLIS);
if (nextTime <= 0) {
return;
}
log.debug("Waiting {}ms to change visibility timeout", nextTime);
waitingLock.wait(nextTime);
}
/**
* Determines the next time that the message needs to be extended to stop its visibility from expiring.
*
* @param timeNow the time that this iteration started at
* @param message the message to determine the visibility timeout for
* @param bufferDuration the buffer to change the visibility timeout before it actually expires
* @return the time to extend the message's visibility
*/
private Instant nextExtensionTime(final Instant timeNow, final Message message, final Duration bufferDuration) {
return timeNow.plus(decoratorProperties.visibilityTimeout(message)).minus(bufferDuration);
}
/**
* Determines whether the earliest time for this message should be when it should be interrupted or the next visibility extension time.
*
* @param state the state of this message
* @param maxDuration the maximum time the message should process
* @return the next time that the message should be extended or interrupted
*/
private static Instant determineEarliestTrigger(final MessageProcessingState state, final Duration maxDuration) {
final Instant maxTime = state.startTime().plus(maxDuration);
final Instant nextVisibilityExtensionTime = state.nextVisibilityExtensionTime();
if (maxTime.isBefore(nextVisibilityExtensionTime)) {
return maxTime;
} else {
return nextVisibilityExtensionTime;
}
}
@Value.Immutable
interface MessageProcessingState {
/**
* The thread that is processing this message.
*
* <p> This is used to interrupt the processing if has run too long.
*
* @return the thread processing the message
*/
Thread thread();
/**
* The time that the message began processing.
*
* @return the start time for the message
*/
Instant startTime();
/**
* The next time that the visibility of the message will need to be extended.
*
* <p> This includes the buffer time and therefore will occur before the message's timeout actually expires.
*
* @return the next visibility extension time
*/
Instant nextVisibilityExtensionTime();
}
}