Skip to content

Commit

Permalink
Add on_kill method to BigQueryInsertJobOperator (#10866)
Browse files Browse the repository at this point in the history
* Add on_kill method to BigQueryInsertJobOperator
* BigQueryInsertJobOperator pylint disable=too-many-arguments
  • Loading branch information
tszerszen committed Sep 11, 2020
1 parent 56bd9b7 commit 41a6273
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
12 changes: 12 additions & 0 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1934,6 +1934,7 @@ def execute(self, context):
)


# pylint: disable=too-many-arguments
class BigQueryInsertJobOperator(BaseOperator):
"""
Executes a BigQuery job. Waits for the job to complete and returns job id.
Expand Down Expand Up @@ -1990,6 +1991,8 @@ class BigQueryInsertJobOperator(BaseOperator):
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:type impersonation_chain: Union[str, Sequence[str]]
:param cancel_on_kill: Flag which indicates whether cancel the hook's job or not, when on_kill is called
:type cancel_on_kill: bool
"""

template_fields = (
Expand All @@ -2011,6 +2014,7 @@ def __init__(
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
cancel_on_kill: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -2023,6 +2027,8 @@ def __init__(
self.force_rerun = force_rerun
self.reattach_states: Set[str] = reattach_states or set()
self.impersonation_chain = impersonation_chain
self.cancel_on_kill = cancel_on_kill
self.hook: Optional[BigQueryHook] = None

def prepare_template(self) -> None:
# If .json is passed then we have to read the file
Expand Down Expand Up @@ -2071,6 +2077,7 @@ def execute(self, context: Any):
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
self.hook = hook

job_id = self._job_id(context)

Expand All @@ -2096,4 +2103,9 @@ def execute(self, context: Any):
f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
)

self.job_id = job.job_id
return job.job_id

def on_kill(self):
if self.job_id and self.cancel_on_kill:
self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id, location=self.location)
37 changes: 37 additions & 0 deletions tests/providers/google/cloud/operators/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,43 @@ def test_execute_success(self, mock_hook, mock_md5):

assert result == real_job_id

@mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
def test_on_kill(self, mock_hook, mock_md5):
job_id = "123456"
hash_ = "hash"
real_job_id = f"{job_id}_{hash_}"
mock_md5.return_value.hexdigest.return_value = hash_

configuration = {
"query": {
"query": "SELECT * FROM any",
"useLegacySql": False,
}
}
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)

op = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=job_id,
project_id=TEST_GCP_PROJECT_ID,
cancel_on_kill=False,
)
op.execute({})

op.on_kill()
mock_hook.return_value.cancel_job.assert_not_called()

op.cancel_on_kill = True
op.on_kill()
mock_hook.return_value.cancel_job.assert_called_once_with(
job_id=real_job_id,
location=TEST_DATASET_LOCATION,
project_id=TEST_GCP_PROJECT_ID,
)

@mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
def test_execute_failure(self, mock_hook, mock_md5):
Expand Down

0 comments on commit 41a6273

Please sign in to comment.