-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper #12611
Conversation
@TheNeuralBit I know I'm merciless to give such a big PR to review, but I think you're the most up-to-date person about rows and schemas :) There are some unit tests and TODOs left but overall I think it's almost completed. The integration tests work well on FlinkRunner. |
No worries I'm happy to help review :) It might take me a few days to get to it though. Regarding testing: we could consider adding a spanner instance to apache-beam-testing for integration testing, I'd suggest raising it on dev@ if you want to pursue it. I also just came across https://cloud.google.com/spanner/docs/emulator which could be a good option too. Its a docker container that starts up an in-memory version of spanner to test against. |
@TheNeuralBit Great advice as always! I tried to find something like this emulator on dockerhub but without success. I managed to successfully use this emulator, it has much better support than aws for localstack. Few comments about this PR: I am almost certain that the Schema doesn't have to be sent as proto in Read but I didn't come up with anything else. Another issue is representing the Mutation - for now it's a Row containing 4 fields: operation, table, rows and key_set. It does quite well but I wonder whether I can do it better. I erased SpannerWriteResult and return PDone for now - I don't see the way to keep it without including spanner dependencies to java.core. Because of that failure mode is FAIL_FAST and I didn't include it in configuration params. Transactions are not supported because they require a ptransform to be transferred. I suppose it's doable though and it could be a good future improvement. FYI - I'll be OOO the next week so there is absolutely no haste :) |
e1d9001
to
2839b8e
Compare
2839b8e
to
fed767a
Compare
Codecov Report
@@ Coverage Diff @@
## master #12611 +/- ##
==========================================
- Coverage 82.48% 82.44% -0.05%
==========================================
Files 455 456 +1
Lines 54876 54975 +99
==========================================
+ Hits 45266 45324 +58
- Misses 9610 9651 +41
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution Piotr! Sorry it took me until after you were back from OOO to get to this :P
I have a few high-level comments and questions
|
||
List<Schema.Field> fields = schema.getFields(); | ||
Row.FieldValueBuilder valueBuilder = null; | ||
// TODO: Remove this null-checking once nullable fields are supported in cross-language |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the issue here? Nullable fields should be supported in cross-language
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NullableCoder is not a standard coder as was mentioned here: https://issues.apache.org/jira/browse/BEAM-10529?jql=project%20%3D%20BEAM%20AND%20text%20~%20%22nullable%20python%22
So I suppose the only way to support null values is not to set them.
I noticed that when I tried to read a null field from Spanner table. But I may be wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm so it should be supported. RowCoder encodes nulls for top-level fields separately so there's no need for NullableCoder. NullableCoder is only used when you have a nullable type in a container type, e.g. ARRAY<NULLABLE INT>
. This wasn't supported in Python until recently - #12426 should have fixed it though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure where my message has gone, but I wrote that nulls come up with no problems, I've just used ImmutableMap which does not allow null values. Replacing it with java.util.HashMap solved the issue.
...ava/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
Outdated
Show resolved
Hide resolved
public ReadRows(Read read, Schema schema) { | ||
super("Read rows"); | ||
this.read = read; | ||
this.schema = schema; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be really great if SpannerIO.ReadRows
could determine the schema at pipeline construction time so the user doesn't have to specify it. In SpannerIO.Read#expand
we require the user to specify either a query or a list of columns:
Lines 656 to 671 in 2872e37
if (getReadOperation().getQuery() != null) { | |
// TODO: validate query? | |
} else if (getReadOperation().getTable() != null) { | |
// Assume read | |
checkNotNull( | |
getReadOperation().getColumns(), | |
"For a read operation SpannerIO.read() requires a list of " | |
+ "columns to set with withColumns method"); | |
checkArgument( | |
!getReadOperation().getColumns().isEmpty(), | |
"For a read operation SpannerIO.read() requires a" | |
+ " list of columns to set with withColumns method"); | |
} else { | |
throw new IllegalArgumentException( | |
"SpannerIO.read() requires configuring query or read operation."); | |
} |
In both case we're very close to a schema. We just need to analyze the query and/or get the output types for the projected columns. I looked into it a little bit, but I'm not quite sure the best way to use the spanner client to look up the schema. The only thing I could figure out was to start a read and look at the type of ResultSet#getCurrentRowAsStruct
which seems less than ideal.
CC @nielm who's done some work with SpannerIO recently - do you have any suggestions for a way to determine the types of the Structs that SpannerIO.Read will produce?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also punt on this question and file a jira with a TODO here. I recognize this is a little out of scope for BEAM-10139, BEAM-10140.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd really like to do it in this PR, but the only thing that comes to mind is to do what you said - perform the read request with client and then read the schema. The obvious disadvantage is that the Spanner query will be executed twice. I researched that limit of 1 row added to the end of query will not improve the performance so this is not the thing to do for huge result sets
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can reach out to the Spanner team to see if there's a good way to do this, I'll let you know if I learn anything. For now we can just plan on a jira and a TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any good solution here...
When reading an entire table, it could be possible to read the table's schema first, and determine what types the columns are, but this does not work for a query as the query output columns may not correspond to table columns.
Adding LIMIT 1
would only work for simple queries, anything with joins, GROUP BY
, ORDER BY
will require the majority of the query to be executed before a single row is returned.
So the only solution I can see is for the caller to specify the row Schema as you do here..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it should be possible to analyze the query and determine the output schema, SqlTransform and JdbcIO both do this.
I got a similar response from my internal queries though, it doesn't look like there's a good way to do this with the Spanner client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @nielm ! I thought about the LIMIT approach but then I found the same arguments not to do that.
It appears there exist a jdbc client for Spanner: https://cloud.google.com/spanner/docs/jdbc-drivers . I'll try to figure out if I can use it.
There is ResultSetMetadata in Spanner's REST API which extends json object. https://cloud.google.com/spanner/docs/reference/rest/v1/ResultSetMetadata but at the end of the day it requires at least partially to fetch the data.
But I would leave it for another PR as it supposedly require to move SchemaUtils from io/jdbc to some more general place (extensions/sql?). As I can see Struct type is mapped to String/Varchar as is mentioned in the FAQ, so it may not be the best option
The Cloud Spanner STRUCT data type is mapped to a SQL VARCHAR data type, accessible through
this driver as String types. All other types have appropriate mappings.
sdks/java/io/google-cloud-platform/expansion-service/build.gradle
Outdated
Show resolved
Hide resolved
...a/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
Outdated
Show resolved
Hide resolved
...a/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
Show resolved
Hide resolved
...a/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
Outdated
Show resolved
Hide resolved
) | ||
|
||
|
||
class WriteToSpanner(ExternalTransform): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like there's already a native SpannerIO in the Python SDK in apache_beam/io/gcp/experimental/spannerio.py. Are we planning on removing that one? Should the API for this one be compliant with that one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can try to make the API compliant with the native one. I think it'd be valuable for Beam to compare the performance of both IOs and then decide which one to leave.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense. There's definitely still value in adding this even if we end up preferring the native Python one, since we can use it from the Go SDK in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably it makes sense to converge into one implementation. I'd prefer the Java implementation (hence cross-language) since it's being around for longer and used by many users. We have to make sure that the cross-language version works for all runners before native version can be removed. For example, cross-language version will not work for current production Dataflow (Runner v1) and we have to confirm that it works adequate for Dataflow Runner v2.
fed767a
to
55ee406
Compare
@TheNeuralBit I've upgraded it a bit.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking pretty good overall, my biggest hangup is over the API for the Write transform. I suggested an alternative approach in a comment.
Also FYI - I'm going to be out of the office starting tomorrow (Friday), and back next Thursday. If you get blocked on this before then it may make sense to ask Cham to take a look in the meantime.
...oud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
Show resolved
Hide resolved
) | ||
|
||
|
||
class WriteToSpanner(ExternalTransform): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense. There's definitely still value in adding this even if we end up preferring the native Python one, since we can use it from the Go SDK in the future.
[('id', int), ('name', unicode)]) | ||
coders.registry.register_coder(ExampleRow, coders.RowCoder) | ||
|
||
mutation_creator = MutationCreator('table', ExampleRow, 'ExampleMutation') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I think it makes a lot of sense to use Rows for the Mutations, with a nested Row for the data, but this API is pretty tricky. Could you look into adding a separate PTransform (or multiple PTransforms) for converting the Rows to mutations? I think an API like this should be possible:
pc = ... #some PCollection with a schema
pc | RowToMutation.insert('table')
| WriteToSpanner(...)
OR
pc | RowToMutation.insertOrUpdate('table')
| WriteToSpanner(...)
OR
pc | RowToMutation.delete('table')
| WriteToSpanner(...)
The PTransform would be able to look at the element_type
of the input PCollection and create a mutation type that wraps it in the expand
method. There's not a lot of examples of logic like this in the Python SDK (yet) the only one I know of is here:
beam/sdks/python/apache_beam/dataframe/schemas.py
Lines 50 to 55 in cfa448d
def expand(self, pcoll): | |
columns = [ | |
name for name, _ in named_fields_from_element_type(pcoll.element_type) | |
] | |
return pcoll | self._batch_elements_transform | beam.Map( | |
lambda batch: pd.DataFrame.from_records(batch, columns=columns)) |
That way the user wouldn't need to pass the type they're planning on using to MutationCreator. What do you think of that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That way we loose possibility of mixing different kinds of mutations. I don't imagine any sane usage of mixed insert/delete as the order is not guaranteed so I aggree that removing this assumption is justified.
Since we will always map rows to mutations before then it would be good to enclose mapping rows to mutations inside WriteToSpanner. How about such an API?:
pc.with_output_types(CustomRow) | WriteToSpanner(...).insert(table)
pc.with_output_types(CustomRow) | WriteToSpanner(...).delete(table)
pc.with_output_types(List[CustomRow]) | WriteToSpanner(...).delete(table)
It's not consistent with ReadFromSpanner(...) but I think it's better than forcing the user to call RowToMutation each time.
To be more consistent I could do something like ReadFromSpanner(...).from_table(table)
and ReadFromSpanner(...).from_sql(sql_query)
@chamikaramj Brian asked me to ask you for the further review as he is going OOO this week. I'd be grateful :) |
b0c23a2
to
c24e8cb
Compare
cc: @allenpradeep @nielm |
In addition to Brian's review, @allenpradeep or @nielm can you briefly look at Java SpannerIO changes here ? |
@nielm Could you take a look at this thread? #12611 (comment) |
Sorry for dropping the ball on this @piotr-szuberski. I'll look over the changes to the Python API this week |
c24e8cb
to
1c43284
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way WriteTransform is written it's not possible to mix mutations that perform different operations. If we're going to have that limitation I think this could be simplified if we just had a separate xlang transform for each write operation, and send just the field values over the xlang boundary. Then the Java external transforms would be responsible for making the appropriate Mutation for each operation.
That would remove the need to construct a NamedTuple in RowToMutation.expand.
If we do want to keep using Mutations-as-Rows over xlang there will need to be more work on the type system. The types for row
and keyset
should really be a union of the relevant types for each table that might be written to. Unfortunately, I'm not sure Python schemas are mature enough for users to be able to express this well. (Alternatively we might express mutations as a logical type, that uses table/operation as a key for the union).
I think what we should do for now is just have separate xlang transforms for each write operation (beam:external:java:spanner:{delete,insert,update,...}
). We can file a follow-on jira to add a generic beam:external:java:spanner:write
that will allow mixing mutations with various operations and tables, and note that its blocked on support for unions of structs in Python/portable schemas. Does that sound reasonable?
sdks/java/io/google-cloud-platform/expansion-service/build.gradle
Outdated
Show resolved
Hide resolved
...io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
Outdated
Show resolved
Hide resolved
[ | ||
('operation', unicode), | ||
('table', unicode), | ||
('keyset', List[row_type]) if is_delete else ('row', row_type), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make sure this works when schemas are specified via beam.Row
as well, right now I think this will only work with the NamedTuple
style.
You could use element_type = named_tuple_from_schema(schema_from_element_type(pcoll.element_type))
to make sure element_type is a NamedTuple that you can use here (it might be worth adding a convenience function for that patttern).
def named_tuple_from_schema(schema): |
def schema_from_element_type(element_type): # (type) -> schema_pb2.Schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I'm not sure if you didn't mean to add that convenience to schemas.py. I'm leaving it in spanner.py for now
I think most of my comments in that review are actually not relevant any more if we go down the path of separate xlang transforms per operation. |
a5af894
to
b76a62a
Compare
7aef88a
to
46e0f1a
Compare
@TheNeuralBit I promise that this is the last big review from me. I've just recently realized how much work I've made you to do. Better late than never I guess! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moving towards what I had in mind, but I think we should just avoid having a concept of mutations on the Python side for now.
...oud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
Outdated
Show resolved
Hide resolved
…th python wrapper to Java SpannerIO
356788f
to
1602c26
Compare
9efa8c7
to
7228622
Compare
7228622
to
3a153ae
Compare
Run Python 3.7 PostCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you @piotr-szuberski, and sorry for the incredibly long review cycle!
I just have one last request which is to try to eliminate or minimize the places where were suppressing the nullness warnings
- https://beam.apache.org/roadmap/portability/ | ||
|
||
For more information specific to Flink runner see: | ||
- https://beam.apache.org/documentation/runners/flink/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This information is getting duplicated across a lot of docstrings. It looks like #13317 will actually add similar information to the programming guide. I think we should re-write all these docstrings to refer to that once its complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree - it refers to all the existing xlang transforms, so it'll be done in another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it can be done in another PR. Filed BEAM-11269 to track this.
|
||
@SuppressWarnings({ | ||
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you try to address any lingering nullness errors here and in the other files that have it suppressed? If there are any intractable issues we could consider a smaller @SuppressWarnings
blocks around a few functions, but in general we should make sure that new classes pass the null checker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Oh, it was quite painful as all of the row getters return a @nullable value. Especially that checkNotNull doesn't work with the checker and there is even no possibility to check for null in a function (only if (var == null) { throw new NullPointerException("Null var"); }
seem to work.
It doesn't even work in chained functions as in this example:
@Nullable Object var = new Object();
if (var != null) {
someObject.doSth().doChained(var); // checker doesn't understand that var is checked for nullness)
}
So it's quite unfriendly. In general I'm really excited about dealing with NPE problem, but for now it adds much more complexity and reduces the contributor friendliness. But I guess that it's worth it, especially when the checker gets smarter and will work with the Guava checks and chained functions (if it's even possible?)
Run Python 3.7 PostCommit |
...a/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
Show resolved
Hide resolved
d05e0e4
to
aa73ae1
Compare
aa73ae1
to
6370a87
Compare
Run Python 3.7 PostCommit |
Looks good, merging now. Thanks for all your work on this @piotr-szuberski :) |
Thank you too for your reviews! :) |
What is left:
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.