Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ensure "bigframes-api" label is always set on jobs, even if the API is unknown #722

Merged
merged 6 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions bigframes/core/log_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,12 @@ def add_api_method(api_method_name):
_api_methods = _api_methods[:MAX_LABELS_COUNT]


def get_and_reset_api_methods():
def get_and_reset_api_methods(dry_run: bool = False):
global _lock
with _lock:
previous_api_methods = list(_api_methods)
_api_methods.clear()

# dry_run might not make a job resource, so only reset the log on real queries.
if not dry_run:
_api_methods.clear()
return previous_api_methods
12 changes: 9 additions & 3 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2912,7 +2912,9 @@ def to_csv(
field_delimiter=sep,
header=header,
)
_, query_job = self._block.expr.session._start_query(export_data_statement)
_, query_job = self._block.expr.session._start_query(
export_data_statement, api_name="dataframe-to_csv"
)
self._set_internal_query_job(query_job)

def to_json(
Expand Down Expand Up @@ -2954,7 +2956,9 @@ def to_json(
format="JSON",
export_options={},
)
_, query_job = self._block.expr.session._start_query(export_data_statement)
_, query_job = self._block.expr.session._start_query(
export_data_statement, api_name="dataframe-to_json"
)
self._set_internal_query_job(query_job)

def to_gbq(
Expand Down Expand Up @@ -3086,7 +3090,9 @@ def to_parquet(
format="PARQUET",
export_options=export_options,
)
_, query_job = self._block.expr.session._start_query(export_data_statement)
_, query_job = self._block.expr.session._start_query(
export_data_statement, api_name="dataframe-to_parquet"
)
self._set_internal_query_job(query_job)

def to_dict(
Expand Down
15 changes: 9 additions & 6 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import sys
import tempfile
import textwrap
from typing import List, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union
from typing import cast, List, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union
import warnings

import ibis
Expand Down Expand Up @@ -133,6 +133,8 @@ def __init__(
cloud_function_service_account,
cloud_function_kms_key_name,
cloud_function_docker_repository,
*,
session: Session,
):
self._gcp_project_id = gcp_project_id
self._cloud_function_region = cloud_function_region
Expand All @@ -145,6 +147,7 @@ def __init__(
self._cloud_function_service_account = cloud_function_service_account
self._cloud_function_kms_key_name = cloud_function_kms_key_name
self._cloud_function_docker_repository = cloud_function_docker_repository
self._session = session

def create_bq_remote_function(
self,
Expand Down Expand Up @@ -216,10 +219,8 @@ def create_bq_remote_function(
# This requires bigquery.datasets.create IAM permission
self._bq_client.create_dataset(dataset, exists_ok=True)

# TODO: Use session._start_query() so we get progress bar
query_job = self._bq_client.query(create_function_ddl) # Make an API request.
query_job.result() # Wait for the job to complete.

# TODO(swast): plumb through the original, user-facing api_name.
_, query_job = self._session._start_query(create_function_ddl)
logger.info(f"Created remote function {query_job.ddl_target_routine}")

def get_cloud_function_fully_qualified_parent(self):
Expand Down Expand Up @@ -910,6 +911,7 @@ def remote_function(
is_row_processor = False

import bigframes.series
import bigframes.session

if input_types == bigframes.series.Series:
warnings.warn(
Expand All @@ -928,7 +930,7 @@ def remote_function(
# Some defaults may be used from the session if not provided otherwise
import bigframes.pandas as bpd

session = session or bpd.get_global_session()
session = cast(bigframes.session.Session, session or bpd.get_global_session())

# A BigQuery client is required to perform BQ operations
if not bigquery_client:
Expand Down Expand Up @@ -1040,6 +1042,7 @@ def wrapper(f):
cloud_function_service_account,
cloud_function_kms_key_name,
cloud_function_docker_repository,
session=session, # type: ignore
)

rf_name, cf_name = remote_function_client.provision_bq_remote_function(
Expand Down
8 changes: 7 additions & 1 deletion bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,9 @@ def _set_default_session_location_if_possible(query):
bqclient = clients_provider.bqclient

if bigframes.session._io.bigquery.is_query(query):
# Intentionally run outside of the session so that we can detect the
# location before creating the session. Since it's a dry_run, labels
# aren't necessary.
job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True))
options.bigquery.location = job.location
else:
Expand Down Expand Up @@ -773,7 +776,10 @@ def clean_up_by_session_id(
dataset = session._anonymous_dataset
else:
dataset = bigframes.session._io.bigquery.create_bq_dataset_reference(
client, location=location, project=project
client,
location=location,
project=project,
api_name="clean_up_by_session_id",
)

bigframes.session._io.bigquery.delete_tables_matching_session_id(
Expand Down
40 changes: 26 additions & 14 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ def __init__(

self._anonymous_dataset = (
bigframes.session._io.bigquery.create_bq_dataset_reference(
self.bqclient, location=self._location
self.bqclient,
location=self._location,
api_name="session-__init__",
)
)

Expand Down Expand Up @@ -420,9 +422,11 @@ def _query_to_destination(
# bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement.
dry_run_config = bigquery.QueryJobConfig()
dry_run_config.dry_run = True
_, dry_run_job = self._start_query(query, job_config=dry_run_config)
_, dry_run_job = self._start_query(
query, job_config=dry_run_config, api_name=api_name
)
if dry_run_job.statement_type != "SELECT":
_, query_job = self._start_query(query)
_, query_job = self._start_query(query, api_name=api_name)
return query_job.destination, query_job

# Create a table to workaround BigQuery 10 GB query results limit. See:
Expand Down Expand Up @@ -451,23 +455,25 @@ def _query_to_destination(
bigquery.QueryJobConfig,
bigquery.QueryJobConfig.from_api_repr(configuration),
)
job_config.labels["bigframes-api"] = api_name
job_config.destination = temp_table

try:
# Write to temp table to workaround BigQuery 10 GB query results
# limit. See: internal issue 303057336.
job_config.labels["error_caught"] = "true"
_, query_job = self._start_query(
query, job_config=job_config, timeout=timeout
query,
job_config=job_config,
timeout=timeout,
api_name=api_name,
)
return query_job.destination, query_job
except google.api_core.exceptions.BadRequest:
# Some SELECT statements still aren't compatible with cluster
# tables as the destination. For example, if the query has a
# top-level ORDER BY, this conflicts with our ability to cluster
# the table by the index column(s).
_, query_job = self._start_query(query, timeout=timeout)
_, query_job = self._start_query(query, timeout=timeout, api_name=api_name)
return query_job.destination, query_job

def read_gbq_query(
Expand Down Expand Up @@ -811,7 +817,7 @@ def _read_gbq_table(
dry_run_config = bigquery.QueryJobConfig()
dry_run_config.dry_run = True
try:
self._start_query(sql, job_config=dry_run_config)
self._start_query(sql, job_config=dry_run_config, api_name=api_name)
except google.api_core.exceptions.NotFound:
# note that a notfound caused by a simple typo will be
# caught above when the metadata is fetched, not here
Expand Down Expand Up @@ -1777,12 +1783,6 @@ def _prepare_query_job_config(
bigframes.options.compute.maximum_bytes_billed
)

current_labels = job_config.labels if job_config.labels else {}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to bigframes.session._io.bigquery.create_job_configs_labels

for key, value in bigframes.options.compute.extra_query_labels.items():
if key not in current_labels:
current_labels[key] = value
job_config.labels = current_labels

if self._bq_kms_key_name:
job_config.destination_encryption_configuration = (
bigquery.EncryptionConfiguration(kms_key_name=self._bq_kms_key_name)
Expand Down Expand Up @@ -1818,13 +1818,19 @@ def _start_query(
job_config: Optional[bigquery.job.QueryJobConfig] = None,
max_results: Optional[int] = None,
timeout: Optional[float] = None,
api_name: Optional[str] = None,
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""
Starts BigQuery query job and waits for results.
"""
job_config = self._prepare_query_job_config(job_config)
return bigframes.session._io.bigquery.start_query_with_client(
self.bqclient, sql, job_config, max_results, timeout
self.bqclient,
sql,
job_config,
max_results,
timeout,
api_name=api_name,
)

def _start_query_ml_ddl(
Expand Down Expand Up @@ -1970,6 +1976,9 @@ def _execute(
job_config = bigquery.QueryJobConfig(dry_run=dry_run)
else:
job_config.dry_run = dry_run

# TODO(swast): plumb through the api_name of the user-facing api that
# caused this query.
return self._start_query(
sql=sql,
job_config=job_config,
Expand All @@ -1982,6 +1991,9 @@ def _peek(
if not tree_properties.peekable(self._with_cached_executions(array_value.node)):
warnings.warn("Peeking this value cannot be done efficiently.")
sql = self._compile_unordered(array_value).peek_sql(n_rows)

# TODO(swast): plumb through the api_name of the user-facing api that
# caused this query.
return self._start_query(
sql=sql,
)
Expand Down
47 changes: 38 additions & 9 deletions bigframes/session/_io/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq
import google.api_core.exceptions
import google.cloud.bigquery as bigquery
import google.cloud.bigquery.table

import bigframes
from bigframes.core import log_adapter
Expand All @@ -40,19 +41,34 @@
# will be limited to this many tables

LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
CHECK_DRIVE_PERMISSIONS = "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."


def create_job_configs_labels(
job_configs_labels: Optional[Dict[str, str]],
api_methods: typing.List[str],
api_name: Optional[str] = None,
) -> Dict[str, str]:
if job_configs_labels is None:
job_configs_labels = {}

if api_methods:
# If the user has labels they wish to set, make sure we set those first so
# they are preserved.
for key, value in bigframes.options.compute.extra_query_labels.items():
job_configs_labels[key] = value

if api_name is not None:
job_configs_labels["bigframes-api"] = api_name

if api_methods and "bigframes-api" not in job_configs_labels:
job_configs_labels["bigframes-api"] = api_methods[0]
del api_methods[0]

# Make sure we always populate bigframes-api with _something_, even if we
# have a code path which doesn't populate the list of api_methods. See
# internal issue 336521938.
job_configs_labels.setdefault("bigframes-api", "unknown")

labels = list(
itertools.chain(
job_configs_labels.keys(),
Expand Down Expand Up @@ -193,27 +209,33 @@ def format_option(key: str, value: Union[bool, str]) -> str:
return f"{key}={repr(value)}"


def add_labels(job_config, api_name: Optional[str] = None):
api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run)
job_config.labels = create_job_configs_labels(
job_configs_labels=job_config.labels,
api_methods=api_methods,
api_name=api_name,
)


def start_query_with_client(
bq_client: bigquery.Client,
sql: str,
job_config: bigquery.job.QueryJobConfig,
max_results: Optional[int] = None,
timeout: Optional[float] = None,
api_name: Optional[str] = None,
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""
Starts query job and waits for results.
"""
if not job_config.dry_run:
api_methods = log_adapter.get_and_reset_api_methods()
job_config.labels = create_job_configs_labels(
job_configs_labels=job_config.labels, api_methods=api_methods
)
add_labels(job_config, api_name=api_name)

try:
query_job = bq_client.query(sql, job_config=job_config, timeout=timeout)
except google.api_core.exceptions.Forbidden as ex:
if "Drive credentials" in ex.message:
ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
ex.message += CHECK_DRIVE_PERMISSIONS
raise

opts = bigframes.options.display
Expand Down Expand Up @@ -286,7 +308,10 @@ def delete_tables_matching_session_id(


def create_bq_dataset_reference(
bq_client: bigquery.Client, location=None, project=None
bq_client: bigquery.Client,
location=None,
project=None,
api_name: str = "unknown",
) -> bigquery.DatasetReference:
"""Create and identify dataset(s) for temporary BQ resources.

Expand All @@ -307,7 +332,11 @@ def create_bq_dataset_reference(
Returns:
bigquery.DatasetReference: The constructed reference to the anonymous dataset.
"""
query_job = bq_client.query("SELECT 1", location=location, project=project)
job_config = google.cloud.bigquery.QueryJobConfig()
add_labels(job_config, api_name=api_name)
query_job = bq_client.query(
"SELECT 1", location=location, project=project, job_config=job_config
)
query_job.result() # blocks until finished

# The anonymous dataset is used by BigQuery to write query results and
Expand Down
Loading