Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds fallback mechanism when Kafka message doesn't have headers #1262

Closed

Conversation

Estebangpp
Copy link

@Estebangpp Estebangpp commented Oct 2, 2021

This PR relates to this issue
This PR contains an implementation for a fallback mechanism in cases where a Kafka message doesn’t have a header specifying the partition key for kinesis streams, this PR includes 2 new test for the added logic.

Copy link
Contributor

@oscerd oscerd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is conceptually incorrect. You should modify the core just for modifying the behavior of a single connector. Maybe think about an SMT for this purpose. I don't think we can accept this PR

@AlejandroME
Copy link

AlejandroME commented Oct 4, 2021

Hello! 👋🏻

I've seen this PR and I wanted to chime in, because I refrained to use this connector in the past, knowing that this very same issue happened to me not too long ago.
@oscerd why is the OP's proposal "conceptually incorrect"? Technically speaking, the semantics of Kinesis requiring a mandatory partition key is a concern belonging to this connector only.
Given that the current connector's design requires you to, either provide this key through a Kafka message header (which is a rarely used feature in most cases) or falling back to the exchange id (which doesn't work, hence this PR), a change in the connector's code makes sense, doesn't it?

Also, why should an SMT solve what is, by definition, a bug in the connector's basic behavior?

I ask these questions towards understanding how to proceed here. I'm very interested in seeing this fix in the connector, given that, in my particular case, we decided to go with AWS's "unofficial" connector, which is not stable.

Thank you very much 🙇🏻

@orpiske
Copy link
Contributor

orpiske commented Oct 4, 2021

I understand the motivation for the fix, but I think that adding a fix that is somewhat specific to Kinesis in the core code is not the right way. Otherwise, the header CamelAwsKinesisPartitionKey as added on the PR would leak for other connectors that are not related to AWS Kinesis in any way.

The proposed solution to use the SMT transformation is more adequate precisely because it can be implemented at the connector level. For instance, if you have an transformation such as:

@Override
    public R apply(R r) {
        Object value = r.value();

        if (value instanceof MyTypeMessage) {

            // do any other specific transformation that you need and create the record using API that allows it to include extra headers
            return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), buildSchemaBuilderForType(// ... code));

        } else {
            LOG.debug("Unexpected message type: {}", value == null ? "null instance" : value.getClass());

            return r;
        }
    }

I think that if you create a SMT that uses this connect record API to include the header as you need, it would do the trick.

@oscerd
Copy link
Contributor

oscerd commented Oct 4, 2021

This is specific to Kinesis and the fix is in the core. It's conceptually wrong.

Also, this is not a bug because all of these connectors are autogenerated, so the behavior cannot be easily customized. This is reason why I'm suggesting the smt

@AlejandroME
Copy link

AlejandroME commented Oct 4, 2021

@orpiske and @oscerd I understood your point. Thank you for the clarification, much appreciated 🙇🏻

@oscerd one last question: when you say:

Also, this is not a bug because all of these connectors are autogenerated, so the behavior cannot be easily customized

You mean that "targeting" a fix (if there were one) against the connector's codebase (instead of the core one) won't work as well, because that code will always be autogenerated?

In that case, the SMT solution would be the best bet then.

@oscerd
Copy link
Contributor

oscerd commented Oct 5, 2021

@orpiske and @oscerd I understood your point. Thank you for the clarification, much appreciated 🙇🏻

@oscerd one last question: when you say:

Also, this is not a bug because all of these connectors are autogenerated, so the behavior cannot be easily customized

You mean that "targeting" a fix (if there were one) against the connector's codebase (instead of the core one) won't work as well, because that code will always be autogenerated?

Yes, the connector code will be autogenerated and the only solution to add stuff will be to avoid the autogeneration and leave the connector as-is, but this means we'll lose the updates from main Camel project when we update to a newer version.

In that case, the SMT solution would be the best bet then.

@valdar
Copy link
Member

valdar commented Oct 20, 2021

@Estebangpp @AlejandroME , I am closing this one. If for any reason you need more clarifications/help on the topic feel free to reopen it or open a specific issue.

Tnx!

@valdar valdar closed this Oct 20, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants