Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial read_gbq support #4

Merged
merged 46 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
94c41f6
add precommit config
ncclementi Aug 5, 2021
48becdb
add read_gbq
ncclementi Aug 5, 2021
a934259
add setup and req
ncclementi Aug 5, 2021
04bdd80
modifications suggested by bnaul
ncclementi Aug 6, 2021
ab16a32
raise error when table type is VIEW
ncclementi Aug 6, 2021
455f749
add linting github actions
ncclementi Aug 6, 2021
c417d5f
add comment on context manager related to possible upstram solution
ncclementi Aug 6, 2021
4839bbb
avoid scanning table when creating partitions
ncclementi Aug 11, 2021
774e79b
add first read_gbq test
ncclementi Aug 17, 2021
7bdd66a
add partitioning test
ncclementi Aug 17, 2021
31a1253
use pytest fixtures
ncclementi Aug 18, 2021
db4edb4
use context manager on test
ncclementi Aug 18, 2021
be1efbd
ignore bare except for now
ncclementi Aug 18, 2021
35cbdc6
remove prefix from delayed kwargs
ncclementi Aug 18, 2021
40de1ea
make dataset name random, remove annotate
ncclementi Aug 18, 2021
45e0004
better name for delayed _read_rows_arrow
ncclementi Aug 18, 2021
de93e88
implementation of HLG - wip
ncclementi Aug 19, 2021
3070ae3
Slight refactor
jrbourbeau Aug 20, 2021
b43daf6
Minor test tweaks
jrbourbeau Aug 20, 2021
50f3c6a
Update requirements.txt
ncclementi Sep 16, 2021
f8a578c
use context manager for bq client
ncclementi Sep 17, 2021
a91c73c
remove with_storage_api since it is always true
ncclementi Sep 17, 2021
548f2fb
remove partition fields option
ncclementi Sep 17, 2021
d3ffa79
add test github actions setup
ncclementi Sep 17, 2021
44096a1
add ci environments
ncclementi Sep 17, 2021
b19dca4
trigger ci
ncclementi Sep 17, 2021
982a5f5
trigger ci again
ncclementi Sep 17, 2021
4292ac3
add pytest to envs
ncclementi Sep 17, 2021
14ba56c
Only run CI on push events
jrbourbeau Sep 20, 2021
32b6686
Minor cleanup
jrbourbeau Sep 20, 2021
97b5d21
Use mamba
jrbourbeau Sep 20, 2021
e03e731
update docstrings
ncclementi Sep 21, 2021
d73b686
missing docstring
ncclementi Sep 21, 2021
3f8e397
trigger ci - testing workflow
ncclementi Sep 21, 2021
64fe0ec
use env variable for project id
ncclementi Sep 21, 2021
6f94825
add test for read with row_filter
ncclementi Sep 21, 2021
1a51981
add test for read with kwargs
ncclementi Sep 21, 2021
acb404e
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
d78c2a9
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
2b46c4f
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
5ac1358
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
216a4e7
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
46e4923
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
3204bc2
tweak on docstrings
ncclementi Sep 22, 2021
f17cfb8
add readme content
ncclementi Sep 22, 2021
d1398c2
Minor updates
jrbourbeau Sep 23, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: Linting

on:
push:
branches: main
pull_request:
branches: main

jobs:
checks:
name: "pre-commit hooks"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- uses: pre-commit/action@v2.0.0
17 changes: 17 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
repos:
- repo: https://github.com/psf/black
rev: 20.8b1
hooks:
- id: black
language_version: python3
exclude: versioneer.py
- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.3
hooks:
- id: flake8
language_version: python3
- repo: https://github.com/pycqa/isort
rev: 5.8.0
hooks:
- id: isort
language_version: python3
1 change: 1 addition & 0 deletions dask_bigquery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .core import read_gbq
260 changes: 260 additions & 0 deletions dask_bigquery/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
from __future__ import annotations

import logging
from collections.abc import Iterable
from contextlib import contextmanager
from functools import partial

import pandas as pd
import pyarrow
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
from dask.highlevelgraph import HighLevelGraph
from dask.layers import DataFrameIOLayer
from google.cloud import bigquery, bigquery_storage


@contextmanager
def bigquery_client(project_id=None, with_storage_api=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears we only every use this context manager with with_storage_api=True. If this is the case (I could be missing something), I'll suggest we remove with_storage_api as an option and just always yield both a bigquery.Client and a bigquery_storage.BigQueryReadClient

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what I see every time we use the created context manager we use it with with_storage_api=True which enables the use of the storage API. My understanding is that we want to use this based on these two comments dask/dask#3121 (comment) and dask/dask#3121 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I'm suggesting that since we never use with_storage_api=False, we remove it as an option from our custom bigquery_client context manager. We can always add it back in the future if needed, but right now it's just an unnecessary keyword argument (since we always call it with with_storage_api=True)

"""This context manager is a temporary solution until there is an
upstream solution to handle this.
See googleapis/google-cloud-python#9457
and googleapis/gapic-generator-python#575 for reference.
"""

bq_storage_client = None
bq_client = bigquery.Client(project_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting with version 2.11.0 of bigquery we can use bigquery.Client as a context manager. Given that there have been many bigquery releases since then (the latest release is 2.26.0) I think it's safe to use 2.11.0 as a minimum support version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, now we do have a context manager for the bigquery client but not for the bigquery_storage_client. Would you suggest using at least the one we have? Something like

bq_storage_client = None
with bigquery.Client(project_id) as bq_client:
    try:
        if with_storage_api:
            bq_storage_client = bigquery_storage.BigQueryReadClient(
                credentials=bq_client._credentials
            )
            yield bq_client, bq_storage_client
        else:
            yield bq_client

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's the right direction. Though since bq_client.close() is all that was being called in the finally block before, we can remove the try/finally blocks since bq_client.close() will be called when we exit the scope of the bq_client context manager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have to include the closing of the storage client, maybe we should keep the try/finally but on the finally have the bigquery_storage.transport.channel.close()

try:
if with_storage_api:
bq_storage_client = bigquery_storage.BigQueryReadClient(
credentials=bq_client._credentials
)
yield bq_client, bq_storage_client
else:
yield bq_client
finally:
bq_client.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to close bq_storage_client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand the bq storage client doesn't have a .close() method. But there is a work around explained in this comment googleapis/gapic-generator-python#575 (comment) , and there are some active discussions about this here googleapis/gapic-generator-python#987

Probably we can now get away by doing bqs.transport.channel.close() as recommended here dask/dask#3121 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for investigating. What you proposed sounds good



def _stream_to_dfs(bqs_client, stream_name, schema, timeout):
"""Given a Storage API client and a stream name, yield all dataframes."""
return [
pyarrow.ipc.read_record_batch(
pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch),
schema,
).to_pandas()
for message in bqs_client.read_rows(name=stream_name, offset=0, timeout=timeout)
]


def bigquery_read_partition_field(
make_create_read_session_request: callable,
project_id: str,
timeout: int,
partition_field: str,
row_filter: str,
) -> pd.DataFrame:
"""Read a single batch of rows via BQ Storage API, in Arrow binary format.
Args:
project_id: BigQuery project
create_read_session_request: kwargs to pass to `bqs_client.create_read_session`
as `request`
partition_field: BigQuery field for partitions, to be used as Dask index col for
divisions
NOTE: Please set if specifying `row_restriction` filters in TableReadOptions.
Adapted from
https://github.com/googleapis/python-bigquery-storage/blob/a0fc0af5b4447ce8b50c365d4d081b9443b8490e/google/cloud/bigquery_storage_v1/reader.py.
"""
with bigquery_client(project_id, with_storage_api=True) as (bq_client, bqs_client):
session = bqs_client.create_read_session(
make_create_read_session_request(row_filter=row_filter)
)
schema = pyarrow.ipc.read_schema(
pyarrow.py_buffer(session.arrow_schema.serialized_schema)
)

shards = [
df
for stream in session.streams
for df in _stream_to_dfs(bqs_client, stream.name, schema, timeout=timeout)
]
# NOTE: if no rows satisfying the row_restriction, then `shards` will be empty list
if len(shards) == 0:
shards = [schema.empty_table().to_pandas()]
shards = [shard.set_index(partition_field, drop=True) for shard in shards]

return pd.concat(shards)


def bigquery_read(
make_create_read_session_request: callable,
project_id: str,
timeout: int,
stream_name: str,
) -> pd.DataFrame:
"""Read a single batch of rows via BQ Storage API, in Arrow binary format.
Args:
project_id: BigQuery project
create_read_session_request: kwargs to pass to `bqs_client.create_read_session`
as `request`
stream_name: BigQuery Storage API Stream "name".
NOTE: Please set if reading from Storage API without any `row_restriction`.
https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream
NOTE: `partition_field` and `stream_name` kwargs are mutually exclusive.
Adapted from
https://github.com/googleapis/python-bigquery-storage/blob/a0fc0af5b4447ce8b50c365d4d081b9443b8490e/google/cloud/bigquery_storage_v1/reader.py.
"""
with bigquery_client(project_id, with_storage_api=True) as (bq_client, bqs_client):
session = bqs_client.create_read_session(make_create_read_session_request())
schema = pyarrow.ipc.read_schema(
pyarrow.py_buffer(session.arrow_schema.serialized_schema)
)
shards = _stream_to_dfs(bqs_client, stream_name, schema, timeout=timeout)
# NOTE: BQ Storage API can return empty streams
if len(shards) == 0:
shards = [schema.empty_table().to_pandas()]

return pd.concat(shards)


def read_gbq(
project_id: str,
dataset_id: str,
table_id: str,
partition_field: str = None,
partitions: Iterable[str] = None,
row_filter="",
fields: list[str] = (),
read_timeout: int = 3600,
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
):
"""Read table as dask dataframe using BigQuery Storage API via Arrow format.
If `partition_field` and `partitions` are specified, then the resulting dask dataframe
will be partitioned along the same boundaries. Otherwise, partitions will be approximately
balanced according to BigQuery stream allocation logic.
If `partition_field` is specified but not included in `fields` (either implicitly by requesting
all fields, or explicitly by inclusion in the list `fields`), then it will still be included
in the query in order to have it available for dask dataframe indexing.
Args:
project_id: BigQuery project
dataset_id: BigQuery dataset within project
table_id: BigQuery table within dataset
partition_field: to specify filters of form "WHERE {partition_field} = ..."
partitions: all values to select of `partition_field`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifying the index and specific value of the index in this way seems unusual to me (at least compared to other Dask I/O functions). @bnaul I'm wondering if this API will be familiar with Bigquery users, or if these options were useful for a specific use case when this prototype was initially developed?

fields: names of the fields (columns) to select (default None to "SELECT *")
read_timeout: # of seconds an individual read request has before timing out
Returns:
dask dataframe
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
See https://github.com/dask/dask/issues/3121 for additional context.
"""
if (partition_field is None) and (partitions is not None):
raise ValueError("Specified `partitions` without `partition_field`.")

# If `partition_field` is not part of the `fields` filter, fetch it anyway to be able
# to set it as dask dataframe index. We want this to be able to have consistent:
# BQ partitioning + dask divisions + pandas index values
if (partition_field is not None) and fields and (partition_field not in fields):
fields = (partition_field, *fields)

# These read tasks seems to cause deadlocks (or at least long stuck workers out of touch with
# the scheduler), particularly when mixed with other tasks that execute C code. Anecdotally
# annotating the tasks with a higher priority seems to help (but not fully solve) the issue at
# the expense of higher cluster memory usage.
with bigquery_client(project_id, with_storage_api=True) as (
bq_client,
bqs_client,
):
table_ref = bq_client.get_table(".".join((dataset_id, table_id)))
if table_ref.table_type == "VIEW":
raise TypeError("Table type VIEW not supported")

# The protobuf types can't be pickled (may be able to tweak w/ copyreg), so instead use a
# generator func.
def make_create_read_session_request(row_filter=""):
return bigquery_storage.types.CreateReadSessionRequest(
max_stream_count=100, # 0 -> use as many streams as BQ Storage will provide
parent=f"projects/{project_id}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should projects be hard coded here?

Nvm, looking at the docstring for CreateReadSessionRequest, it appears the answer is "yes, projects should be hardcoded"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_session=bigquery_storage.types.ReadSession(
data_format=bigquery_storage.types.DataFormat.ARROW,
read_options=bigquery_storage.types.ReadSession.TableReadOptions(
row_restriction=row_filter,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrbourbeau I'm not quite sure if this works if we don't use the default, should we remove this, since we removed the partition field options. We seem to always leave it as ""
Here is some documentation to review https://googleapis.dev/python/bigquerystorage/latest/bigquery_storage_v1beta2/types.html#google.cloud.bigquery_storage_v1beta2.types.ReadSession.TableReadOptions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row_filter is quite useful as it will perform the filtering server-side and can avoid a lot of extraneous IO. I don't see any reason to remove it, but probably it should be made into just a more generic TableReadOptions object like you linked to so that it can be used for column selection as well

one other small note: the doc you linked is for the beta API, v1 was released since our original implementation https://googleapis.dev/python/bigquerystorage/latest/bigquery_storage_v1/types.html#google.cloud.bigquery_storage_v1.types.ReadSession.TableReadOptions. there are a couple of other references to the beta docs throughout as well

selected_fields=fields,
),
table=table_ref.to_bqstorage(),
),
)

# Create a read session in order to detect the schema.
# Read sessions are light weight and will be auto-deleted after 24 hours.
session = bqs_client.create_read_session(
make_create_read_session_request(row_filter=row_filter)
)
Comment on lines +123 to +125
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does session have a close method, or some other cleanup method, we should call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema = pyarrow.ipc.read_schema(
pyarrow.py_buffer(session.arrow_schema.serialized_schema)
)
meta = schema.empty_table().to_pandas()

label = "read-gbq-"
output_name = label + tokenize(
project_id,
dataset_id,
table_id,
partition_field,
partitions,
row_filter,
fields,
read_timeout,
)

if partition_field is not None:
if row_filter:
raise ValueError("Cannot pass both `partition_field` and `row_filter`")

meta = meta.set_index(partition_field, drop=True)

if partitions is None:
logging.info(
"Specified `partition_field` without `partitions`; reading full table."
)
partitions = [
p
for p in bq_client.list_partitions(f"{dataset_id}.{table_id}")
if p != "__NULL__"
]
# TODO generalize to ranges (as opposed to discrete values)

partitions = sorted(partitions)
row_filters = [
f'{partition_field} = "{partition_value}"'
for partition_value in partitions
]
layer = DataFrameIOLayer(
output_name,
meta.columns,
row_filters,
partial(
bigquery_read_partition_field,
make_create_read_session_request,
project_id,
read_timeout,
partition_field,
),
label=label,
)
divisions = (*partitions, partitions[-1])
else:
layer = DataFrameIOLayer(
output_name,
meta.columns,
[stream.name for stream in session.streams],
partial(
bigquery_read,
make_create_read_session_request,
project_id,
read_timeout,
),
label=label,
)
divisions = tuple([None] * (len(session.streams) + 1))

graph = HighLevelGraph({output_name: layer}, {output_name: set()})
return new_dd_object(graph, output_name, meta, divisions)
95 changes: 95 additions & 0 deletions dask_bigquery/tests/test_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import random
import uuid

import pandas as pd
import pytest
from dask.dataframe.utils import assert_eq
from distributed.utils_test import cluster_fixture # noqa: F401
from distributed.utils_test import client, loop # noqa: F401
from google.cloud import bigquery

from dask_bigquery import read_gbq

# These tests are run locally and assume the user is already athenticated.
# It also assumes that the user has created a project called dask-bigquery.


@pytest.fixture
def df():
records = [
{
"name": random.choice(["fred", "wilma", "barney", "betty"]),
"number": random.randint(0, 100),
"idx": i,
}
for i in range(10)
]

yield pd.DataFrame(records)


@pytest.fixture
def dataset(df):
"Push some data to BigQuery using pandas gbq"
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
project_id = "dask-bigquery"
dataset_id = uuid.uuid4().hex
table_id = "table_test"
# push data to gbq
pd.DataFrame.to_gbq(
df,
destination_table=f"{dataset_id}.{table_id}",
project_id=project_id,
chunksize=5,
if_exists="append",
)
yield (project_id, dataset_id, table_id)

with bigquery.Client() as bq_client:
bq_client.delete_dataset(
dataset=f"{project_id}.{dataset_id}",
delete_contents=True,
)


# test simple read
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
def test_read_gbq(df, dataset, client):
"""Test simple read of data pushed to BigQuery using pandas-gbq"""
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
project_id, dataset_id, table_id = dataset
ddf = read_gbq(project_id=project_id, dataset_id=dataset_id, table_id=table_id)

assert list(ddf.columns) == ["name", "number", "idx"]
assert ddf.npartitions == 2
assert assert_eq(ddf.set_index("idx"), df.set_index("idx"))


# test partitioned data: this test requires a copy of the public dataset
# bigquery-public-data.covid19_public_forecasts.county_14d into a the
# project dask-bigquery


@pytest.mark.parametrize(
"fields",
([], ["county_name"], ["county_name", "county_fips_code"]),
ids=["no_fields", "missing_partition_field", "fields"],
)
def test_read_gbq_partitioning(fields, client):
partitions = ["Teton", "Loudoun"]
ddf = read_gbq(
project_id="dask-bigquery",
dataset_id="covid19_public_forecasts",
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
table_id="county_14d",
partition_field="county_name",
partitions=partitions,
fields=fields,
)

assert len(ddf) # check it's not empty
loaded = set(ddf.columns) | {ddf.index.name}

if fields:
assert loaded == set(fields) | {"county_name"}
else: # all columns loaded
assert loaded >= set(["county_name", "county_fips_code"])

assert ddf.npartitions == len(partitions)
assert list(ddf.divisions) == sorted(ddf.divisions)
Loading