Skip to content
Permalink
Browse files
feat: add support for user defined Table View Functions (#724)
* Add auxiliary classes for TVF routines

* Add return_table_type property to Routine

* Add system test for TVF routines

* Use the generated StandardSqlTableType class

* Update docs with new changes

* Add missing space in misc. Sphinx directives
  • Loading branch information
plamut committed Jul 19, 2021
1 parent b8b5433 commit 8c7b839
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 14 deletions.
@@ -118,6 +118,7 @@ Routine
routine.Routine
routine.RoutineArgument
routine.RoutineReference
routine.RoutineType

Schema
======
@@ -85,6 +85,7 @@
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineArgument
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.routine import RoutineType
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import PartitionRange
from google.cloud.bigquery.table import RangePartitioning
@@ -162,6 +163,7 @@
"KeyResultStatementKind",
"OperationType",
"QueryPriority",
"RoutineType",
"SchemaUpdateOption",
"SourceFormat",
"SqlTypeNames",
@@ -1386,12 +1386,12 @@ def to_arrow(
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.24.0
.. versionadded:: 1.24.0
max_results (Optional[int]):
Maximum number of rows to include in the result. No limit by default.
..versionadded:: 2.21.0
.. versionadded:: 2.21.0
Returns:
pyarrow.Table
@@ -1403,7 +1403,7 @@ def to_arrow(
ValueError:
If the :mod:`pyarrow` library cannot be imported.
..versionadded:: 1.17.0
.. versionadded:: 1.17.0
"""
query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
return query_result.to_arrow(
@@ -1452,7 +1452,7 @@ def to_dataframe(
:func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
for details.
..versionadded:: 1.11.0
.. versionadded:: 1.11.0
create_bqstorage_client (Optional[bool]):
If ``True`` (default), create a BigQuery Storage API client
using the default API settings. The BigQuery Storage API
@@ -1461,18 +1461,18 @@ def to_dataframe(
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.24.0
.. versionadded:: 1.24.0
date_as_object (Optional[bool]):
If ``True`` (default), cast dates to objects. If ``False``, convert
to datetime64[ns] dtype.
..versionadded:: 1.26.0
.. versionadded:: 1.26.0
max_results (Optional[int]):
Maximum number of rows to include in the result. No limit by default.
..versionadded:: 2.21.0
.. versionadded:: 2.21.0
Returns:
A :class:`~pandas.DataFrame` populated with row data and column
@@ -19,11 +19,13 @@
from google.cloud.bigquery.routine.routine import Routine
from google.cloud.bigquery.routine.routine import RoutineArgument
from google.cloud.bigquery.routine.routine import RoutineReference
from google.cloud.bigquery.routine.routine import RoutineType


__all__ = (
"DeterminismLevel",
"Routine",
"RoutineArgument",
"RoutineReference",
"RoutineType",
)
@@ -21,6 +21,21 @@
import google.cloud._helpers
from google.cloud.bigquery import _helpers
import google.cloud.bigquery_v2.types
from google.cloud.bigquery_v2.types import StandardSqlTableType


class RoutineType:
"""The fine-grained type of the routine.
https://cloud.google.com/bigquery/docs/reference/rest/v2/routines#routinetype
.. versionadded:: 2.22.0
"""

ROUTINE_TYPE_UNSPECIFIED = "ROUTINE_TYPE_UNSPECIFIED"
SCALAR_FUNCTION = "SCALAR_FUNCTION"
PROCEDURE = "PROCEDURE"
TABLE_VALUED_FUNCTION = "TABLE_VALUED_FUNCTION"


class Routine(object):
@@ -48,6 +63,7 @@ class Routine(object):
"modified": "lastModifiedTime",
"reference": "routineReference",
"return_type": "returnType",
"return_table_type": "returnTableType",
"type_": "routineType",
"description": "description",
"determinism_level": "determinismLevel",
@@ -204,6 +220,35 @@ def return_type(self, value):
resource = None
self._properties[self._PROPERTY_TO_API_FIELD["return_type"]] = resource

@property
def return_table_type(self) -> StandardSqlTableType:
"""The return type of a Table Valued Function (TVF) routine.
.. versionadded:: 2.22.0
"""
resource = self._properties.get(
self._PROPERTY_TO_API_FIELD["return_table_type"]
)
if not resource:
return resource

output = google.cloud.bigquery_v2.types.StandardSqlTableType()
raw_protobuf = json_format.ParseDict(
resource, output._pb, ignore_unknown_fields=True
)
return type(output).wrap(raw_protobuf)

@return_table_type.setter
def return_table_type(self, value):
if not value:
resource = None
else:
resource = {
"columns": [json_format.MessageToDict(col._pb) for col in value.columns]
}

self._properties[self._PROPERTY_TO_API_FIELD["return_table_type"]] = resource

@property
def imported_libraries(self):
"""List[str]: The path of the imported JavaScript libraries.
@@ -1684,7 +1684,7 @@ def to_arrow(
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.24.0
.. versionadded:: 1.24.0
Returns:
pyarrow.Table
@@ -1695,7 +1695,7 @@ def to_arrow(
Raises:
ValueError: If the :mod:`pyarrow` library cannot be imported.
..versionadded:: 1.17.0
.. versionadded:: 1.17.0
"""
if pyarrow is None:
raise ValueError(_NO_PYARROW_ERROR)
@@ -1775,7 +1775,7 @@ def to_dataframe_iterable(
created by the server. If ``max_queue_size`` is :data:`None`, the queue
size is infinite.
..versionadded:: 2.14.0
.. versionadded:: 2.14.0
Returns:
pandas.DataFrame:
@@ -1861,7 +1861,7 @@ def to_dataframe(
Use the :func:`tqdm.tqdm_gui` function to display a
progress bar as a graphical dialog box.
..versionadded:: 1.11.0
.. versionadded:: 1.11.0
create_bqstorage_client (Optional[bool]):
If ``True`` (default), create a BigQuery Storage API client
using the default API settings. The BigQuery Storage API
@@ -1870,13 +1870,13 @@ def to_dataframe(
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.24.0
.. versionadded:: 1.24.0
date_as_object (Optional[bool]):
If ``True`` (default), cast dates to objects. If ``False``, convert
to datetime64[ns] dtype.
..versionadded:: 1.26.0
.. versionadded:: 1.26.0
Returns:
pandas.DataFrame:
@@ -2010,7 +2010,7 @@ def to_dataframe_iterable(
) -> Iterator["pandas.DataFrame"]:
"""Create an iterable of pandas DataFrames, to process the table as a stream.
..versionadded:: 2.21.0
.. versionadded:: 2.21.0
Args:
bqstorage_client:
@@ -2228,6 +2228,85 @@ def test_create_routine(self):
assert len(rows) == 1
assert rows[0].max_value == 100.0

def test_create_tvf_routine(self):
from google.cloud.bigquery import Routine, RoutineArgument, RoutineType

StandardSqlDataType = bigquery_v2.types.StandardSqlDataType
StandardSqlField = bigquery_v2.types.StandardSqlField
StandardSqlTableType = bigquery_v2.types.StandardSqlTableType

INT64 = StandardSqlDataType.TypeKind.INT64
STRING = StandardSqlDataType.TypeKind.STRING

client = Config.CLIENT

dataset = self.temp_dataset(_make_dataset_id("create_tvf_routine"))
routine_ref = dataset.routine("test_tvf_routine")

routine_body = """
SELECT int_col, str_col
FROM (
UNNEST([1, 2, 3]) int_col
JOIN
(SELECT str_col FROM UNNEST(["one", "two", "three"]) str_col)
ON TRUE
)
WHERE int_col > threshold
"""

return_table_type = StandardSqlTableType(
columns=[
StandardSqlField(
name="int_col", type=StandardSqlDataType(type_kind=INT64),
),
StandardSqlField(
name="str_col", type=StandardSqlDataType(type_kind=STRING),
),
]
)

routine_args = [
RoutineArgument(
name="threshold", data_type=StandardSqlDataType(type_kind=INT64),
)
]

routine_def = Routine(
routine_ref,
type_=RoutineType.TABLE_VALUED_FUNCTION,
arguments=routine_args,
return_table_type=return_table_type,
body=routine_body,
)

# Create TVF routine.
client.delete_routine(routine_ref, not_found_ok=True)
routine = client.create_routine(routine_def)

assert routine.body == routine_body
assert routine.return_table_type == return_table_type
assert routine.arguments == routine_args

# Execute the routine to see if it's working as expected.
query_job = client.query(
f"""
SELECT int_col, str_col
FROM `{routine.reference}`(1)
ORDER BY int_col, str_col ASC
"""
)

result_rows = [tuple(row) for row in query_job.result()]
expected = [
(2, "one"),
(2, "three"),
(2, "two"),
(3, "one"),
(3, "three"),
(3, "two"),
]
assert result_rows == expected

def test_create_table_rows_fetch_nested_schema(self):
table_name = "test_table"
dataset = self.temp_dataset(_make_dataset_id("create_table_nested_schema"))

0 comments on commit 8c7b839

Please sign in to comment.