Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add annotation for id class mapping #1549

Open
markbanierink opened this issue Jul 27, 2020 · 8 comments · May be fixed by #1562
Open

Add annotation for id class mapping #1549

markbanierink opened this issue Jul 27, 2020 · 8 comments · May be fixed by #1562

Comments

@markbanierink
Copy link

Feature request

PROBLEM
Currently, mapping id's to classes is, for example, done through configuration beans. There a TypePrecedence, mapping and TypeMapper can be configured. For any additional message, you also have to add changes to your configuration.

SUGGESTED SOLUTION
It would be nice if setting this configuration could be simplified by annotating a class with, for example:

@KafkaMessage(typeId = "foo")
public class Bar {}

Upon setting this annotation, the TypePrecedence and a default TypeMapper could be set as well.

@garyrussell
Copy link
Contributor

Does the new Using Methods to Determine Types feature not meet your needs?

private static final JavaType barType = TypeFactory.defaultInstance()
				.constructType(Bar.class);

public static JavaType whichType(byte[] data, Headers headers) {
	Header header = headers.lastHeader("foo");
	if (header != null) {
		return this.barType;
	}
	else {
		return TypeFactory.defaultInstance().constructType(Object.class);
	}
}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.MyClass.returnType

@markbanierink
Copy link
Author

But that still requires you to define the mapping in a centralized place, right?

@markbanierink
Copy link
Author

markbanierink commented Jul 30, 2020

I have now created the following for our project, which allows you to just annotate a class with @KafkaMessage and a type id:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface KafkaMessage {

    String[] typeId() default "";

}
public class KafkaMessageAnnotationPostProcessor implements BeanPostProcessor {

    private static final String SCAN_PACKAGE_PROPERTY_KEY = "spring.kafka.message.package";
    private static final String DEFAULT_SCAN_PACKAGE = "*";

    private final Environment environment;

    public KafkaMessageAnnotationPostProcessor(Environment environment) {
        this.environment = environment;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (JsonMessageConverter.class.isAssignableFrom(bean.getClass())) {
            setTypeMapper((JsonMessageConverter)bean, getMapping());
        }
        return bean;
    }

    private Map<String, Class<?>> getMapping() {
        Map<String, Class<?>> mapping = new HashMap<>();
        for (BeanDefinition beanDefinition : findAnnotatedClasses(getScanPackage())) {
            addToMapping(mapping, getClass(beanDefinition));
        }
        return mapping;
    }

    private String getScanPackage() {
        return environment.getProperty(SCAN_PACKAGE_PROPERTY_KEY, DEFAULT_SCAN_PACKAGE);
    }

    private Set<BeanDefinition> findAnnotatedClasses(String scanPackage) {
        ClassPathScanningCandidateComponentProvider provider = createComponentProvider();
        return provider.findCandidateComponents(scanPackage);
    }

    private ClassPathScanningCandidateComponentProvider createComponentProvider() {
        ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false);
        provider.addIncludeFilter(new AnnotationTypeFilter(KafkaMessage.class));
        return provider;
    }

    private Class<?> getClass(BeanDefinition beanDefinition) {
        try {
            return Class.forName(beanDefinition.getBeanClassName());
        }
        catch (ClassNotFoundException e) {
            throw new IllegalStateException("Cannot find " + beanDefinition.getBeanClassName(), e);
        }
    }

    private void addToMapping(Map<String, Class<?>> mapping, Class<?> clazz) {
        for (String i : getTypeIds(clazz)) {
            mapping.put(i, clazz);
        }
    }

    private List<String> getTypeIds(Class<?> clazz) {
        KafkaMessage annotation = clazz.getAnnotation(KafkaMessage.class);
        // Use simple name as default
        return annotation.typeId().length > 0 ? Arrays.asList(annotation.typeId()) : Collections.singletonList(clazz.getSimpleName());
    }

    private void setTypeMapper(JsonMessageConverter converter, Map<String, Class<?>> mapping) {
        // Only set a mapping if an @KafkaMessage annotation is set on any class
        if (!mapping.isEmpty()) {
            Jackson2JavaTypeMapper typeMapper = converter.getTypeMapper();
            if (!AbstractJavaTypeMapper.class.isAssignableFrom(typeMapper.getClass())) {
                throw new IllegalStateException("TypeMapper must be of type AbstractJavaTypeMapper");
            }
            typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
            ((AbstractJavaTypeMapper)typeMapper).setIdClassMapping(mapping);
        }
    }

}

Maybe something like the above could be adopted by the spring-kafka project?

@garyrussell
Copy link
Contributor

Sure; will take a look.

@frosiere
Copy link
Contributor

frosiere commented Aug 17, 2023

Would be great to have such kind of support. Mapping and method resolutions are a bit intrusive compared to a single annotation. Any plans to have it in a coming version of Spring Kafka?

  1. An additional property may also be added to enforce the usage of that annotation. Otherwise, deserialization may suddenly start to fail in case of refactoring of the data model.
  2. Since Spring Kafka relies on Jackson, I wonder if the KafkaMessage annotation could be avoided by using the existing JsonTypeName annotation?

@garyrussell
Copy link
Contributor

I am not sure I really understand the requirement here.

The JsonMessageConverter will use the concrete type in the @KafkaListener method signature for its conversion. Type mapping is not needed.

Perhaps a real use case would help me understand what the requirement is.

@frosiere
Copy link
Contributor

Thanks for the reply. According to my understanding, the event type is propagated through the TypeId.
This type is then used to allow deserializing a Kafka event using Jackson.
As this type is the fully qualified name of the class, it's not resistant to any refactoring (moving a class or renaming a class).

So, instead, it would be great to have an identifier/name to uniquely identify the type.
A bit like the JsonTypeName annotation.

Maybe I missed something, but the main idea is to be agnostic of the real java type of the event.
To be agnostic, we may use type mappings but the proposal here looks better.

Let me know if you need more details.

@garyrussell
Copy link
Contributor

While it is true that the type id is set to the FQCN of the source class (by default), it can be changed to set a token via type mapping and, on the receiving side, the token can be mapped to the destination type.

However, by default, the type id header is ignored and the type is inferred from the method signature. See

/**
* Set the precedence for evaluating type information in message properties.
* When using {@code @KafkaListener} at the method level, the framework attempts
* to determine the target type for payload conversion from the method signature.
* If so, this type is provided by the {@code MessagingMessageListenerAdapter}.
* <p> By default, if the type is concrete (not abstract, not an interface), this will
* be used ahead of type information provided in the {@code __TypeId__} and
* associated headers provided by the sender.
* <p> If you wish to force the use of the {@code __TypeId__} and associated headers
* (such as when the actual type is a subclass of the method argument type),
* set the precedence to {@link Jackson2JavaTypeMapper.TypePrecedence#TYPE_ID}.
* @param typePrecedence the precedence.
* @since 2.2
*/
default void setTypePrecedence(TypePrecedence typePrecedence) {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants