-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial read_gbq
support
#4
Changes from 1 commit
94c41f6
48becdb
a934259
04bdd80
ab16a32
455f749
c417d5f
4839bbb
774e79b
7bdd66a
31a1253
db4edb4
be1efbd
35cbdc6
40de1ea
45e0004
de93e88
3070ae3
b43daf6
50f3c6a
f8a578c
a91c73c
548f2fb
d3ffa79
44096a1
b19dca4
982a5f5
4292ac3
14ba56c
32b6686
97b5d21
e03e731
d73b686
3f8e397
64fe0ec
6f94825
1a51981
acb404e
d78c2a9
2b46c4f
5ac1358
216a4e7
46e4923
3204bc2
f17cfb8
d1398c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,33 +19,29 @@ def bigquery_client(project_id=None): | |
See googleapis/google-cloud-python#9457 | ||
and googleapis/gapic-generator-python#575 for reference. | ||
""" | ||
|
||
bq_storage_client = None | ||
with bigquery.Client(project_id) as bq_client: | ||
try: | ||
bq_storage_client = bigquery_storage.BigQueryReadClient( | ||
credentials=bq_client._credentials | ||
) | ||
yield bq_client, bq_storage_client | ||
finally: | ||
bq_storage_client.transport.grpc_channel.close() | ||
bq_storage_client = bigquery_storage.BigQueryReadClient( | ||
credentials=bq_client._credentials | ||
) | ||
yield bq_client, bq_storage_client | ||
bq_storage_client.transport.grpc_channel.close() | ||
|
||
|
||
def _stream_to_dfs(bqs_client, stream_name, schema, timeout): | ||
def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs): | ||
"""Given a Storage API client and a stream name, yield all dataframes.""" | ||
return [ | ||
pyarrow.ipc.read_record_batch( | ||
pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch), | ||
schema, | ||
).to_pandas() | ||
for message in bqs_client.read_rows(name=stream_name, offset=0, timeout=timeout) | ||
for message in bqs_client.read_rows(name=stream_name, offset=0, **read_kwargs) | ||
] | ||
|
||
|
||
def bigquery_read( | ||
make_create_read_session_request: callable, | ||
project_id: str, | ||
timeout: int, | ||
read_kwargs: int, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jrbourbeau I was going over the docs and realized that this still shows as an def bigquery_read(
make_create_read_session_request: callable,
project_id: str,
timeout: int,
stream_name: str,
*,
read_kwargs: dict=None) If this is correct I can modify it. and we should probably add a test that this works, although I'm not sure what's the easist to test for these kwargs, any ideas? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since |
||
stream_name: str, | ||
) -> pd.DataFrame: | ||
"""Read a single batch of rows via BQ Storage API, in Arrow binary format. | ||
|
@@ -65,7 +61,7 @@ def bigquery_read( | |
schema = pyarrow.ipc.read_schema( | ||
pyarrow.py_buffer(session.arrow_schema.serialized_schema) | ||
) | ||
shards = _stream_to_dfs(bqs_client, stream_name, schema, timeout=timeout) | ||
shards = _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs) | ||
# NOTE: BQ Storage API can return empty streams | ||
if len(shards) == 0: | ||
shards = [schema.empty_table().to_pandas()] | ||
|
@@ -78,7 +74,7 @@ def read_gbq( | |
dataset_id: str, | ||
table_id: str, | ||
row_filter="", | ||
read_timeout: int = 3600, | ||
read_kwargs=None, | ||
): | ||
"""Read table as dask dataframe using BigQuery Storage API via Arrow format. | ||
If `partition_field` and `partitions` are specified, then the resulting dask dataframe | ||
|
@@ -99,12 +95,9 @@ def read_gbq( | |
dask dataframe | ||
See https://github.com/dask/dask/issues/3121 for additional context. | ||
""" | ||
|
||
with bigquery_client(project_id) as ( | ||
bq_client, | ||
bqs_client, | ||
): | ||
table_ref = bq_client.get_table(".".join((dataset_id, table_id))) | ||
read_kwargs = read_kwargs or {} | ||
with bigquery_client(project_id) as (bq_client, bqs_client): | ||
table_ref = bq_client.get_table(f"{dataset_id}.{table_id}") | ||
if table_ref.table_type == "VIEW": | ||
raise TypeError("Table type VIEW not supported") | ||
|
||
|
@@ -139,7 +132,7 @@ def make_create_read_session_request(row_filter=""): | |
dataset_id, | ||
table_id, | ||
row_filter, | ||
read_timeout, | ||
read_kwargs, | ||
) | ||
|
||
layer = DataFrameIOLayer( | ||
|
@@ -150,7 +143,7 @@ def make_create_read_session_request(row_filter=""): | |
bigquery_read, | ||
make_create_read_session_request, | ||
project_id, | ||
read_timeout, | ||
read_kwargs, | ||
), | ||
label=label, | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't have to be this PR, but it would be really helpful if we could attribute these requests to Dask/Dask-BigQuery. #6