Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does CloudEventDeserializer deserialize custom extensions (kafka headers)? #587

Closed
alfonz19 opened this issue Sep 13, 2023 · 3 comments
Closed

Comments

@alfonz19
Copy link

I have cloudevent instance build using io.cloudevents.core.v1.CloudEventBuilder builder and I gave it custom extension using method builder.withExtension("whatever", "whatever");. Cloudevent is built, sent. I can see, that in kafka, there is respective message, and it has header "whatever" having value "whatever". So far so good.

But if I deserialize given message into io.cloudevents.CloudEvent instance, the header isn't 'there'. I'd expect it in io.cloudevents.CloudEventExtensions#getExtensionNames resp. io.cloudevents.CloudEventExtensions#getExtension but it's not there or anywhere else.

Looking into io.cloudevents.kafka.CloudEventDeserializer we can dig deep and reach io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl#read, which is shown below.

Debugging this method, we will find here our header, key==value=="whatever". So value is not null, we continue, it's not "content-type" header so we proceed to else branch, and it does not have "ce_" prefix (isCloudEventsHeader), and we're done. Custom extension ignored.

This is expected to happen? Am I overlooking smth? I'd expect, that if I am able to create Cloudevent with custom extension, I'd like to read it as well. Is there some special configuration for that or anything I forgot to enable?

    @Override
    public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
        CloudEventWriter<V> visitor = writerFactory.create(this.version);

        // Grab from headers the attributes and extensions
        // This implementation avoids to use visitAttributes and visitExtensions
        // in order to complete the visit in one loop
        this.forEachHeader((key, value) -> {
            if (value == null) {
                return;
            }
            if (isContentTypeHeader(key)) {
                visitor.withContextAttribute(CloudEventV1.DATACONTENTTYPE, toCloudEventsValue(value));
            } else if (isCloudEventsHeader(key)) {
                String name = toCloudEventsKey(key);
                if (name.equals(CloudEventV1.SPECVERSION)) {
                    return;
                }
                visitor.withContextAttribute(name, toCloudEventsValue(value));
            }
        });

(also posted here: https://stackoverflow.com/questions/77099281/does-cloudeventdeserializer-deserialize-custom-extensions-kafka-headers)

@pierDipi
Copy link
Member

pierDipi commented Sep 14, 2023

Hi @alfonz19, thanks for reporting, in Kafka, when sending the event, you should see a header ce_whatever having value whatever as opposed to header whatever having value whatever.

This is done by serializer in

public BaseKafkaMessageWriterImpl<R> withContextAttribute(String name, String value) throws CloudEventRWException {
String headerName = KafkaHeaders.ATTRIBUTES_TO_HEADERS.get(name);
if (headerName == null) {
headerName = KafkaHeaders.CE_PREFIX + name;
}
headers.add(new RecordHeader(headerName, value.getBytes(StandardCharsets.UTF_8)));
return this;
}
when calling this
return new KafkaSerializerMessageWriterImpl(headers)
.writeBinary(data);
}

@pierDipi
Copy link
Member

how are you writing records to Kafka? Can you share a small reproducer?

@alfonz19
Copy link
Author

Sorry for delay, I had to verify it.
Thank you for your help, you are correct. In our project we have multiple producers and the one producing this specific record was kafka-connect, which was misconfigured. I misread specification, that even custom extensions has to have "ce_" prefix. I thought this rule holds only for cloudevents-specific, known headers. So I fixed kafka-connect configuration and sure enough, everything is fine.

Thanks again for your help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants