-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-6820] Custom Row-Object mapper implementation for CassandraIO #8069
[BEAM-6820] Custom Row-Object mapper implementation for CassandraIO #8069
Conversation
@iemejia , this is second PR around CassandraIO we are expanding. can you review that as well? |
Run Java PreCommit |
org.apache.beam.sdk.io.cassandra.CassandraIOTest.testCustomMapperImplRead did not fail during last run. Rerunning to rule out flaky test. |
Run Java PreCommit |
Sure I will take a look |
Awesome, that would be great. I had to clean up some pre-commit issues. Tell me if you want me to open a cleaner PR. |
No worries for the moment we will squash before the final merge, starting to take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot, a really nice PR/idea, extending out of Cassandra object mapping is definitely a valuable improvement.
After doing the review I ask myself the question, do we really need MapperFactory, maybe just putting withMapper
will be enough. We already have the default object mapping and if users want to provide theirs they can do it as they wish. Please consider removing it.
Can you also rebase it (we rollback a PR of Cassandra that produced a conflict in the tests).
Then please move the classes from the mapper package to the main cassandra package and make the Default implementation(s) package private.
If you use IntelliJ please do a pass of Analyze→ Inspect Code
and fix any issue (if so) on the new code.
@@ -172,6 +173,9 @@ private CassandraIO() {} | |||
@Nullable | |||
abstract Integer minNumberOfSplits(); | |||
|
|||
@Nullable | |||
abstract MapperFactory<T> customMapperFactory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please rename this to be mapperFactory
, as well as the methods withMapperFactory()
and setMapperFactory()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
Write<T> spec, | ||
BiFunction<Mapper<T>, T, ListenableFuture<Void>> mutator, | ||
String operationName) { | ||
Mutator(Write<T> spec, BiFunction<Mapper<T>, T, Future<Void>> mutator, String operationName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question for curiosity, why not ListenableFuture
to be aligned with the Cassandra API ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good question!
I had to do this because of a catch-22. We enforce vendored guava and as such the Mapper
would have to use the vendored guava in the method signature. The Cassandra mapper deleteAsync
and saveAsync
however returns the non-vendored guava so that left me with two options:
- Keep vendored guava signature but deep copy/wrap the non-vendored
ListenableFuture
to a vendoredListenableFuture
inside theDefaultObjectMapper
. - Let the
Mapper
simply returnjava.util.concurrent.Future
(whichListenableFuture
extends anyway).
I went with option 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, this improvement is already worth the PR (IMO)!
@@ -172,6 +173,9 @@ private CassandraIO() {} | |||
@Nullable | |||
abstract Integer minNumberOfSplits(); | |||
|
|||
@Nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make non-nullable and set new DefaultObjectMapperFactory
as the default value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -1062,20 +1096,26 @@ private static Cluster getCluster( | |||
spec.localDc(), | |||
spec.consistencyLevel()); | |||
this.session = cluster.connect(spec.keyspace()); | |||
this.mappingManager = new MappingManager(session); | |||
this.entitiyClass = spec.entity(); | |||
if (spec.customMapperFactory() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove if/else (not needed if you set the default as described above).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
private final String operationName; | ||
private final Class<T> entitiyClass; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/entitiyClass/entityClass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
*/ | ||
public class DefaultObjectMapper<T> implements Mapper<T> { | ||
|
||
com.datastax.driver.mapping.Mapper<T> datastaxMapper; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call it simply mapper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -711,6 +723,15 @@ public T getCurrent() throws NoSuchElementException { | |||
public CassandraIO.CassandraSource<T> getCurrentSource() { | |||
return source; | |||
} | |||
|
|||
private Mapper<T> getMapper(Session session, Class<T> enitity) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/enitity/entity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this method can be removed (or at least inlined in their use) since there will always be a valid mapperFactory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/enitity/entity
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this method can be removed (or at least inlined in their use) since there will always be a valid
mapperFactory
See main comment below.
*/ | ||
public interface MapperFactory<T> { | ||
|
||
Mapper<T> getMapper(Session session, Class<T> entity); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about making this one have no parameters to make it more generic. (If you think about it Session could be an instance attribute in the implementations that need it, and entity is only needed for the Cassandra object mapping one, or I am I missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See main comment below.
3ab5ec3
to
fa92656
Compare
Thank you for reviewing, very valid and insightful comments!
Do we need 1: Remove
2: Use current suggested implementation with
3: Remove MapperFactory and make Mapper abstract with a forced constructor that takes session and entity as arguments. Make withMapper take a Class referring to the Mapper. Instantiate Mapper inside CassandraIO with reflection.
Done!
Yep, will do once we decide on major discussion above. With that said, what do you think is a good way forward? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking great! Thanks a lot for the clear explanation. I understand now the issues. After thinking a bit in all the pros/cons I decided to go into a different direction but closer to your proposal.
Remove the MapperFactory interface. In CassandraIO and update the signature to withMapperFactoryFn(SerializableFunction<Session, Mapper> mapperFactoryFn)
add the corresponding javadoc and make DefaultMapperFactory
a package protected implementation of it and the default implementation of the AutoValue.Builder
, of course instantiate update the rest of the CassandraIO code and tests to comply with this new signature.
We should be ok to merge with this final change and the final cleanups. Please squash all your commits into one and update the title of the commit to [BEAM-6820] Add custom Row-Object mapper implementation for CassandraIO
Note: If you want this to be in the next release (2.12) you need to hurry because tomorrow is the branch cut.
* | ||
* @see org.apache.beam.sdk.io.cassandra.DefaultObjectMapperFactory | ||
*/ | ||
public class DefaultObjectMapper<T> implements Mapper<T>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package private
* | ||
* @see org.apache.beam.sdk.io.cassandra.DefaultObjectMapper | ||
*/ | ||
public class DefaultObjectMapperFactory<T> implements MapperFactory<T>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make package private remove the implements and implement SerializableFunction<Session, Mapper>
.pass the entity in the constructor of this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iemejia , problem with "pass the entity in the constructor" is that at moment of creation of DefaultObjectMapperFactory we don't know what is the entity (pojo).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't? Isn't this done via the method `withEntity(Class entity)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make package private remove the implements and implement
SerializableFunction<Session, Mapper>
.pass the entity in the constructor of this class.
Done.
* | ||
* @see org.apache.beam.sdk.io.cassandra.MapperFactory | ||
*/ | ||
public interface Mapper<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document in detail each method in this interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
* @see org.apache.beam.sdk.io.cassandra.MapperFactory | ||
* @see org.apache.beam.sdk.io.cassandra.Mapper | ||
*/ | ||
public interface MapperFactory<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
} | ||
} | ||
|
||
private static class NOOPMapper implements Mapper<String>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update to implement the interface described in the detailed message (SerializableFunction<Session, Mapper> ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update to implement the interface described in the detailed message (
SerializableFunction<Session, Mapper> ...)
Fixed!
c0e67cb
to
2638be6
Compare
No worries. Happy that is was understandable.
Awesome suggestion. Done.
Done!
|
2638be6
to
a313984
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Excellent work @goldfishy thanks!
I will merge manually just to fix some really minor things.
The current Cassandra source sink is tightly coupled to the Datastax Object Mapper. This requires users of the sink to provide a POJO describing the table in Cassandra. Although the POJO is a easy and powerful way to describe the table it does requires users to always recompile the pipeline for each change in the table definition.
I suggest adding a abstraction layer that allows users to inject their own mapper implementation. One example use would be a mapper that works with a generic Row implementation rather than a compile-time POJO.
This abstraction layer could be implemented by simply supplying a MapperFactory through the Sink/Source builder.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.