New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-9384] Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types #10974
[BEAM-9384] Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types #10974
Conversation
61f1c49
to
afe344b
Compare
afe344b
to
e6e4a11
Compare
getSchema(typeDescriptor), | ||
typeDescriptor, | ||
getToRowFunction(typeDescriptor), | ||
getFromRowFunction(typeDescriptor)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks fine, but can you explain the use case? The goal was for users to never have to deal with SchemaCoder (the fact that schemas are implemented via a special coder should be a Beam implementation details), but I'd understand if we have cases where the coder is still needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #10978 for a concrete case. PTransform authors may benefit of being able to infer a SchemaCoder
for a given type from the SchemaRegistry
.
We can even add that too to PubsubIO to read JavaBeans and be able to query them with Beam's SQL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you in the fact that SchemaCoder
is an internal detail that regular users (authors of Pipelines) should not care about. I felt tempted to mark this method as @Internal
however PTransform authors (e.g. IO authors) will find this useful (as I did for the PR I mention above for KafkaIO schema support), so probably worth to let it available, also I cannot think of a better place to put this method than here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's useful when integrating schema code with code that does not yet understand schemas.
In the KafkaIO example I think that the ideal solution would be to allow a Schema on KafkaRecord (this probably requires us to add Java generic type awareness to schema inference though), in which case they keyCoder and valueCoder isn't needed. I agree that allowing easy inference of SchemaCoder allows for lower-effort integration of schemas in code like this, though hopefully this is just a short-term solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. I have not thought about making KafkaRecord 'schema' like good point. There are some consequences on that that are still not clear to me (like how will we deal with the runtime resolution part of Schemas for KV that we do now with the Confluent Schema Registry support. I am going to give it a try and ping you once I have something in the other PR #10978. Let's continue that discussion there.
Run seed job |
retest this please |
I thought retesting will clear the nonexistent job :( |
e6e4a11
to
17d8c10
Compare
@mwalenia I just rebased and push forced to see the unavailable Java11 job dissapear, thanks for the tip! |
… registered types
17d8c10
to
612d3d1
Compare
@@ -59,7 +60,7 @@ | |||
@Experimental(Kind.SCHEMAS) | |||
public class SchemaCoder<T> extends CustomCoder<T> { | |||
// This contains a map of primitive types to their coders. | |||
public static final ImmutableMap<TypeName, Coder> CODER_MAP = | |||
public static final Map<TypeName, Coder> CODER_MAP = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is required because we should not be exposing Guava (even our own vendored version) in the public API and it was surfaced by a GcpCoreApiSurfaceTest
break
Finally the tests are green. Can you PTAL @reuvenlax . |
lgtm |
R: @aromanenko-dev @reuvenlax