Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add priority parameter to BigQueryHook #30655

Merged
merged 14 commits into from Apr 24, 2023
18 changes: 11 additions & 7 deletions airflow/providers/google/cloud/hooks/bigquery.py
Expand Up @@ -76,6 +76,9 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
:param gcp_conn_id: The Airflow connection used for GCP credentials.
:param use_legacy_sql: This specifies whether to use legacy SQL dialect.
:param location: The location of the BigQuery resource.
:param priority: Specifies a priority for the query.
Possible values include INTERACTIVE and BATCH.
The default value is INTERACTIVE.
:param api_resource_configs: This contains params configuration applied for Google BigQuery jobs.
:param impersonation_chain: This is the optional service account to impersonate using short term
credentials.
Expand All @@ -92,6 +95,7 @@ def __init__(
gcp_conn_id: str = GoogleBaseHook.default_conn_name,
use_legacy_sql: bool = True,
location: str | None = None,
priority: str = "INTERACTIVE",
api_resource_configs: dict | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
Expand All @@ -108,6 +112,7 @@ def __init__(
)
self.use_legacy_sql = use_legacy_sql
self.location = location
self.priority = priority
self.running_job_id: str | None = None
self.api_resource_configs: dict = api_resource_configs if api_resource_configs else {}
self.labels = labels
Expand Down Expand Up @@ -2000,7 +2005,7 @@ def run_query(
query_params: list | None = None,
labels: dict | None = None,
schema_update_options: Iterable | None = None,
priority: str = "INTERACTIVE",
priority: str | None = None,
time_partitioning: dict | None = None,
api_resource_configs: dict | None = None,
cluster_fields: list[str] | None = None,
Expand Down Expand Up @@ -2051,7 +2056,7 @@ def run_query(
table to be updated as a side effect of the query job.
:param priority: Specifies a priority for the query.
Possible values include INTERACTIVE and BATCH.
The default value is INTERACTIVE.
If `None`, defaults to `self.priority`.
:param time_partitioning: configure optional time partitioning fields i.e.
partition by field, type and expiration as per API specifications.
:param cluster_fields: Request that the result of this query be stored sorted
Expand All @@ -2076,12 +2081,11 @@ def run_query(
labels = labels or self.labels
schema_update_options = list(schema_update_options or [])

priority = priority or self.priority

if time_partitioning is None:
time_partitioning = {}

if location:
self.location = location

Comment on lines -2082 to -2084
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is used in some other way to propagate location to the instance attribute. It isn't the right thing to do but at the same time, I'm worried that an operator was relying on this functionality - especially given the fact that there was a test around this. Can you check if callers of run_query were previously relying on it?

Copy link
Contributor Author

@ying-w ying-w Apr 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was just checking what other calls go to run_query() and in hooks/bigquery.py theres only 1 other call from deprecated BigQueryCursor.run_query which passes in *args, **kwargs so might rely on it

in operators/bigquery.py are 2 calls from within the deprecated BigQueryExecuteQueryOperator class just passing through values for execute() but doesn't pass in location. Many of these operators set self.location = location and then pass that to BigQueryHook

So getting rid of the call from cursor.execute() should remove calls to run_query() from non-deprecated functions

(i checked all references of run_query within airflow/providers/google/cloud/)

if not api_resource_configs:
api_resource_configs = self.api_resource_configs
else:
Expand Down Expand Up @@ -2133,7 +2137,7 @@ def run_query(

query_param_list: list[tuple[Any, str, str | bool | None | dict, type | tuple[type]]] = [
(sql, "query", None, (str,)),
(priority, "priority", "INTERACTIVE", (str,)),
(priority, "priority", priority, (str,)),
(use_legacy_sql, "useLegacySql", self.use_legacy_sql, bool),
(query_params, "queryParameters", None, list),
(udf_config, "userDefinedFunctionResources", None, list),
Expand Down Expand Up @@ -2202,7 +2206,7 @@ def run_query(
if encryption_configuration:
configuration["query"]["destinationEncryptionConfiguration"] = encryption_configuration

job = self.insert_job(configuration=configuration, project_id=self.project_id)
job = self.insert_job(configuration=configuration, project_id=self.project_id, location=location)
potiuk marked this conversation as resolved.
Show resolved Hide resolved
self.running_job_id = job.job_id
return job.job_id

Expand Down
9 changes: 0 additions & 9 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Expand Up @@ -93,15 +93,6 @@ def test_bigquery_client_creation(self, mock_build, mock_authorize, mock_bigquer
)
assert mock_bigquery_connection.return_value == result

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_service")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.insert_job")
def test_location_propagates_properly(self, run_with_config, _):
# TODO: this creates side effect
assert self.hook.location is None
self.hook.run_query(sql="select 1", location="US")
assert run_with_config.call_count == 1
assert self.hook.location == "US"

def test_bigquery_insert_rows_not_implemented(self):
with pytest.raises(NotImplementedError):
self.hook.insert_rows(table="table", rows=[1, 2])
Expand Down