Skip to content

Commit

Permalink
feat: add Client.query_and_wait which directly returns a `RowIterat…
Browse files Browse the repository at this point in the history
…or` of results (#1722)

* perf: use the first page a results when `query(api_method="QUERY")`

* add tests

* respect max_results with cached page

* respect page_size, also avoid bqstorage if almost fully downloaded

* skip true test if bqstorage not installed

* coverage

* feat: add `Client.query_and_wait` which directly returns a `RowIterator` of results

Set the `QUERY_PREVIEW_ENABLED=TRUE` environment variable to use this with the
new JOB_CREATION_OPTIONAL mode (currently in preview).

* implement basic query_and_wait and add code sample to test

* avoid duplicated QueryJob construction

* update unit tests

* fix merge conflict in rowiterator

* support max_results, add tests

* retry tests

* unit test coverage

* dont retry twice

* fix mypy_samples session

* consolidate docstrings for query_and_wait

* remove mention of job ID

* fallback to jobs.insert for unsupported features

* distinguish API timeout from wait timeout

* add test for jobs.insert fallback

* populate default job config

* refactor default config

* add coverage for job_config

* cancel job if hasn't finished

* mypy

* allow unrealeased features in samples

* fix for 3.12

* fix: keep `RowIterator.total_rows` populated after iteration

This was being reset in some cases when
the rows were all available in the
first page of results.

* Update google/cloud/bigquery/table.py

Co-authored-by: Anthonios Partheniou <partheniou@google.com>

* fix comments

---------

Co-authored-by: Anthonios Partheniou <partheniou@google.com>
  • Loading branch information
tswast and parthea committed Dec 8, 2023
1 parent 8482f47 commit 89a647e
Show file tree
Hide file tree
Showing 12 changed files with 1,550 additions and 80 deletions.
316 changes: 303 additions & 13 deletions google/cloud/bigquery/_job_helpers.py
Expand Up @@ -12,9 +12,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers for interacting with the job REST APIs from the client."""
"""Helpers for interacting with the job REST APIs from the client.
For queries, there are three cases to consider:
1. jobs.insert: This always returns a job resource.
2. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED:
This sometimes can return the results inline, but always includes a job ID.
3. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL:
This sometimes doesn't create a job at all, instead returning the results.
For better debugging, an auto-generated query ID is included in the
response.
Client.query() calls either (1) or (2), depending on what the user provides
for the api_method parameter. query() always returns a QueryJob object, which
can retry the query when the query job fails for a retriable reason.
Client.query_and_wait() calls (3). This returns a RowIterator that may wrap
local results from the response or may wrap a query job containing multiple
pages of results. Even though query_and_wait() waits for the job to complete,
we still need a separate job_retry object because there are different
predicates where it is safe to generate a new query ID.
"""

import copy
import functools
import os
import uuid
from typing import Any, Dict, TYPE_CHECKING, Optional

Expand All @@ -23,6 +46,7 @@

from google.cloud.bigquery import job
import google.cloud.bigquery.query
from google.cloud.bigquery import table

# Avoid circular imports
if TYPE_CHECKING: # pragma: NO COVER
Expand Down Expand Up @@ -59,6 +83,25 @@ def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> s
return str(uuid.uuid4())


def job_config_with_defaults(
job_config: Optional[job.QueryJobConfig],
default_job_config: Optional[job.QueryJobConfig],
) -> Optional[job.QueryJobConfig]:
"""Create a copy of `job_config`, replacing unset values with those from
`default_job_config`.
"""
if job_config is None:
return default_job_config

if default_job_config is None:
return job_config

# Both job_config and default_job_config are not None, so make a copy of
# job_config merged with default_job_config. Anything already explicitly
# set on job_config should not be replaced.
return job_config._fill_from_default(default_job_config)


def query_jobs_insert(
client: "Client",
query: str,
Expand All @@ -67,9 +110,9 @@ def query_jobs_insert(
job_id_prefix: Optional[str],
location: Optional[str],
project: str,
retry: retries.Retry,
retry: Optional[retries.Retry],
timeout: Optional[float],
job_retry: retries.Retry,
job_retry: Optional[retries.Retry],
) -> job.QueryJob:
"""Initiate a query using jobs.insert.
Expand Down Expand Up @@ -123,7 +166,13 @@ def do_query():
return future


def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any]:
def _to_query_request(
job_config: Optional[job.QueryJobConfig] = None,
*,
query: str,
location: Optional[str] = None,
timeout: Optional[float] = None,
) -> Dict[str, Any]:
"""Transform from Job resource to QueryRequest resource.
Most of the keys in job.configuration.query are in common with
Expand All @@ -150,6 +199,15 @@ def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any
request_body.setdefault("formatOptions", {})
request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore

if timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)

if location is not None:
request_body["location"] = location

request_body["query"] = query

return request_body


Expand Down Expand Up @@ -207,6 +265,10 @@ def _to_query_job(
return query_job


def _to_query_path(project: str) -> str:
return f"/projects/{project}/queries"


def query_jobs_query(
client: "Client",
query: str,
Expand All @@ -217,18 +279,14 @@ def query_jobs_query(
timeout: Optional[float],
job_retry: retries.Retry,
) -> job.QueryJob:
"""Initiate a query using jobs.query.
"""Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED.
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
"""
path = f"/projects/{project}/queries"
request_body = _to_query_request(job_config)

if timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
request_body["location"] = location
request_body["query"] = query
path = _to_query_path(project)
request_body = _to_query_request(
query=query, job_config=job_config, location=location, timeout=timeout
)

def do_query():
request_body["requestId"] = make_job_id()
Expand All @@ -253,3 +311,235 @@ def do_query():
future._job_retry = job_retry

return future


def query_and_wait(
client: "Client",
query: str,
*,
job_config: Optional[job.QueryJobConfig],
location: Optional[str],
project: str,
api_timeout: Optional[float] = None,
wait_timeout: Optional[float] = None,
retry: Optional[retries.Retry],
job_retry: Optional[retries.Retry],
page_size: Optional[int] = None,
max_results: Optional[int] = None,
) -> table.RowIterator:
"""Run the query, wait for it to finish, and return the results.
While ``jobCreationMode=JOB_CREATION_OPTIONAL`` is in preview in the
``jobs.query`` REST API, use the default ``jobCreationMode`` unless
the environment variable ``QUERY_PREVIEW_ENABLED=true``. After
``jobCreationMode`` is GA, this method will always use
``jobCreationMode=JOB_CREATION_OPTIONAL``. See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
Args:
client:
BigQuery client to make API calls.
query (str):
SQL query to be executed. Defaults to the standard SQL
dialect. Use the ``job_config`` parameter to change dialects.
job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
Extra configuration options for the job.
To override any options that were previously set in
the ``default_query_job_config`` given to the
``Client`` constructor, manually set those options to ``None``,
or whatever value is preferred.
location (Optional[str]):
Location where to run the job. Must match the location of the
table used in the query as well as the destination table.
project (Optional[str]):
Project ID of the project of where to run the job. Defaults
to the client's project.
api_timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
wait_timeout (Optional[float]):
The number of seconds to wait for the query to finish. If the
query doesn't finish before this timeout, the client attempts
to cancel the query.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC. This only applies to making RPC
calls. It isn't used to retry failed jobs. This has
a reasonable default that should only be overridden
with care.
job_retry (Optional[google.api_core.retry.Retry]):
How to retry failed jobs. The default retries
rate-limit-exceeded errors. Passing ``None`` disables
job retry. Not all jobs can be retried.
page_size (Optional[int]):
The maximum number of rows in each page of results from this
request. Non-positive values are ignored.
max_results (Optional[int]):
The maximum total number of rows from this request.
Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
:class:`~google.cloud.bigquery.table.Row`-s. During each
page, the iterator will have the ``total_rows`` attribute
set, which counts the total number of rows **in the result
set** (this is distinct from the total number of rows in the
current page: ``iterator.page.num_items``).
If the query is a special query that produces no results, e.g.
a DDL query, an ``_EmptyRowIterator`` instance is returned.
Raises:
TypeError:
If ``job_config`` is not an instance of
:class:`~google.cloud.bigquery.job.QueryJobConfig`
class.
"""
# Some API parameters aren't supported by the jobs.query API. In these
# cases, fallback to a jobs.insert call.
if not _supported_by_jobs_query(job_config):
return _wait_or_cancel(
query_jobs_insert(
client=client,
query=query,
job_id=None,
job_id_prefix=None,
job_config=job_config,
location=location,
project=project,
retry=retry,
timeout=api_timeout,
job_retry=job_retry,
),
api_timeout=api_timeout,
wait_timeout=wait_timeout,
retry=retry,
page_size=page_size,
max_results=max_results,
)

path = _to_query_path(project)
request_body = _to_query_request(
query=query, job_config=job_config, location=location, timeout=api_timeout
)

if page_size is not None and max_results is not None:
request_body["maxResults"] = min(page_size, max_results)
elif page_size is not None or max_results is not None:
request_body["maxResults"] = page_size or max_results

if os.getenv("QUERY_PREVIEW_ENABLED", "").casefold() == "true":
request_body["jobCreationMode"] = "JOB_CREATION_OPTIONAL"

def do_query():
request_body["requestId"] = make_job_id()
span_attributes = {"path": path}

# For easier testing, handle the retries ourselves.
if retry is not None:
response = retry(client._call_api)(
retry=None, # We're calling the retry decorator ourselves.
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=api_timeout,
)
else:
response = client._call_api(
retry=None,
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=api_timeout,
)

# Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
# to fetch, there will be a job ID for jobs.getQueryResults.
query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
response
)
page_token = query_results.page_token
more_pages = page_token is not None

if more_pages or not query_results.complete:
# TODO(swast): Avoid a call to jobs.get in some cases (few
# remaining pages) by waiting for the query to finish and calling
# client._list_rows_from_query_results directly. Need to update
# RowIterator to fetch destination table via the job ID if needed.
return _wait_or_cancel(
_to_query_job(client, query, job_config, response),
api_timeout=api_timeout,
wait_timeout=wait_timeout,
retry=retry,
page_size=page_size,
max_results=max_results,
)

return table.RowIterator(
client=client,
api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
path=None,
schema=query_results.schema,
max_results=max_results,
page_size=page_size,
total_rows=query_results.total_rows,
first_page_response=response,
location=query_results.location,
job_id=query_results.job_id,
query_id=query_results.query_id,
project=query_results.project,
)

if job_retry is not None:
return job_retry(do_query)()
else:
return do_query()


def _supported_by_jobs_query(job_config: Optional[job.QueryJobConfig]) -> bool:
"""True if jobs.query can be used. False if jobs.insert is needed."""
if job_config is None:
return True

return (
# These features aren't supported by jobs.query.
job_config.clustering_fields is None
and job_config.destination is None
and job_config.destination_encryption_configuration is None
and job_config.range_partitioning is None
and job_config.table_definitions is None
and job_config.time_partitioning is None
)


def _wait_or_cancel(
job: job.QueryJob,
api_timeout: Optional[float],
wait_timeout: Optional[float],
retry: Optional[retries.Retry],
page_size: Optional[int],
max_results: Optional[int],
) -> table.RowIterator:
"""Wait for a job to complete and return the results.
If we can't return the results within the ``wait_timeout``, try to cancel
the job.
"""
try:
return job.result(
page_size=page_size,
max_results=max_results,
retry=retry,
timeout=wait_timeout,
)
except Exception:
# Attempt to cancel the job since we can't return the results.
try:
job.cancel(retry=retry, timeout=api_timeout)
except Exception:
# Don't eat the original exception if cancel fails.
pass
raise

0 comments on commit 89a647e

Please sign in to comment.