-
Notifications
You must be signed in to change notification settings - Fork 12
/
MessageSystemAttributeArgumentResolver.java
92 lines (76 loc) · 4 KB
/
MessageSystemAttributeArgumentResolver.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.jashmore.sqs.argument.attribute;
import static java.time.ZoneOffset.UTC;
import static software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_FIRST_RECEIVE_TIMESTAMP;
import static software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.SENT_TIMESTAMP;
import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.argument.ArgumentResolutionException;
import com.jashmore.sqs.argument.ArgumentResolver;
import com.jashmore.sqs.argument.MethodParameter;
import com.jashmore.sqs.util.annotation.AnnotationUtils;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.Optional;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
public class MessageSystemAttributeArgumentResolver implements ArgumentResolver<Object> {
@Override
public boolean canResolveParameter(final MethodParameter methodParameter) {
return AnnotationUtils.findParameterAnnotation(methodParameter, MessageSystemAttribute.class).isPresent();
}
@Override
public Object resolveArgumentForParameter(
final QueueProperties queueProperties,
final MethodParameter methodParameter,
final Message message
) throws ArgumentResolutionException {
final MessageSystemAttribute annotation = AnnotationUtils
.findParameterAnnotation(methodParameter, MessageSystemAttribute.class)
.orElseThrow(() ->
new ArgumentResolutionException("Parameter passed in does not contain the MessageSystemAttribute annotation when it should")
);
final MessageSystemAttributeName messageSystemAttributeName = annotation.value();
final Optional<String> optionalAttributeValue = Optional.ofNullable(message.attributes().get(messageSystemAttributeName));
if (!optionalAttributeValue.isPresent()) {
if (annotation.required()) {
throw new ArgumentResolutionException("Missing system attribute with name: " + messageSystemAttributeName.toString());
}
return null;
}
final String attributeValue = optionalAttributeValue.get();
final Class<?> parameterType = methodParameter.getParameter().getType();
try {
if (parameterType == String.class) {
return attributeValue;
}
if (parameterType == Integer.class || parameterType == int.class) {
return Integer.parseInt(attributeValue);
}
if (parameterType == Long.class || parameterType == long.class) {
return Long.parseLong(attributeValue);
}
} catch (final RuntimeException exception) {
throw new ArgumentResolutionException("Error parsing message attribute: " + messageSystemAttributeName.toString(), exception);
}
if (messageSystemAttributeName == SENT_TIMESTAMP || messageSystemAttributeName == APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) {
return handleTimeStampAttributes(methodParameter.getParameter().getType(), messageSystemAttributeName, attributeValue);
}
throw new ArgumentResolutionException(
"Unsupported parameter type " + parameterType.getName() + " for system attribute " + messageSystemAttributeName.toString()
);
}
private Object handleTimeStampAttributes(
final Class<?> parameterType,
final MessageSystemAttributeName messageSystemAttributeName,
final String attributeValue
) {
if (parameterType == Instant.class) {
return Instant.ofEpochMilli(Long.parseLong(attributeValue));
}
if (parameterType == OffsetDateTime.class) {
return OffsetDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(attributeValue)), UTC);
}
throw new ArgumentResolutionException(
"Unsupported parameter type " + parameterType.getName() + " for system attribute " + messageSystemAttributeName.toString()
);
}
}