Skip to content

Conversation

@chunyang
Copy link
Contributor

@chunyang chunyang commented Mar 22, 2022

Second attempt to avoid storing generator in _CustomBigQuerySource. See description from last attempt (#17100) below.

This time, export table schema is retrieved before the temp dataset is deleted. Because TableSchema is not picklable (see #17100 (comment)), I convert it into a JSON string before storing it as an instance variable.


Avoid storing a generator in _CustomBigQuerySource so that it can be used with the Python interactive runner. See details in https://issues.apache.org/jira/browse/BEAM-14112 .

Likely related to #15610


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

Generators cannot be pickled. The previous implementation causes errors
when used with the Python interactive runner.
@chunyang
Copy link
Contributor Author

Run Python 3.8 PostCommit

@chunyang
Copy link
Contributor Author

Run PythonLint PreCommit


@schema.setter
def schema(self, value):
self._schema = json.dumps(bigquery_tools.table_schema_to_dict(value))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any guarantees that json.dumps(bigquery_tools.table_schema_to_dict(x)) and bigquery_tools.parse_table_schema_from_json(x) are reciprocal functions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation is around https://cloud.google.com/bigquery/docs/schemas#creating_a_json_schema_file, so they do seem to be reciprocal functions. Need @chamikaramj to double check this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bigquery_tools.parse_table_schema_from_json is just an implementation detail of BQ connector and can change in the future so I would not assume that. "json.dumps" and "json.loads" are guaranteed to be reciprocal so I would use that instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think json.dumps/json.loads can't be used directly since TableSchema is not a dict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the types are also different ? Input is a bigquery.TableSchema object and output is JSON dict. I would implement new functions here without depending on existing functions and add extensive testing to make sure that this is valid/correct for various bigquery.TableSchema objects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that sounds like the right approach. I will likely try to extract _convert_to_tuple from _JsonToDictCoder and build on top of that. I won't be able to get to it before the release cut today though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a higher level point though. I think all state of a source object being packable (including temporary state) is not a guarantee we provide and it will be very difficult to guarantee this behavior in the future for BQ and other sources. Also that can make source implementations more restrictive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to Cham that the behavior of pickling a PTransform is not guaranteed. The workaround is limited by the current InteractiveRunner implementation.

@chunyang I left a comment in the PR that explores saving the coder and paths as attributes instead of the TableSchema and matadata_list. Could you please check if that is an appropriate approach?

@chunyang
Copy link
Contributor Author

R: @kevingg
R: @chamikaramj

@codecov
Copy link

codecov bot commented Mar 22, 2022

Codecov Report

Merging #17153 (cd6fea8) into master (9a36490) will increase coverage by 0.00%.
The diff coverage is 52.94%.

@@           Coverage Diff           @@
##           master   #17153   +/-   ##
=======================================
  Coverage   73.95%   73.95%           
=======================================
  Files         671      671           
  Lines       88245    88257   +12     
=======================================
+ Hits        65264    65273    +9     
- Misses      21869    21872    +3     
  Partials     1112     1112           
Flag Coverage Δ
python 83.63% <52.94%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/python/apache_beam/io/gcp/bigquery.py 63.67% <52.94%> (+0.04%) ⬆️
...hon/apache_beam/runners/worker/bundle_processor.py 93.39% <0.00%> (ø)
...che_beam/runners/interactive/interactive_runner.py 93.57% <0.00%> (+0.71%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9a36490...cd6fea8. Read the comment docs.

@chunyang
Copy link
Contributor Author

Python integration tests passed in postcommit AFAICT, but there was a failure in :sdks:java:io:google-cloud-platform:expansion-service:shadowJar.

@chunyang chunyang marked this pull request as ready for review March 22, 2022 21:47
@nika-qubit
Copy link
Contributor

Python integration tests passed in postcommit AFAICT, but there was a failure in :sdks:java:io:google-cloud-platform:expansion-service:shadowJar.

Could you plz share information about the failure? The PR looks nice, IIUIC: similar to the reverted one but export the files before the temp dataset gets cleaned up. The reason to add the intermediate JSON conversion is because InteractiveRunner needs to pickle the PTransform while the table schema nor a generator are pickle-able as an attribute.

@chunyang
Copy link
Contributor Author

Could you plz share information about the failure?

I don't know much other than what the console output is showing. Something about failing to load/store cache entries.

14:29:57 [...]
14:29:57 FAILURE: Build completed with 2 failures.
14:29:57 
14:29:57 1: Task failed with an exception.
14:29:57 -----------
14:29:57 * What went wrong:
14:29:57 Execution failed for task ':runners:flink:1.14:job-server:shadowJar'.
14:29:57 > Failed to load cache entry for task ':runners:flink:1.14:job-server:shadowJar'
14:29:57 [...]
14:29:57 2: Task failed with an exception.
14:29:57 -----------
14:29:57 * What went wrong:
14:29:57 Execution failed for task ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar'.
14:29:57 > Failed to store cache entry for task ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
14:29:57 [...]

@chunyang
Copy link
Contributor Author

Just to explore some other options other than converting schema to JSON, prior to #15610, there was no generator or TableSchema to cause pickling errors.

Instead of storing the generator as an instance attribute, a list of TextSource and notably its coder attribute were stored instead (we can assume use_json_exports=True for this discussion). The default coder, _JsonToDictCoder, had a method _convert_to_tuple to marshal the TableSchema into an object more amenable to pickling.

Perhaps I can use the same _convert_to_tuple method to create a picklable version of TableSchema and store that as an attribute rather than going the JSON route?

@nika-qubit
Copy link
Contributor

Could you plz share information about the failure?

I don't know much other than what the console output is showing. Something about failing to load/store cache entries.

14:29:57 [...]
14:29:57 FAILURE: Build completed with 2 failures.
14:29:57 
14:29:57 1: Task failed with an exception.
14:29:57 -----------
14:29:57 * What went wrong:
14:29:57 Execution failed for task ':runners:flink:1.14:job-server:shadowJar'.
14:29:57 > Failed to load cache entry for task ':runners:flink:1.14:job-server:shadowJar'
14:29:57 [...]
14:29:57 2: Task failed with an exception.
14:29:57 -----------
14:29:57 * What went wrong:
14:29:57 Execution failed for task ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar'.
14:29:57 > Failed to store cache entry for task ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
14:29:57 [...]

I doubt this Python code change would affect the target :runners:flink:1.14:job-server:shadowJar, should be unrelated.

@nika-qubit
Copy link
Contributor

Just to explore some other options other than converting schema to JSON, prior to #15610, there was no generator or TableSchema to cause pickling errors.

Instead of storing the generator as an instance attribute, a list of TextSource and notably its coder attribute were stored instead (we can assume use_json_exports=True for this discussion). The default coder, _JsonToDictCoder, had a method _convert_to_tuple to marshal the TableSchema into an object more amenable to pickling.

Perhaps I can use the same _convert_to_tuple method to create a picklable version of TableSchema and store that as an attribute rather than going the JSON route?

How about we store the coders directly in the _BigQueryExportResult class? Since we don't really need the TableSchema later but a coder built from the TableSchema.
I already tested it out that the coder itself is pickle-able.

_JsonToDictCoder is the default self.coder used to create the coder from a TableSchma, but not necessarily the only possible implementation.

So your _BigQueryExportResult class could be:

@dataclass
class _BigQueryExportResult:
   coder: beam.coders.Coder
   paths: List[str]

And

export_result = __BigQueryExportResult(coder=self.coder(table_schema), paths=[metadata.path for metadata in metadata_list])

@nika-qubit
Copy link
Contributor

@chunyang, thanks for contributing to Beam and provide quality solutions to the issue! Is this PR still in active development?

@chunyang
Copy link
Contributor Author

chunyang commented Oct 11, 2022 via email

@nika-qubit
Copy link
Contributor

Hi chunyang, I thought we had already fixed this in the other PR #17306. We can close this PR permanently if this is no longer an issue.

@chunyang
Copy link
Contributor Author

Woah strange, there's some email necromancy going on--I sent that last reply in April but for some reason Github shows it's from 9 hours ago. Anyway, we can close this as it's fixed in #17306 as you mentioned. Thanks!

@chunyang chunyang closed this Oct 11, 2022
@chunyang chunyang deleted the cyang/rfbq-interactive-fix branch October 11, 2022 17:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants