Skip to content

Commit

Permalink
Normalize *_conn_id parameters in BigQuery sensors (#21430)
Browse files Browse the repository at this point in the history
It fixes deprecation warning in `BigQueryHook` because of `bigquery_conn_id` parameter usage from sensors code.
Note: similar change for BigQuery operators was already performed in commit 042a9ba
  • Loading branch information
Rafał Harabień committed Feb 15, 2022
1 parent a9b8ac5 commit 6692e91
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 14 deletions.
40 changes: 30 additions & 10 deletions airflow/providers/google/cloud/sensors/bigquery.py
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""This module contains a Google Bigquery sensor."""
import warnings
from typing import TYPE_CHECKING, Optional, Sequence, Union

from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
Expand All @@ -25,6 +26,11 @@
from airflow.utils.context import Context


_DEPRECATION_MSG = (
"The bigquery_conn_id parameter has been deprecated. You should pass the gcp_conn_id parameter."
)


class BigQueryTableExistenceSensor(BaseSensorOperator):
"""
Checks for the existence of a table in Google Bigquery.
Expand All @@ -35,8 +41,9 @@ class BigQueryTableExistenceSensor(BaseSensorOperator):
:param dataset_id: The name of the dataset in which to look for the table.
storage bucket.
:param table_id: The name of the table to check the existence of.
:param bigquery_conn_id: The connection ID to use when connecting to
Google BigQuery.
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
:param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
:param delegate_to: The account to impersonate using domain-wide delegation of authority,
if any. For this to work, the service account making the request must have
domain-wide delegation enabled.
Expand Down Expand Up @@ -64,25 +71,31 @@ def __init__(
project_id: str,
dataset_id: str,
table_id: str,
bigquery_conn_id: str = 'google_cloud_default',
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:

super().__init__(**kwargs)

if bigquery_conn_id:
warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
gcp_conn_id = bigquery_conn_id

self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.bigquery_conn_id = bigquery_conn_id
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain

def poke(self, context: 'Context') -> bool:
table_uri = f'{self.project_id}:{self.dataset_id}.{self.table_id}'
self.log.info('Sensor checks existence of table: %s', table_uri)
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
Expand All @@ -102,8 +115,9 @@ class BigQueryTablePartitionExistenceSensor(BaseSensorOperator):
storage bucket.
:param table_id: The name of the table to check the existence of.
:param partition_id: The name of the partition to check the existence of.
:param bigquery_conn_id: The connection ID to use when connecting to
Google BigQuery.
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
:param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must
have domain-wide delegation enabled.
Expand Down Expand Up @@ -133,26 +147,32 @@ def __init__(
dataset_id: str,
table_id: str,
partition_id: str,
bigquery_conn_id: str = 'google_cloud_default',
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:

super().__init__(**kwargs)

if bigquery_conn_id:
warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
gcp_conn_id = bigquery_conn_id

self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.partition_id = partition_id
self.bigquery_conn_id = bigquery_conn_id
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain

def poke(self, context: 'Context') -> bool:
table_uri = f'{self.project_id}:{self.dataset_id}.{self.table_id}'
self.log.info('Sensor checks existence of partition: "%s" in table: %s', self.partition_id, table_uri)
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
Expand Down
8 changes: 4 additions & 4 deletions tests/providers/google/cloud/sensors/test_bigquery.py
Expand Up @@ -39,7 +39,7 @@ def test_passing_arguments_to_hook(self, mock_hook):
project_id=TEST_PROJECT_ID,
dataset_id=TEST_DATASET_ID,
table_id=TEST_TABLE_ID,
bigquery_conn_id=TEST_GCP_CONN_ID,
gcp_conn_id=TEST_GCP_CONN_ID,
delegate_to=TEST_DELEGATE_TO,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
Expand All @@ -49,7 +49,7 @@ def test_passing_arguments_to_hook(self, mock_hook):
assert results is True

mock_hook.assert_called_once_with(
bigquery_conn_id=TEST_GCP_CONN_ID,
gcp_conn_id=TEST_GCP_CONN_ID,
delegate_to=TEST_DELEGATE_TO,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
Expand All @@ -67,7 +67,7 @@ def test_passing_arguments_to_hook(self, mock_hook):
dataset_id=TEST_DATASET_ID,
table_id=TEST_TABLE_ID,
partition_id=TEST_PARTITION_ID,
bigquery_conn_id=TEST_GCP_CONN_ID,
gcp_conn_id=TEST_GCP_CONN_ID,
delegate_to=TEST_DELEGATE_TO,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
Expand All @@ -77,7 +77,7 @@ def test_passing_arguments_to_hook(self, mock_hook):
assert results is True

mock_hook.assert_called_once_with(
bigquery_conn_id=TEST_GCP_CONN_ID,
gcp_conn_id=TEST_GCP_CONN_ID,
delegate_to=TEST_DELEGATE_TO,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
Expand Down

0 comments on commit 6692e91

Please sign in to comment.