Skip to content

Commit

Permalink
feat: ensure "bigframes-api" label is always set on jobs, even if t…
Browse files Browse the repository at this point in the history
…he API is unknown (#722)

* feat: ensure `"bigframes-api"` label is always set on jobs, even if the API is unknown

* remove some dead code. plumb through api_name

* avoid . in label value

* add tests
  • Loading branch information
tswast committed May 24, 2024
1 parent 354abc1 commit 1832778
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 99 deletions.
2 changes: 1 addition & 1 deletion .kokoro/continuous/e2e.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Only run this nox session.
env_vars: {
key: "NOX_SESSION"
value: "unit_prerelease system_prerelease system_noextras e2e notebook"
value: "e2e doctest notebook unit_prerelease system_prerelease system_noextras"
}

env_vars: {
Expand Down
2 changes: 1 addition & 1 deletion .kokoro/presubmit/e2e.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Only run this nox session.
env_vars: {
key: "NOX_SESSION"
value: "unit_prerelease system_prerelease system_noextras e2e notebook"
value: "e2e doctest notebook unit_prerelease system_prerelease system_noextras"
}

env_vars: {
Expand Down
7 changes: 5 additions & 2 deletions bigframes/core/log_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,12 @@ def add_api_method(api_method_name):
_api_methods = _api_methods[:MAX_LABELS_COUNT]


def get_and_reset_api_methods():
def get_and_reset_api_methods(dry_run: bool = False):
global _lock
with _lock:
previous_api_methods = list(_api_methods)
_api_methods.clear()

# dry_run might not make a job resource, so only reset the log on real queries.
if not dry_run:
_api_methods.clear()
return previous_api_methods
12 changes: 9 additions & 3 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2912,7 +2912,9 @@ def to_csv(
field_delimiter=sep,
header=header,
)
_, query_job = self._block.expr.session._start_query(export_data_statement)
_, query_job = self._block.expr.session._start_query(
export_data_statement, api_name="dataframe-to_csv"
)
self._set_internal_query_job(query_job)

def to_json(
Expand Down Expand Up @@ -2954,7 +2956,9 @@ def to_json(
format="JSON",
export_options={},
)
_, query_job = self._block.expr.session._start_query(export_data_statement)
_, query_job = self._block.expr.session._start_query(
export_data_statement, api_name="dataframe-to_json"
)
self._set_internal_query_job(query_job)

def to_gbq(
Expand Down Expand Up @@ -3086,7 +3090,9 @@ def to_parquet(
format="PARQUET",
export_options=export_options,
)
_, query_job = self._block.expr.session._start_query(export_data_statement)
_, query_job = self._block.expr.session._start_query(
export_data_statement, api_name="dataframe-to_parquet"
)
self._set_internal_query_job(query_job)

def to_dict(
Expand Down
15 changes: 9 additions & 6 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import sys
import tempfile
import textwrap
from typing import List, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union
from typing import cast, List, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union
import warnings

import ibis
Expand Down Expand Up @@ -133,6 +133,8 @@ def __init__(
cloud_function_service_account,
cloud_function_kms_key_name,
cloud_function_docker_repository,
*,
session: Session,
):
self._gcp_project_id = gcp_project_id
self._cloud_function_region = cloud_function_region
Expand All @@ -145,6 +147,7 @@ def __init__(
self._cloud_function_service_account = cloud_function_service_account
self._cloud_function_kms_key_name = cloud_function_kms_key_name
self._cloud_function_docker_repository = cloud_function_docker_repository
self._session = session

def create_bq_remote_function(
self,
Expand Down Expand Up @@ -216,10 +219,8 @@ def create_bq_remote_function(
# This requires bigquery.datasets.create IAM permission
self._bq_client.create_dataset(dataset, exists_ok=True)

# TODO: Use session._start_query() so we get progress bar
query_job = self._bq_client.query(create_function_ddl) # Make an API request.
query_job.result() # Wait for the job to complete.

# TODO(swast): plumb through the original, user-facing api_name.
_, query_job = self._session._start_query(create_function_ddl)
logger.info(f"Created remote function {query_job.ddl_target_routine}")

def get_cloud_function_fully_qualified_parent(self):
Expand Down Expand Up @@ -910,6 +911,7 @@ def remote_function(
is_row_processor = False

import bigframes.series
import bigframes.session

if input_types == bigframes.series.Series:
warnings.warn(
Expand All @@ -928,7 +930,7 @@ def remote_function(
# Some defaults may be used from the session if not provided otherwise
import bigframes.pandas as bpd

session = session or bpd.get_global_session()
session = cast(bigframes.session.Session, session or bpd.get_global_session())

# A BigQuery client is required to perform BQ operations
if not bigquery_client:
Expand Down Expand Up @@ -1040,6 +1042,7 @@ def wrapper(f):
cloud_function_service_account,
cloud_function_kms_key_name,
cloud_function_docker_repository,
session=session, # type: ignore
)

rf_name, cf_name = remote_function_client.provision_bq_remote_function(
Expand Down
8 changes: 7 additions & 1 deletion bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,9 @@ def _set_default_session_location_if_possible(query):
bqclient = clients_provider.bqclient

if bigframes.session._io.bigquery.is_query(query):
# Intentionally run outside of the session so that we can detect the
# location before creating the session. Since it's a dry_run, labels
# aren't necessary.
job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True))
options.bigquery.location = job.location
else:
Expand Down Expand Up @@ -773,7 +776,10 @@ def clean_up_by_session_id(
dataset = session._anonymous_dataset
else:
dataset = bigframes.session._io.bigquery.create_bq_dataset_reference(
client, location=location, project=project
client,
location=location,
project=project,
api_name="clean_up_by_session_id",
)

bigframes.session._io.bigquery.delete_tables_matching_session_id(
Expand Down
40 changes: 26 additions & 14 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ def __init__(

self._anonymous_dataset = (
bigframes.session._io.bigquery.create_bq_dataset_reference(
self.bqclient, location=self._location
self.bqclient,
location=self._location,
api_name="session-__init__",
)
)

Expand Down Expand Up @@ -420,9 +422,11 @@ def _query_to_destination(
# bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement.
dry_run_config = bigquery.QueryJobConfig()
dry_run_config.dry_run = True
_, dry_run_job = self._start_query(query, job_config=dry_run_config)
_, dry_run_job = self._start_query(
query, job_config=dry_run_config, api_name=api_name
)
if dry_run_job.statement_type != "SELECT":
_, query_job = self._start_query(query)
_, query_job = self._start_query(query, api_name=api_name)
return query_job.destination, query_job

# Create a table to workaround BigQuery 10 GB query results limit. See:
Expand Down Expand Up @@ -451,23 +455,25 @@ def _query_to_destination(
bigquery.QueryJobConfig,
bigquery.QueryJobConfig.from_api_repr(configuration),
)
job_config.labels["bigframes-api"] = api_name
job_config.destination = temp_table

try:
# Write to temp table to workaround BigQuery 10 GB query results
# limit. See: internal issue 303057336.
job_config.labels["error_caught"] = "true"
_, query_job = self._start_query(
query, job_config=job_config, timeout=timeout
query,
job_config=job_config,
timeout=timeout,
api_name=api_name,
)
return query_job.destination, query_job
except google.api_core.exceptions.BadRequest:
# Some SELECT statements still aren't compatible with cluster
# tables as the destination. For example, if the query has a
# top-level ORDER BY, this conflicts with our ability to cluster
# the table by the index column(s).
_, query_job = self._start_query(query, timeout=timeout)
_, query_job = self._start_query(query, timeout=timeout, api_name=api_name)
return query_job.destination, query_job

def read_gbq_query(
Expand Down Expand Up @@ -811,7 +817,7 @@ def _read_gbq_table(
dry_run_config = bigquery.QueryJobConfig()
dry_run_config.dry_run = True
try:
self._start_query(sql, job_config=dry_run_config)
self._start_query(sql, job_config=dry_run_config, api_name=api_name)
except google.api_core.exceptions.NotFound:
# note that a notfound caused by a simple typo will be
# caught above when the metadata is fetched, not here
Expand Down Expand Up @@ -1777,12 +1783,6 @@ def _prepare_query_job_config(
bigframes.options.compute.maximum_bytes_billed
)

current_labels = job_config.labels if job_config.labels else {}
for key, value in bigframes.options.compute.extra_query_labels.items():
if key not in current_labels:
current_labels[key] = value
job_config.labels = current_labels

if self._bq_kms_key_name:
job_config.destination_encryption_configuration = (
bigquery.EncryptionConfiguration(kms_key_name=self._bq_kms_key_name)
Expand Down Expand Up @@ -1818,13 +1818,19 @@ def _start_query(
job_config: Optional[bigquery.job.QueryJobConfig] = None,
max_results: Optional[int] = None,
timeout: Optional[float] = None,
api_name: Optional[str] = None,
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""
Starts BigQuery query job and waits for results.
"""
job_config = self._prepare_query_job_config(job_config)
return bigframes.session._io.bigquery.start_query_with_client(
self.bqclient, sql, job_config, max_results, timeout
self.bqclient,
sql,
job_config,
max_results,
timeout,
api_name=api_name,
)

def _start_query_ml_ddl(
Expand Down Expand Up @@ -1970,6 +1976,9 @@ def _execute(
job_config = bigquery.QueryJobConfig(dry_run=dry_run)
else:
job_config.dry_run = dry_run

# TODO(swast): plumb through the api_name of the user-facing api that
# caused this query.
return self._start_query(
sql=sql,
job_config=job_config,
Expand All @@ -1982,6 +1991,9 @@ def _peek(
if not tree_properties.peekable(self._with_cached_executions(array_value.node)):
warnings.warn("Peeking this value cannot be done efficiently.")
sql = self._compile_unordered(array_value).peek_sql(n_rows)

# TODO(swast): plumb through the api_name of the user-facing api that
# caused this query.
return self._start_query(
sql=sql,
)
Expand Down
47 changes: 38 additions & 9 deletions bigframes/session/_io/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq
import google.api_core.exceptions
import google.cloud.bigquery as bigquery
import google.cloud.bigquery.table

import bigframes
from bigframes.core import log_adapter
Expand All @@ -40,19 +41,34 @@
# will be limited to this many tables

LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
CHECK_DRIVE_PERMISSIONS = "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."


def create_job_configs_labels(
job_configs_labels: Optional[Dict[str, str]],
api_methods: typing.List[str],
api_name: Optional[str] = None,
) -> Dict[str, str]:
if job_configs_labels is None:
job_configs_labels = {}

if api_methods:
# If the user has labels they wish to set, make sure we set those first so
# they are preserved.
for key, value in bigframes.options.compute.extra_query_labels.items():
job_configs_labels[key] = value

if api_name is not None:
job_configs_labels["bigframes-api"] = api_name

if api_methods and "bigframes-api" not in job_configs_labels:
job_configs_labels["bigframes-api"] = api_methods[0]
del api_methods[0]

# Make sure we always populate bigframes-api with _something_, even if we
# have a code path which doesn't populate the list of api_methods. See
# internal issue 336521938.
job_configs_labels.setdefault("bigframes-api", "unknown")

labels = list(
itertools.chain(
job_configs_labels.keys(),
Expand Down Expand Up @@ -193,27 +209,33 @@ def format_option(key: str, value: Union[bool, str]) -> str:
return f"{key}={repr(value)}"


def add_labels(job_config, api_name: Optional[str] = None):
api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run)
job_config.labels = create_job_configs_labels(
job_configs_labels=job_config.labels,
api_methods=api_methods,
api_name=api_name,
)


def start_query_with_client(
bq_client: bigquery.Client,
sql: str,
job_config: bigquery.job.QueryJobConfig,
max_results: Optional[int] = None,
timeout: Optional[float] = None,
api_name: Optional[str] = None,
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""
Starts query job and waits for results.
"""
if not job_config.dry_run:
api_methods = log_adapter.get_and_reset_api_methods()
job_config.labels = create_job_configs_labels(
job_configs_labels=job_config.labels, api_methods=api_methods
)
add_labels(job_config, api_name=api_name)

try:
query_job = bq_client.query(sql, job_config=job_config, timeout=timeout)
except google.api_core.exceptions.Forbidden as ex:
if "Drive credentials" in ex.message:
ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
ex.message += CHECK_DRIVE_PERMISSIONS
raise

opts = bigframes.options.display
Expand Down Expand Up @@ -286,7 +308,10 @@ def delete_tables_matching_session_id(


def create_bq_dataset_reference(
bq_client: bigquery.Client, location=None, project=None
bq_client: bigquery.Client,
location=None,
project=None,
api_name: str = "unknown",
) -> bigquery.DatasetReference:
"""Create and identify dataset(s) for temporary BQ resources.
Expand All @@ -307,7 +332,11 @@ def create_bq_dataset_reference(
Returns:
bigquery.DatasetReference: The constructed reference to the anonymous dataset.
"""
query_job = bq_client.query("SELECT 1", location=location, project=project)
job_config = google.cloud.bigquery.QueryJobConfig()
add_labels(job_config, api_name=api_name)
query_job = bq_client.query(
"SELECT 1", location=location, project=project, job_config=job_config
)
query_job.result() # blocks until finished

# The anonymous dataset is used by BigQuery to write query results and
Expand Down
Loading

0 comments on commit 1832778

Please sign in to comment.