Skip to content
Permalink
Browse files
feat: add BIGNUMERIC support (#527)
* feat: add support of BIGNUMERIC

* feat: add BIGNUMERIC support

* Add bignumeric_type extra

* Add additional BIGNUMERIC tests

* Prevent import time error if no BIGNUMERIC support

* Add/improve a few comments

* Add feature flag for BIGNUMERIC suppport

Co-authored-by: HemangChothani <hemang.chothani@qlogic.io>
  • Loading branch information
plamut and HemangChothani committed Feb 23, 2021
1 parent 696c443 commit cc3394f80934419eb00c2029bb81c92a696e7d88
@@ -20,6 +20,7 @@
import queue
import warnings

from packaging import version

try:
import pandas
@@ -80,6 +81,10 @@ def pyarrow_numeric():
return pyarrow.decimal128(38, 9)


def pyarrow_bignumeric():
return pyarrow.decimal256(76, 38)


def pyarrow_time():
return pyarrow.time64("us")

@@ -128,14 +133,23 @@ def pyarrow_timestamp():
pyarrow.date64().id: "DATETIME", # because millisecond resolution
pyarrow.binary().id: "BYTES",
pyarrow.string().id: "STRING", # also alias for pyarrow.utf8()
# The exact scale and precision don't matter, see below.
pyarrow.decimal128(38, scale=9).id: "NUMERIC",
# The exact decimal's scale and precision are not important, as only
# the type ID matters, and it's the same for all decimal128 instances.
}

if version.parse(pyarrow.__version__) >= version.parse("3.0.0"):
BQ_TO_ARROW_SCALARS["BIGNUMERIC"] = pyarrow_bignumeric
# The exact decimal's scale and precision are not important, as only
# the type ID matters, and it's the same for all decimal256 instances.
ARROW_SCALAR_IDS_TO_BQ[pyarrow.decimal256(76, scale=38).id] = "BIGNUMERIC"
_BIGNUMERIC_SUPPORT = True
else:
_BIGNUMERIC_SUPPORT = False

else: # pragma: NO COVER
BQ_TO_ARROW_SCALARS = {} # pragma: NO COVER
ARROW_SCALAR_IDS_TO_BQ = {} # pragma: NO_COVER
_BIGNUMERIC_SUPPORT = False # pragma: NO COVER


def bq_to_arrow_struct_data_type(field):
@@ -19,6 +19,11 @@
import functools
import numbers

try:
import pyarrow
except ImportError: # pragma: NO COVER
pyarrow = None

from google.cloud import bigquery
from google.cloud.bigquery import table
from google.cloud.bigquery.dbapi import exceptions
@@ -184,7 +189,12 @@ def bigquery_scalar_type(value):
elif isinstance(value, numbers.Real):
return "FLOAT64"
elif isinstance(value, decimal.Decimal):
return "NUMERIC"
# We check for NUMERIC before BIGNUMERIC in order to support pyarrow < 3.0.
scalar_object = pyarrow.scalar(value)
if isinstance(scalar_object, pyarrow.Decimal128Scalar):
return "NUMERIC"
else:
return "BIGNUMERIC"
elif isinstance(value, str):
return "STRING"
elif isinstance(value, bytes):
@@ -78,7 +78,7 @@ def __eq__(self, other):
STRING = "STRING"
BINARY = _DBAPITypeObject("BYTES", "RECORD", "STRUCT")
NUMBER = _DBAPITypeObject(
"INTEGER", "INT64", "FLOAT", "FLOAT64", "NUMERIC", "BOOLEAN", "BOOL"
"INTEGER", "INT64", "FLOAT", "FLOAT64", "NUMERIC", "BIGNUMERIC", "BOOLEAN", "BOOL"
)
DATETIME = _DBAPITypeObject("TIMESTAMP", "DATE", "TIME", "DATETIME")
ROWID = "ROWID"
@@ -83,7 +83,7 @@ class ScalarQueryParameter(_AbstractQueryParameter):
type_ (str):
Name of parameter type. One of 'STRING', 'INT64',
'FLOAT64', 'NUMERIC', 'BOOL', 'TIMESTAMP', 'DATETIME', or
'FLOAT64', 'NUMERIC', 'BIGNUMERIC', 'BOOL', 'TIMESTAMP', 'DATETIME', or
'DATE'.
value (Union[str, int, float, decimal.Decimal, bool, datetime.datetime, datetime.date]):
@@ -102,7 +102,7 @@ def positional(cls, type_, value):
Args:
type_ (str):
Name of parameter type. One of 'STRING', 'INT64',
'FLOAT64', 'NUMERIC', 'BOOL', 'TIMESTAMP', 'DATETIME', or
'FLOAT64', 'NUMERIC', 'BIGNUMERIC', 'BOOL', 'TIMESTAMP', 'DATETIME', or
'DATE'.
value (Union[str, int, float, decimal.Decimal, bool, datetime.datetime, datetime.date]):
@@ -186,7 +186,7 @@ class ArrayQueryParameter(_AbstractQueryParameter):
array_type (str):
Name of type of array elements. One of `'STRING'`, `'INT64'`,
`'FLOAT64'`, `'NUMERIC'`, `'BOOL'`, `'TIMESTAMP'`, or `'DATE'`.
`'FLOAT64'`, `'NUMERIC'`, `'BIGNUMERIC'`, `'BOOL'`, `'TIMESTAMP'`, or `'DATE'`.
values (List[appropriate scalar type]): The parameter array values.
"""
@@ -203,7 +203,7 @@ def positional(cls, array_type, values):
Args:
array_type (str):
Name of type of array elements. One of `'STRING'`, `'INT64'`,
`'FLOAT64'`, `'NUMERIC'`, `'BOOL'`, `'TIMESTAMP'`, or `'DATE'`.
`'FLOAT64'`, `'NUMERIC'`, `'BIGNUMERIC'`, `'BOOL'`, `'TIMESTAMP'`, or `'DATE'`.
values (List[appropriate scalar type]): The parameter array values.
@@ -32,6 +32,7 @@
"FLOAT": types.StandardSqlDataType.TypeKind.FLOAT64,
"FLOAT64": types.StandardSqlDataType.TypeKind.FLOAT64,
"NUMERIC": types.StandardSqlDataType.TypeKind.NUMERIC,
"BIGNUMERIC": types.StandardSqlDataType.TypeKind.BIGNUMERIC,
"BOOLEAN": types.StandardSqlDataType.TypeKind.BOOL,
"BOOL": types.StandardSqlDataType.TypeKind.BOOL,
"GEOGRAPHY": types.StandardSqlDataType.TypeKind.GEOGRAPHY,
@@ -33,6 +33,7 @@
"proto-plus >= 1.10.0",
"google-cloud-core >= 1.4.1, < 2.0dev",
"google-resumable-media >= 0.6.0, < 2.0dev",
"packaging >= 14.3",
"protobuf >= 3.12.0",
]
extras = {
@@ -48,6 +49,7 @@
"pyarrow >= 1.0.0, < 4.0dev",
],
"pandas": ["pandas>=0.23.0", "pyarrow >= 1.0.0, < 4.0dev",],
"bignumeric_type": ["pyarrow >= 3.0.0, < 4.0dev"],
"tqdm": ["tqdm >= 4.7.4, <5.0.0dev"],
"opentelemetry": [
"opentelemetry-api==0.11b0",
@@ -65,6 +65,7 @@
from google.api_core.iam import Policy
from google.cloud import bigquery
from google.cloud import bigquery_v2
from google.cloud.bigquery._pandas_helpers import _BIGNUMERIC_SUPPORT
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.table import Table
@@ -891,6 +892,9 @@ def test_load_table_from_dataframe_w_nulls(self):
bigquery.SchemaField("time_col", "TIME"),
bigquery.SchemaField("ts_col", "TIMESTAMP"),
)
if _BIGNUMERIC_SUPPORT:
scalars_schema += (bigquery.SchemaField("bignum_col", "BIGNUMERIC"),)

table_schema = scalars_schema + (
# TODO: Array columns can't be read due to NULLABLE versus REPEATED
# mode mismatch. See:
@@ -902,21 +906,22 @@ def test_load_table_from_dataframe_w_nulls(self):
)
num_rows = 100
nulls = [None] * num_rows
df_data = collections.OrderedDict(
[
("bool_col", nulls),
("bytes_col", nulls),
("date_col", nulls),
("dt_col", nulls),
("float_col", nulls),
("geo_col", nulls),
("int_col", nulls),
("num_col", nulls),
("str_col", nulls),
("time_col", nulls),
("ts_col", nulls),
]
)
df_data = [
("bool_col", nulls),
("bytes_col", nulls),
("date_col", nulls),
("dt_col", nulls),
("float_col", nulls),
("geo_col", nulls),
("int_col", nulls),
("num_col", nulls),
("str_col", nulls),
("time_col", nulls),
("ts_col", nulls),
]
if _BIGNUMERIC_SUPPORT:
df_data.append(("bignum_col", nulls))
df_data = collections.OrderedDict(df_data)
dataframe = pandas.DataFrame(df_data, columns=df_data.keys())

dataset_id = _make_dataset_id("bq_load_test")
@@ -1003,6 +1008,9 @@ def test_load_table_from_dataframe_w_explicit_schema(self):
bigquery.SchemaField("time_col", "TIME"),
bigquery.SchemaField("ts_col", "TIMESTAMP"),
)
if _BIGNUMERIC_SUPPORT:
scalars_schema += (bigquery.SchemaField("bignum_col", "BIGNUMERIC"),)

table_schema = scalars_schema + (
# TODO: Array columns can't be read due to NULLABLE versus REPEATED
# mode mismatch. See:
@@ -1012,57 +1020,65 @@ def test_load_table_from_dataframe_w_explicit_schema(self):
# https://jira.apache.org/jira/browse/ARROW-2587
# bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema),
)
df_data = collections.OrderedDict(
[
("bool_col", [True, None, False]),
("bytes_col", [b"abc", None, b"def"]),
(
"date_col",
[datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)],
),
# (
# "dt_col",
# [
# datetime.datetime(1, 1, 1, 0, 0, 0),
# None,
# datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
# ],
# ),
("float_col", [float("-inf"), float("nan"), float("inf")]),
(
"geo_col",
[
"POINT(30 10)",
None,
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
],
),
("int_col", [-9223372036854775808, None, 9223372036854775807]),
(
"num_col",
[
decimal.Decimal("-99999999999999999999999999999.999999999"),
None,
decimal.Decimal("99999999999999999999999999999.999999999"),
],
),
("str_col", [u"abc", None, u"def"]),
(
"time_col",
[datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)],
),

df_data = [
("bool_col", [True, None, False]),
("bytes_col", [b"abc", None, b"def"]),
("date_col", [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)]),
# (
# "dt_col",
# [
# datetime.datetime(1, 1, 1, 0, 0, 0),
# None,
# datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
# ],
# ),
("float_col", [float("-inf"), float("nan"), float("inf")]),
(
"geo_col",
[
"POINT(30 10)",
None,
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
],
),
("int_col", [-9223372036854775808, None, 9223372036854775807]),
(
"num_col",
[
decimal.Decimal("-99999999999999999999999999999.999999999"),
None,
decimal.Decimal("99999999999999999999999999999.999999999"),
],
),
("str_col", [u"abc", None, u"def"]),
(
"time_col",
[datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)],
),
(
"ts_col",
[
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
None,
datetime.datetime(
9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc
),
],
),
]
if _BIGNUMERIC_SUPPORT:
df_data.append(
(
"ts_col",
"bignum_col",
[
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
decimal.Decimal("-{d38}.{d38}".format(d38="9" * 38)),
None,
datetime.datetime(
9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc
),
decimal.Decimal("{d38}.{d38}".format(d38="9" * 38)),
],
),
]
)
)
)
df_data = collections.OrderedDict(df_data)
dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys())

dataset_id = _make_dataset_id("bq_load_test")
@@ -1172,6 +1188,7 @@ def test_load_table_from_dataframe_w_explicit_schema_source_format_csv(self):
bigquery.SchemaField("geo_col", "GEOGRAPHY"),
bigquery.SchemaField("int_col", "INTEGER"),
bigquery.SchemaField("num_col", "NUMERIC"),
bigquery.SchemaField("bignum_col", "BIGNUMERIC"),
bigquery.SchemaField("str_col", "STRING"),
bigquery.SchemaField("time_col", "TIME"),
bigquery.SchemaField("ts_col", "TIMESTAMP"),
@@ -1210,6 +1227,14 @@ def test_load_table_from_dataframe_w_explicit_schema_source_format_csv(self):
decimal.Decimal("99999999999999999999999999999.999999999"),
],
),
(
"bignum_col",
[
decimal.Decimal("-{d38}.{d38}".format(d38="9" * 38)),
None,
decimal.Decimal("{d38}.{d38}".format(d38="9" * 38)),
],
),
("str_col", [u"abc", None, u"def"]),
(
"time_col",
@@ -2157,6 +2182,10 @@ def test_query_w_query_params(self):
pi_numeric_param = ScalarQueryParameter(
name="pi_numeric_param", type_="NUMERIC", value=pi_numeric
)
bignum = decimal.Decimal("-{d38}.{d38}".format(d38="9" * 38))
bignum_param = ScalarQueryParameter(
name="bignum_param", type_="BIGNUMERIC", value=bignum
)
truthy = True
truthy_param = ScalarQueryParameter(name="truthy", type_="BOOL", value=truthy)
beef = b"DEADBEEF"
@@ -2302,6 +2331,15 @@ def test_query_w_query_params(self):
"query_parameters": [with_friends_param],
},
]
if _BIGNUMERIC_SUPPORT:
examples.append(
{
"sql": "SELECT @bignum_param",
"expected": bignum,
"query_parameters": [bignum_param],
}
)

for example in examples:
jconfig = QueryJobConfig()
jconfig.query_parameters = example["query_parameters"]
Loading

0 comments on commit cc3394f

Please sign in to comment.