Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add api_method parameter to Client.query to select INSERT or QUERY API #967

Merged
merged 25 commits into from Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
575966d
feat: add `api_method` parameter to `Client.query` to select `insert`…
tswast Sep 13, 2021
094e3cb
WIP: begin implementation of jobs.query usage
tswast Sep 13, 2021
17994f5
remove extra files
tswast Oct 5, 2021
c963e25
insert query with jobs.query
tswast Oct 6, 2021
5093378
fix merge between job config and query request
tswast Oct 6, 2021
e2e1c7d
add tests
tswast Oct 6, 2021
2e4af92
update todo with thoughts on future perf update
tswast Oct 7, 2021
d700df5
clarify TODO comment
tswast Oct 7, 2021
1e73a56
add placeholders for needed tests
tswast Oct 8, 2021
7435c8d
add schema property
tswast Oct 11, 2021
d383847
feat: add `QueryJob.schema` property for dry run queries
tswast Oct 11, 2021
8bc2458
add more job properties
tswast Oct 13, 2021
a2b4c2b
add tests for differences in API error behavior between jobs.query an…
tswast Oct 14, 2021
8b970f2
update docs to show differences
tswast Oct 14, 2021
e7e5e17
cover error conversion
tswast Oct 14, 2021
b572188
restore missing modules
tswast Oct 14, 2021
7bb1200
add unit tests
tswast Oct 15, 2021
7fbf966
Merge remote-tracking branch 'upstream/v3' into issue589-queries-endp…
tswast Nov 18, 2021
0598ace
adjust query job construction
tswast Nov 19, 2021
4f36bae
avoid conflicting table IDs
tswast Nov 19, 2021
3058498
mock query response
tswast Nov 19, 2021
ba785d9
fix unit test coverage
tswast Nov 19, 2021
b67ac5a
fix type errors
tswast Nov 19, 2021
a3223b1
fix docs formatting
tswast Nov 19, 2021
b257609
comments and additional unit tests
tswast Nov 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
248 changes: 248 additions & 0 deletions google/cloud/bigquery/_job_helpers.py
@@ -0,0 +1,248 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers for interacting with the job REST APIs from the client."""

import copy
import uuid
from typing import Any, Dict, TYPE_CHECKING, Optional

import google.api_core.exceptions as core_exceptions
from google.api_core import retry as retries

from google.cloud.bigquery import job

# Avoid circular imports
if TYPE_CHECKING: # pragma: NO COVER
from google.cloud.bigquery.client import Client


_TIMEOUT_BUFFER_MILLIS = 100
tswast marked this conversation as resolved.
Show resolved Hide resolved


def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> str:
"""Construct an ID for a new job.

Args:
job_id (Optional[str]): the user-provided job ID.
prefix (Optional[str]): the user-provided prefix for a job ID.
tswast marked this conversation as resolved.
Show resolved Hide resolved

Returns:
str: A job ID
"""
if job_id is not None:
return job_id
elif prefix is not None:
return str(prefix) + str(uuid.uuid4())
else:
return str(uuid.uuid4())


def query_jobs_insert(
client: "Client",
query: str,
job_config: job.QueryJobConfig,
job_id: str,
job_id_prefix: str,
location: str,
project: str,
retry: retries.Retry,
timeout: Optional[float],
job_retry: retries.Retry,
):
tswast marked this conversation as resolved.
Show resolved Hide resolved
"""Initiate a query using jobs.insert.

See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
"""
job_id_given = job_id is not None
job_id_save = job_id
job_config_save = job_config

def do_query():
# Make a copy now, so that original doesn't get changed by the process
# below and to facilitate retry
job_config = copy.deepcopy(job_config_save)

job_id = make_job_id(job_id_save, job_id_prefix)
job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(job_ref, query, client=client, job_config=job_config)

try:
query_job._begin(retry=retry, timeout=timeout)
except core_exceptions.Conflict as create_exc:
# The thought is if someone is providing their own job IDs and they get
# their job ID generation wrong, this could end up returning results for
# the wrong query. We thus only try to recover if job ID was not given.
if job_id_given:
raise create_exc

try:
query_job = client.get_job(
job_id,
project=project,
location=location,
retry=retry,
timeout=timeout,
)
except core_exceptions.GoogleAPIError: # (includes RetryError)
raise create_exc
else:
return query_job
else:
return query_job

future = do_query()
# The future might be in a failed state now, but if it's
# unrecoverable, we'll find out when we ask for it's result, at which
# point, we may retry.
if not job_id_given:
future._retry_do_query = do_query # in case we have to retry later
future._job_retry = job_retry

return future


def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any]:
"""Transform from Job resource to QueryRequest resource.

Most of the keys in job.configuration.query are in common with
QueryRequest. If any configuration property is set that is not available in
jobs.query, it will result in a server-side error.
"""
request_body = {}
job_config_resource = job_config.to_api_repr() if job_config else {}
query_config_resource = job_config_resource.get("query", {})

request_body.update(query_config_resource)

# These keys are top level in job resource and query resource.
if "labels" in job_config_resource:
request_body["labels"] = job_config_resource["labels"]
if "dryRun" in job_config_resource:
request_body["dryRun"] = job_config_resource["dryRun"]

# Default to standard SQL.
request_body.setdefault("useLegacySql", False)

# Since jobs.query can return results, ensure we use the lossless timestamp
# format. See: https://github.com/googleapis/python-bigquery/issues/395
request_body.setdefault("formatOptions", {})
request_body["formatOptions"]["useInt64Timestamp"] = True

return request_body


def _to_query_job(
client: "Client",
query: str,
request_config: Optional[job.QueryJobConfig],
query_response: Dict[str, Any],
) -> job.QueryJob:
job_ref_resource = query_response["jobReference"]
job_ref = job._JobReference._from_api_repr(job_ref_resource)
query_job = job.QueryJob(job_ref, query, client=client)

# Not all relevant properties are in the jobs.query response, so
if request_config is not None:
query_job._properties["configuration"].update(request_config.to_api_repr())
query_job._properties["configuration"]["query"]["query"] = query
query_job._properties["configuration"]["query"].setdefault(
"useLegacySql", False
)

query_job._properties.setdefault("statistics", {})
query_job._properties["statistics"].setdefault("query", {})
query_job._properties["statistics"]["query"]["cacheHit"] = query_response.get(
"cacheHit"
)
query_job._properties["statistics"]["query"]["schema"] = query_response.get(
"schema"
)
query_job._properties["statistics"]["query"][
"totalBytesProcessed"
] = query_response.get("totalBytesProcessed")

# Set errors if any were encountered.
query_job._properties.setdefault("status", {})
if "errors" in query_response:
# Set errors but not errorResult. If there was an error that failed
# the job, jobs.query behaves like jobs.getQueryResults and returns a
# non-success HTTP status code.
errors = query_response["errors"]
query_job._properties["status"]["errors"] = errors

# Transform job state so that QueryJob doesn't try to restart the query.
job_complete = query_response.get("jobComplete")
if job_complete:
query_job._properties["status"]["state"] = "DONE"
# TODO: https://github.com/googleapis/python-bigquery/issues/589
# Set the first page of results if job is "complete" and there is
# only 1 page of results. Otherwise, use the existing logic that
# refreshes the job stats.
#
# This also requires updates to `to_dataframe` and the DB API connector
# so that they don't try to read from a destination table if all the
# results are present.
else:
query_job._properties["status"]["state"] = "PENDING"

return query_job


def query_jobs_query(
client: "Client",
query: str,
job_config: Optional[job.QueryJobConfig],
location: str,
project: str,
retry: retries.Retry,
timeout: Optional[float],
job_retry: retries.Retry,
):
"""Initiate a query using jobs.query.

See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
"""
path = f"/projects/{project}/queries"
request_body = _to_query_request(job_config)

if timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
request_body["location"] = location
request_body["query"] = query

def do_query():
request_body["requestId"] = make_job_id()
span_attributes = {"path": path}
api_response = client._call_api(
retry,
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=timeout,
)
return _to_query_job(client, query, job_config, api_response)

future = do_query()

# The future might be in a failed state now, but if it's
# unrecoverable, we'll find out when we ask for it's result, at which
# point, we may retry.
future._retry_do_query = do_query # in case we have to retry later
future._job_retry = job_retry

return future
95 changes: 32 additions & 63 deletions google/cloud/bigquery/client.py
Expand Up @@ -60,6 +60,8 @@
DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO,
)

from google.cloud.bigquery import _job_helpers
from google.cloud.bigquery._job_helpers import make_job_id as _make_job_id
from google.cloud.bigquery._helpers import _get_sub_prop
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
Expand All @@ -69,6 +71,7 @@
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery import enums
from google.cloud.bigquery.enums import AutoRowIDs
from google.cloud.bigquery.opentelemetry_tracing import create_span
from google.cloud.bigquery import job
Expand Down Expand Up @@ -3164,6 +3167,7 @@ def query(
retry: retries.Retry = DEFAULT_RETRY,
timeout: TimeoutType = DEFAULT_TIMEOUT,
job_retry: retries.Retry = DEFAULT_JOB_RETRY,
api_method: enums.QueryApiMethod = enums.QueryApiMethod.INSERT,
) -> job.QueryJob:
"""Run a SQL query.

Expand Down Expand Up @@ -3215,6 +3219,11 @@ def query(
called on the job returned. The ``job_retry``
specified here becomes the default ``job_retry`` for
``result()``, where it can also be specified.
api_method:
Method with which to start the query job.

See :class:`google.cloud.bigquery.enums.QueryApiMethod` for
details on the difference between the query start methods.

Returns:
google.cloud.bigquery.job.QueryJob: A new query job instance.
Expand All @@ -3238,7 +3247,10 @@ def query(
" provided."
)

job_id_save = job_id
if job_id_given and api_method == enums.QueryApiMethod.QUERY:
raise TypeError(
"`job_id` was provided, but the 'QUERY' `api_method` was requested."
)

if project is None:
project = self.project
Expand Down Expand Up @@ -3269,50 +3281,25 @@ def query(

# Note that we haven't modified the original job_config (or
# _default_query_job_config) up to this point.
job_config_save = job_config

def do_query():
# Make a copy now, so that original doesn't get changed by the process
# below and to facilitate retry
job_config = copy.deepcopy(job_config_save)

job_id = _make_job_id(job_id_save, job_id_prefix)
job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config)

try:
query_job._begin(retry=retry, timeout=timeout)
except core_exceptions.Conflict as create_exc:
# The thought is if someone is providing their own job IDs and they get
# their job ID generation wrong, this could end up returning results for
# the wrong query. We thus only try to recover if job ID was not given.
if job_id_given:
raise create_exc

try:
query_job = self.get_job(
job_id,
project=project,
location=location,
retry=retry,
timeout=timeout,
)
except core_exceptions.GoogleAPIError: # (includes RetryError)
raise create_exc
else:
return query_job
else:
return query_job

future = do_query()
# The future might be in a failed state now, but if it's
# unrecoverable, we'll find out when we ask for it's result, at which
# point, we may retry.
if not job_id_given:
future._retry_do_query = do_query # in case we have to retry later
future._job_retry = job_retry

return future
if api_method == enums.QueryApiMethod.QUERY:
return _job_helpers.query_jobs_query(
self, query, job_config, location, project, retry, timeout, job_retry,
)
elif api_method == enums.QueryApiMethod.INSERT:
return _job_helpers.query_jobs_insert(
self,
query,
job_config,
job_id,
job_id_prefix,
location,
project,
retry,
timeout,
job_retry,
)
else:
raise ValueError(f"Got unexpected value for api_method: {repr(api_method)}")

def insert_rows(
self,
Expand Down Expand Up @@ -3985,24 +3972,6 @@ def _extract_job_reference(job, project=None, location=None):
return (project, location, job_id)


def _make_job_id(job_id: Optional[str], prefix: Optional[str] = None) -> str:
"""Construct an ID for a new job.

Args:
job_id: the user-provided job ID.
prefix: the user-provided prefix for a job ID.

Returns:
str: A job ID
"""
if job_id is not None:
return job_id
elif prefix is not None:
return str(prefix) + str(uuid.uuid4())
else:
return str(uuid.uuid4())


def _check_mode(stream):
"""Check that a stream was opened in read-binary mode.

Expand Down