[BEAM-3157] Generate BeamRecord types from Pojos#4204
Conversation
|
R: @kennknowles @reuvenlax @iemejia |
iemejia
left a comment
There was a problem hiding this comment.
I just took a quick look and let some remarks but I feel a bit confused, I expected a simple function that from a POJO extracted the schema (aka BeamRecordType) + the machinery to ensure the order of the correct correspondance of types/coders, names, values (lists) to build a BeamRecord from it, but this seems way farther than this. So I am not sure if I am missing something or if this is more complex than I expected (or than it should be). I feel particularly uneasy about all the complexity of those injected FieldValueGetters when the object values have the same information than the Pojo.
| /** | ||
| * An interface to access a field of a class. | ||
| * | ||
| * <p>Implementations of this interface are generated at rutime by {@link RecordFactory} |
| import org.junit.Test; | ||
|
|
||
| /** | ||
| * Unit tests for {@link RecordFactoryTest}. |
There was a problem hiding this comment.
s/RecordFactoryTest/RecordFactory
| */ | ||
| public class DefaultRecordTypeFactory implements RecordTypeFactory { | ||
|
|
||
| private static final ImmutableMap<Class, Coder> JAVA_TYPES_CODERS = ImmutableMap |
There was a problem hiding this comment.
Wonder if this could come somehow come from CoderRegistry#getCoder(java.lang.Class) to avoid the repeated mapping ?
There was a problem hiding this comment.
+1 on using the registry to figure this out.
| * Takes field names and types from the getters, maps them to coders, returns | ||
| * an instance of the {@link BeamRecordType}. | ||
| * | ||
| * <p>Coders not for all field types are implemented. |
There was a problem hiding this comment.
This line is hard to read, maybe remove.
| * | ||
| * <p>Generated record types are cached in the instance of this factory. | ||
| * | ||
| * <p>Currently only 1-1 mapping is supported between pojo classes and record types. |
There was a problem hiding this comment.
Maybe add that only 'primitive' types are supported because it does not support more complex types, Lists/Sets/Maps, etc.
| * | ||
| * <p>Currently only 1-1 mapping is supported between pojo classes and record types. | ||
| */ | ||
| public class RecordFactory { |
There was a problem hiding this comment.
BeamRecordFactory ? Probably to remind us in the future to rename this when we move BeamRecord -> Row (or Record).
| * <p>Field names for getters are stripped of the 'get' prefix. | ||
| * For example record field 'name' will be generated for 'getName()' pojo method. | ||
| */ | ||
| public BeamRecord newRecordCopyOf(Object pojo) { |
There was a problem hiding this comment.
I wanted to make clear it's a copy, not a pass-through wrapper backed by the original pojo
| * For example record field 'name' will be generated for 'getName()' pojo method. | ||
| */ | ||
| public BeamRecord newRecordCopyOf(Object pojo) { | ||
| RecordTypeGetters getters = getRowType(pojo.getClass()); |
There was a problem hiding this comment.
We should probably not mix the terms Record and Row (we should stick to one and for the moment is Record), but in the discussion on schema-aware PCollection the future seems to be Row.
There was a problem hiding this comment.
Ah, yes, these are typos, will double check to use Record everywhere. I don't have a strong opinion whether to use Row or Record, so will continue to use Record for now until there's a decision to rename to Rows.
| * See {@link ByteBuddyUtils#makeNewGetterInstance(String, DynamicType.Builder)} | ||
| * and ByteBuddy documentation for details. | ||
| */ | ||
| public class DirectAccessGetterFactory { |
There was a problem hiding this comment.
This asumes a different contract of the Pojo contract I am not 100% that this is a good idea, maybe we should stick to the basic case (what is exposed via public getters, or is there a strong reason to do this ?.
There was a problem hiding this comment.
Main reason is that current Nexmark model has data objects only with public fields. It's relatively easy to convert them to getters but I didn't see any harm in supporting this as well.
There was a problem hiding this comment.
I see, in this case we will change Nexmark to have public getters, this implementation should stick to the POJO 'semantics' and avoid to move from it (to avoid possible issues).
| * <p>Implementations of this interface are generated at rutime by {@link RecordFactory} | ||
| * to map pojo fields to BeamRecord fields. | ||
| */ | ||
| public interface FieldValueGetter<T> { |
There was a problem hiding this comment.
I am a bit puzzled by this Interface, is this really needed ? I mean with the Pojo object we have all that is needed, we have the name/type by reflection and the value just by invoking the method.
There was a problem hiding this comment.
If you use reflection, then you probably don't need this specific interface. But I don't think that getting rid of it, or switching to reflection will reduce complexity a lot without sacrificing something.
This is how I understand it:
Reflection or other implementation details aside, there are 2 problems:
- map a pojo class to a record type;
- given a pojo instance, create a record from it, populating the fields in the order they are specified in the record type;
Creating a record type is straightforward, you need to map pojo fields/getters to record field names and coders. You can do it either by convention or using an explicit spec. Record types can be either cached for pojo classes, or re-created for every pojo instance. Overall this feels straightforward.
Second problem creating a record from a pojo conceptually looks like this:
- look up or create the record type for the pojo class;
- get the fields in correct order (same as in record type);
- for each field invoke the corresponding getter on the pojo;
This means that now we need a mechanism to map record field types to pojo getters (here I assume that even with reflection we will still need to cache Methods).
Given the above you seem to need few abstractions no matter whether you use reflection or not:
- main component to create records based on pojos which will:
- get the record type for a pojo class;
- while we are creating a record type we know which fields order we're using and which getters correspond to those, so we are also storing a mapping of ordered list of getters to corresponding record type;
- invoke pojo getters in correct order corresponding to record type;
- component to create record types, which will:
So I feel like most of the components in this PR will be needed in some way or another, and the main question is how you map getters to fields and invoke them.
Using reflection:
- store each getter in a
List<Method>; - invoke a getter with reflection;
Using code generation:
- generate a class to represent a
Methodcall; - invoke getter the class directly;
There are pros and cons for both. How I see it:
Reflection:
- simpler code because you don't have the code generation logic and custom class to represent the getter method;
- the implementation is java runtime specific on multiple levels;
- feels like it might require more serialization code if we want to pass the schema information around;
- easy to shoot yourself in the foot if you start doing more reflection than needed;
Code generation:
- more code;
- potentially can be optimized further by JIT;
- doesn't have to be at run-time;
- similar interfaces / mechanisms can be implemented in other languages for schema support;
There was a problem hiding this comment.
Thanks for specifying this, I understand now better the motivation.sorry I forgot to send some points after checking this, I think we agree in the core ideas/goal, my only complain is more API based. Some notes inline:
Reflection or other implementation details aside, there are 2 problems:
map a pojo class to a record type; given a pojo instance, create a record from it, populating the fields in the order they are specified in the record type;
Sure !
You can quickly achieve this by mapping the POJO class into a BeamRecord type. This is just getting the public get* methods and create a LinkedHashMap<String, Coder> where the key is the name of the field and the coder is the given coder for the type.
Caching this mapping seems like an excellent idea because this is a time consuming step. So it should be cached in some sort of Map<Class, BeamRecordType> where the Key is the Pojo Class.
Second problem creating a record from a pojo conceptually looks like this:
look up or create the record type for the pojo class; get the fields in correct order (same as in record type); for each field invoke the corresponding getter on the pojo;This means that now we need a mechanism to map record field types to pojo getters (here I assume that even with reflection we will still need to cache Methods).
Given the above you seem to need few abstractions no matter whether you use reflection or not:
I disagree, you don’t really need extra abstractions for this if you do it by reflection, but more important that needing or not these abstractions is the fact of exposing them in the API. You can do this by simply having a method with a signature like this:
List<Object> getValues(Object myPojo, List<String> attributes);
and then with that List of values and the BeamRecordType you can build the BeamRecord.
The way you implement this method to make it faster is another story and it is great if we can add some tricks to improve this, but we should try to avoid the optimization from leaking into the API. We are doing this in sdks/java/core, trying not to leak abstractions at this level is important because if we don’t take care here they will be used everywhere.
There was a problem hiding this comment.
I am not sure I understand your point about exposing the abstractions in the API. The only thing I am planning to expose is BeamRecordFactory.create(pojo). This currently creates the BeamRecords using all public getters from the pojo instances. This PR scope is to only implement this.
Probably eventually there will be another parameter added to specify a schema. Whether it's backed by reflection or code generation should not be leaked at this point. Actual schema support or even getting the BeamRecord for a subset of the getters of a pojo is out of the scope here.
Imagine that it is plugged into SQL to convert from pojos to records somewhat like this:
class PojoToRecord extends DoFn<Pojo, BeamRecord>() {
@ProcessElement
public void processElement(Context c) {
c.output(BeamRecordFactory.instance().create(c.element()));
}
List<Object> getValues(Object myPojo, List<String> attributes);
Yes, I agree. Something like this is needed. But to actually create a record you also need to make sure that the attributes here and the BeamRecordType have the same attributes in the same order.
To achieve that, it would make sense to store that order somewhere, like Map<Class, List<String>>, and construct it at the same time you create the BeamRecordType because you're analyzing getters anyway. This way the list of attributes is available in the map when you are creating the beam records later. Then you use this list of attributes to get the values by calling getValues(myPojo, attributes).
I agree it's all pretty straightforward here, but I argue that it's roughly what's happening here anyway, minus the actual code generation bit.
For example, consider following:
- you can combine the values from
Map<Class, BeamRecordType>andMap<Class, List<String>>and put them in the utility class:RecordTypeGetters { beamRecordType, attributes }, and have a single mapMap<Class, RecordTypeGetters>. - when calling
getValues(myPojo, attributes)the most expensive part is actually looking up the methods on the class, so I would cache them:- instead of storing the attribute names, just store the getter methods in a
List<Method>; - so now you can change
getValues(Object myPojo, List<String> attributes)togetValues(Object myPojo, List<Method> getters); - now something like this is already in
BeamRecordFactory:List<Object> getFieldValues(List<FieldValueGetter> fieldValueGetters, Object pojo);
- instead of storing the attribute names, just store the getter methods in a
- you still need to wrap everything into some utility/factory class:
- getting the getters from pojo;
- mapping getter return types to coders for sql and non-sql versions;
- caching;
- getting the values from pojo;
- now if you replace super low-level
Methodwith our own classFieldValueGetteryou have my implementation here. The only thing different is whether we getFieldValueGetter/Methodby reflection or by codegen;
d2e4a41 to
4658d10
Compare
| * | ||
| * <p>Currently only 1-1 mapping is supported between pojo classes and record types. | ||
| */ | ||
| public class RecordFactory { |
There was a problem hiding this comment.
iemejia wrote:
BeamRecordFactory ? Probably to remind us in the future to rename this when we move BeamRecord -> Row (or Record).
Sure, renamed this one as it is the main interface. Kept other classes without Beam prefix for now. Let me know if you think other ones would be also better with Beam
| */ | ||
| public class DefaultRecordTypeFactory implements RecordTypeFactory { | ||
|
|
||
| private static final ImmutableMap<Class, Coder> JAVA_TYPES_CODERS = ImmutableMap |
There was a problem hiding this comment.
lukecwik wrote:
+1 on using the registry to figure this out.
Changed to use CoderRegistry.createDefault(). Will update this to get the registry from the pipeline as soon as wire it up (future PRs).
| * Takes field names and types from the getters, maps them to coders, returns | ||
| * an instance of the {@link BeamRecordType}. | ||
| * | ||
| * <p>Coders not for all field types are implemented. |
There was a problem hiding this comment.
iemejia wrote:
This line is hard to read, maybe remove.
Updated
| /** | ||
| * An interface to access a field of a class. | ||
| * | ||
| * <p>Implementations of this interface are generated at rutime by {@link RecordFactory} |
There was a problem hiding this comment.
iemejia wrote:
s/rutime/runtime
Fixed
| import org.junit.Test; | ||
|
|
||
| /** | ||
| * Unit tests for {@link RecordFactoryTest}. |
There was a problem hiding this comment.
iemejia wrote:
s/RecordFactoryTest/RecordFactory
Fixed
| * | ||
| * <p>Generated record types are cached in the instance of this factory. | ||
| * | ||
| * <p>Currently only 1-1 mapping is supported between pojo classes and record types. |
There was a problem hiding this comment.
iemejia wrote:
Maybe add that only 'primitive' types are supported because it does not support more complex types, Lists/Sets/Maps, etc.
This class doesn't explicitly limit what types are supported. It uses RecordTypeFactory to create record types. After updating DefaultRecordTypeFactory to use CoderRegistry it should allow using a lot more types, e.g. byte arrays, sets, serializables. Not sure what would be the use of it at the moment, because it's not used in SQL. SqlRecordTypeFactory is the one used for SQL and it does not support complex types at the moment.
| * See {@link ByteBuddyUtils#makeNewGetterInstance(String, DynamicType.Builder)} | ||
| * and ByteBuddy documentation for details. | ||
| */ | ||
| public class DirectAccessGetterFactory { |
There was a problem hiding this comment.
iemejia wrote:
I see, in this case we will change Nexmark to have public getters, this implementation should stick to the POJO 'semantics' and avoid to move from it (to avoid possible issues).
Removed the direct field accessor implementation.
Although my understanding is that Pojo doesn't imply getter-only. Beans imply getters, but Pojos only imply independence, e.g. no interfaces implementation, custom annotations etc.
| * <p>Field names for getters are stripped of the 'get' prefix. | ||
| * For example record field 'name' will be generated for 'getName()' pojo method. | ||
| */ | ||
| public BeamRecord newRecordCopyOf(Object pojo) { |
There was a problem hiding this comment.
iemejia wrote:
create() then ?
done
|
If I understand correctly, we may want to make different choices about generating bytecode (and how to do it) versus cached reflection (and how to do this) so the best thing to do would be to put the interface in a place that allows us to do each of these things and compare their performance. We can then change our mind over time. Is there something in the API that would be a problem for this? |
akedin
left a comment
There was a problem hiding this comment.
kennknowles wrote:
If I understand correctly, we may want to make different choices about generating bytecode (and how to do it) versus cached reflection (and how to do this) so the best thing to do would be to put the interface in a place that allows us to do each of these things and compare their performance. We can then change our mind over time.Is there something in the API that would be a problem for this?
I don't believe there is currently anything that makes it hard to extend or make this more configurable in the future.
The only API I think should be used externally is BeamRecordFactory.create(pojo) which doesn't expose implementation details.
I did a brief exercise of implementing different ways of generating getters, including a different kind of code generation and reflection:
Public API BeamRecordFactory.create(pojo) internally calls GetterMethodGetterFactory. generateGetters(). This generateGetters() call is the main piece that generates wraps the calls to pojo fields getters into instances of FieldValueGetter.
At the moment, if you wanted to create instances of FieldValueGetter backed by reflection or different kind of code generation, you would need to implement a factory class similar to GetterMethodGetterFactory which would use reflection, and then swap the call to GetterMethodGetterFactory.generateGetters().
Right now there is no explicit java interface for the GetterMethodGetterFactory, and its generateGetters() call accepts a ByteBuddy instance. It is trivial to hide the ByteBuddy and introduce a general use interface though.
In one of the previous iterations of this PR there was already another implementation of the getters factory hooked up in addition to GetterMethodGetterFactory.generateGetters(). That implementation generated accessors for public fields instead of public getters. Both of these factories shared the same interface and it was a 1 line change in the top level BeamRecordFactory to remove it.
Similarly when I did a quick perf test I created a small ad-hoc reflection-based variant of GetterMethodGetterFactory, and hooking it up to BeamRecordFactory was also very simple.
If we go forward with some version of this code, then BeamRecordFactory can be modified to be configurable with instances of GetterMethodGetterFactory instead of hardcoding the calls. This change seems trivial to me.
a805d0a to
4596783
Compare
There was a problem hiding this comment.
rebased, squashed commits, added interface for getter factory, reflective implementation. Quick perf
4596783 to
40c4ff1
Compare
|
retest this please |
|
Reviewed 12 of 18 files at r1, 12 of 12 files at r2. sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/BeamRecordFactory.java, line 53 at r2 (raw file):
Recommended pattern for classes like this: A single private constructor that takes all the arguments. Then for each way of building one, have a sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/BeamRecordFactory.java, line 81 at r2 (raw file):
Incidentally, sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRecordTypeFactory.java, line 30 at r2 (raw file):
Can you describe what its behavior is like? Why is it the default? Also - is it sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/package-info.java, line 22 at r2 (raw file):
Is there a special reason to make a new package? It does mean there might be things that are And the other thing to do with the package is to set up static analysis with sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/field/FieldValueGetter.java, line 29 at r2 (raw file):
While this is a pretty harmless interface, I think it should still be sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/field/GeneratedGetterFactory.java, line 69 at r2 (raw file):
With the DoFn stuff, we had issue where the stacktrace had pseudo-random bits in it so aggregated errors across workers didn't work. I think these bits are expected to be correct-by-construction and never allowed to throw exceptions, yes? sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/field/package-info.java, line 22 at r2 (raw file):
Is there a benefit to even more deeply nesting packages? I'd keep it a bit more flat just to make things easy to find and make it most flexible in terms of managing public/private. sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/BeamRecordFactoryTest.java, line 39 at r2 (raw file):
Nice use sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java, line 90 at r2 (raw file):
Is this class for internal use only or also do users interact with it? It should either be Comments from Reviewable |
692a1ae to
a15ecb6
Compare
|
Review status: all files reviewed at latest revision, 8 unresolved discussions, some commit checks failed. sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/BeamRecordFactory.java, line 53 at r2 (raw file): Previously, kennknowles (Kenn Knowles) wrote…
Agree. Hidden the constructors, added sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/BeamRecordFactory.java, line 81 at r2 (raw file): Previously, kennknowles (Kenn Knowles) wrote…
I agree. But there appears to be more than 70 usages of the constructors. I'd prefer to do this in another PR. sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRecordTypeFactory.java, line 30 at r2 (raw file): Previously, kennknowles (Kenn Knowles) wrote…
Updated the javadoc with this. Short version: This factory creates record types from the list of getters. Record types are represented by Normally you can do this by mapping java types of the getters to coders using Problem with SQL is that it uses sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/package-info.java, line 22 at r2 (raw file): Previously, kennknowles (Kenn Knowles) wrote…
The parent Haven't thought about
sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/field/FieldValueGetter.java, line 29 at r2 (raw file): Previously, kennknowles (Kenn Knowles) wrote…
I agree. Updated sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/field/GeneratedGetterFactory.java, line 69 at r2 (raw file): Previously, kennknowles (Kenn Knowles) wrote…
Hm, haven't really thought about this. Generated code itself doesn't throw but technically nothing stops the underlying getters from doing so. Do you have any ideas on how to handle this better? sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/field/package-info.java, line 22 at r2 (raw file): Previously, kennknowles (Kenn Knowles) wrote…
Combined with parent package. sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java, line 90 at r2 (raw file): Previously, kennknowles (Kenn Knowles) wrote…
Added javadoc. It's public. Until we have the code generation wired up the only way to create a record is to manually create a Comments from Reviewable |
a15ecb6 to
0763851
Compare
|
This has been open for over 2 months now. @kennknowles , @iemejia do you see a path to get this merged? Otherwise I will close it |
|
Sorry about that. I think we should merge it. Technically, it has only been lying stale for 12 days... If you would just write less code, then I could get through the reviews! :-) |
|
Any more comments from others? (don't let me LGTM dissuade further debate, please!) Reviewed 1 of 22 files at r1, 3 of 20 files at r2, 20 of 31 files at r3. sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/BeamRecordFactory.java, line 81 at r2 (raw file): Previously, akedin (Anton Kedin) wrote…
Yes, just a drive-by comment on past code. sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRecordTypeFactory.java, line 30 at r2 (raw file): Previously, akedin (Anton Kedin) wrote…
I see. LGTM at this juncture. sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/GeneratedGetterFactory.java, line 69 at r2 (raw file): Previously, akedin (Anton Kedin) wrote…
This was the fix for DoFnInvoker: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/StableInvokerNamingStrategy.java I think we can go with what you have for now anyhow and rethink later if it comes up. Not a big issue for POJOs, etc. sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/package-info.java, line 22 at r2 (raw file): Previously, akedin (Anton Kedin) wrote…
OK, works for me! Comments from Reviewable |
0763851 to
436d0ce
Compare
|
retest this please |
|
tis green! |
|
My excuses @akedin I have totally forgot about this one, and somehow I wasn't receiving any notifications. It is really nicer now. Thanks, Great work ! |
|
@akedin Can you please just resolve the existing conflict so I or @kennknowles can merge this immediately after. |
a896c88 to
d408c7e
Compare
|
found this with similar gradle failures: #4139 |
This is needed to avoid manual conversion from PCollection<Pojo> to PCollection<BeamRecord>. Current usecase is Beam SQL and SQL Nexmark. This can also be an exmple of how schema generation can be implemented internally for schema-aware PCollections.
d408c7e to
f9f70e2
Compare
|
yay it is green |
|
👍 Finally win the war against jenkins ! |
This implements automatic generation of BeamRecordTypes and BeamRecordSqlTypes from pojo types. Work is being done as part of BEAM-3157.
Main piece is RecordFactory which exposes a method to generate BeamRecords from pojos. See RecordFactoryTest for usage examples.
The plan is to integrate this into the Beam SQL framework. Integration into SQL will be done in the future PRs.
Records generation is a major step to simplify conversion of pojo model to BeamRecords. Immediate use case is implementation of Nexmark queries in Beam SQL using existing pojo models.
This can also be used as a starting point for code generation for schema-aware collections.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue.mvn clean verifyto make sure basic checks pass. A more thorough check will be performed on your pull request automatically.