Skip to content

Commit

Permalink
[AIRFLOW-6821] Success callback not called when task marked as succes…
Browse files Browse the repository at this point in the history
…s from UI (#7447)

cherry-picked from 6cd37da
  • Loading branch information
saurabhdhupar authored and kaxil committed Mar 17, 2020
1 parent fb55967 commit 5d96b66
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 0 deletions.
7 changes: 7 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ assists users migrating to a new version.
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of contents**

- [Airflow 1.10.10](#airflow-11010)
- [Airflow 1.10.9](#airflow-1109)
- [Airflow 1.10.8](#airflow-1108)
- [Airflow 1.10.7](#airflow-1107)
Expand All @@ -42,6 +43,12 @@ assists users migrating to a new version.

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

## Airflow 1.10.10

### Success Callback will be called when a task in marked as success from UI

When a task is marked as success by a user from Airflow UI - `on_success_callback` will be called

## Airflow 1.10.9

No breaking changes.
Expand Down
3 changes: 3 additions & 0 deletions airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,8 @@ def heartbeat_callback(self, session=None):
if ti.state == State.FAILED and ti.task.on_failure_callback:
context = ti.get_template_context()
ti.task.on_failure_callback(context)
if ti.state == State.SUCCESS and ti.task.on_success_callback:
context = ti.get_template_context()
ti.task.on_success_callback(context)
self.task_runner.terminate()
self.terminating = True
54 changes: 54 additions & 0 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,57 @@ def check_failure(context):
self.assertTrue(data['called'])
process.join(timeout=10)
self.assertFalse(process.is_alive())

def test_mark_success_on_success_callback(self):
"""
Test that ensures that where a task is marked suceess in the UI
on_success_callback gets executed
"""
data = {'called': False}

def success_callback(context):
self.assertEqual(context['dag_run'].dag_id,
'test_mark_success')
data['called'] = True

dag = DAG(dag_id='test_mark_success',
start_date=DEFAULT_DATE,
default_args={'owner': 'owner1'})

task = DummyOperator(
task_id='test_state_succeeded1',
dag=dag,
on_success_callback=success_callback)

session = settings.Session()

dag.clear()
dag.create_dagrun(run_id="test",
state=State.RUNNING,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
session=session)
ti = TI(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti,
ignore_ti_state=True,
executor=SequentialExecutor())
from airflow.task.task_runner.standard_task_runner import StandardTaskRunner
job1.task_runner = StandardTaskRunner(job1)
process = multiprocessing.Process(target=job1.run)
process.start()
ti.refresh_from_db()
for _ in range(0, 50):
if ti.state == State.RUNNING:
break
time.sleep(0.1)
ti.refresh_from_db()
self.assertEqual(State.RUNNING, ti.state)
ti.state = State.SUCCESS
session.merge(ti)
session.commit()

job1.heartbeat_callback(session=None)
self.assertTrue(data['called'])
process.join(timeout=10)
self.assertFalse(process.is_alive())

0 comments on commit 5d96b66

Please sign in to comment.