-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement to_dbq
with intermediary parquet in gcs
#44
Conversation
to_dbq
to write a dask DataFrame
to a big query table via intermediary parquet in gcs
Nits:
|
Thanks Irina! Am I right that as written, this leaves the parquet dataset around forever? It'd be nice to help people with cleanup / bucket lifecycle management. If it's possible to do something automated in code, that could be really nice. Otherwise we should document our recommendations. (Also just to clarify Nat's nit, I don't think anyone's objecting to this approach, or objecting to closing #3. Just looking to avoid confusion for anyone who comes back here later and would be mislead by the description saying this does the "no intermediate storage" version.) |
I agree, "closes" is the wrong word. "Motivated by" is more like it. It addresses the problem described in the issue, but as of this moment, we don't have an easy way to use Storage Write API and bypass the intermediate storage, since the API accepts data as protobuf.
👍 Yes, that will be useful in the docstring. This PR is still WIP. I also plan to add more parameters to the method, so users have more control over how the data is written to intermediary storage, and to BQ. |
Cool, thanks for sharing a draft!
Probably on your mind already, but it'd be nice for users to be able to control the authentication with Google Cloud a bit. When the need for this came up recently, it was in a case with a dask cluster where setting up |
This piece does the cleanup: dask-bigquery/dask_bigquery/core.py Lines 272 to 274 in 1b3a308
I'm also going to update the docstring to advise configuring a bucket with retention policy.
I wish it did! Until the Storage Write API supports Arrow in addition to protobuf, I think we're out of luck there. |
Oh right, thanks! Sorry! |
50f39df
to
aff3f90
Compare
053c303
to
2749c5a
Compare
@ncclementi @dchudz @ntabris I addressed the suggestions, please take another look. |
Co-authored-by: Florian Jetter <fjetter@users.noreply.github.com>
70fa05d
to
797fc71
Compare
e55b3e0
to
476c5ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small comments
parquet_kwargs_used.update(parquet_kwargs) | ||
|
||
# override the following kwargs, even if user specified them | ||
parquet_kwargs_used["engine"] = "pyarrow" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Raising a warning here might make sense as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pointed out in docstring that these will be enforced. Warning feels like an overkill.
dask_bigquery/tests/test_core.py
Outdated
def test_read_gbq(df, dataset, client): | ||
project_id, dataset_id, table_id = dataset | ||
ddf = read_gbq(project_id=project_id, dataset_id=dataset_id, table_id=table_id) | ||
|
||
assert list(ddf.columns) == ["name", "number", "timestamp", "idx"] | ||
assert ddf.npartitions == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you removing these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not 2 anymore, it's 1. I think it's changed on the side of Big Query, the number of streams it gives back by default. Without this change, the test breaks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bnaul do you know what could be happening here? I would have expected that since we have defined a time_partitioning
then we would be reading 2 different partitions, did something change in the default streams that you are aware of?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, like @j-bennet said I guess they just must have changed the chunking upstream. I don't think we actually care one way or the other, we didn't end up including any time partitioning-specific logic (our original implementation does have something along those lines, where we read every partition separately)
to_dbq
to write a dask DataFrame
to a big query table via intermediary parquet in gcsto_dbq
with intermediary parquet in gcs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we have a test that does read_gbq
from a bigquery table that was created with the big query API, can we add a roundtrip test to make sure that we can read the tables we write with to_gbq
.
dask_bigquery/tests/test_core.py
Outdated
def test_read_gbq(df, dataset, client): | ||
project_id, dataset_id, table_id = dataset | ||
ddf = read_gbq(project_id=project_id, dataset_id=dataset_id, table_id=table_id) | ||
|
||
assert list(ddf.columns) == ["name", "number", "timestamp", "idx"] | ||
assert ddf.npartitions == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bnaul do you know what could be happening here? I would have expected that since we have defined a time_partitioning
then we would be reading 2 different partitions, did something change in the default streams that you are aware of?
@ncclementi and @phofl , I addressed your comments. Please take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@j-bennet This looks good to me, I left one comment regarding an older pin we had on grpcio
, I understand we we leave this to a future PR since it's not necessary part of the scope, but if can give it a try and see if CI is green that would be great.
@@ -9,6 +9,7 @@ dependencies: | |||
- pandas | |||
- pyarrow | |||
- pytest | |||
- gcsfs | |||
- grpcio<1.45 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had this pin due to a seg fault on windows, but it's been a year since this happen can we try to unpin this and see what happen? I understand it's not part of this PR, we can leave it for a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try in a separate PR. Thanks for the tip!
lgtm |
Thank you @phofl @ncclementi @bnaul. |
Implement
to_dbq
using parquet in gcs as intermediary storage. It does what people have been doing already, in a more convenient wrapper.Addresses #3.