Skip to content

Commit

Permalink
Add priority parameter to BigQueryHook (#30655)
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-w committed Apr 24, 2023
1 parent 62ea0ff commit 48c9625
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 19 deletions.
43 changes: 36 additions & 7 deletions airflow/providers/google/cloud/hooks/bigquery.py
Expand Up @@ -76,6 +76,9 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
:param gcp_conn_id: The Airflow connection used for GCP credentials.
:param use_legacy_sql: This specifies whether to use legacy SQL dialect.
:param location: The location of the BigQuery resource.
:param priority: Specifies a priority for the query.
Possible values include INTERACTIVE and BATCH.
The default value is INTERACTIVE.
:param api_resource_configs: This contains params configuration applied for Google BigQuery jobs.
:param impersonation_chain: This is the optional service account to impersonate using short term
credentials.
Expand All @@ -92,6 +95,7 @@ def __init__(
gcp_conn_id: str = GoogleBaseHook.default_conn_name,
use_legacy_sql: bool = True,
location: str | None = None,
priority: str = "INTERACTIVE",
api_resource_configs: dict | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
Expand All @@ -108,6 +112,7 @@ def __init__(
)
self.use_legacy_sql = use_legacy_sql
self.location = location
self.priority = priority
self.running_job_id: str | None = None
self.api_resource_configs: dict = api_resource_configs if api_resource_configs else {}
self.labels = labels
Expand Down Expand Up @@ -554,6 +559,9 @@ def create_external_table(
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
This method is deprecated.
Please use `BigQueryHook.create_empty_table` method with passing the `table_resource` object
for more details about these parameters.
:param external_project_dataset_table:
Expand Down Expand Up @@ -743,6 +751,8 @@ def patch_table(
Patch information in an existing table.
It only updates fields that are provided in the request object.
This method is deprecated. Please use `BigQueryHook.update_table`
Reference: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/patch
:param dataset_id: The dataset containing the table to be patched.
Expand Down Expand Up @@ -928,6 +938,9 @@ def patch_dataset(self, dataset_id: str, dataset_resource: dict, project_id: str
"""
Patches information in an existing dataset.
It only replaces fields that are provided in the submitted dataset resource.
This method is deprecated. Please use `update_dataset`
More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/patch
Expand Down Expand Up @@ -972,6 +985,8 @@ def get_dataset_tables_list(
Method returns tables list of a BigQuery tables. If table prefix is specified,
only tables beginning by it are returned.
This method is deprecated. Please use `get_dataset_tables`
For more information, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list
Expand Down Expand Up @@ -1162,6 +1177,8 @@ def run_table_delete(self, deletion_dataset_table: str, ignore_if_missing: bool
If the table does not exist, return an error unless ignore_if_missing
is set to True.
This method is deprecated. Please use `delete_table`
:param deletion_dataset_table: A dotted
``(<project>.|<project>:)<dataset>.<table>`` that indicates which table
will be deleted.
Expand Down Expand Up @@ -1206,6 +1223,9 @@ def get_tabledata(
) -> list[dict]:
"""
Get the data of a given dataset.table and optionally with selected columns.
This method is deprecated. Please use `list_rows`
see https://cloud.google.com/bigquery/docs/reference/v2/tabledata/list
:param dataset_id: the dataset ID of the requested table.
Expand Down Expand Up @@ -1574,6 +1594,8 @@ def run_with_configuration(self, configuration: dict) -> str:
https://cloud.google.com/bigquery/docs/reference/v2/jobs
This method is deprecated. Please use `BigQueryHook.insert_job`
For more details about the configuration parameter.
:param configuration: The configuration parameter maps directly to
Expand Down Expand Up @@ -1617,6 +1639,8 @@ def run_load(
https://cloud.google.com/bigquery/docs/reference/v2/jobs
This method is deprecated. Please use `BigQueryHook.insert_job` method.
For more details about these parameters.
:param destination_project_dataset_table:
Expand Down Expand Up @@ -1841,6 +1865,8 @@ def run_copy(
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy
This method is deprecated. Please use `BigQueryHook.insert_job` method.
For more details about these parameters.
:param source_project_dataset_tables: One or more dotted
Expand Down Expand Up @@ -1928,6 +1954,8 @@ def run_extract(
https://cloud.google.com/bigquery/docs/reference/v2/jobs
This method is deprecated. Please use `BigQueryHook.insert_job` method.
For more details about these parameters.
:param source_project_dataset_table: The dotted ``<dataset>.<table>``
Expand Down Expand Up @@ -2000,7 +2028,7 @@ def run_query(
query_params: list | None = None,
labels: dict | None = None,
schema_update_options: Iterable | None = None,
priority: str = "INTERACTIVE",
priority: str | None = None,
time_partitioning: dict | None = None,
api_resource_configs: dict | None = None,
cluster_fields: list[str] | None = None,
Expand All @@ -2013,6 +2041,8 @@ def run_query(
https://cloud.google.com/bigquery/docs/reference/v2/jobs
This method is deprecated. Please use `BigQueryHook.insert_job` method.
For more details about these parameters.
:param sql: The BigQuery SQL to execute.
Expand Down Expand Up @@ -2051,7 +2081,7 @@ def run_query(
table to be updated as a side effect of the query job.
:param priority: Specifies a priority for the query.
Possible values include INTERACTIVE and BATCH.
The default value is INTERACTIVE.
If `None`, defaults to `self.priority`.
:param time_partitioning: configure optional time partitioning fields i.e.
partition by field, type and expiration as per API specifications.
:param cluster_fields: Request that the result of this query be stored sorted
Expand All @@ -2076,12 +2106,11 @@ def run_query(
labels = labels or self.labels
schema_update_options = list(schema_update_options or [])

priority = priority or self.priority

if time_partitioning is None:
time_partitioning = {}

if location:
self.location = location

if not api_resource_configs:
api_resource_configs = self.api_resource_configs
else:
Expand Down Expand Up @@ -2133,7 +2162,7 @@ def run_query(

query_param_list: list[tuple[Any, str, str | bool | None | dict, type | tuple[type]]] = [
(sql, "query", None, (str,)),
(priority, "priority", "INTERACTIVE", (str,)),
(priority, "priority", priority, (str,)),
(use_legacy_sql, "useLegacySql", self.use_legacy_sql, bool),
(query_params, "queryParameters", None, list),
(udf_config, "userDefinedFunctionResources", None, list),
Expand Down Expand Up @@ -2202,7 +2231,7 @@ def run_query(
if encryption_configuration:
configuration["query"]["destinationEncryptionConfiguration"] = encryption_configuration

job = self.insert_job(configuration=configuration, project_id=self.project_id)
job = self.insert_job(configuration=configuration, project_id=self.project_id, location=location)
self.running_job_id = job.job_id
return job.job_id

Expand Down
18 changes: 6 additions & 12 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Expand Up @@ -93,15 +93,6 @@ def test_bigquery_client_creation(self, mock_build, mock_authorize, mock_bigquer
)
assert mock_bigquery_connection.return_value == result

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_service")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.insert_job")
def test_location_propagates_properly(self, run_with_config, _):
# TODO: this creates side effect
assert self.hook.location is None
self.hook.run_query(sql="select 1", location="US")
assert run_with_config.call_count == 1
assert self.hook.location == "US"

def test_bigquery_insert_rows_not_implemented(self):
with pytest.raises(NotImplementedError):
self.hook.insert_rows(table="table", rows=[1, 2])
Expand Down Expand Up @@ -1228,7 +1219,7 @@ def test_execute_with_parameters(self, mock_insert, _):
"schemaUpdateOptions": [],
}
}
mock_insert.assert_called_once_with(configuration=conf, project_id=PROJECT_ID)
mock_insert.assert_called_once_with(configuration=conf, project_id=PROJECT_ID, location=None)

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_service")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.insert_job")
Expand All @@ -1239,6 +1230,7 @@ def test_execute_many(self, mock_insert, _):
mock_insert.assert_has_calls(
[
mock.call(
location=None,
configuration={
"query": {
"query": "SELECT 'bar'",
Expand All @@ -1250,6 +1242,7 @@ def test_execute_many(self, mock_insert, _):
project_id=PROJECT_ID,
),
mock.call(
location=None,
configuration={
"query": {
"query": "SELECT 'baz'",
Expand Down Expand Up @@ -1704,13 +1697,14 @@ def test_run_query_with_arg(self, mock_insert):
self.hook.run_query(
sql="select 1",
destination_dataset_table=f"{DATASET_ID}.{TABLE_ID}",
priority="BATCH",
time_partitioning={"type": "DAY", "field": "test_field", "expirationMs": 1000},
)

configuration = {
"query": {
"query": "select 1",
"priority": "INTERACTIVE",
"priority": "BATCH",
"useLegacySql": True,
"timePartitioning": {"type": "DAY", "field": "test_field", "expirationMs": 1000},
"schemaUpdateOptions": [],
Expand All @@ -1722,7 +1716,7 @@ def test_run_query_with_arg(self, mock_insert):
}
}

mock_insert.assert_called_once_with(configuration=configuration, project_id=PROJECT_ID)
mock_insert.assert_called_once_with(configuration=configuration, project_id=PROJECT_ID, location=None)

def test_dollar_makes_partition(self):
tp_out = _cleanse_time_partitioning("test.teast$20170101", {})
Expand Down
Expand Up @@ -105,6 +105,7 @@
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=location,
Expand Down
Expand Up @@ -134,6 +134,7 @@
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=LOCATION,
Expand Down

0 comments on commit 48c9625

Please sign in to comment.