Skip to content
Permalink
Browse files
feat: Support using GeoPandas for GEOGRAPHY columns (#848)
  • Loading branch information
jimfulton committed Aug 24, 2021
1 parent 5c5b4b8 commit 16f65e6ae15979217ceea6c6d398c9057a363a13
@@ -366,6 +366,8 @@
"grpc": ("https://grpc.github.io/grpc/python/", None),
"proto-plus": ("https://proto-plus-python.readthedocs.io/en/latest/", None),
"protobuf": ("https://googleapis.dev/python/protobuf/latest/", None),
"pandas": ("http://pandas.pydata.org/pandas-docs/dev", None),
"geopandas": ("https://geopandas.org/", None),
}


@@ -37,6 +37,21 @@ To retrieve table rows as a :class:`pandas.DataFrame`:
:start-after: [START bigquery_list_rows_dataframe]
:end-before: [END bigquery_list_rows_dataframe]


Retrieve BigQuery GEOGRAPHY data as a GeoPandas GeoDataFrame
------------------------------------------------------------

`GeoPandas <https://geopandas.org/>`_ adds geospatial analytics
capabilities to Pandas. To retrieve query results containing
GEOGRAPHY data as a :class:`geopandas.GeoDataFrame`:

.. literalinclude:: ../samples/geography/to_geodataframe.py
:language: python
:dedent: 4
:start-after: [START bigquery_query_results_geodataframe]
:end-before: [END bigquery_query_results_geodataframe]


Load a Pandas DataFrame to a BigQuery Table
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

@@ -24,6 +24,36 @@
import pandas
except ImportError: # pragma: NO COVER
pandas = None
else:
import numpy

try:
# _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array`
from shapely.geometry.base import BaseGeometry as _BaseGeometry
except ImportError: # pragma: NO COVER
# No shapely, use NoneType for _BaseGeometry as a placeholder.
_BaseGeometry = type(None)
else:
if pandas is not None: # pragma: NO COVER

def _to_wkb():
# Create a closure that:
# - Adds a not-null check. This allows the returned function to
# be used directly with apply, unlike `shapely.wkb.dumps`.
# - Avoid extra work done by `shapely.wkb.dumps` that we don't need.
# - Caches the WKBWriter (and write method lookup :) )
# - Avoids adding WKBWriter, lgeos, and notnull to the module namespace.
from shapely.geos import WKBWriter, lgeos

write = WKBWriter(lgeos).write
notnull = pandas.notnull

def _to_wkb(v):
return write(v) if notnull(v) else v

return _to_wkb

_to_wkb = _to_wkb()

try:
import pyarrow
@@ -69,6 +99,7 @@
"uint8": "INTEGER",
"uint16": "INTEGER",
"uint32": "INTEGER",
"geometry": "GEOGRAPHY",
}


@@ -193,14 +224,16 @@ def bq_to_arrow_data_type(field):
return data_type_constructor()


def bq_to_arrow_field(bq_field):
def bq_to_arrow_field(bq_field, array_type=None):
"""Return the Arrow field, corresponding to a given BigQuery column.
Returns:
None: if the Arrow type cannot be determined.
"""
arrow_type = bq_to_arrow_data_type(bq_field)
if arrow_type:
if arrow_type is not None:
if array_type is not None:
arrow_type = array_type # For GEOGRAPHY, at least initially
is_nullable = bq_field.mode.upper() == "NULLABLE"
return pyarrow.field(bq_field.name, arrow_type, nullable=is_nullable)

@@ -225,7 +258,24 @@ def bq_to_arrow_schema(bq_schema):


def bq_to_arrow_array(series, bq_field):
arrow_type = bq_to_arrow_data_type(bq_field)
if bq_field.field_type.upper() == "GEOGRAPHY":
arrow_type = None
first = _first_valid(series)
if first is not None:
if series.dtype.name == "geometry" or isinstance(first, _BaseGeometry):
arrow_type = pyarrow.binary()
# Convert shapey geometry to WKB binary format:
series = series.apply(_to_wkb)
elif isinstance(first, bytes):
arrow_type = pyarrow.binary()
elif series.dtype.name == "geometry":
# We have a GeoSeries containing all nulls, convert it to a pandas series
series = pandas.Series(numpy.array(series))

if arrow_type is None:
arrow_type = bq_to_arrow_data_type(bq_field)
else:
arrow_type = bq_to_arrow_data_type(bq_field)

field_type_upper = bq_field.field_type.upper() if bq_field.field_type else ""

@@ -279,6 +329,12 @@ def list_columns_and_indexes(dataframe):
return columns_and_indexes


def _first_valid(series):
first_valid_index = series.first_valid_index()
if first_valid_index is not None:
return series.at[first_valid_index]


def dataframe_to_bq_schema(dataframe, bq_schema):
"""Convert a pandas DataFrame schema to a BigQuery schema.
@@ -319,6 +375,13 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
# Otherwise, try to automatically determine the type based on the
# pandas dtype.
bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
if bq_type is None:
sample_data = _first_valid(dataframe[column])
if (
isinstance(sample_data, _BaseGeometry)
and sample_data is not None # Paranoia
):
bq_type = "GEOGRAPHY"
bq_field = schema.SchemaField(column, bq_type)
bq_schema_out.append(bq_field)

@@ -450,11 +513,11 @@ def dataframe_to_arrow(dataframe, bq_schema):
arrow_names = []
arrow_fields = []
for bq_field in bq_schema:
arrow_fields.append(bq_to_arrow_field(bq_field))
arrow_names.append(bq_field.name)
arrow_arrays.append(
bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field)
)
arrow_fields.append(bq_to_arrow_field(bq_field, arrow_arrays[-1].type))

if all((field is not None for field in arrow_fields)):
return pyarrow.Table.from_arrays(
@@ -53,6 +53,7 @@
# Assumption: type checks are only used by library developers and CI environments
# that have all optional dependencies installed, thus no conditional imports.
import pandas
import geopandas
import pyarrow
from google.api_core import retry as retries
from google.cloud import bigquery_storage
@@ -1487,6 +1488,7 @@ def to_dataframe(
create_bqstorage_client: bool = True,
date_as_object: bool = True,
max_results: Optional[int] = None,
geography_as_object: bool = False,
) -> "pandas.DataFrame":
"""Return a pandas DataFrame from a QueryJob
@@ -1538,13 +1540,27 @@ def to_dataframe(
.. versionadded:: 2.21.0
geography_as_object (Optional[bool]):
If ``True``, convert GEOGRAPHY data to :mod:`shapely`
geometry objects. If ``False`` (default), don't cast
geography data to :mod:`shapely` geometry objects.
.. versionadded:: 2.24.0
Returns:
A :class:`~pandas.DataFrame` populated with row data and column
headers from the query results. The column headers are derived
from the destination table's schema.
pandas.DataFrame:
A :class:`~pandas.DataFrame` populated with row data
and column headers from the query results. The column
headers are derived from the destination table's
schema.
Raises:
ValueError: If the `pandas` library cannot be imported.
ValueError:
If the :mod:`pandas` library cannot be imported, or
the :mod:`google.cloud.bigquery_storage_v1` module is
required but cannot be imported. Also if
`geography_as_object` is `True`, but the
:mod:`shapely` library cannot be imported.
"""
query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
return query_result.to_dataframe(
@@ -1553,6 +1569,101 @@ def to_dataframe(
progress_bar_type=progress_bar_type,
create_bqstorage_client=create_bqstorage_client,
date_as_object=date_as_object,
geography_as_object=geography_as_object,
)

# If changing the signature of this method, make sure to apply the same
# changes to table.RowIterator.to_dataframe(), except for the max_results parameter
# that should only exist here in the QueryJob method.
def to_geodataframe(
self,
bqstorage_client: "bigquery_storage.BigQueryReadClient" = None,
dtypes: Dict[str, Any] = None,
progress_bar_type: str = None,
create_bqstorage_client: bool = True,
date_as_object: bool = True,
max_results: Optional[int] = None,
geography_column: Optional[str] = None,
) -> "geopandas.GeoDataFrame":
"""Return a GeoPandas GeoDataFrame from a QueryJob
Args:
bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
A BigQuery Storage API client. If supplied, use the faster
BigQuery Storage API to fetch rows from BigQuery. This
API is a billable API.
This method requires the ``fastavro`` and
``google-cloud-bigquery-storage`` libraries.
Reading from a specific partition or snapshot is not
currently supported by this method.
dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]):
A dictionary of column names pandas ``dtype``s. The provided
``dtype`` is used when constructing the series for the column
specified. Otherwise, the default pandas behavior is used.
progress_bar_type (Optional[str]):
If set, use the `tqdm <https://tqdm.github.io/>`_ library to
display a progress bar while the data downloads. Install the
``tqdm`` package to use this feature.
See
:func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
for details.
.. versionadded:: 1.11.0
create_bqstorage_client (Optional[bool]):
If ``True`` (default), create a BigQuery Storage API client
using the default API settings. The BigQuery Storage API
is a faster way to fetch rows from BigQuery. See the
``bqstorage_client`` parameter for more information.
This argument does nothing if ``bqstorage_client`` is supplied.
.. versionadded:: 1.24.0
date_as_object (Optional[bool]):
If ``True`` (default), cast dates to objects. If ``False``, convert
to datetime64[ns] dtype.
.. versionadded:: 1.26.0
max_results (Optional[int]):
Maximum number of rows to include in the result. No limit by default.
.. versionadded:: 2.21.0
geography_column (Optional[str]):
If there are more than one GEOGRAPHY column,
identifies which one to use to construct a GeoPandas
GeoDataFrame. This option can be ommitted if there's
only one GEOGRAPHY column.
Returns:
geopandas.GeoDataFrame:
A :class:`geopandas.GeoDataFrame` populated with row
data and column headers from the query results. The
column headers are derived from the destination
table's schema.
Raises:
ValueError:
If the :mod:`geopandas` library cannot be imported, or the
:mod:`google.cloud.bigquery_storage_v1` module is
required but cannot be imported.
.. versionadded:: 2.24.0
"""
query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
return query_result.to_geodataframe(
bqstorage_client=bqstorage_client,
dtypes=dtypes,
progress_bar_type=progress_bar_type,
create_bqstorage_client=create_bqstorage_client,
date_as_object=date_as_object,
geography_column=geography_column,
)

def __iter__(self):
Loading

0 comments on commit 16f65e6

Please sign in to comment.