diff --git a/providers/google/docs/connections/bigquery.rst b/providers/google/docs/connections/bigquery.rst index c596d3e86b1c4..9f6f58a4b9b49 100644 --- a/providers/google/docs/connections/bigquery.rst +++ b/providers/google/docs/connections/bigquery.rst @@ -38,7 +38,9 @@ Impersonation Scopes Use Legacy SQL - Whether or not the connection should utilize legacy SQL. + Whether or not the connection should utilize legacy SQL. GoogleSQL is the recommended dialect for + BigQuery. BigQuery legacy SQL availability is restricted after June 1, 2026, based on legacy SQL + usage during Google's evaluation period. Location One of `BigQuery locations `_ where the dataset resides. diff --git a/providers/google/docs/operators/cloud/bigquery.rst b/providers/google/docs/operators/cloud/bigquery.rst index 41c49dd5e0311..0da85926f97cc 100644 --- a/providers/google/docs/operators/cloud/bigquery.rst +++ b/providers/google/docs/operators/cloud/bigquery.rst @@ -27,6 +27,16 @@ analyzing data to find meaningful insights using familiar SQL. Airflow provides operators to manage datasets and tables, run queries and validate data. +.. note:: + + GoogleSQL is the recommended dialect for BigQuery. BigQuery legacy SQL availability is restricted + after June 1, 2026, based on legacy SQL usage during Google's evaluation period. In Airflow, the + implicit default for older BigQuery operators that expose ``use_legacy_sql`` is deprecated and will + change from ``True`` to ``False`` in a future provider release. Set ``use_legacy_sql=True`` + explicitly if you still need legacy SQL, or set ``use_legacy_sql=False`` to use GoogleSQL. + For more information, see + `Legacy SQL feature availability `__. + Prerequisite Tasks ^^^^^^^^^^^^^^^^^^ diff --git a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py index 280ae79422bb1..d0e726b41bf32 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py @@ -63,6 +63,15 @@ from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.utils.helpers import exactly_one +try: + from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet, is_arg_set +except ImportError: + from airflow.utils.types import NOTSET, ArgNotSet # type: ignore[attr-defined,no-redef] + + def is_arg_set(value): # type: ignore[misc,no-redef] + return value is not NOTSET + + if TYPE_CHECKING: from google.api_core.retry import Retry from google.cloud.bigquery import UnknownJob @@ -71,10 +80,27 @@ BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={job_id}" +BIGQUERY_LEGACY_SQL_DEFAULT_WARNING = ( + "The default value of `use_legacy_sql` is deprecated and will change from `True` to `False` " + "in a future provider release. Set `use_legacy_sql=True` explicitly if you need legacy SQL, " + "or set `use_legacy_sql=False` to use GoogleSQL." +) LABEL_REGEX = re.compile(r"^[\w-]{0,63}$") +def _resolve_use_legacy_sql(use_legacy_sql: bool | ArgNotSet) -> bool: + if is_arg_set(use_legacy_sql): + return use_legacy_sql + + warnings.warn( + BIGQUERY_LEGACY_SQL_DEFAULT_WARNING, + AirflowProviderDeprecationWarning, + stacklevel=3, + ) + return True + + class BigQueryUIColors(enum.Enum): """Hex colors for BigQuery operators.""" @@ -228,7 +254,7 @@ def __init__( sql: str, gcp_conn_id: str = "google_cloud_default", project_id: str = PROVIDE_PROJECT_ID, - use_legacy_sql: bool = True, + use_legacy_sql: bool | ArgNotSet = NOTSET, location: str | None = None, impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, @@ -240,7 +266,7 @@ def __init__( ) -> None: super().__init__(sql=sql, **kwargs) self.gcp_conn_id = gcp_conn_id - self.use_legacy_sql = use_legacy_sql + self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql) self.location = location self.impersonation_chain = impersonation_chain self.labels = labels @@ -386,7 +412,7 @@ def __init__( encryption_configuration: dict | None = None, gcp_conn_id: str = "google_cloud_default", project_id: str = PROVIDE_PROJECT_ID, - use_legacy_sql: bool = True, + use_legacy_sql: bool | ArgNotSet = NOTSET, location: str | None = None, impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, @@ -397,7 +423,7 @@ def __init__( super().__init__(sql=sql, pass_value=pass_value, tolerance=tolerance, **kwargs) self.location = location self.gcp_conn_id = gcp_conn_id - self.use_legacy_sql = use_legacy_sql + self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql) self.encryption_configuration = encryption_configuration self.impersonation_chain = impersonation_chain self.labels = labels @@ -548,7 +574,7 @@ def __init__( date_filter_column: str = "ds", days_back: SupportsAbs[int] = -7, gcp_conn_id: str = "google_cloud_default", - use_legacy_sql: bool = True, + use_legacy_sql: bool | ArgNotSet = NOTSET, location: str | None = None, encryption_configuration: dict | None = None, impersonation_chain: str | Sequence[str] | None = None, @@ -567,7 +593,7 @@ def __init__( ) self.gcp_conn_id = gcp_conn_id - self.use_legacy_sql = use_legacy_sql + self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql) self.location = location self.encryption_configuration = encryption_configuration self.impersonation_chain = impersonation_chain @@ -700,7 +726,7 @@ def __init__( encryption_configuration: dict | None = None, gcp_conn_id: str = "google_cloud_default", project_id: str = PROVIDE_PROJECT_ID, - use_legacy_sql: bool = True, + use_legacy_sql: bool | ArgNotSet = NOTSET, location: str | None = None, impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, @@ -721,7 +747,7 @@ def __init__( self.accept_none = accept_none self.gcp_conn_id = gcp_conn_id self.encryption_configuration = encryption_configuration - self.use_legacy_sql = use_legacy_sql + self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql) self.location = location self.impersonation_chain = impersonation_chain self.labels = labels @@ -841,7 +867,7 @@ def __init__( partition_clause: str | None = None, gcp_conn_id: str = "google_cloud_default", project_id: str = PROVIDE_PROJECT_ID, - use_legacy_sql: bool = True, + use_legacy_sql: bool | ArgNotSet = NOTSET, location: str | None = None, impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, @@ -850,7 +876,7 @@ def __init__( ) -> None: super().__init__(table=table, checks=checks, partition_clause=partition_clause, **kwargs) self.gcp_conn_id = gcp_conn_id - self.use_legacy_sql = use_legacy_sql + self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql) self.location = location self.impersonation_chain = impersonation_chain self.labels = labels @@ -1036,7 +1062,7 @@ def __init__( deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), poll_interval: float = 4.0, as_dict: bool = False, - use_legacy_sql: bool = True, + use_legacy_sql: bool | ArgNotSet = NOTSET, **kwargs, ) -> None: super().__init__(**kwargs) @@ -1056,7 +1082,7 @@ def __init__( self.deferrable = deferrable self.poll_interval = poll_interval self.as_dict = as_dict - self.use_legacy_sql = use_legacy_sql + self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql) def _submit_job( self, diff --git a/providers/google/tests/deprecations_ignore.yml b/providers/google/tests/deprecations_ignore.yml index d07c6bdfc21a0..51c4fb2403111 100644 --- a/providers/google/tests/deprecations_ignore.yml +++ b/providers/google/tests/deprecations_ignore.yml @@ -57,6 +57,13 @@ - providers/google/tests/unit/google/cloud/hooks/test_bigquery.py::TestTimePartitioningInRunJob::test_run_query_with_arg - providers/google/tests/unit/google/cloud/hooks/test_bigquery.py::TestTimePartitioningInRunJob::test_run_with_auto_detect - providers/google/tests/unit/google/cloud/hooks/test_gcs.py::TestGCSHook::test_list__error_match_glob_and_invalid_delimiter +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryCheckOperator +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryCheckOperators +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryColumnCheckOperator +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryGetDataOperator +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryIntervalCheckOperator +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryTableCheckOperator +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryValueCheckOperator - providers/google/tests/unit/google/cloud/operators/test_dataproc.py::TestDataprocClusterScaleOperator::test_execute - providers/google/tests/unit/google/cloud/operators/test_dataproc.py::test_create_cluster_operator_extra_links - providers/google/tests/unit/google/cloud/operators/test_dataproc.py::test_scale_cluster_operator_extra_links diff --git a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_queries.py b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_queries.py index 5c8b98de2746b..40491b1090212 100644 --- a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_queries.py +++ b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_queries.py @@ -176,6 +176,7 @@ table_id=TABLE_1, max_results=10, selected_fields="value,name", + use_legacy_sql=False, ) # [END howto_operator_bigquery_get_data] @@ -216,6 +217,7 @@ task_id="column_check", table=f"{DATASET_NAME}.{TABLE_1}", column_mapping={"value": {"null_check": {"equal_to": 0}}}, + use_legacy_sql=False, ) # [END howto_operator_bigquery_column_check] @@ -224,6 +226,7 @@ task_id="table_check", table=f"{DATASET_NAME}.{TABLE_1}", checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}}, + use_legacy_sql=False, ) # [END howto_operator_bigquery_table_check] diff --git a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py index 70427e794e63b..413be9ebf74a1 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py +++ b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py @@ -29,6 +29,7 @@ from google.cloud.bigquery import DEFAULT_RETRY, ScalarQueryParameter, Table from google.cloud.exceptions import Conflict +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.common.compat.openlineage.facet import ( DocumentationDatasetFacet, ErrorMessageRunFacet, @@ -2363,6 +2364,16 @@ def test_encryption_configuration_deferrable_mode(self, mock_job, mock_hook): class TestBigQueryCheckOperator: + def test_implicit_legacy_sql_default_warns(self): + with pytest.warns( + AirflowProviderDeprecationWarning, + match="The default value of `use_legacy_sql` is deprecated", + ): + BigQueryCheckOperator( + task_id="check_query", + sql="SELECT COUNT(*) FROM Any", + ) + @pytest.mark.db_test @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator._validate_records") @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator.defer")