diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 08eb9b7a74ce0..9de5663420bf6 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -177,6 +177,8 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, SQLCheckOperator): account from the list granting this role to the originating account (templated). :param labels: a dictionary containing labels for the table, passed to BigQuery :param deferrable: Run operator in the deferrable mode + :param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job. + Defaults to 4 seconds. """ template_fields: Sequence[str] = ( @@ -198,6 +200,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, deferrable: bool = False, + poll_interval: float = 4.0, **kwargs, ) -> None: super().__init__(sql=sql, **kwargs) @@ -208,6 +211,7 @@ def __init__( self.impersonation_chain = impersonation_chain self.labels = labels self.deferrable = deferrable + self.poll_interval = poll_interval def _submit_job( self, @@ -240,6 +244,7 @@ def execute(self, context: Context): conn_id=self.gcp_conn_id, job_id=job.job_id, project_id=hook.project_id, + poll_interval=self.poll_interval, ), method_name="execute_complete", ) @@ -288,6 +293,8 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator): account from the list granting this role to the originating account (templated). :param labels: a dictionary containing labels for the table, passed to BigQuery :param deferrable: Run operator in the deferrable mode + :param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job. + Defaults to 4 seconds. """ template_fields: Sequence[str] = ( @@ -312,6 +319,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, deferrable: bool = False, + poll_interval: float = 4.0, **kwargs, ) -> None: super().__init__(sql=sql, pass_value=pass_value, tolerance=tolerance, **kwargs) @@ -321,6 +329,7 @@ def __init__( self.impersonation_chain = impersonation_chain self.labels = labels self.deferrable = deferrable + self.poll_interval = poll_interval def _submit_job( self, @@ -360,6 +369,7 @@ def execute(self, context: Context) -> None: # type: ignore[override] sql=self.sql, pass_value=self.pass_value, tolerance=self.tol, + poll_interval=self.poll_interval, ), method_name="execute_complete", ) @@ -414,6 +424,8 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat account from the list granting this role to the originating account (templated). :param labels: a dictionary containing labels for the table, passed to BigQuery :param deferrable: Run operator in the deferrable mode + :param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job. + Defaults to 4 seconds. """ template_fields: Sequence[str] = ( @@ -439,6 +451,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, deferrable: bool = False, + poll_interval: float = 4.0, **kwargs, ) -> None: super().__init__( @@ -455,6 +468,7 @@ def __init__( self.impersonation_chain = impersonation_chain self.labels = labels self.deferrable = deferrable + self.poll_interval = poll_interval def _submit_job( self, @@ -498,6 +512,7 @@ def execute(self, context: Context): days_back=self.days_back, ratio_formula=self.ratio_formula, ignore_zero=self.ignore_zero, + poll_interval=self.poll_interval, ), method_name="execute_complete", ) @@ -794,6 +809,8 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator): Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). :param deferrable: Run operator in the deferrable mode + :param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job. + Defaults to 4 seconds. :param delegate_to: The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have domain-wide delegation enabled. Deprecated. @@ -822,6 +839,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, deferrable: bool = False, delegate_to: str | None = None, + poll_interval: float = 4.0, **kwargs, ) -> None: super().__init__(**kwargs) @@ -840,6 +858,7 @@ def __init__( self.impersonation_chain = impersonation_chain self.project_id = project_id self.deferrable = deferrable + self.poll_interval = poll_interval def _submit_job( self, @@ -915,6 +934,7 @@ def execute(self, context: Context): dataset_id=self.dataset_id, table_id=self.table_id, project_id=hook.project_id, + poll_interval=self.poll_interval, ), method_name="execute_complete", ) @@ -2630,7 +2650,8 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator): :param result_retry: How to retry the `result` call that retrieves rows :param result_timeout: The number of seconds to wait for `result` method before using `result_retry` :param deferrable: Run operator in the deferrable mode - :param poll_interval: polling period in seconds to check for the status of job. Defaults to 4 seconds. + :param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job. + Defaults to 4 seconds. """ template_fields: Sequence[str] = (