Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6685: Added Exception to distinguish message Key from Value during deserializing. #4765

Conversation

jadireddi
Copy link
Contributor

@jadireddi jadireddi commented Mar 23, 2018

https://issues.apache.org/jira/browse/KAFKA-6685

Added Exception message in WorkerSinkTask.convertMessages to distinguish message Key from Value during deserialization to Kafka connect format.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@jadireddi
Copy link
Contributor Author

retest this please

@jadireddi
Copy link
Contributor Author

Hi @rhauch,
could you please help in assigning Reviewers to review this ticket. I missed this during Pull request creation.

Copy link
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jadireddi thanks for creating this PR and jumping in to add the better exception logic. I do have a suggestion for making this more readable and concise.

log.error("Failed to convert message Value to Kafka Connect format", e);
}
throw new ConnectException("Exiting WorkerSinkTask due to unconverted message to Kafka Connect format exception", e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps the following would be a bit more straightforward without the boolean flag while providing more meaningful content in the error message?

        final SchemaAndValue keyAndSchema;
        final SchemaAndValue valueAndSchema;
        try {
            keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
        } catch (Throwable e) {
            String str = String.format("Error converting message key in topic '%s' partition %d at offset %d and timestamp %d",
                                       msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
            throw new ConnectException(str, e);
        }
        try {
            valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
        } catch (Throwable e) {
            String str = String.format("Error converting message value in topic '%s' partition %d at offset %d and timestamp %d",
                                       msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
            throw new ConnectException(str, e);
        }

Also, note that we don't want to log the exception here since exceptions will ultimately be caught and logged in WorkerTask.doRun(). Logging it here is unnecessary.

And actually, since there's a fair amount of duplicate logic, a better way of reducing that logic is to extract it into a method:

        SchemaAndValue keyAndSchema = toConnectData(keyConverter, "key", msg, msg.key());
        SchemaAndValue valueAndSchema = toConnectData(valueConverter, "value", msg, msg.value());

with the method:

private SchemaAndValue toConnectData(Converter converter, String converterName, ConsumerRecord<byte[], byte[]> msg, byte[] data) {
    try {
        return converter.toConnectData(msg.topic(), data);
    } catch (Throwable e) {
        String str = String.format("Error converting message %s in topic '%s' partition %d at offset %d and timestamp %d",
                                   converterName, msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
        throw new ConnectException(str, e);
    }
}

This is even more concise and readable. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @rhauch for your time to review and provide valuable suggestions. Code looks really concise and readable. Making changes.

@jadireddi
Copy link
Contributor Author

Hi @rhauch , Incorporated review comments. Kindly review them.

@jadireddi
Copy link
Contributor Author

@rhauch, Can you please review changes and let me know if anything needed.

@jadireddi
Copy link
Contributor Author

retest this please

Copy link
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jadireddi one more suggestion, but otherwise looks good.

final SchemaAndValue valueAndSchema;

keyAndSchema = toConnectData(keyConverter, "key", msg, msg.key());
valueAndSchema = toConnectData(valueConverter, "value", msg, msg.value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just declaring and initializing each variable in one statement?

SchemaAndValue keyAndSchema = toConnectData(keyConverter, "key", msg, msg.key());
SchemaAndValue valueAndSchema = toConnectData(valueConverter, "value", msg, msg.value());

Also, in this case I'm not sure final adds much if any value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rhauch, Fixed review comments. It's true final doesn't add much value in this case..Removed it. Thank you for the review comments..

Copy link
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @ewencp, would you mind reviewing and merging if all looks good?

Thanks!

Copy link
Contributor

@ewencp ewencp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the contribution to debuggability!

@ewencp ewencp closed this in 95b46a1 May 22, 2018
@jadireddi jadireddi deleted the KAFKA-6685---log-message-should-distinguish-key-from-value branch June 11, 2018 22:01
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
…ing deserializing.

https://issues.apache.org/jira/browse/KAFKA-6685

Added Exception message in `WorkerSinkTask.convertMessages` to distinguish message Key from Value during deserialization to Kafka connect format.

*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*

*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*

Author: Jagadesh Adireddi <adireddijagadesh@gmail.com>

Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#4765 from jadireddi/KAFKA-6685---log-message-should-distinguish-key-from-value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants