Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): make rowIterator._to_dataframe_iterable public #10017

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1554,11 +1554,44 @@ def to_arrow(
arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema)
return pyarrow.Table.from_batches(record_batches, schema=arrow_schema)

def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
"""Create an iterable of pandas DataFrames, to process the table as a stream.

See ``to_dataframe`` for argument descriptions.
Args:
bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient):
**Beta Feature** Optional. A BigQuery Storage API client. If
supplied, use the faster BigQuery Storage API to fetch rows
from BigQuery.

This method requires the ``pyarrow`` and
``google-cloud-bigquery-storage`` libraries.

Reading from a specific partition or snapshot is not
currently supported by this method.

**Caution**: There is a known issue reading small anonymous
query result tables with the BQ Storage API. When a problem
is encountered reading a table, the tabledata.list method
from the BigQuery API is used, instead.
dtypes (Map[str, Union[str, pandas.Series.dtype]]):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.

Returns:
pandas.DataFrame:
A generator of :class:`~pandas.DataFrame`.

Raises:
ValueError:
If the :mod:`pandas` library cannot be imported.
"""
if pandas is None:
raise ValueError(_NO_PANDAS_ERROR)
if dtypes is None:
dtypes = {}

column_names = [field.name for field in self._schema]
bqstorage_download = functools.partial(
_pandas_helpers.download_dataframe_bqstorage,
Expand Down Expand Up @@ -1683,7 +1716,7 @@ def to_dataframe(
progress_bar = self._get_progress_bar(progress_bar_type)

frames = []
for frame in self._to_dataframe_iterable(
for frame in self.to_dataframe_iterable(
bqstorage_client=bqstorage_client, dtypes=dtypes
):
frames.append(frame)
Expand Down
62 changes: 62 additions & 0 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2014,6 +2014,68 @@ def test_to_arrow_w_pyarrow_none(self):
with self.assertRaises(ValueError):
row_iterator.to_arrow()

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_to_dataframe_iterable(self):
from google.cloud.bigquery.schema import SchemaField
import types

schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
]

path = "/foo"
api_request = mock.Mock(
side_effect=[
{
"rows": [{"f": [{"v": "Bengt"}, {"v": "32"}]}],
"pageToken": "NEXTPAGE",
},
{"rows": [{"f": [{"v": "Sven"}, {"v": "33"}]}]},
]
)

row_iterator = self._make_one(
_mock_client(), api_request, path, schema, page_size=1, max_results=5
)
dfs = row_iterator.to_dataframe_iterable()

self.assertIsInstance(dfs, types.GeneratorType)

df_1 = next(dfs)
self.assertIsInstance(df_1, pandas.DataFrame)
self.assertEqual(df_1.name.dtype.name, "object")
self.assertEqual(df_1.age.dtype.name, "int64")
self.assertEqual(len(df_1), 1) # verify the number of rows
self.assertEqual(
df_1["name"][0], "Bengt"
) # verify the first value of 'name' column
self.assertEqual(df_1["age"][0], 32) # verify the first value of 'age' column

df_2 = next(dfs)
self.assertEqual(len(df_2), 1) # verify the number of rows
self.assertEqual(df_2["name"][0], "Sven")
self.assertEqual(df_2["age"][0], 33)

@mock.patch("google.cloud.bigquery.table.pandas", new=None)
def test_to_dataframe_iterable_error_if_pandas_is_none(self):
from google.cloud.bigquery.schema import SchemaField

schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
]
rows = [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})
row_iterator = self._make_one(_mock_client(), api_request, path, schema)

with pytest.raises(ValueError, match="pandas"):
row_iterator.to_dataframe_iterable()

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_to_dataframe(self):
from google.cloud.bigquery.schema import SchemaField
Expand Down