Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MediaMessage annotation support. #2058

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,48 @@
/*
* Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import org.springframework.util.MimeTypeUtils;

import java.lang.annotation.*;
scruel marked this conversation as resolved.
Show resolved Hide resolved

/**
* Used to define the content type of {@code Message} with {@code MediaMessageResolver}.
*
* @author Scruel Tao
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
public @interface ResolvableType {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a org.springframework.core.ResolvableType.
So, this one is going to confuse.
Plus it doesn't look like we really resolve a type, but talk more about a content-type.

I'm not fully sure that I follow with the goal of this feature, but probably better to have it as an @ExpectedContentType since there are many ContentType classes around 😸 .
I would say this is similar to consumes on the @RequestMapping, but you really have here a fallback to the expected if no one provided in the record.
Anyway I'd like to understand better what is going on.
Looks like you want to have a common composite converter and choose an appropriate one according the content-type header or fallback.
How about have an attribute on the @KafkaListener instead? Kinda fallbackContentType?
Populate it to the message (if needed) and still rely on that composite ability to choose an expected converter.

Just thinking aloud...

Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Artem, for your perspective; I agree coercing the content type in the listener adapter via a @KafkaListener property would be better than polluting the method signature.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Artem, for your perspective; I agree coercing the content type in the listener adapter via a @KafkaListener property would be better than polluting the method signature.

In spring, annotation on parameters is very common, I think it can not treat as polluting the method signature, for example @RequestBody.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we already have enough param annotations to pollute enough.
And your idea makes me think that this kind of fallbackContentType should go to the existing @Payload annotation which we rely in Spring Kafka as well.
But again: your current goal could be achieved with an explicit converter for the particular @KakfaListener. The rest I believe is already covered by the composition if contentType header is present.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, @Payload annotation is in org.springframework.messaging package but not this project, I treat the @MediaMessage as an enhanced annotation for spring-kafka.
I know it was covered if contentType header is present, but sometimes, you can not control the producers for which message they will send.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks; it needs some docs; probably in this section https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka-docs/src/main/asciidoc/kafka.adoc#kafkalistener-annotation

Also, maybe a "how to use" here https://docs.spring.io/spring-kafka/docs/current/reference/html/#tips-n-tricks

I will fulfill the docs after you make your decision.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Artem, for your perspective; I agree coercing the content type in the listener adapter via a @KafkaListener property would be better than polluting the method signature.

Would you need me also to implement this idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, and why we should implement such a feature only in Spring for Apache Kafka?
Why other projects cannot benefit from the feature? For example Spring AMQP, Spring Cloud AWS. Spring Integration at all, at last. And so on. The same @MessageMapping for WebScokets and RSockets.
That's why I'm talking about that @Payload as a general place for such a fallback option.
But again: I believe even fallback option can be implemented right now with just a composition with desired converter in the end of chain.
I even think something like this is resent in Spring Cloud Stream with its JSON converter as a fallback.

I really against new annotations because end-user needs to know what to look for. With just a new attribute on the well-known @KafkaListener he/she gets a clue what is possible.
However independently of the end-user hook for the method, they still need to support an infrastructure for converter composition and so on.

I guess we need to think more what and how could be done and where.
Right now I'm not fully convinced in a new annotation, but I'm opened for arguments anyway.

Thanks for understanding

Copy link
Author

@scruel scruel Jan 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I agree that the other projects have the rights to benefit from this.

they still need to support an infrastructure for converter composition and so on.

I do want to add this in default configuration, but maybe we should consider how to implement this first, then consider should or not should to add this support feature as a default.
I will wait for your decision, feel free to comment your ideas.

/**
* The string value of the content type of message, which can support parse,
* Default application/json.
*
* @return the string value of mime type.
* @see org.springframework.util.MimeType
*/
String type() default MimeTypeUtils.APPLICATION_JSON_VALUE;

/**
* Regardless of whether there is a type header in the {@code Message}, the parse
* progress will set the header of {@code Message} by this annotation {@link #type()}.
*
* @return whether to force parse with the annotation {@link #type()}
*/
boolean force() default false;
}
@@ -0,0 +1,65 @@
/*
* Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support.resolver;

import org.springframework.core.MethodParameter;
import org.springframework.kafka.annotation.ResolvableType;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
import org.springframework.messaging.support.MessageBuilder;

/**
* A resolver to extract and convert the payload of a message using a MessageConverter.
* This HandlerMethodArgumentResolver only work with MediaMessage annotation now.
*
* @author Scruel Tao
*/
public class ResolvableTypeResolver extends PayloadMethodArgumentResolver {
/**
* Create a new {@code MediaMessageResolver} with the given
* {@link MessageConverter}.
*
* @param messageConverter the MessageConverter to use (required)
*/
public ResolvableTypeResolver(MessageConverter messageConverter) {
super(messageConverter);
}

@Override
public boolean supportsParameter(MethodParameter parameter) {
return parameter.hasParameterAnnotation(ResolvableType.class);
}

@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
ResolvableType ann = parameter.getParameterAnnotation(ResolvableType.class);
if (null == ann) {
throw new IllegalStateException("Annotation parsing failed.");
}
// If not present, parse content type from annotation
Object type = message.getHeaders().get(MessageHeaders.CONTENT_TYPE);
if (ann.force() || type == null) {
message = MessageBuilder.fromMessage(message)
.setHeader(MessageHeaders.CONTENT_TYPE, ann.type())
.build();
}
return super.resolveArgument(parameter, message);
}

}
@@ -0,0 +1,165 @@
/*
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support.resolver;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.annotation.ResolvableType;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.*;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.messaging.support.GenericMessage;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* @author Scruel Tao
*/
public class ResolvableTypeResolverTests {
private static final String TEXT = "kafka";

@Test
void jsonFoo() throws Exception {
InvocableHandlerMethod method = getFooInvocableHandlerMethod();
String s = new ObjectMapper().writeValueAsString(new Foo().setText(TEXT));
method.invoke(new GenericMessage<>(s));
}

@Test
void bytesToFoo() throws Exception {
InvocableHandlerMethod method = getFooInvocableHandlerMethod();
String s = new ObjectMapper().writeValueAsString(new Foo().setText(TEXT));
method.invoke(new GenericMessage<>(s.getBytes(StandardCharsets.UTF_8)));
}

public void onMessageWithFoo(@ResolvableType Foo payload) {
assertThat(payload.getText()).isEqualTo(TEXT);
}

@Test
void stringToFooForceJson() throws Exception {
final InvocableHandlerMethod method = getFooInvocableHandlerMethod();
final String s = new ObjectMapper().writeValueAsString(new Foo().setText(TEXT));
final Map<String, Object> contentType = Map.of(MessageHeaders.CONTENT_TYPE, "application/xml");
// No converters can process xml media type, so will cause MessageConversionException.
assertThatThrownBy(() ->
method.invoke(new GenericMessage<>(s, contentType)))
.isExactlyInstanceOf(MessageConversionException.class);
// Ignore the header, and force using own converter.
InvocableHandlerMethod forceMethod = getForceJsonFooInvocableHandlerMethod();
forceMethod.invoke(new GenericMessage<>(s, contentType));
}

public void onForceJsonMessageWithFoo(@ResolvableType(force = true) Foo payload) {
assertThat(payload.getText()).isEqualTo(TEXT);
}

@NotNull
private InvocableHandlerMethod getFooInvocableHandlerMethod() throws NoSuchMethodException {
MessageHandlerMethodFactory factory = getMessageHandlerMethodFactory();
return factory.createInvocableHandlerMethod(this, getClass().getDeclaredMethod(
"onMessageWithFoo", Foo.class));
}

@NotNull
private InvocableHandlerMethod getForceJsonFooInvocableHandlerMethod() throws NoSuchMethodException {
MessageHandlerMethodFactory factory = getMessageHandlerMethodFactory();
return factory.createInvocableHandlerMethod(this, getClass().getDeclaredMethod(
"onForceJsonMessageWithFoo", Foo.class));
}

@SuppressWarnings("rawtypes")
private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
KafkaListenerAnnotationBeanPostProcessor bpp = new KafkaListenerAnnotationBeanPostProcessor<>();
beanPostProcessorConfig(bpp);
return bpp.getMessageHandlerMethodFactory();
}

@SuppressWarnings("rawtypes")
private void beanPostProcessorConfig(KafkaListenerAnnotationBeanPostProcessor processor) {
processor.setBeanFactory(new KafkaBeanFactory());
try {
processor.afterSingletonsInstantiated();
} catch (NoSuchBeanDefinitionException ignore) {
}
}

@SuppressWarnings("unchecked")
static class KafkaBeanFactory extends DefaultListableBeanFactory {
scruel marked this conversation as resolved.
Show resolved Hide resolved
@Override
public <T> Map<String, T> getBeansOfType(Class<T> type) throws BeansException {
return new HashMap<>() {
{
if (KafkaListenerConfigurer.class.equals(type)) {
put("bean", (T) new MediaMessageConfig());
}
}
};
}
}

static class MediaMessageConfig implements KafkaListenerConfigurer {
scruel marked this conversation as resolved.
Show resolved Hide resolved
public CompositeMessageConverter createMessageConverter() {
List<MessageConverter> converters = new ArrayList<>();
converters.add(new StringMessageConverter());
converters.add(new ByteArrayMessageConverter());
converters.add(createJacksonConverter());
return new CompositeMessageConverter(converters);
}

protected MappingJackson2MessageConverter createJacksonConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setContentTypeResolver(new DefaultContentTypeResolver());
scruel marked this conversation as resolved.
Show resolved Hide resolved
return converter;
}

@Override
public void configureKafkaListeners(@JsonDeserialize KafkaListenerEndpointRegistrar registrar) {
scruel marked this conversation as resolved.
Show resolved Hide resolved
registrar.setCustomMethodArgumentResolvers(new ResolvableTypeResolver(createMessageConverter()));
}
}

static class Foo {
private String text;

public String getText() {
return text;
}

public Foo setText(String text) {
this.text = text;
return this;
}
}

}