diff --git a/airflow/providers/google/cloud/sensors/bigquery.py b/airflow/providers/google/cloud/sensors/bigquery.py index 13bc84b9d45f..b2ce055ba1de 100644 --- a/airflow/providers/google/cloud/sensors/bigquery.py +++ b/airflow/providers/google/cloud/sensors/bigquery.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. """This module contains a Google Bigquery sensor.""" +import warnings from typing import TYPE_CHECKING, Optional, Sequence, Union from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook @@ -25,6 +26,11 @@ from airflow.utils.context import Context +_DEPRECATION_MSG = ( + "The bigquery_conn_id parameter has been deprecated. You should pass the gcp_conn_id parameter." +) + + class BigQueryTableExistenceSensor(BaseSensorOperator): """ Checks for the existence of a table in Google Bigquery. @@ -35,8 +41,9 @@ class BigQueryTableExistenceSensor(BaseSensorOperator): :param dataset_id: The name of the dataset in which to look for the table. storage bucket. :param table_id: The name of the table to check the existence of. - :param bigquery_conn_id: The connection ID to use when connecting to - Google BigQuery. + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud. + :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud. + This parameter has been deprecated. You should pass the gcp_conn_id parameter instead. :param delegate_to: The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have domain-wide delegation enabled. @@ -64,17 +71,23 @@ def __init__( project_id: str, dataset_id: str, table_id: str, - bigquery_conn_id: str = 'google_cloud_default', + gcp_conn_id: str = 'google_cloud_default', + bigquery_conn_id: Optional[str] = None, delegate_to: Optional[str] = None, impersonation_chain: Optional[Union[str, Sequence[str]]] = None, **kwargs, ) -> None: super().__init__(**kwargs) + + if bigquery_conn_id: + warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3) + gcp_conn_id = bigquery_conn_id + self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id - self.bigquery_conn_id = bigquery_conn_id + self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to self.impersonation_chain = impersonation_chain @@ -82,7 +95,7 @@ def poke(self, context: 'Context') -> bool: table_uri = f'{self.project_id}:{self.dataset_id}.{self.table_id}' self.log.info('Sensor checks existence of table: %s', table_uri) hook = BigQueryHook( - bigquery_conn_id=self.bigquery_conn_id, + gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to, impersonation_chain=self.impersonation_chain, ) @@ -102,8 +115,9 @@ class BigQueryTablePartitionExistenceSensor(BaseSensorOperator): storage bucket. :param table_id: The name of the table to check the existence of. :param partition_id: The name of the partition to check the existence of. - :param bigquery_conn_id: The connection ID to use when connecting to - Google BigQuery. + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud. + :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud. + This parameter has been deprecated. You should pass the gcp_conn_id parameter instead. :param delegate_to: The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. @@ -133,18 +147,24 @@ def __init__( dataset_id: str, table_id: str, partition_id: str, - bigquery_conn_id: str = 'google_cloud_default', + gcp_conn_id: str = 'google_cloud_default', + bigquery_conn_id: Optional[str] = None, delegate_to: Optional[str] = None, impersonation_chain: Optional[Union[str, Sequence[str]]] = None, **kwargs, ) -> None: super().__init__(**kwargs) + + if bigquery_conn_id: + warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3) + gcp_conn_id = bigquery_conn_id + self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id self.partition_id = partition_id - self.bigquery_conn_id = bigquery_conn_id + self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to self.impersonation_chain = impersonation_chain @@ -152,7 +172,7 @@ def poke(self, context: 'Context') -> bool: table_uri = f'{self.project_id}:{self.dataset_id}.{self.table_id}' self.log.info('Sensor checks existence of partition: "%s" in table: %s', self.partition_id, table_uri) hook = BigQueryHook( - bigquery_conn_id=self.bigquery_conn_id, + gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to, impersonation_chain=self.impersonation_chain, ) diff --git a/tests/providers/google/cloud/sensors/test_bigquery.py b/tests/providers/google/cloud/sensors/test_bigquery.py index 488224f6249d..87ec3dbacb1b 100644 --- a/tests/providers/google/cloud/sensors/test_bigquery.py +++ b/tests/providers/google/cloud/sensors/test_bigquery.py @@ -39,7 +39,7 @@ def test_passing_arguments_to_hook(self, mock_hook): project_id=TEST_PROJECT_ID, dataset_id=TEST_DATASET_ID, table_id=TEST_TABLE_ID, - bigquery_conn_id=TEST_GCP_CONN_ID, + gcp_conn_id=TEST_GCP_CONN_ID, delegate_to=TEST_DELEGATE_TO, impersonation_chain=TEST_IMPERSONATION_CHAIN, ) @@ -49,7 +49,7 @@ def test_passing_arguments_to_hook(self, mock_hook): assert results is True mock_hook.assert_called_once_with( - bigquery_conn_id=TEST_GCP_CONN_ID, + gcp_conn_id=TEST_GCP_CONN_ID, delegate_to=TEST_DELEGATE_TO, impersonation_chain=TEST_IMPERSONATION_CHAIN, ) @@ -67,7 +67,7 @@ def test_passing_arguments_to_hook(self, mock_hook): dataset_id=TEST_DATASET_ID, table_id=TEST_TABLE_ID, partition_id=TEST_PARTITION_ID, - bigquery_conn_id=TEST_GCP_CONN_ID, + gcp_conn_id=TEST_GCP_CONN_ID, delegate_to=TEST_DELEGATE_TO, impersonation_chain=TEST_IMPERSONATION_CHAIN, ) @@ -77,7 +77,7 @@ def test_passing_arguments_to_hook(self, mock_hook): assert results is True mock_hook.assert_called_once_with( - bigquery_conn_id=TEST_GCP_CONN_ID, + gcp_conn_id=TEST_GCP_CONN_ID, delegate_to=TEST_DELEGATE_TO, impersonation_chain=TEST_IMPERSONATION_CHAIN, )