Skip to content
Permalink
Browse files
feat: add session and connection properties to QueryJobConfig (#1024)
* feat: add session and connection properties to QueryJobConfig

* add unit tests

* adjust types and add versionadded

* add missing url

* link to ConnectionProperty docs

* add resource classes to root module
  • Loading branch information
tswast committed Oct 27, 2021
1 parent e37380a commit e4c94f4
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 8 deletions.
@@ -52,6 +52,7 @@
from google.cloud.bigquery.external_config import ExternalSourceFormat
from google.cloud.bigquery.format_options import AvroOptions
from google.cloud.bigquery.format_options import ParquetOptions
from google.cloud.bigquery.job.base import SessionInfo
from google.cloud.bigquery.job import Compression
from google.cloud.bigquery.job import CopyJob
from google.cloud.bigquery.job import CopyJobConfig
@@ -77,6 +78,7 @@
from google.cloud.bigquery.model import ModelReference
from google.cloud.bigquery.query import ArrayQueryParameter
from google.cloud.bigquery.query import ArrayQueryParameterType
from google.cloud.bigquery.query import ConnectionProperty
from google.cloud.bigquery.query import ScalarQueryParameter
from google.cloud.bigquery.query import ScalarQueryParameterType
from google.cloud.bigquery.query import StructQueryParameter
@@ -104,6 +106,7 @@
"__version__",
"Client",
# Queries
"ConnectionProperty",
"QueryJob",
"QueryJobConfig",
"ArrayQueryParameter",
@@ -132,6 +135,7 @@
"ExtractJobConfig",
"LoadJob",
"LoadJobConfig",
"SessionInfo",
"UnknownJob",
# Models
"Model",
@@ -202,6 +202,19 @@ def script_statistics(self) -> Optional["ScriptStatistics"]:
return None
return ScriptStatistics(resource)

@property
def session_info(self) -> Optional["SessionInfo"]:
"""[Preview] Information of the session if this job is part of one.
.. versionadded:: 2.29.0
"""
resource = _helpers._get_sub_prop(
self._properties, ["statistics", "sessionInfo"]
)
if resource is None:
return None
return SessionInfo(resource)

@property
def num_child_jobs(self):
"""The number of child jobs executed.
@@ -990,6 +1003,24 @@ def evaluation_kind(self) -> Optional[str]:
return self._properties.get("evaluationKind")


class SessionInfo:
"""[Preview] Information of the session if this job is part of one.
.. versionadded:: 2.29.0
Args:
resource (Map[str, Any]): JSON representation of object.
"""

def __init__(self, resource):
self._properties = resource

@property
def session_id(self) -> Optional[str]:
"""The ID of the session."""
return self._properties.get("sessionId")


class UnknownJob(_AsyncJob):
"""A job whose type cannot be determined."""

@@ -18,7 +18,7 @@
import copy
import re
import typing
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, Iterable, List, Optional, Union

from google.api_core import exceptions
from google.api_core.future import polling as polling_future
@@ -31,11 +31,14 @@
from google.cloud.bigquery.enums import KeyResultStatementKind
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery import _helpers
from google.cloud.bigquery.query import _query_param_from_api_repr
from google.cloud.bigquery.query import ArrayQueryParameter
from google.cloud.bigquery.query import ScalarQueryParameter
from google.cloud.bigquery.query import StructQueryParameter
from google.cloud.bigquery.query import UDFResource
from google.cloud.bigquery.query import (
_query_param_from_api_repr,
ArrayQueryParameter,
ConnectionProperty,
ScalarQueryParameter,
StructQueryParameter,
UDFResource,
)
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
@@ -269,6 +272,24 @@ def allow_large_results(self):
def allow_large_results(self, value):
self._set_sub_prop("allowLargeResults", value)

@property
def connection_properties(self) -> List[ConnectionProperty]:
"""Connection properties.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.connection_properties
.. versionadded:: 2.29.0
"""
resource = self._get_sub_prop("connectionProperties", [])
return [ConnectionProperty.from_api_repr(prop) for prop in resource]

@connection_properties.setter
def connection_properties(self, value: Iterable[ConnectionProperty]):
self._set_sub_prop(
"connectionProperties", [prop.to_api_repr() for prop in value],
)

@property
def create_disposition(self):
"""google.cloud.bigquery.job.CreateDisposition: Specifies behavior
@@ -283,6 +304,27 @@ def create_disposition(self):
def create_disposition(self, value):
self._set_sub_prop("createDisposition", value)

@property
def create_session(self) -> Optional[bool]:
"""[Preview] If :data:`True`, creates a new session, where
:attr:`~google.cloud.bigquery.job.QueryJob.session_info` will contain a
random server generated session id.
If :data:`False`, runs query with an existing ``session_id`` passed in
:attr:`~google.cloud.bigquery.job.QueryJobConfig.connection_properties`,
otherwise runs query in non-session mode.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_session
.. versionadded:: 2.29.0
"""
return self._get_sub_prop("createSession")

@create_session.setter
def create_session(self, value: Optional[bool]):
self._set_sub_prop("createSession", value)

@property
def default_dataset(self):
"""google.cloud.bigquery.dataset.DatasetReference: the default dataset
@@ -613,7 +655,7 @@ def schema_update_options(self, values):

@property
def script_options(self) -> ScriptOptions:
"""Connection properties which can modify the query behavior.
"""Options controlling the execution of scripts.
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#scriptoptions
"""
@@ -694,13 +736,31 @@ def allow_large_results(self):
"""
return self._configuration.allow_large_results

@property
def connection_properties(self) -> List[ConnectionProperty]:
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.connection_properties`.
.. versionadded:: 2.29.0
"""
return self._configuration.connection_properties

@property
def create_disposition(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.create_disposition`.
"""
return self._configuration.create_disposition

@property
def create_session(self) -> Optional[bool]:
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.create_session`.
.. versionadded:: 2.29.0
"""
return self._configuration.create_session

@property
def default_dataset(self):
"""See
@@ -18,7 +18,7 @@
import copy
import datetime
import decimal
from typing import Optional, Union
from typing import Any, Optional, Dict, Union

from google.cloud.bigquery.table import _parse_schema_resource
from google.cloud.bigquery._helpers import _rows_from_json
@@ -31,6 +31,65 @@
]


class ConnectionProperty:
"""A connection-level property to customize query behavior.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/ConnectionProperty
Args:
key:
The key of the property to set, for example, ``'time_zone'`` or
``'session_id'``.
value: The value of the property to set.
"""

def __init__(self, key: str = "", value: str = ""):
self._properties = {
"key": key,
"value": value,
}

@property
def key(self) -> str:
"""Name of the property.
For example:
* ``time_zone``
* ``session_id``
"""
return self._properties["key"]

@property
def value(self) -> str:
"""Value of the property."""
return self._properties["value"]

@classmethod
def from_api_repr(cls, resource) -> "ConnectionProperty":
"""Construct :class:`~google.cloud.bigquery.query.ConnectionProperty`
from JSON resource.
Args:
resource: JSON representation.
Returns:
A connection property.
"""
value = cls()
value._properties = resource
return value

def to_api_repr(self) -> Dict[str, Any]:
"""Construct JSON API representation for the connection property.
Returns:
JSON mapping
"""
return self._properties


class UDFResource(object):
"""Describe a single user-defined function (UDF) resource.
@@ -27,3 +27,29 @@ def test_dry_run(bigquery_client: bigquery.Client, scalars_table: str):
assert query_job.dry_run is True
assert query_job.total_bytes_processed > 0
assert len(query_job.schema) > 0


def test_session(bigquery_client: bigquery.Client):
initial_config = bigquery.QueryJobConfig()
initial_config.create_session = True
initial_query = """
CREATE TEMPORARY TABLE numbers(id INT64)
AS
SELECT * FROM UNNEST([1, 2, 3, 4, 5]) AS id;
"""
initial_job = bigquery_client.query(initial_query, job_config=initial_config)
initial_job.result()
session_id = initial_job.session_info.session_id
assert session_id is not None

second_config = bigquery.QueryJobConfig()
second_config.connection_properties = [
bigquery.ConnectionProperty("session_id", session_id),
]
second_job = bigquery_client.query(
"SELECT COUNT(*) FROM numbers;", job_config=second_config
)
rows = list(second_job.result())

assert len(rows) == 1
assert rows[0][0] == 5
@@ -228,6 +228,15 @@ def test_script_statistics(self):
self.assertEqual(stack_frame.end_column, 14)
self.assertEqual(stack_frame.text, "QUERY TEXT")

def test_session_info(self):
client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)

self.assertIsNone(job.session_info)
job._properties["statistics"] = {"sessionInfo": {"sessionId": "abcdefg"}}
self.assertIsNotNone(job.session_info)
self.assertEqual(job.session_info.session_id, "abcdefg")

def test_transaction_info(self):
from google.cloud.bigquery.job.base import TransactionInfo

@@ -281,6 +281,8 @@ def test_from_api_repr_bare(self):
job = klass.from_api_repr(RESOURCE, client=client)
self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)
self.assertEqual(len(job.connection_properties), 0)
self.assertIsNone(job.create_session)

def test_from_api_repr_with_encryption(self):
self._setUpConstants()
@@ -152,6 +152,27 @@ def test_clustering_fields(self):
config.clustering_fields = None
self.assertIsNone(config.clustering_fields)

def test_connection_properties(self):
from google.cloud.bigquery.job.query import ConnectionProperty

config = self._get_target_class()()
self.assertEqual(len(config.connection_properties), 0)

session_id = ConnectionProperty("session_id", "abcd")
time_zone = ConnectionProperty("time_zone", "America/Chicago")
config.connection_properties = [session_id, time_zone]
self.assertEqual(len(config.connection_properties), 2)
self.assertEqual(config.connection_properties[0].key, "session_id")
self.assertEqual(config.connection_properties[0].value, "abcd")
self.assertEqual(config.connection_properties[1].key, "time_zone")
self.assertEqual(config.connection_properties[1].value, "America/Chicago")

def test_create_session(self):
config = self._get_target_class()()
self.assertIsNone(config.create_session)
config.create_session = True
self.assertTrue(config.create_session)

def test_from_api_repr_empty(self):
klass = self._get_target_class()
config = klass.from_api_repr({})

0 comments on commit e4c94f4

Please sign in to comment.