Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ https://developers.google.com/style/inclusive-documentation

-->

### Success Callback will be called when a task in marked as success from UI

When a task is marked as success by a used from Airflow UI - on_success_callback will be called

### Added `airflow dags test` CLI command

A new command was added to the CLI for executing one full run of a DAG for a given execution date, similar to
Expand Down
3 changes: 3 additions & 0 deletions airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,5 +156,8 @@ def heartbeat_callback(self, session=None):
if ti.state == State.FAILED and ti.task.on_failure_callback:
context = ti.get_template_context()
ti.task.on_failure_callback(context)
if ti.state == State.SUCCESS and ti.task.on_success_callback:
context = ti.get_template_context()
ti.task.on_success_callback(context)
self.task_runner.terminate()
self.terminating = True
54 changes: 54 additions & 0 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,57 @@ def check_failure(context):
self.assertTrue(data['called'])
process.join(timeout=10)
self.assertFalse(process.is_alive())

def test_mark_success_on_sucess_callback(self):
"""
Test that ensures that where a task is marked suceess in the UI
on_success_callback gets executed
"""
data = {'called': False}

def success_callback(context):
self.assertEqual(context['dag_run'].dag_id,
'test_mark_success')
data['called'] = True

dag = DAG(dag_id='test_mark_success',
start_date=DEFAULT_DATE,
default_args={'owner': 'owner1'})

task = DummyOperator(
task_id='test_state_succeeded1',
dag=dag,
on_success_callback=success_callback)

session = settings.Session()

dag.clear()
dag.create_dagrun(run_id="test",
state=State.RUNNING,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
session=session)
ti = TI(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti,
ignore_ti_state=True,
executor=SequentialExecutor())
from airflow.task.task_runner.standard_task_runner import StandardTaskRunner
job1.task_runner = StandardTaskRunner(job1)
process = multiprocessing.Process(target=job1.run)
process.start()
ti.refresh_from_db()
for _ in range(0, 50):
if ti.state == State.RUNNING:
break
time.sleep(0.1)
ti.refresh_from_db()
self.assertEqual(State.RUNNING, ti.state)
ti.state = State.SUCCESS
session.merge(ti)
session.commit()

job1.heartbeat_callback(session=None)
self.assertTrue(data['called'])
process.join(timeout=10)
self.assertFalse(process.is_alive())