Skip to content
Permalink
Browse files
feat: support CSV format in load_table_from_dataframe pandas connec…
…tor (#399)

* WIP: support alternative serialization formats
for load_table_from_dataframe

* fix: address review comments

* docs: make clear repeated fields are not supportedin csv
  • Loading branch information
cguardia committed Dec 7, 2020
1 parent a9d8ae8 commit 0046742abdd2b5eab3c3e935316f91e7eef44d44
Showing with 239 additions and 27 deletions.
  1. +55 −27 google/cloud/bigquery/client.py
  2. +134 −0 tests/system.py
  3. +50 −0 tests/unit/test_client.py
@@ -2111,9 +2111,12 @@ def load_table_from_dataframe(
.. note::
Due to the way REPEATED fields are encoded in the ``parquet`` file
format, a mismatch with the existing table schema can occur, and
100% compatibility cannot be guaranteed for REPEATED fields.
REPEATED fields are NOT supported when using the CSV source format.
They are supported when using the PARQUET source format, but
due to the way they are encoded in the ``parquet`` file,
a mismatch with the existing table schema can occur, so
100% compatibility cannot be guaranteed for REPEATED fields when
using the parquet format.
https://github.com/googleapis/python-bigquery/issues/17
@@ -2153,6 +2156,14 @@ def load_table_from_dataframe(
column names matching those of the dataframe. The BigQuery
schema is used to determine the correct data type conversion.
Indexes are not loaded. Requires the :mod:`pyarrow` library.
By default, this method uses the parquet source format. To
override this, supply a value for
:attr:`~google.cloud.bigquery.job.LoadJobConfig.source_format`
with the format name. Currently only
:attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and
:attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
supported.
parquet_compression (Optional[str]):
[Beta] The compression method to use if intermittently
serializing ``dataframe`` to a parquet file.
@@ -2181,10 +2192,6 @@ def load_table_from_dataframe(
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
class.
"""
if pyarrow is None:
# pyarrow is now the only supported parquet engine.
raise ValueError("This method requires pyarrow to be installed")

job_id = _make_job_id(job_id, job_id_prefix)

if job_config:
@@ -2197,15 +2204,20 @@ def load_table_from_dataframe(
else:
job_config = job.LoadJobConfig()

if job_config.source_format:
if job_config.source_format != job.SourceFormat.PARQUET:
raise ValueError(
"Got unexpected source_format: '{}'. Currently, only PARQUET is supported".format(
job_config.source_format
)
)
else:
supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET}
if job_config.source_format is None:
# default value
job_config.source_format = job.SourceFormat.PARQUET
if job_config.source_format not in supported_formats:
raise ValueError(
"Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format(
job_config.source_format
)
)

if pyarrow is None and job_config.source_format == job.SourceFormat.PARQUET:
# pyarrow is now the only supported parquet engine.
raise ValueError("This method requires pyarrow to be installed")

if location is None:
location = self.location
@@ -2245,27 +2257,43 @@ def load_table_from_dataframe(
stacklevel=2,
)

tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8]))
tmpfd, tmppath = tempfile.mkstemp(
suffix="_job_{}.{}".format(job_id[:8], job_config.source_format.lower())
)
os.close(tmpfd)

try:
if job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()

_pandas_helpers.dataframe_to_parquet(
dataframe,
job_config.schema,
if job_config.source_format == job.SourceFormat.PARQUET:

if job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()

_pandas_helpers.dataframe_to_parquet(
dataframe,
job_config.schema,
tmppath,
parquet_compression=parquet_compression,
)
else:
dataframe.to_parquet(tmppath, compression=parquet_compression)

else:

dataframe.to_csv(
tmppath,
parquet_compression=parquet_compression,
index=False,
header=False,
encoding="utf-8",
float_format="%.17g",
date_format="%Y-%m-%d %H:%M:%S.%f",
)
else:
dataframe.to_parquet(tmppath, compression=parquet_compression)

with open(tmppath, "rb") as parquet_file:
with open(tmppath, "rb") as tmpfile:
file_size = os.path.getsize(tmppath)
return self.load_table_from_file(
parquet_file,
tmpfile,
destination,
num_retries=num_retries,
rewind=True,
@@ -1165,6 +1165,140 @@ def test_load_table_from_json_basic_use(self):
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, 2)

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_load_table_from_dataframe_w_explicit_schema_source_format_csv(self):
from google.cloud.bigquery.job import SourceFormat

table_schema = (
bigquery.SchemaField("bool_col", "BOOLEAN"),
bigquery.SchemaField("bytes_col", "BYTES"),
bigquery.SchemaField("date_col", "DATE"),
bigquery.SchemaField("dt_col", "DATETIME"),
bigquery.SchemaField("float_col", "FLOAT"),
bigquery.SchemaField("geo_col", "GEOGRAPHY"),
bigquery.SchemaField("int_col", "INTEGER"),
bigquery.SchemaField("num_col", "NUMERIC"),
bigquery.SchemaField("str_col", "STRING"),
bigquery.SchemaField("time_col", "TIME"),
bigquery.SchemaField("ts_col", "TIMESTAMP"),
)
df_data = collections.OrderedDict(
[
("bool_col", [True, None, False]),
("bytes_col", ["abc", None, "def"]),
(
"date_col",
[datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)],
),
(
"dt_col",
[
datetime.datetime(1, 1, 1, 0, 0, 0),
None,
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
],
),
("float_col", [float("-inf"), float("nan"), float("inf")]),
(
"geo_col",
[
"POINT(30 10)",
None,
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
],
),
("int_col", [-9223372036854775808, None, 9223372036854775807]),
(
"num_col",
[
decimal.Decimal("-99999999999999999999999999999.999999999"),
None,
decimal.Decimal("99999999999999999999999999999.999999999"),
],
),
("str_col", [u"abc", None, u"def"]),
(
"time_col",
[datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)],
),
(
"ts_col",
[
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
None,
datetime.datetime(
9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc
),
],
),
]
)
dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys())

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema_csv".format(
Config.CLIENT.project, dataset_id
)

job_config = bigquery.LoadJobConfig(
schema=table_schema, source_format=SourceFormat.CSV
)
load_job = Config.CLIENT.load_table_from_dataframe(
dataframe, table_id, job_config=job_config
)
load_job.result()

table = Config.CLIENT.get_table(table_id)
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, 3)

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_load_table_from_dataframe_w_explicit_schema_source_format_csv_floats(self):
from google.cloud.bigquery.job import SourceFormat

table_schema = (bigquery.SchemaField("float_col", "FLOAT"),)
df_data = collections.OrderedDict(
[
(
"float_col",
[
0.14285714285714285,
0.51428571485748,
0.87128748,
1.807960649,
2.0679610649,
2.4406779661016949,
3.7148514257,
3.8571428571428572,
1.51251252e40,
],
),
]
)
dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys())

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema_csv".format(
Config.CLIENT.project, dataset_id
)

job_config = bigquery.LoadJobConfig(
schema=table_schema, source_format=SourceFormat.CSV
)
load_job = Config.CLIENT.load_table_from_dataframe(
dataframe, table_id, job_config=job_config
)
load_job.result()

table = Config.CLIENT.get_table(table_id)
rows = self._fetch_single_page(table)
floats = [r.values()[0] for r in rows]
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, 9)
self.assertEqual(floats, df_data["float_col"])

def test_load_table_from_json_schema_autodetect(self):
json_rows = [
{"name": "John", "age": 18, "birthday": "2001-10-15", "is_awesome": False},
@@ -8410,6 +8410,56 @@ def test_load_table_from_dataframe_w_invaild_job_config(self):
err_msg = str(exc.value)
assert "Expected an instance of LoadJobConfig" in err_msg

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_load_table_from_dataframe_with_csv_source_format(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()
records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}]
dataframe = pandas.DataFrame(records)
job_config = job.LoadJobConfig(
write_disposition=job.WriteDisposition.WRITE_TRUNCATE,
source_format=job.SourceFormat.CSV,
)

get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
autospec=True,
return_value=mock.Mock(
schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")]
),
)
load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)
with load_patch as load_table_from_file, get_table_patch:
client.load_table_from_dataframe(
dataframe, self.TABLE_REF, job_config=job_config
)

load_table_from_file.assert_called_once_with(
client,
mock.ANY,
self.TABLE_REF,
num_retries=_DEFAULT_NUM_RETRIES,
rewind=True,
size=mock.ANY,
job_id=mock.ANY,
job_id_prefix=None,
location=None,
project=None,
job_config=mock.ANY,
timeout=None,
)

sent_file = load_table_from_file.mock_calls[0][1][1]
assert sent_file.closed

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert sent_config.source_format == job.SourceFormat.CSV

def test_load_table_from_json_basic_use(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery import job

0 comments on commit 0046742

Please sign in to comment.