Skip to content
This repository has been archived by the owner on May 22, 2021. It is now read-only.

Commit

Permalink
[AIRFLOW-6821] Success callback not called when task marked as succes…
Browse files Browse the repository at this point in the history
…s from UI (apache#7447)
  • Loading branch information
saurabhdhupar authored and galuszkak committed Mar 5, 2020
1 parent 5e412fa commit de1327f
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 0 deletions.
4 changes: 4 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ https://developers.google.com/style/inclusive-documentation
-->

### Success Callback will be called when a task in marked as success from UI

When a task is marked as success by a used from Airflow UI - on_success_callback will be called

### Added `airflow dags test` CLI command

A new command was added to the CLI for executing one full run of a DAG for a given execution date, similar to
Expand Down
3 changes: 3 additions & 0 deletions airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,5 +156,8 @@ def heartbeat_callback(self, session=None):
if ti.state == State.FAILED and ti.task.on_failure_callback:
context = ti.get_template_context()
ti.task.on_failure_callback(context)
if ti.state == State.SUCCESS and ti.task.on_success_callback:
context = ti.get_template_context()
ti.task.on_success_callback(context)
self.task_runner.terminate()
self.terminating = True
54 changes: 54 additions & 0 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,57 @@ def check_failure(context):
self.assertTrue(data['called'])
process.join(timeout=10)
self.assertFalse(process.is_alive())

def test_mark_success_on_success_callback(self):
"""
Test that ensures that where a task is marked suceess in the UI
on_success_callback gets executed
"""
data = {'called': False}

def success_callback(context):
self.assertEqual(context['dag_run'].dag_id,
'test_mark_success')
data['called'] = True

dag = DAG(dag_id='test_mark_success',
start_date=DEFAULT_DATE,
default_args={'owner': 'owner1'})

task = DummyOperator(
task_id='test_state_succeeded1',
dag=dag,
on_success_callback=success_callback)

session = settings.Session()

dag.clear()
dag.create_dagrun(run_id="test",
state=State.RUNNING,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
session=session)
ti = TI(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti,
ignore_ti_state=True,
executor=SequentialExecutor())
from airflow.task.task_runner.standard_task_runner import StandardTaskRunner
job1.task_runner = StandardTaskRunner(job1)
process = multiprocessing.Process(target=job1.run)
process.start()
ti.refresh_from_db()
for _ in range(0, 50):
if ti.state == State.RUNNING:
break
time.sleep(0.1)
ti.refresh_from_db()
self.assertEqual(State.RUNNING, ti.state)
ti.state = State.SUCCESS
session.merge(ti)
session.commit()

job1.heartbeat_callback(session=None)
self.assertTrue(data['called'])
process.join(timeout=10)
self.assertFalse(process.is_alive())

0 comments on commit de1327f

Please sign in to comment.