Skip to content

[AIRFLOW-1623] Trigger on_kill method in operators#2975

Closed
jgao54 wants to merge 1 commit intoapache:masterfrom
jgao54:fix-on-kill
Closed

[AIRFLOW-1623] Trigger on_kill method in operators#2975
jgao54 wants to merge 1 commit intoapache:masterfrom
jgao54:fix-on-kill

Conversation

@jgao54
Copy link

@jgao54 jgao54 commented Jan 25, 2018

Make sure you have checked all steps below.

JIRA

Description

  • Here are some details about my PR, including screenshots of any UI changes:
    Fixing a previous TODO item to also kill the root process whenever the process tree is being killed. By sending a SIGTERM signal to the root process (airflow run --raw ...), the signal handler will catch the signal and complete necessary cleanups (i.e. operator on_kill).

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
  • Unit test
  • Run Airflow locally with LocalExecutor. Cleared a running task from the UI and verified that operator's on_kill method is triggered.

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":

    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"
  • Passes git diff upstream/master -u -- "*.py" | flake8 --diff

@milanvdmria @ashb
I think this approach is simpler (that @ashb suggested in #2877). I've tested it with LocalExecutor and it behaves as expected, so it should at least fix some issues folks have been experiencing.

@milanvdm
Copy link
Contributor

@jgao54 I agree but Im still not convinced this will work in a distributed setup.
Its probably worth merging this PR already to fix it for some people. But the discussion on the distributed setup still needs to happen :)

@jgao54 jgao54 force-pushed the fix-on-kill branch 3 times, most recently from f133220 to eb2fa58 Compare January 26, 2018 22:29
@jgao54
Copy link
Author

jgao54 commented Jan 27, 2018

@milanvdm Thanks for taking a look!

I'm not 100% convinced myself either since I haven't run the code with CeleryExecutor :P but I am hopeful that this could potentially solve the issue in the distributed setup as well. Reason being:

When clear is called via the UI, the webserver will set the state of the LocalTaskJob to shutdown, this information is stored in the jobs table in the airflow metadata db.

The LocalTaskJob (which is created via airflow run --local command and is running in one of the workers) will heartbeat periodically, each time it will read from the db and check its own state , and once it detects that it needs to be shut down, it will attempt to "kill" itself, which will call the on_kill method that eventually sends a SIGTERM signal to the raw process that's doing the work...

So the main idea being, the detection of a shutdown is via the db, so it should be able to successfully terminate the running process.

Would be great to have someone running CeleryExecutor or have experience with Celery to test this out. I'll see what I can do on my end as well.

@milanvdm
Copy link
Contributor

@jgao54 We have a Celery setup so I will try to find some time for testing this in a distributed context :)

@milanvdm
Copy link
Contributor

@jgao54 I created some time next week to test this in a distributed setup :)

@jgao54
Copy link
Author

jgao54 commented Jan 30, 2018

@milanvdm awesome, thanks!

@ashb
Copy link
Member

ashb commented Jan 30, 2018

I found a few spare moments across the day and this change looks good to me. An extra pair of eyes would be useful to double check as this was done in moments snatched here and there.

Full log (with the log format adjusted to show PID just after the time stamp)
[2018-01-30 17:10:02,760] 149 {{celery_executor.py:50}} INFO - Executing command in Celery: airflow run kill_test x 2018-01-25T22:00:00 --local -sd /usr/local/airflow/dags/example/kill_test.py
[2018-01-30 17:10:11,448] 1898 {{__init__.py:45}} INFO - Using executor CeleryExecutor
[2018-01-30 17:10:12,176] 1898 {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/example/kill_test.py
[2018-01-30 17:10:12,581] 1898 {{bash_task_runner.py:29}} INFO - BashTaskRunner.start called
[2018-01-30 17:10:12,582] 1898 {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run kill_test x 2018-01-25T22:00:00 --job_id 19 --raw -sd /usr/local/airflow/dags/example/kill_test.py']
[2018-01-30 17:10:17,205] 1898 {{base_task_runner.py:98}} INFO - Subtask: [2018-01-30 17:10:17,204] 1915 {{__init__.py:45}} INFO - Using executor CeleryExecutor
[2018-01-30 17:10:17,535] 1898 {{base_task_runner.py:98}} INFO - Subtask: [2018-01-30 17:10:17,534] 1915 {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/example/kill_test.py
[2018-01-30 17:13:03,300] 1898 {{jobs.py:2585}} WARNING - State of this instance has been externally set to shutdown. Taking the poison pill.
[2018-01-30 17:13:03,300] 1898 {{bash_task_runner.py:36}} INFO - BashTaskRunner.terminate called
[2018-01-30 17:13:03,328] 1898 {{helpers.py:258}} INFO - Terminating root process ['/usr/local/bin/python /usr/local/bin/airflow run kill_test x 2018-01-25T22:00:00 --job_id 19 --raw -sd /usr/local/airflow/dags/example/kill_test.py'] PID 1915
[2018-01-30 17:13:03,400] 1898 {{helpers.py:195}} ERROR - b''
[2018-01-30 17:13:03,402] 1898 {{base_task_runner.py:98}} INFO - Subtask: [2018-01-30 17:13:03,401] 1915 {{kill_test.py:25}} INFO - On kill called
[2018-01-30 17:13:03,403] 1898 {{helpers.py:196}} INFO - Killed process 1915 with signal 15
[2018-01-30 17:13:08,406] 1898 {{base_task_runner.py:98}} INFO - Subtask: [2018-01-30 17:13:08,405] 1915 {{kill_test.py:27}} INFO - On kill called
[2018-01-30 17:13:08,608] 1898 {{base_task_runner.py:98}} INFO - Subtask: Traceback (most recent call last):
[2018-01-30 17:13:08,613] 1898 {{base_task_runner.py:98}} INFO - Subtask:   File "/usr/local/bin/airflow", line 27, in <module>
[2018-01-30 17:13:08,641] 1898 {{base_task_runner.py:98}} INFO - Subtask:     args.func(args)
[2018-01-30 17:13:08,780] 1898 {{base_task_runner.py:98}} INFO - Subtask:   File "/Users/ash/code/python/incubator-airflow/airflow/bin/cli.py", line 392, in run
[2018-01-30 17:13:08,917] 1898 {{base_task_runner.py:98}} INFO - Subtask:     pool=args.pool,
[2018-01-30 17:13:09,011] 1898 {{base_task_runner.py:98}} INFO - Subtask:   File "/Users/ash/code/python/incubator-airflow/airflow/utils/db.py", line 50, in wrapper
[2018-01-30 17:13:09,123] 1898 {{base_task_runner.py:98}} INFO - Subtask:     result = func(*args, **kwargs)
[2018-01-30 17:13:09,139] 1898 {{base_task_runner.py:98}} INFO - Subtask:   File "/Users/ash/code/python/incubator-airflow/airflow/models.py", line 1493, in _run_raw_task
[2018-01-30 17:13:09,145] 1898 {{base_task_runner.py:98}} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-01-30 17:13:09,377] 1898 {{base_task_runner.py:98}} INFO - Subtask:   File "/Users/ash/code/python/incubator-airflow/airflow/operators/python_operator.py", line 89, in execute
[2018-01-30 17:13:09,452] 1898 {{base_task_runner.py:98}} INFO - Subtask:     return_value = self.execute_callable()
[2018-01-30 17:13:09,455] 1898 {{base_task_runner.py:98}} INFO - Subtask:   File "/Users/ash/code/python/incubator-airflow/airflow/operators/python_operator.py", line 94, in execute_callable
[2018-01-30 17:13:09,459] 1898 {{base_task_runner.py:98}} INFO - Subtask:     return self.python_callable(*self.op_args, **self.op_kwargs)
[2018-01-30 17:13:09,535] 1898 {{base_task_runner.py:98}} INFO - Subtask:   File "/usr/local/airflow/dags/example/kill_test.py", line 39, in <lambda>
[2018-01-30 17:13:09,540] 1898 {{base_task_runner.py:98}} INFO - Subtask:     Op(task_id='x', python_callable=lambda: time.sleep(3600))
[2018-01-30 17:13:09,573] 1898 {{base_task_runner.py:98}} INFO - Subtask:   File "/Users/ash/code/python/incubator-airflow/airflow/models.py", line 1472, in signal_handler
[2018-01-30 17:13:09,682] 1898 {{base_task_runner.py:98}} INFO - Subtask:     raise AirflowException("Task received SIGTERM signal")
[2018-01-30 17:13:09,712] 1898 {{base_task_runner.py:98}} INFO - Subtask: airflow.exceptions.AirflowException: Task received SIGTERM signal
[2018-01-30 17:13:10,428] 1898 {{jobs.py:2521}} INFO - Task exited with return code 0
[2018-01-30 17:13:10,429] 1898 {{bash_task_runner.py:36}} INFO - BashTaskRunner.terminate called

The interesting lines are these

[2018-01-30 17:10:12,581] 1898 {{bash_task_runner.py:29}} INFO - BashTaskRunner.start called
[2018-01-30 17:10:12,582] 1898 {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run kill_test x 2018-01-25T22:00:00 --job_id 19 --raw -sd /usr/local/airflow/dags/example/kill_test.py']
[2018-01-30 17:10:17,205] 1898 {{base_task_runner.py:98}} INFO - Subtask: [2018-01-30 17:10:17,204] 1915 {{__init__.py:45}} INFO - Using executor CeleryExecutor
[2018-01-30 17:10:17,535] 1898 {{base_task_runner.py:98}} INFO - Subtask: [2018-01-30 17:10:17,534] 1915 {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/example/kill_test.py
[2018-01-30 17:13:03,300] 1898 {{jobs.py:2585}} WARNING - State of this instance has been externally set to shutdown. Taking the poison pill.
[2018-01-30 17:13:03,300] 1898 {{bash_task_runner.py:36}} INFO - BashTaskRunner.terminate called
[2018-01-30 17:13:03,328] 1898 {{helpers.py:258}} INFO - Terminating root process ['/usr/local/bin/python /usr/local/bin/airflow run kill_test x 2018-01-25T22:00:00 --job_id 19 --raw -sd /usr/local/airflow/dags/example/kill_test.py'] PID 1915
[2018-01-30 17:13:03,400] 1898 {{helpers.py:195}} ERROR - b''
[2018-01-30 17:13:03,402] 1898 {{base_task_runner.py:98}} INFO - Subtask: [2018-01-30 17:13:03,401] 1915 {{kill_test.py:25}} INFO - Operator on_kill called
[2018-01-30 17:13:03,403] 1898 {{helpers.py:196}} INFO - Killed process 1915 with signal 15
[2018-01-30 17:13:08,406] 1898 {{base_task_runner.py:98}} INFO - Subtask: [2018-01-30 17:13:08,405] 1915 {{kill_test.py:27}} INFO - End of Operator on_kill

It shows that pid 1898 is the run --local, pid 1915 is the run --raw and that the on kill command is run in the context of pid 1915: Subtask: [2018-01-30 17:13:08,405] 1915 {{kill_test.py:27}} INFO - End of Operator on_kill (logging from my custom on_kill)

@Fokko
Copy link
Contributor

Fokko commented Jan 31, 2018

Please rebase against master to trigger a new build. The CI should be fixed.

@jgao54 jgao54 force-pushed the fix-on-kill branch 2 times, most recently from db6d916 to f24b18a Compare February 1, 2018 09:16
@jgao54
Copy link
Author

jgao54 commented Feb 5, 2018

@ashb thanks for testing this!
@milanvdm this should take care of #2877 as well, would be great if you got a chance to double-check on this one.

@milanvdm
Copy link
Contributor

milanvdm commented Feb 6, 2018

@jgao54 I tested and it works! :)
Ill close my other PR. I will also submit a PR tomorrow to add on_kill logic for the SparkSubmitOperator.

@Fokko
Copy link
Contributor

Fokko commented Feb 6, 2018

Looking forward to your contribution to the SparkSubmitOperator @milanvdm

@jgao54
Copy link
Author

jgao54 commented Feb 6, 2018

Hi @Fokko! I rebased a few times but travis still isn't very happy in some env. Are you familiar with the issue?

@Fokko
Copy link
Contributor

Fokko commented Feb 6, 2018

Hi @jgao54

This looks like failing tests, Python 2.7 Postgres:

======================================================================
3) FAIL: test_terminate_task (tests.CoreTest)
----------------------------------------------------------------------
   Traceback (most recent call last):
    tests/core.py line 848 in test_terminate_task
      self.assertEqual(State.FAILED, ti.state)
   AssertionError: u'failed' != None

The same for Python 3:
https://travis-ci.org/apache/incubator-airflow/jobs/336013133#L4719
https://travis-ci.org/apache/incubator-airflow/jobs/336013132#L4326

I think you need some fix some tests ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the benefits of using the shell to do this? I only see downsides

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bolkedebruin the alternative could be along the lines of what @milanvdm proposed in #2877. I think that's a more flexible/safe solution, assuming we remove this shell-based code all together. My only concern is having to do db migration on TaskInstance, which could be difficult for long running DBs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other alternative to be os.kill(), or given we already have psutil.Process objects descendant.terminate() and then descendant.kill()

See also http://psutil.readthedocs.io/en/latest/#terminate-my-children

Copy link
Author

@jgao54 jgao54 Feb 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb ah gotcha. I assume that's what @bolkedebruin meant too. Sure let me take a look.

@Fokko
Copy link
Contributor

Fokko commented Mar 5, 2018

@jgao54 Any updates on this?

@jgao54
Copy link
Author

jgao54 commented Mar 6, 2018

Haven't got a chance to revisit this yet, will do sometimes this week for sure

@jgao54
Copy link
Author

jgao54 commented Mar 7, 2018

@Fokko one unit test is failing in some environment, it doesn't look related, and I can't replicate it locally.

@Fokko
Copy link
Contributor

Fokko commented Mar 7, 2018

So, please rebase onto master. The Python 3 failures are unrelated, the Python 2.7 feels related to me:

======================================================================
6) FAIL: test_terminate_task (tests.CoreTest)
----------------------------------------------------------------------
   Traceback (most recent call last):
    tests/core.py line 848 in test_terminate_task
      self.assertEqual(State.FAILED, ti.state)
   AssertionError: u'failed' != None

@Fokko
Copy link
Contributor

Fokko commented Mar 7, 2018

Let me check if I can figure out what's happening

@Fokko
Copy link
Contributor

Fokko commented Mar 7, 2018

So I'm able to replicate it on my local machine:

[2018-03-07 12:16:30,529] {jobs.py:2592} WARNING - State of this instance has been externally set to None. Taking the poison pill.

@Fokko
Copy link
Contributor

Fokko commented Mar 7, 2018

So I suspect that somewhere in the code, the TaskInstance is upserted with the failed state.

@Fokko
Copy link
Contributor

Fokko commented Mar 7, 2018

This makes sense: db816ee The test is from two years back: 7c0f837 And the ticket: https://issues.apache.org/jira/projects/AIRFLOW/issues/AIRFLOW-234

Personally I have the strong suspicon that the:

ti = session.query(TI).filter_by(
    dag_id=task.dag_id,
    task_id=task.task_id,
    execution_date=DEFAULT_DATE
).one()

Query returns more than one result, and therefore fetches a TI with state FAILED. @mistercrunch Any thoughts on this?

@Fokko
Copy link
Contributor

Fokko commented Mar 7, 2018

When I run the test isolated on Python 3 locally, I get the same error as with Python 2.7

@ashb
Copy link
Member

ashb commented Mar 7, 2018

Cuuurious. I wonder if this might be related to some of the sporadic reports we get of "loosing track of tasks"?

@Fokko
Copy link
Contributor

Fokko commented Mar 7, 2018

If the task gets "loose", i.e. being delete from the database, the task will be killed.

@Fokko
Copy link
Contributor

Fokko commented Mar 7, 2018

I can't think of a situation where the the task_instance records would be deleted from the database.

@jgao54
Copy link
Author

jgao54 commented Mar 7, 2018

Will rebase, just in case this is no longer an issue.

@jgao54
Copy link
Author

jgao54 commented Mar 7, 2018

@Fokko Thanks for looking into this! I traced back on the warning message you provided, and definitely agree that the failure is caused by my PR.

In the past we only send SIGTERM to the child processes of airflow run --raw; after this PR, it also sends SIGTERM to the root process itself. Unfortunately, I think we rely on airflow run --raw to update the ti state, so there's a race condition now between the process setting the ti state to fail, and the signal killing that process.. (probably why I can't replicate it locally). I think one potential fix is to let the task runner handle the ti state update when it's terminating the process.. but not 100% sure if it would have other side effects or if it even makes sense. Given it's a core change, so I'd want to see how @mistercrunch and @bolkedebruin feel about this.

Also note that the query you mentioned above should not return more than 1 row because the primary key of the TaskInstance is task_id + dag_id + execution_date, so the db guarantees its uniqueness.

@ashb
Copy link
Member

ashb commented Mar 7, 2018

I think one potential fix is to let the task runner handle the ti state update when it's terminating the process.. but not 100% sure if it would have other side effects or if it even makes sense

I think moving the setting state to failure to the “monitor” process actually sounds like a plus as there are other cases (such as set fault/OOM kill) Not likely, but... yay? I haven’t looked at the code to see if that makes sense though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an else with an exception if they don't match

@Fokko
Copy link
Contributor

Fokko commented Mar 7, 2018

I'm still a bit confused how this test works out. Since we delete the record, I don't suspect that it gets upserted again with the failed state. I'm missing something since the test seems to work before.

@@ -183,87 +183,65 @@ def f(t):
return s


Copy link
Contributor

@Fokko Fokko Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add :)

    """
    Kills a list of processes with a given signal, with a configurable timout in seconds
    Returns a tuple of (gone, alive) lists of processes based on their state

    :param logger: logger
    :type logger: logging.Logger
    :param processes: process id of the root process
    :type processes: list of int
    :param sig: The signal to send, either SIGTERM or SIGKILL (Default: SIGKILL)
    :type sig: signal
    :param timeout: time (seconds) to wait on a process to terminate before
                    attempting to SIGKILL
    :type timeout: int
    """

[x for x in root_process.children(recursive=True) if x.is_running()]

if len(descendant_processes) != 0:
if len(running_descendants) != 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a > instead of !=?

@Fokko
Copy link
Contributor

Fokko commented Mar 8, 2018

So what happens.

  • The record cannot be found anymore at a certain hearthbeat because it is deleted in the test
  • The job will terminate the bash_task_runner (jobs.py:2594)
  • The bash_task_runner will terminate

In the case of the test there are no descendants:

bash,1
  ├─pstree,2213 -a -p
  └─pytest,2187 /usr/local/bin/pytest tests/core.py
      └─pytest,2203 /usr/local/bin/pytest tests/core.py
          ├─airflow,2205 /usr/local/bin/airflow run test_utils sleeps_forever 2015-01-01T00:00:00+00:00 --job_id 17 --raw -sd DAGS_FOLDER/example_dags/test_utils.py --cfg_path /tmp/tmpoiInKx
          └─{pytest},2206

After some tests I noticed that the exit handler is not invoked:
https://github.com/apache/incubator-airflow/blob/a5d9d5038b3ab59dcc1f64a1c161fb63f41c58cf/airflow/jobs.py#L2500

Maybe this is because we've changed the way of killing the processes.

@Fokko
Copy link
Contributor

Fokko commented Mar 8, 2018

I have a strong suspicion that the test is not working properly, when I run the individual test on master, I get the same result:

root@a06072e45c2d:/airflow# rm /root/airflow/airflow.db; airflow initdb; pytest -k "terminate" tests/core.py 
[2018-03-08 12:23:17,873] {settings.py:141} DEBUG - Setting up DB connection pool (PID 3203)
[2018-03-08 12:23:19,039] {__init__.py:45} INFO - Using executor SequentialExecutor
DB: sqlite:////root/airflow/airflow.db
[2018-03-08 12:23:19,384] {db.py:301} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
/usr/local/lib/python2.7/site-packages/alembic/util/messaging.py:69: UserWarning: Skipping unsupported ALTER for creation of implicit constraint
  warnings.warn(msg)
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_isntance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
INFO  [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> bdaa763e6c56, Make xcom value column a large binary
INFO  [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 947454bf1dff, add ti job_id index
INFO  [alembic.runtime.migration] Running upgrade 947454bf1dff -> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text types)
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 0e2a74e0fc9f, Add time zone awareness
Done.
=========================================================================================================================================================================== test session starts ============================================================================================================================================================================
platform linux2 -- Python 2.7.14, pytest-3.4.2, py-1.5.2, pluggy-0.6.0
rootdir: /airflow, inifile:
collected 126 items                                                                                                                                                                                                                                                                                                                                                        

tests/core.py F                                                                                                                                                                                                                                                                                                                                                      [100%]

================================================================================================================================================================================= FAILURES =================================================================================================================================================================================
_______________________________________________________________________________________________________________________________________________________________________ CoreTest.test_terminate_task _______________________________________________________________________________________________________________________________________________________________________

self = <tests.core.CoreTest testMethod=test_terminate_task>

    def test_terminate_task(self):
        """If a task instance's db state get deleted, it should fail"""
        TI = models.TaskInstance
        dag = self.dagbag.dags.get('test_utils')
        task = dag.task_dict.get('sleeps_forever')
    
        ti = TI(task=task, execution_date=DEFAULT_DATE)
        job = jobs.LocalTaskJob(
            task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
    
        # Running task instance asynchronously
        p = multiprocessing.Process(target=job.run)
        p.start()
        sleep(5)
        settings.engine.dispose()
        session = settings.Session()
        ti.refresh_from_db(session=session)
        # making sure it's actually running
        self.assertEqual(State.RUNNING, ti.state)
        ti = session.query(TI).filter_by(
            dag_id=task.dag_id,
            task_id=task.task_id,
            execution_date=DEFAULT_DATE
        ).one()
    
        # deleting the instance should result in a failure
        session.delete(ti)
        session.commit()
        # waiting for the async task to finish
        p.join()
    
        # making sure that the task ended up as failed
        ti.refresh_from_db(session=session)
>       self.assertEqual(State.FAILED, ti.state)
E       AssertionError: u'failed' != None

tests/core.py:848: AssertionError
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Captured stdout call ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[2018-03-08 12:23:32,405] {models.py:196} INFO - Filling up the DagBag from /dev/null
[2018-03-08 12:23:32,560] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_latest_only.py
[2018-03-08 12:23:32,594] {models.py:397} DEBUG - Loaded DAG <DAG: latest_only>
[2018-03-08 12:23:32,610] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_python_operator.py
[2018-03-08 12:23:32,627] {models.py:397} DEBUG - Loaded DAG <DAG: example_python_operator>
[2018-03-08 12:23:32,652] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/test_utils.py
[2018-03-08 12:23:32,659] {models.py:397} DEBUG - Loaded DAG <DAG: test_utils>
[2018-03-08 12:23:32,682] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_bash_operator.py
[2018-03-08 12:23:32,693] {models.py:397} DEBUG - Loaded DAG <DAG: example_bash_operator>
[2018-03-08 12:23:32,708] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_short_circuit_operator.py
[2018-03-08 12:23:32,721] {models.py:397} DEBUG - Loaded DAG <DAG: example_short_circuit_operator>
[2018-03-08 12:23:32,736] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_branch_operator.py
[2018-03-08 12:23:32,752] {models.py:397} DEBUG - Loaded DAG <DAG: example_branch_operator>
[2018-03-08 12:23:32,775] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_docker_operator.py
[2018-03-08 12:23:32,799] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/tutorial.py
[2018-03-08 12:23:32,809] {models.py:397} DEBUG - Loaded DAG <DAG: tutorial>
[2018-03-08 12:23:32,834] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/docker_copy_data.py
[2018-03-08 12:23:32,889] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_passing_params_via_test_command.py
[2018-03-08 12:23:32,898] {models.py:397} DEBUG - Loaded DAG <DAG: example_passing_params_via_test_command>
[2018-03-08 12:23:32,931] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_kubernetes_operator.py
[2018-03-08 12:23:33,508] {models.py:397} DEBUG - Loaded DAG <DAG: example_kubernetes_operator>
[2018-03-08 12:23:33,548] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_latest_only_with_trigger.py
[2018-03-08 12:23:33,559] {models.py:397} DEBUG - Loaded DAG <DAG: latest_only_with_trigger>
[2018-03-08 12:23:33,622] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_xcom.py
[2018-03-08 12:23:33,632] {models.py:397} DEBUG - Loaded DAG <DAG: example_xcom>
[2018-03-08 12:23:33,682] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_http_operator.py
[2018-03-08 12:23:33,757] {models.py:397} DEBUG - Loaded DAG <DAG: example_http_operator>
[2018-03-08 12:23:33,781] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_skip_dag.py
[2018-03-08 12:23:33,794] {models.py:397} DEBUG - Loaded DAG <DAG: example_skip_dag>
[2018-03-08 12:23:33,823] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_trigger_target_dag.py
[2018-03-08 12:23:33,832] {models.py:397} DEBUG - Loaded DAG <DAG: example_trigger_target_dag>
[2018-03-08 12:23:33,862] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_branch_python_dop_operator_3.py
[2018-03-08 12:23:33,872] {models.py:397} DEBUG - Loaded DAG <DAG: example_branch_dop_operator_v3>
[2018-03-08 12:23:33,895] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_subdag_operator.py
[2018-03-08 12:23:33,946] {models.py:397} DEBUG - Loaded DAG <DAG: example_subdag_operator.section-2>
[2018-03-08 12:23:33,946] {models.py:397} DEBUG - Loaded DAG <DAG: example_subdag_operator.section-1>
[2018-03-08 12:23:33,946] {models.py:397} DEBUG - Loaded DAG <DAG: example_subdag_operator>
[2018-03-08 12:23:33,963] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/example_trigger_controller_dag.py
[2018-03-08 12:23:33,970] {models.py:397} DEBUG - Loaded DAG <DAG: example_trigger_controller_dag>
[2018-03-08 12:23:33,997] {models.py:283} DEBUG - Importing /airflow/airflow/example_dags/subdags/subdag.py
[2018-03-08 12:23:34,430] {configuration.py:211} WARNING - section/key [core/hostname_callable] not found in config
[2018-03-08 12:23:34,602] {configuration.py:211} WARNING - section/key [core/hostname_callable] not found in config
[2018-03-08 12:23:35,446] {configuration.py:211} WARNING - section/key [core/hostname_callable] not found in config
[2018-03-08 12:23:35,447] {jobs.py:183} DEBUG - [heartbeat]
[2018-03-08 12:23:36,456] {configuration.py:211} WARNING - section/key [core/hostname_callable] not found in config
[2018-03-08 12:23:36,457] {jobs.py:183} DEBUG - [heartbeat]
[2018-03-08 12:23:37,524] {configuration.py:211} WARNING - section/key [core/hostname_callable] not found in config
[2018-03-08 12:23:37,525] {jobs.py:183} DEBUG - [heartbeat]
[2018-03-08 12:23:38,594] {configuration.py:211} WARNING - section/key [core/hostname_callable] not found in config
[2018-03-08 12:23:38,595] {jobs.py:183} DEBUG - [heartbeat]
[2018-03-08 12:23:39,581] {configuration.py:211} WARNING - section/key [core/hostname_callable] not found in config
[2018-03-08 12:23:39,582] {jobs.py:2592} WARNING - State of this instance has been externally set to None. Taking the poison pill.
[2018-03-08 12:23:39,587] {jobs.py:183} DEBUG - [heartbeat]
[2018-03-08 12:23:40,572] {jobs.py:183} DEBUG - [heartbeat]
[2018-03-08 12:23:41,757] {jobs.py:183} DEBUG - [heartbeat]
[2018-03-08 12:23:42,664] {jobs.py:183} DEBUG - [heartbeat]
[2018-03-08 12:23:42,664] {jobs.py:2528} INFO - Task exited with return code 1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Captured log call -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
models.py                  196 INFO     Filling up the DagBag from /dev/null
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_latest_only.py
models.py                  397 DEBUG    Loaded DAG <DAG: latest_only>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_python_operator.py
models.py                  397 DEBUG    Loaded DAG <DAG: example_python_operator>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/test_utils.py
models.py                  397 DEBUG    Loaded DAG <DAG: test_utils>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_bash_operator.py
models.py                  397 DEBUG    Loaded DAG <DAG: example_bash_operator>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_short_circuit_operator.py
models.py                  397 DEBUG    Loaded DAG <DAG: example_short_circuit_operator>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_branch_operator.py
models.py                  397 DEBUG    Loaded DAG <DAG: example_branch_operator>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_docker_operator.py
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/tutorial.py
models.py                  397 DEBUG    Loaded DAG <DAG: tutorial>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/docker_copy_data.py
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_passing_params_via_test_command.py
models.py                  397 DEBUG    Loaded DAG <DAG: example_passing_params_via_test_command>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_kubernetes_operator.py
models.py                  397 DEBUG    Loaded DAG <DAG: example_kubernetes_operator>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_latest_only_with_trigger.py
models.py                  397 DEBUG    Loaded DAG <DAG: latest_only_with_trigger>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_xcom.py
models.py                  397 DEBUG    Loaded DAG <DAG: example_xcom>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_http_operator.py
models.py                  397 DEBUG    Loaded DAG <DAG: example_http_operator>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_skip_dag.py
models.py                  397 DEBUG    Loaded DAG <DAG: example_skip_dag>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_trigger_target_dag.py
models.py                  397 DEBUG    Loaded DAG <DAG: example_trigger_target_dag>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_branch_python_dop_operator_3.py
models.py                  397 DEBUG    Loaded DAG <DAG: example_branch_dop_operator_v3>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_subdag_operator.py
models.py                  397 DEBUG    Loaded DAG <DAG: example_subdag_operator.section-2>
models.py                  397 DEBUG    Loaded DAG <DAG: example_subdag_operator.section-1>
models.py                  397 DEBUG    Loaded DAG <DAG: example_subdag_operator>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/example_trigger_controller_dag.py
models.py                  397 DEBUG    Loaded DAG <DAG: example_trigger_controller_dag>
models.py                  283 DEBUG    Importing /airflow/airflow/example_dags/subdags/subdag.py
configuration.py           211 WARNING  section/key [core/hostname_callable] not found in config
=========================================================================================================================================================================== 125 tests deselected ===========================================================================================================================================================================
================================================================================================================================================================ 1 failed, 125 deselected in 17.80 seconds =================================================================================================================================================================
[2018-03-08 12:23:42,782] {settings.py:165} DEBUG - Disposing DB connection pool (PID 3208)
root@a06072e45c2d:/airflow# 

I would suggest to alter the test. Maybe @bolkedebruin or @mistercrunch has some input on this?

@milanvdm
Copy link
Contributor

@bolkedebruin @mistercrunch Kind reminder :)

1 similar comment
@milanvdm
Copy link
Contributor

milanvdm commented Apr 6, 2018

@bolkedebruin @mistercrunch Kind reminder :)

@bolkedebruin
Copy link
Contributor

@Fokko

Query returns more than one result, and therefore fetches a TI with state FAILED

That can't happen by definition of one() in sqlalchemy. It actually fails when there are two records and raises an exception.

@jgao54 The signal handler of a --raw task should be able (and have the time) to set the right state. Only if a SIGKILL is issued it will exit directly.

logger.debug("There are no descendant processes to kill")
logger.debug("There are no descendant processes to terminate.")

logger.info("Terminating root process PID: {}.".format(root_process.pid))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jgao54 I don't understand this part. The root_process is the monitoring --local task which is the parent of the --raw task and its underlying processes. So it is trying to SIGTERM itself here and SIGKILL eventually. That doesn't make sense.

If --local finds the need to terminate its children, it should exit itself gracefully maybe with a non 0 exitcode.

sig=signal.SIGTERM, timeout=timeout)

if running_root or running_descendants:
kill_processes(logger, running_root + running_descendants, sig=signal.SIGKILL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above comment.

@Fokko
Copy link
Contributor

Fokko commented Apr 17, 2018

@jgao54 maybe close this one for now, since @bolkedebruin pushed a fix in #3204

@jgao54 jgao54 closed this Apr 18, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants