diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml new file mode 100644 index 0000000..229c576 --- /dev/null +++ b/.github/workflows/pre-commit.yml @@ -0,0 +1,16 @@ +name: Linting + +on: + push: + branches: main + pull_request: + branches: main + +jobs: + checks: + name: "pre-commit hooks" + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + - uses: pre-commit/action@v2.0.0 \ No newline at end of file diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..fd7704d --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,54 @@ +name: Tests + +on: push + +# When this workflow is queued, automatically cancel any previous running +# or pending jobs from the same branch +concurrency: + group: ${{ github.ref }} + cancel-in-progress: true + +jobs: + test: + runs-on: ${{ matrix.os }} + defaults: + run: + shell: bash -l {0} + strategy: + fail-fast: false + matrix: + os: ["windows-latest", "ubuntu-latest", "macos-latest"] + python-version: ["3.7", "3.8", "3.9"] + + steps: + - name: Checkout source + uses: actions/checkout@v2 + with: + fetch-depth: 0 # Needed by codecov.io + + - name: Setup Conda Environment + uses: conda-incubator/setup-miniconda@v2 + with: + miniforge-variant: Mambaforge + miniforge-version: latest + use-mamba: true + channel-priority: strict + python-version: ${{ matrix.python-version }} + environment-file: ci/environment-${{ matrix.python-version }}.yaml + activate-environment: test-environment + auto-activate-base: false + + - name: Install dask-bigquery + run: python -m pip install --no-deps -e . + + - name: Set up Cloud SDK + uses: google-github-actions/setup-gcloud@master + with: + project_id: ${{ secrets.GCP_PROJECT_ID }} + service_account_key: ${{ secrets.GCP_SA_KEY }} + export_default_credentials: true + + - name: Run tests + env: + DASK_BIGQUERY_PROJECT_ID: "${{ secrets.GCP_PROJECT_ID }}" + run: pytest -v dask_bigquery \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..ee7164d --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,17 @@ +repos: +- repo: https://github.com/psf/black + rev: 20.8b1 + hooks: + - id: black + language_version: python3 + exclude: versioneer.py +- repo: https://gitlab.com/pycqa/flake8 + rev: 3.8.3 + hooks: + - id: flake8 + language_version: python3 +- repo: https://github.com/pycqa/isort + rev: 5.8.0 + hooks: + - id: isort + language_version: python3 \ No newline at end of file diff --git a/README.md b/README.md index 020194d..c0db199 100644 --- a/README.md +++ b/README.md @@ -1 +1,35 @@ -# dask-bigquery \ No newline at end of file +# Dask-BigQuery + +[![Tests](https://github.com/coiled/dask-bigquery/actions/workflows/tests.yml/badge.svg)](https://github.com/coiled/dask-bigquery/actions/workflows/tests.yml) [![Linting](https://github.com/coiled/dask-bigquery/actions/workflows/pre-commit.yml/badge.svg)](https://github.com/coiled/dask-bigquery/actions/workflows/pre-commit.yml) + +Read data from Google BigQuery with Dask + +## Installation + +## Example + +`dask-bigquery` assumes that you are already authenticated. + +```python +import dask_bigquery + +ddf = dask_bigquery.read_gbq( + project_id="your_project_id", + dataset_id="your_dataset", + table_id="your_table", +) + +ddf.head() +``` + +## History + +This project stems from the discussion in +[this Dask issue](https://github.com/dask/dask/issues/3121) and +[this initial implementation](https://gist.github.com/bnaul/4819f045ccbee160b60a530b6cfc0c98#file-dask_bigquery-py) +developed by [Brett Naul](https://github.com/bnaul), [Jacob Hayes](https://github.com/JacobHayes), +and [Steven Soojin Kim](https://github.com/mikss). + +## License + +[BSD-3](LICENSE) \ No newline at end of file diff --git a/ci/environment-3.7.yaml b/ci/environment-3.7.yaml new file mode 100644 index 0000000..a8f48ce --- /dev/null +++ b/ci/environment-3.7.yaml @@ -0,0 +1,14 @@ +name: test-environment +channels: + - conda-forge +dependencies: + - python=3.7 + - dask + - distributed + - pandas + - pyarrow + - pytest + - grpcio + - pandas-gbq + - google-cloud-bigquery + - google-cloud-bigquery-storage \ No newline at end of file diff --git a/ci/environment-3.8.yaml b/ci/environment-3.8.yaml new file mode 100644 index 0000000..ba179fb --- /dev/null +++ b/ci/environment-3.8.yaml @@ -0,0 +1,14 @@ +name: test-environment +channels: + - conda-forge +dependencies: + - python=3.8 + - dask + - distributed + - pandas + - pyarrow + - pytest + - grpcio + - pandas-gbq + - google-cloud-bigquery + - google-cloud-bigquery-storage \ No newline at end of file diff --git a/ci/environment-3.9.yaml b/ci/environment-3.9.yaml new file mode 100644 index 0000000..c9cec6f --- /dev/null +++ b/ci/environment-3.9.yaml @@ -0,0 +1,14 @@ +name: test-environment +channels: + - conda-forge +dependencies: + - python=3.9 + - dask + - distributed + - pandas + - pyarrow + - pytest + - grpcio + - pandas-gbq + - google-cloud-bigquery + - google-cloud-bigquery-storage \ No newline at end of file diff --git a/dask_bigquery/__init__.py b/dask_bigquery/__init__.py new file mode 100644 index 0000000..80a7c9c --- /dev/null +++ b/dask_bigquery/__init__.py @@ -0,0 +1 @@ +from .core import read_gbq diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py new file mode 100644 index 0000000..1858180 --- /dev/null +++ b/dask_bigquery/core.py @@ -0,0 +1,155 @@ +from __future__ import annotations + +from contextlib import contextmanager +from functools import partial + +import pandas as pd +import pyarrow +from dask.base import tokenize +from dask.dataframe.core import new_dd_object +from dask.highlevelgraph import HighLevelGraph +from dask.layers import DataFrameIOLayer +from google.cloud import bigquery, bigquery_storage + + +@contextmanager +def bigquery_clients(project_id): + """This context manager is a temporary solution until there is an + upstream solution to handle this. + See googleapis/google-cloud-python#9457 + and googleapis/gapic-generator-python#575 for reference. + """ + with bigquery.Client(project_id) as bq_client: + bq_storage_client = bigquery_storage.BigQueryReadClient( + credentials=bq_client._credentials + ) + yield bq_client, bq_storage_client + bq_storage_client.transport.grpc_channel.close() + + +def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs): + """Given a Storage API client and a stream name, yield all dataframes.""" + return [ + pyarrow.ipc.read_record_batch( + pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch), + schema, + ).to_pandas() + for message in bqs_client.read_rows(name=stream_name, offset=0, **read_kwargs) + ] + + +def bigquery_read( + make_create_read_session_request: callable, + project_id: str, + read_kwargs: dict, + stream_name: str, +) -> pd.DataFrame: + """Read a single batch of rows via BQ Storage API, in Arrow binary format. + + Parameters + ---------- + create_read_session_request: callable + kwargs to pass to `bqs_client.create_read_session` as `request` + project_id: str + Name of the BigQuery project. + read_kwargs: dict + kwargs to pass to read_rows() + stream_name: str + BigQuery Storage API Stream "name" + NOTE: Please set if reading from Storage API without any `row_restriction`. + https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream + """ + with bigquery_clients(project_id) as (_, bqs_client): + session = bqs_client.create_read_session(make_create_read_session_request()) + schema = pyarrow.ipc.read_schema( + pyarrow.py_buffer(session.arrow_schema.serialized_schema) + ) + shards = _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs) + # NOTE: BQ Storage API can return empty streams + if len(shards) == 0: + shards = [schema.empty_table().to_pandas()] + + return pd.concat(shards) + + +def read_gbq( + project_id: str, + dataset_id: str, + table_id: str, + row_filter="", + read_kwargs: dict = None, +): + """Read table as dask dataframe using BigQuery Storage API via Arrow format. + Partitions will be approximately balanced according to BigQuery stream allocation logic. + + Parameters + ---------- + project_id: str + Name of the BigQuery project id. + dataset_id: str + BigQuery dataset within project + table_id: str + BigQuery table within dataset + row_filter: str + SQL text filtering statement to pass to `row_restriction` + read_kwargs: dict + kwargs to pass to read_rows() + + Returns + ------- + Dask DataFrame + """ + read_kwargs = read_kwargs or {} + with bigquery_clients(project_id) as (bq_client, bqs_client): + table_ref = bq_client.get_table(f"{dataset_id}.{table_id}") + if table_ref.table_type == "VIEW": + raise TypeError("Table type VIEW not supported") + + def make_create_read_session_request(row_filter=""): + return bigquery_storage.types.CreateReadSessionRequest( + max_stream_count=100, # 0 -> use as many streams as BQ Storage will provide + parent=f"projects/{project_id}", + read_session=bigquery_storage.types.ReadSession( + data_format=bigquery_storage.types.DataFormat.ARROW, + read_options=bigquery_storage.types.ReadSession.TableReadOptions( + row_restriction=row_filter, + ), + table=table_ref.to_bqstorage(), + ), + ) + + # Create a read session in order to detect the schema. + # Read sessions are light weight and will be auto-deleted after 24 hours. + session = bqs_client.create_read_session( + make_create_read_session_request(row_filter=row_filter) + ) + schema = pyarrow.ipc.read_schema( + pyarrow.py_buffer(session.arrow_schema.serialized_schema) + ) + meta = schema.empty_table().to_pandas() + + label = "read-gbq-" + output_name = label + tokenize( + project_id, + dataset_id, + table_id, + row_filter, + read_kwargs, + ) + + layer = DataFrameIOLayer( + output_name, + meta.columns, + [stream.name for stream in session.streams], + partial( + bigquery_read, + make_create_read_session_request, + project_id, + read_kwargs, + ), + label=label, + ) + divisions = tuple([None] * (len(session.streams) + 1)) + + graph = HighLevelGraph({output_name: layer}, {output_name: set()}) + return new_dd_object(graph, output_name, meta, divisions) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py new file mode 100644 index 0000000..6da48c0 --- /dev/null +++ b/dask_bigquery/tests/test_core.py @@ -0,0 +1,84 @@ +import os +import random +import uuid + +import pandas as pd +import pytest +from dask.dataframe.utils import assert_eq +from distributed.utils_test import cluster_fixture # noqa: F401 +from distributed.utils_test import client, loop # noqa: F401 +from google.cloud import bigquery + +from dask_bigquery import read_gbq + + +@pytest.fixture +def df(): + records = [ + { + "name": random.choice(["fred", "wilma", "barney", "betty"]), + "number": random.randint(0, 100), + "idx": i, + } + for i in range(10) + ] + + yield pd.DataFrame(records) + + +@pytest.fixture +def dataset(df): + project_id = os.environ.get("DASK_BIGQUERY_PROJECT_ID", "dask-bigquery") + dataset_id = uuid.uuid4().hex + table_id = "table_test" + # push data to gbq + pd.DataFrame.to_gbq( + df, + destination_table=f"{dataset_id}.{table_id}", + project_id=project_id, + chunksize=5, + if_exists="append", + ) + yield (project_id, dataset_id, table_id) + + with bigquery.Client() as bq_client: + bq_client.delete_dataset( + dataset=f"{project_id}.{dataset_id}", + delete_contents=True, + ) + + +def test_read_gbq(df, dataset, client): + project_id, dataset_id, table_id = dataset + ddf = read_gbq(project_id=project_id, dataset_id=dataset_id, table_id=table_id) + + assert list(ddf.columns) == ["name", "number", "idx"] + assert ddf.npartitions == 2 + assert assert_eq(ddf.set_index("idx"), df.set_index("idx")) + + +def test_read_row_filter(df, dataset, client): + project_id, dataset_id, table_id = dataset + ddf = read_gbq( + project_id=project_id, + dataset_id=dataset_id, + table_id=table_id, + row_filter="idx < 5", + ) + + assert list(ddf.columns) == ["name", "number", "idx"] + assert ddf.npartitions == 2 + assert assert_eq(ddf.set_index("idx").loc[:4], df.set_index("idx").loc[:4]) + + +def test_read_kwargs(dataset, client): + project_id, dataset_id, table_id = dataset + ddf = read_gbq( + project_id=project_id, + dataset_id=dataset_id, + table_id=table_id, + read_kwargs={"timeout": 1e-12}, + ) + + with pytest.raises(Exception, match="504 Deadline Exceeded"): + ddf.compute() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..14c5f49 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +dask +distributed +google-cloud-bigquery >= 2.11.0 +google-cloud-bigquery-storage +pandas +pandas-gbq +pyarrow diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..4921ff4 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,4 @@ +[flake8] +exclude = __init__.py +max-line-length = 120 +ignore = F811 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..48525a0 --- /dev/null +++ b/setup.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +from setuptools import setup + +with open("README.md", "r", encoding="utf-8") as f: + long_description = f.read() + +setup( + name="dask-bigquery", + version="0.0.1", + description="Dask + BigQuery intergration", + license="BSD", + packages=["dask_bigquery"], + long_description=long_description, + long_description_content_type="text/markdown", + python_requires=">=3.7", + install_requires=open("requirements.txt").read().strip().split("\n"), + extras_require={"test": ["pytest"]}, + include_package_data=True, + zip_safe=False, +)