Skip to content
Permalink
Browse files
deps: require pyarrow for pandas support (#314)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [X] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [X] Ensure the tests and linter pass
- [X] Code coverage does not decrease (if any source code was changed)
- [X] Appropriate docs were updated (if necessary)

Fixes #265 🦕
  • Loading branch information
cguardia committed Oct 12, 2020
1 parent b8f502b commit 801e4c0574b7e421aa3a28cafec6fd6bcce940dd
@@ -26,10 +26,6 @@

import pytest

try:
import fastparquet
except (ImportError, AttributeError):
fastparquet = None
try:
import pandas
except (ImportError, AttributeError):
@@ -38,7 +38,6 @@
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery import enums
from google.cloud.bigquery.enums import StandardSqlDataTypes
from google.cloud.bigquery.exceptions import PyarrowMissingWarning
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.external_config import BigtableOptions
from google.cloud.bigquery.external_config import BigtableColumnFamily
@@ -143,8 +142,6 @@
"WriteDisposition",
# EncryptionConfiguration
"EncryptionConfiguration",
# Errors and warnings
"PyarrowMissingWarning",
]


@@ -58,7 +58,6 @@
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.exceptions import PyarrowMissingWarning
from google.cloud.bigquery.opentelemetry_tracing import create_span
from google.cloud.bigquery import job
from google.cloud.bigquery.model import Model
@@ -2135,29 +2134,31 @@ def load_table_from_dataframe(
[Beta] The compression method to use if intermittently
serializing ``dataframe`` to a parquet file.
If ``pyarrow`` and job config schema are used, the argument
is directly passed as the ``compression`` argument to the
underlying ``pyarrow.parquet.write_table()`` method (the
default value "snappy" gets converted to uppercase).
The argument is directly passed as the ``compression``
argument to the underlying ``pyarrow.parquet.write_table()``
method (the default value "snappy" gets converted to uppercase).
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
If either ``pyarrow`` or job config schema are missing, the
argument is directly passed as the ``compression`` argument
to the underlying ``DataFrame.to_parquet()`` method.
If the job config schema is missing, the argument is directly
passed as the ``compression`` argument to the underlying
``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Raises:
ImportError:
ValueError:
If a usable parquet engine cannot be found. This method
requires :mod:`pyarrow` or :mod:`fastparquet` to be
installed.
requires :mod:`pyarrow` to be installed.
TypeError:
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
class.
"""
if pyarrow is None:
# pyarrow is now the only supported parquet engine.
raise ValueError("This method requires pyarrow to be installed")

job_id = _make_job_id(job_id, job_id_prefix)

if job_config:
@@ -2222,7 +2223,7 @@ def load_table_from_dataframe(
os.close(tmpfd)

try:
if pyarrow and job_config.schema:
if job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()

@@ -2233,24 +2234,6 @@ def load_table_from_dataframe(
parquet_compression=parquet_compression,
)
else:
if not pyarrow:
warnings.warn(
"Loading dataframe data without pyarrow installed is "
"deprecated and will become unsupported in the future. "
"Please install the pyarrow package.",
PyarrowMissingWarning,
stacklevel=2,
)

if job_config.schema:
warnings.warn(
"job_config.schema is set, but not used to assist in "
"identifying correct types for data serialization. "
"Please install the pyarrow package.",
PendingDeprecationWarning,
stacklevel=2,
)

dataframe.to_parquet(tmppath, compression=parquet_compression)

with open(tmppath, "rb") as parquet_file:

This file was deleted.

@@ -50,7 +50,6 @@
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
from google.cloud.bigquery.schema import _to_schema_fields
from google.cloud.bigquery.exceptions import PyarrowMissingWarning
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration

@@ -1679,75 +1678,38 @@ def to_dataframe(
create_bqstorage_client = False
bqstorage_client = None

if pyarrow is not None:
# If pyarrow is available, calling to_arrow, then converting to a
# pandas dataframe is about 2x faster. This is because pandas.concat is
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
record_batch = self.to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
)
record_batch = self.to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
)

# When converting timestamp values to nanosecond precision, the result
# can be out of pyarrow bounds. To avoid the error when converting to
# Pandas, we set the timestamp_as_object parameter to True, if necessary.
types_to_check = {
pyarrow.timestamp("us"),
pyarrow.timestamp("us", tz=pytz.UTC),
}

# When converting timestamp values to nanosecond precision, the result
# can be out of pyarrow bounds. To avoid the error when converting to
# Pandas, we set the timestamp_as_object parameter to True, if necessary.
types_to_check = {
pyarrow.timestamp("us"),
pyarrow.timestamp("us", tz=pytz.UTC),
}

for column in record_batch:
if column.type in types_to_check:
try:
column.cast("timestamp[ns]")
except pyarrow.lib.ArrowInvalid:
timestamp_as_object = True
break
else:
timestamp_as_object = False

extra_kwargs = {"timestamp_as_object": timestamp_as_object}

df = record_batch.to_pandas(date_as_object=date_as_object, **extra_kwargs)

for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df
for column in record_batch:
if column.type in types_to_check:
try:
column.cast("timestamp[ns]")
except pyarrow.lib.ArrowInvalid:
timestamp_as_object = True
break
else:
warnings.warn(
"Converting to a dataframe without pyarrow installed is "
"often slower and will become unsupported in the future. "
"Please install the pyarrow package.",
PyarrowMissingWarning,
stacklevel=2,
)
timestamp_as_object = False

# The bqstorage_client is only used if pyarrow is available, so the
# rest of this method only needs to account for tabledata.list.
progress_bar = self._get_progress_bar(progress_bar_type)
extra_kwargs = {"timestamp_as_object": timestamp_as_object}

frames = []
for frame in self.to_dataframe_iterable(dtypes=dtypes):
frames.append(frame)
df = record_batch.to_pandas(date_as_object=date_as_object, **extra_kwargs)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()

# Avoid concatting an empty list.
if not frames:
column_names = [field.name for field in self._schema]
return pandas.DataFrame(columns=column_names)
return pandas.concat(frames, ignore_index=True)
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])

return df


class _EmptyRowIterator(object):
@@ -49,10 +49,7 @@ def default(session):
constraints_path,
)

# fastparquet is not included in .[all] because, in general, it's
# redundant with pyarrow. We still want to run some unit tests with
# fastparquet serialization, though.
session.install("-e", ".[all,fastparquet]", "-c", constraints_path)
session.install("-e", ".[all]", "-c", constraints_path)

session.install("ipython", "-c", constraints_path)

@@ -47,13 +47,12 @@
"grpcio >= 1.32.0, < 2.0dev",
"pyarrow >= 1.0.0, < 2.0dev",
],
"pandas": ["pandas>=0.23.0"],
"pyarrow": [
"pandas": [
"pandas>=0.23.0",
# pyarrow 1.0.0 is required for the use of timestamp_as_object keyword.
"pyarrow >= 1.0.0, < 2.0dev",
],
"tqdm": ["tqdm >= 4.7.4, <5.0.0dev"],
"fastparquet": ["fastparquet", "python-snappy", "llvmlite>=0.34.0"],
"opentelemetry": [
"opentelemetry-api==0.9b0",
"opentelemetry-sdk==0.9b0",
@@ -64,13 +63,6 @@
all_extras = []

for extra in extras:
if extra in (
# Skip fastparquet from "all" because it is redundant with pyarrow and
# creates a dependency on pre-release versions of numpy. See:
# https://github.com/googleapis/google-cloud-python/issues/8549
"fastparquet",
):
continue
all_extras.extend(extras[extra])

extras["all"] = all_extras
@@ -1,4 +1,3 @@
fastparquet==0.4.1
google-api-core==1.22.2
google-cloud-bigquery-storage==2.0.0
google-cloud-core==1.4.1
@@ -1329,3 +1329,11 @@ def test_download_dataframe_tabledata_list_dict_sequence_schema(module_under_tes
)
)
assert result.equals(expected_result)

with pytest.raises(StopIteration):
result = next(results_gen)


def test_table_data_listpage_to_dataframe_skips_stop_iteration(module_under_test):
dataframe = module_under_test._tabledata_list_page_to_dataframe([], [], {})
assert isinstance(dataframe, pandas.DataFrame)
Loading

0 comments on commit 801e4c0

Please sign in to comment.