Skip to content

Commit

Permalink
Add missing poll_interval in Bigquery operator (#30132)
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Mar 16, 2023
1 parent 85e8cca commit 75fd5e8
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion airflow/providers/google/cloud/operators/bigquery.py
Expand Up @@ -177,6 +177,8 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, SQLCheckOperator):
account from the list granting this role to the originating account (templated).
:param labels: a dictionary containing labels for the table, passed to BigQuery
:param deferrable: Run operator in the deferrable mode
:param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job.
Defaults to 4 seconds.
"""

template_fields: Sequence[str] = (
Expand All @@ -198,6 +200,7 @@ def __init__(
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
deferrable: bool = False,
poll_interval: float = 4.0,
**kwargs,
) -> None:
super().__init__(sql=sql, **kwargs)
Expand All @@ -208,6 +211,7 @@ def __init__(
self.impersonation_chain = impersonation_chain
self.labels = labels
self.deferrable = deferrable
self.poll_interval = poll_interval

def _submit_job(
self,
Expand Down Expand Up @@ -240,6 +244,7 @@ def execute(self, context: Context):
conn_id=self.gcp_conn_id,
job_id=job.job_id,
project_id=hook.project_id,
poll_interval=self.poll_interval,
),
method_name="execute_complete",
)
Expand Down Expand Up @@ -288,6 +293,8 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
account from the list granting this role to the originating account (templated).
:param labels: a dictionary containing labels for the table, passed to BigQuery
:param deferrable: Run operator in the deferrable mode
:param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job.
Defaults to 4 seconds.
"""

template_fields: Sequence[str] = (
Expand All @@ -312,6 +319,7 @@ def __init__(
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
deferrable: bool = False,
poll_interval: float = 4.0,
**kwargs,
) -> None:
super().__init__(sql=sql, pass_value=pass_value, tolerance=tolerance, **kwargs)
Expand All @@ -321,6 +329,7 @@ def __init__(
self.impersonation_chain = impersonation_chain
self.labels = labels
self.deferrable = deferrable
self.poll_interval = poll_interval

def _submit_job(
self,
Expand Down Expand Up @@ -360,6 +369,7 @@ def execute(self, context: Context) -> None: # type: ignore[override]
sql=self.sql,
pass_value=self.pass_value,
tolerance=self.tol,
poll_interval=self.poll_interval,
),
method_name="execute_complete",
)
Expand Down Expand Up @@ -414,6 +424,8 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
account from the list granting this role to the originating account (templated).
:param labels: a dictionary containing labels for the table, passed to BigQuery
:param deferrable: Run operator in the deferrable mode
:param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job.
Defaults to 4 seconds.
"""

template_fields: Sequence[str] = (
Expand All @@ -439,6 +451,7 @@ def __init__(
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
deferrable: bool = False,
poll_interval: float = 4.0,
**kwargs,
) -> None:
super().__init__(
Expand All @@ -455,6 +468,7 @@ def __init__(
self.impersonation_chain = impersonation_chain
self.labels = labels
self.deferrable = deferrable
self.poll_interval = poll_interval

def _submit_job(
self,
Expand Down Expand Up @@ -498,6 +512,7 @@ def execute(self, context: Context):
days_back=self.days_back,
ratio_formula=self.ratio_formula,
ignore_zero=self.ignore_zero,
poll_interval=self.poll_interval,
),
method_name="execute_complete",
)
Expand Down Expand Up @@ -794,6 +809,8 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:param deferrable: Run operator in the deferrable mode
:param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job.
Defaults to 4 seconds.
:param delegate_to: The account to impersonate using domain-wide delegation of authority,
if any. For this to work, the service account making the request must have
domain-wide delegation enabled. Deprecated.
Expand Down Expand Up @@ -822,6 +839,7 @@ def __init__(
impersonation_chain: str | Sequence[str] | None = None,
deferrable: bool = False,
delegate_to: str | None = None,
poll_interval: float = 4.0,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -840,6 +858,7 @@ def __init__(
self.impersonation_chain = impersonation_chain
self.project_id = project_id
self.deferrable = deferrable
self.poll_interval = poll_interval

def _submit_job(
self,
Expand Down Expand Up @@ -915,6 +934,7 @@ def execute(self, context: Context):
dataset_id=self.dataset_id,
table_id=self.table_id,
project_id=hook.project_id,
poll_interval=self.poll_interval,
),
method_name="execute_complete",
)
Expand Down Expand Up @@ -2630,7 +2650,8 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator):
:param result_retry: How to retry the `result` call that retrieves rows
:param result_timeout: The number of seconds to wait for `result` method before using `result_retry`
:param deferrable: Run operator in the deferrable mode
:param poll_interval: polling period in seconds to check for the status of job. Defaults to 4 seconds.
:param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job.
Defaults to 4 seconds.
"""

template_fields: Sequence[str] = (
Expand Down

0 comments on commit 75fd5e8

Please sign in to comment.