diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index e182090ee16ae..7dde1530b636d 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -76,6 +76,9 @@ class BigQueryHook(GoogleBaseHook, DbApiHook): :param gcp_conn_id: The Airflow connection used for GCP credentials. :param use_legacy_sql: This specifies whether to use legacy SQL dialect. :param location: The location of the BigQuery resource. + :param priority: Specifies a priority for the query. + Possible values include INTERACTIVE and BATCH. + The default value is INTERACTIVE. :param api_resource_configs: This contains params configuration applied for Google BigQuery jobs. :param impersonation_chain: This is the optional service account to impersonate using short term credentials. @@ -92,6 +95,7 @@ def __init__( gcp_conn_id: str = GoogleBaseHook.default_conn_name, use_legacy_sql: bool = True, location: str | None = None, + priority: str = "INTERACTIVE", api_resource_configs: dict | None = None, impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, @@ -108,6 +112,7 @@ def __init__( ) self.use_legacy_sql = use_legacy_sql self.location = location + self.priority = priority self.running_job_id: str | None = None self.api_resource_configs: dict = api_resource_configs if api_resource_configs else {} self.labels = labels @@ -554,6 +559,9 @@ def create_external_table( https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource + This method is deprecated. + Please use `BigQueryHook.create_empty_table` method with passing the `table_resource` object + for more details about these parameters. :param external_project_dataset_table: @@ -743,6 +751,8 @@ def patch_table( Patch information in an existing table. It only updates fields that are provided in the request object. + This method is deprecated. Please use `BigQueryHook.update_table` + Reference: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/patch :param dataset_id: The dataset containing the table to be patched. @@ -928,6 +938,9 @@ def patch_dataset(self, dataset_id: str, dataset_resource: dict, project_id: str """ Patches information in an existing dataset. It only replaces fields that are provided in the submitted dataset resource. + + This method is deprecated. Please use `update_dataset` + More info: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/patch @@ -972,6 +985,8 @@ def get_dataset_tables_list( Method returns tables list of a BigQuery tables. If table prefix is specified, only tables beginning by it are returned. + This method is deprecated. Please use `get_dataset_tables` + For more information, see: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list @@ -1162,6 +1177,8 @@ def run_table_delete(self, deletion_dataset_table: str, ignore_if_missing: bool If the table does not exist, return an error unless ignore_if_missing is set to True. + This method is deprecated. Please use `delete_table` + :param deletion_dataset_table: A dotted ``(.|:).`` that indicates which table will be deleted. @@ -1206,6 +1223,9 @@ def get_tabledata( ) -> list[dict]: """ Get the data of a given dataset.table and optionally with selected columns. + + This method is deprecated. Please use `list_rows` + see https://cloud.google.com/bigquery/docs/reference/v2/tabledata/list :param dataset_id: the dataset ID of the requested table. @@ -1574,6 +1594,8 @@ def run_with_configuration(self, configuration: dict) -> str: https://cloud.google.com/bigquery/docs/reference/v2/jobs + This method is deprecated. Please use `BigQueryHook.insert_job` + For more details about the configuration parameter. :param configuration: The configuration parameter maps directly to @@ -1617,6 +1639,8 @@ def run_load( https://cloud.google.com/bigquery/docs/reference/v2/jobs + This method is deprecated. Please use `BigQueryHook.insert_job` method. + For more details about these parameters. :param destination_project_dataset_table: @@ -1841,6 +1865,8 @@ def run_copy( https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy + This method is deprecated. Please use `BigQueryHook.insert_job` method. + For more details about these parameters. :param source_project_dataset_tables: One or more dotted @@ -1928,6 +1954,8 @@ def run_extract( https://cloud.google.com/bigquery/docs/reference/v2/jobs + This method is deprecated. Please use `BigQueryHook.insert_job` method. + For more details about these parameters. :param source_project_dataset_table: The dotted ``.
`` @@ -2000,7 +2028,7 @@ def run_query( query_params: list | None = None, labels: dict | None = None, schema_update_options: Iterable | None = None, - priority: str = "INTERACTIVE", + priority: str | None = None, time_partitioning: dict | None = None, api_resource_configs: dict | None = None, cluster_fields: list[str] | None = None, @@ -2013,6 +2041,8 @@ def run_query( https://cloud.google.com/bigquery/docs/reference/v2/jobs + This method is deprecated. Please use `BigQueryHook.insert_job` method. + For more details about these parameters. :param sql: The BigQuery SQL to execute. @@ -2051,7 +2081,7 @@ def run_query( table to be updated as a side effect of the query job. :param priority: Specifies a priority for the query. Possible values include INTERACTIVE and BATCH. - The default value is INTERACTIVE. + If `None`, defaults to `self.priority`. :param time_partitioning: configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications. :param cluster_fields: Request that the result of this query be stored sorted @@ -2076,12 +2106,11 @@ def run_query( labels = labels or self.labels schema_update_options = list(schema_update_options or []) + priority = priority or self.priority + if time_partitioning is None: time_partitioning = {} - if location: - self.location = location - if not api_resource_configs: api_resource_configs = self.api_resource_configs else: @@ -2133,7 +2162,7 @@ def run_query( query_param_list: list[tuple[Any, str, str | bool | None | dict, type | tuple[type]]] = [ (sql, "query", None, (str,)), - (priority, "priority", "INTERACTIVE", (str,)), + (priority, "priority", priority, (str,)), (use_legacy_sql, "useLegacySql", self.use_legacy_sql, bool), (query_params, "queryParameters", None, list), (udf_config, "userDefinedFunctionResources", None, list), @@ -2202,7 +2231,7 @@ def run_query( if encryption_configuration: configuration["query"]["destinationEncryptionConfiguration"] = encryption_configuration - job = self.insert_job(configuration=configuration, project_id=self.project_id) + job = self.insert_job(configuration=configuration, project_id=self.project_id, location=location) self.running_job_id = job.job_id return job.job_id diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py b/tests/providers/google/cloud/hooks/test_bigquery.py index f57700b7f5627..0f66ca1208bd9 100644 --- a/tests/providers/google/cloud/hooks/test_bigquery.py +++ b/tests/providers/google/cloud/hooks/test_bigquery.py @@ -93,15 +93,6 @@ def test_bigquery_client_creation(self, mock_build, mock_authorize, mock_bigquer ) assert mock_bigquery_connection.return_value == result - @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_service") - @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.insert_job") - def test_location_propagates_properly(self, run_with_config, _): - # TODO: this creates side effect - assert self.hook.location is None - self.hook.run_query(sql="select 1", location="US") - assert run_with_config.call_count == 1 - assert self.hook.location == "US" - def test_bigquery_insert_rows_not_implemented(self): with pytest.raises(NotImplementedError): self.hook.insert_rows(table="table", rows=[1, 2]) @@ -1228,7 +1219,7 @@ def test_execute_with_parameters(self, mock_insert, _): "schemaUpdateOptions": [], } } - mock_insert.assert_called_once_with(configuration=conf, project_id=PROJECT_ID) + mock_insert.assert_called_once_with(configuration=conf, project_id=PROJECT_ID, location=None) @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_service") @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.insert_job") @@ -1239,6 +1230,7 @@ def test_execute_many(self, mock_insert, _): mock_insert.assert_has_calls( [ mock.call( + location=None, configuration={ "query": { "query": "SELECT 'bar'", @@ -1250,6 +1242,7 @@ def test_execute_many(self, mock_insert, _): project_id=PROJECT_ID, ), mock.call( + location=None, configuration={ "query": { "query": "SELECT 'baz'", @@ -1704,13 +1697,14 @@ def test_run_query_with_arg(self, mock_insert): self.hook.run_query( sql="select 1", destination_dataset_table=f"{DATASET_ID}.{TABLE_ID}", + priority="BATCH", time_partitioning={"type": "DAY", "field": "test_field", "expirationMs": 1000}, ) configuration = { "query": { "query": "select 1", - "priority": "INTERACTIVE", + "priority": "BATCH", "useLegacySql": True, "timePartitioning": {"type": "DAY", "field": "test_field", "expirationMs": 1000}, "schemaUpdateOptions": [], @@ -1722,7 +1716,7 @@ def test_run_query_with_arg(self, mock_insert): } } - mock_insert.assert_called_once_with(configuration=configuration, project_id=PROJECT_ID) + mock_insert.assert_called_once_with(configuration=configuration, project_id=PROJECT_ID, location=None) def test_dollar_makes_partition(self): tp_out = _cleanse_time_partitioning("test.teast$20170101", {}) diff --git a/tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py b/tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py index 93ef73fb44687..d5c3f449fd4aa 100644 --- a/tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py +++ b/tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py @@ -105,6 +105,7 @@ "query": { "query": INSERT_ROWS_QUERY, "useLegacySql": False, + "priority": "BATCH", } }, location=location, diff --git a/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py b/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py index 21410f815abf5..334c2b4026dd3 100644 --- a/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py +++ b/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py @@ -134,6 +134,7 @@ "query": { "query": INSERT_ROWS_QUERY, "useLegacySql": False, + "priority": "BATCH", } }, location=LOCATION,