Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize *_conn_id parameters in BigQuery sensors #21430

Merged
merged 3 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions airflow/providers/google/cloud/sensors/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""This module contains a Google Bigquery sensor."""
import warnings
from typing import TYPE_CHECKING, Optional, Sequence, Union

from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
Expand All @@ -25,6 +26,11 @@
from airflow.utils.context import Context


_DEPRECATION_MSG = (
"The bigquery_conn_id parameter has been deprecated. You should pass the gcp_conn_id parameter."
)


class BigQueryTableExistenceSensor(BaseSensorOperator):
"""
Checks for the existence of a table in Google Bigquery.
Expand All @@ -35,8 +41,9 @@ class BigQueryTableExistenceSensor(BaseSensorOperator):
:param dataset_id: The name of the dataset in which to look for the table.
storage bucket.
:param table_id: The name of the table to check the existence of.
:param bigquery_conn_id: The connection ID to use when connecting to
Google BigQuery.
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
:param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
:param delegate_to: The account to impersonate using domain-wide delegation of authority,
if any. For this to work, the service account making the request must have
domain-wide delegation enabled.
Expand Down Expand Up @@ -64,25 +71,31 @@ def __init__(
project_id: str,
dataset_id: str,
table_id: str,
bigquery_conn_id: str = 'google_cloud_default',
gcp_conn_id: str = 'google_cloud_default',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an idea for further improvement - maybe it would be good to introduce constant for this value and use it in all cases?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

bigquery_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:

super().__init__(**kwargs)

if bigquery_conn_id:
warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
gcp_conn_id = bigquery_conn_id

self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.bigquery_conn_id = bigquery_conn_id
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain

def poke(self, context: 'Context') -> bool:
table_uri = f'{self.project_id}:{self.dataset_id}.{self.table_id}'
self.log.info('Sensor checks existence of table: %s', table_uri)
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
Expand All @@ -102,8 +115,9 @@ class BigQueryTablePartitionExistenceSensor(BaseSensorOperator):
storage bucket.
:param table_id: The name of the table to check the existence of.
:param partition_id: The name of the partition to check the existence of.
:param bigquery_conn_id: The connection ID to use when connecting to
Google BigQuery.
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
:param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must
have domain-wide delegation enabled.
Expand Down Expand Up @@ -133,26 +147,32 @@ def __init__(
dataset_id: str,
table_id: str,
partition_id: str,
bigquery_conn_id: str = 'google_cloud_default',
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:

super().__init__(**kwargs)

if bigquery_conn_id:
warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
gcp_conn_id = bigquery_conn_id

self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.partition_id = partition_id
self.bigquery_conn_id = bigquery_conn_id
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain

def poke(self, context: 'Context') -> bool:
table_uri = f'{self.project_id}:{self.dataset_id}.{self.table_id}'
self.log.info('Sensor checks existence of partition: "%s" in table: %s', self.partition_id, table_uri)
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
Expand Down
8 changes: 4 additions & 4 deletions tests/providers/google/cloud/sensors/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_passing_arguments_to_hook(self, mock_hook):
project_id=TEST_PROJECT_ID,
dataset_id=TEST_DATASET_ID,
table_id=TEST_TABLE_ID,
bigquery_conn_id=TEST_GCP_CONN_ID,
gcp_conn_id=TEST_GCP_CONN_ID,
delegate_to=TEST_DELEGATE_TO,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
Expand All @@ -49,7 +49,7 @@ def test_passing_arguments_to_hook(self, mock_hook):
assert results is True

mock_hook.assert_called_once_with(
bigquery_conn_id=TEST_GCP_CONN_ID,
gcp_conn_id=TEST_GCP_CONN_ID,
delegate_to=TEST_DELEGATE_TO,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
Expand All @@ -67,7 +67,7 @@ def test_passing_arguments_to_hook(self, mock_hook):
dataset_id=TEST_DATASET_ID,
table_id=TEST_TABLE_ID,
partition_id=TEST_PARTITION_ID,
bigquery_conn_id=TEST_GCP_CONN_ID,
gcp_conn_id=TEST_GCP_CONN_ID,
delegate_to=TEST_DELEGATE_TO,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
Expand All @@ -77,7 +77,7 @@ def test_passing_arguments_to_hook(self, mock_hook):
assert results is True

mock_hook.assert_called_once_with(
bigquery_conn_id=TEST_GCP_CONN_ID,
gcp_conn_id=TEST_GCP_CONN_ID,
delegate_to=TEST_DELEGATE_TO,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
Expand Down