Skip to content
Permalink
Browse files
feat: add support and tests for struct fields (#146)
* feat(bigquery): add support and tests for struct fields

* feat(bigquery): bump pyarrow version for python3

* feat(bigquery): nit
  • Loading branch information
HemangChothani committed Aug 3, 2020
1 parent 8360487 commit fee2ba80e338d093ee61565359268da91a5c9913
Showing with 102 additions and 21 deletions.
  1. +8 −7 google/cloud/bigquery/_pandas_helpers.py
  2. +2 −4 setup.py
  3. +44 −0 tests/system.py
  4. +48 −10 tests/unit/test_client.py
@@ -287,13 +287,14 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
"""
if bq_schema:
bq_schema = schema._to_schema_fields(bq_schema)
for field in bq_schema:
if field.field_type in schema._STRUCT_TYPES:
raise ValueError(
"Uploading dataframes with struct (record) column types "
"is not supported. See: "
"https://github.com/googleapis/google-cloud-python/issues/8191"
)
if six.PY2:
for field in bq_schema:
if field.field_type in schema._STRUCT_TYPES:
raise ValueError(
"Uploading dataframes with struct (record) column types "
"is not supported under Python2. See: "
"https://github.com/googleapis/python-bigquery/issues/21"
)
bq_schema_index = {field.name: field for field in bq_schema}
bq_schema_unused = set(bq_schema_index.keys())
else:
@@ -47,10 +47,8 @@
],
"pandas": ["pandas>=0.17.1"],
# Exclude PyArrow dependency from Windows Python 2.7.
'pyarrow: platform_system != "Windows" or python_version >= "3.4"': [
# Bad Linux release for 0.14.0.
# https://issues.apache.org/jira/browse/ARROW-5868
"pyarrow>=0.4.1, != 0.14.0"
'pyarrow: platform_system != "Windows" or python_version >= "3.5"': [
"pyarrow>=0.17.0"
],
"tqdm": ["tqdm >= 4.0.0, <5.0.0dev"],
"fastparquet": [
@@ -131,6 +131,8 @@

PANDAS_MINIMUM_VERSION = pkg_resources.parse_version("1.0.0")
PANDAS_INSTALLED_VERSION = pkg_resources.get_distribution("pandas").parsed_version
PYARROW_MINIMUM_VERSION = pkg_resources.parse_version("0.17.0")
PYARROW_INSTALLED_VERSION = pkg_resources.get_distribution("pyarrow").parsed_version


def _has_rows(result):
@@ -1075,6 +1077,48 @@ def test_load_table_from_dataframe_w_explicit_schema(self):
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, 3)

@unittest.skipIf(
pyarrow is None or PYARROW_INSTALLED_VERSION < PYARROW_MINIMUM_VERSION,
"Only `pyarrow version >=0.17.0` is supported",
)
@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_load_table_from_dataframe_w_struct_datatype(self):
"""Test that a DataFrame with struct datatype can be uploaded if a
BigQuery schema is specified.
https://github.com/googleapis/python-bigquery/issues/21
"""
dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_dataframe_w_struct_datatype".format(
Config.CLIENT.project, dataset_id
)
table_schema = [
bigquery.SchemaField(
"bar",
"RECORD",
fields=[
bigquery.SchemaField("id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
],
mode="REQUIRED",
),
]
table = retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

df_data = [{"id": 1, "age": 21}, {"id": 2, "age": 22}, {"id": 2, "age": 23}]
dataframe = pandas.DataFrame(data={"bar": df_data}, columns=["bar"])

load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id)
load_job.result()

table = Config.CLIENT.get_table(table_id)
self.assertEqual(table.schema, table_schema)
self.assertEqual(table.num_rows, 3)

def test_load_table_from_json_basic_use(self):
table_schema = (
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
@@ -7373,19 +7373,22 @@ def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(se

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_struct_fields_error(self):
def test_load_table_from_dataframe_struct_fields(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()

records = [{"float_column": 3.14, "struct_column": [{"foo": 1}, {"bar": -1}]}]
dataframe = pandas.DataFrame(data=records)
records = [(3.14, {"foo": 1, "bar": 1})]
dataframe = pandas.DataFrame(
data=records, columns=["float_column", "struct_column"]
)

schema = [
SchemaField("float_column", "FLOAT"),
SchemaField(
"agg_col",
"struct_column",
"RECORD",
fields=[SchemaField("foo", "INTEGER"), SchemaField("bar", "INTEGER")],
),
@@ -7396,14 +7399,49 @@ def test_load_table_from_dataframe_struct_fields_error(self):
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)

with pytest.raises(ValueError) as exc_info, load_patch:
client.load_table_from_dataframe(
dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION
if six.PY2:
with pytest.raises(ValueError) as exc_info, load_patch:
client.load_table_from_dataframe(
dataframe,
self.TABLE_REF,
job_config=job_config,
location=self.LOCATION,
)

err_msg = str(exc_info.value)
assert "struct" in err_msg
assert "not support" in err_msg

else:
get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
autospec=True,
side_effect=google.api_core.exceptions.NotFound("Table not found"),
)
with load_patch as load_table_from_file, get_table_patch:
client.load_table_from_dataframe(
dataframe,
self.TABLE_REF,
job_config=job_config,
location=self.LOCATION,
)

load_table_from_file.assert_called_once_with(
client,
mock.ANY,
self.TABLE_REF,
num_retries=_DEFAULT_NUM_RETRIES,
rewind=True,
job_id=mock.ANY,
job_id_prefix=None,
location=self.LOCATION,
project=None,
job_config=mock.ANY,
)

err_msg = str(exc_info.value)
assert "struct" in err_msg
assert "not support" in err_msg
sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert sent_config.source_format == job.SourceFormat.PARQUET
assert sent_config.schema == schema

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")

0 comments on commit fee2ba8

Please sign in to comment.