Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor deserialization to be safer #82

Merged
merged 6 commits into from
Aug 7, 2023

Conversation

nextdude
Copy link
Collaborator

@nextdude nextdude commented Aug 5, 2023

Several changes to deserialization to make it more robust

  • ConfluentAvroRegistryKafkaRecordDeserializationSchema now respects the specific.avro.reader config. If you do not specify a setting, one will be selected based on whether the embedded avro record class is Generic or Specific. If you specify specific.avro.reader = false and the embedded avro record class is Specific, the schema will deserialize as a Generic record, then attempt to convert to a Specific record instance.
  • The utility method that converts generic records to specific instances (AvroUtils.toEmbeddedAvroInstance) is now wrapped in a Try, as it may fail.
  • Replace AvroUtils.toSpecific utility method implementation with call to SpecificData.deepCopy().
  • Processing of headers in the incoming kafka consumer record now takes account of the possibility of null header values (and ignores them) to avoid throwing a null pointer exception.
  • GlueAvroRegistryKafkaRecordDeserializationSchema has a smarter key deserializer that will fallback to Confluent if you've configured a confluent fallback (so that string keys are properly processed).

@nextdude nextdude requested a review from isterin August 5, 2023 21:58
@@ -104,6 +104,7 @@ object AvroUtils extends LazyLogging {
headers
)
)
}

implicit class RichGenericRecord(genericRecord: GenericRecord) {
Copy link
Collaborator

@isterin isterin Aug 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can replace the whole toSpecific method with this:

def toSpecific[A <: GenericRecord](instance: A): A = 
    SpecificData.get().deepCopy(instance.getSchema, genericRecord).asInstanceOf[A]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it briefly and it seems to work

Copy link
Collaborator

@isterin isterin Aug 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to work meaning, the way it's working now. I do have an issue though with logical type conversions. Still trying to figure it out. It's not an issue with your changes. I think it's an avrohugger limitation and/or something else I'm missing. I'll explain in slack.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's brilliant! Good find.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this can fail, we should wrap it in a Try, no?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

isterin
isterin previously approved these changes Aug 7, 2023
Copy link
Collaborator

@isterin isterin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@nextdude nextdude force-pushed the refactor-deserialization-again branch from 6c16b49 to 246638b Compare August 7, 2023 13:39
@nextdude nextdude merged commit a4d9c8b into main Aug 7, 2023
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants