Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [AIRFLOW-1623] Clearing task in UI does not trigger on_kill method in operator #2877

Closed
wants to merge 5 commits into from

Conversation

ghost
Copy link

@ghost ghost commented Dec 13, 2017

With this PR I want to initiate the discussion on https://issues.apache.org/jira/browse/AIRFLOW-1623.

Some context on the expected behavior of Airflow:

  • Submit Spark job to cluster
  • Clear Airflow job from UI
  • The Spark job on the cluster is also clear by the on_kill function in the hook

The last part of this behavior is currently not happening due to the bug described in the issue. As mentioned in the issue description, the line of code I added with the PR, does not solve the issue as the reference to the task is lost.

Not sure who can help with this issue as this touches a lot of core parts of Airflow and for me they are all not that clear.

One solution I can think of is instead of using the task reference, create a kill cli command using the task_id.

JIRA

Description

  • Here are some details about my PR, including screenshots of any UI changes:

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":

    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"
  • Passes git diff upstream/master -u -- "*.py" | flake8 --diff

@gwax
Copy link
Contributor

gwax commented Dec 22, 2017

Can we add tests for this new functionality?

@ghost
Copy link
Author

ghost commented Dec 23, 2017

@gwax As said in the description and the JIRA issue, this line of code does not fix this issue. I made this PR in the hope someone with knowledge of the core code of Airflow could help me out.

@milanvdm
Copy link
Contributor

After some investigation, I think I understand where the problem is coming from.

In the current setup of operators, it is not possible to save a state inside of the operators such as pid or in our case, the id of a spark job.
The reason is that there is a crucial difference between the saved task (the operator) and a job (a TaskInstance).

When you clear a job, it will clear the TaskInstances and not the tasks (and therefore has no previous state which you saved in the operator.

How I see a possible solution is a mechanism which you can use inside of an operator to specify what kind of information you want to save. And this state can be accessed from a TaskInstance.

milanvdm and others added 2 commits January 17, 2018 17:06
Create a persistent context which gets saved to the database. This
context can be used in the Operators to save KV pairs.
@ashb
Copy link
Member

ashb commented Jan 17, 2018

I think the proper fix is to install a signal handler inside the right process, and then have the listener process (this is the LocalExecutor I think, the airflow run --local process) send a sigint to the "operator" process (this is the one with airflow run --raw in the args). That way we don't have to worry about what is or isn't pickleable, or even about persisting extra things to the DB.

@milanvdm
Copy link
Contributor

@ashb
Im not familiar with signal handling in Python.
But how does it handle state in a distributed context? As I don't see how a signal handler can share state between different workers?

@jgao54
Copy link

jgao54 commented Jan 18, 2018

Thanks for working on this @milanvdm.

@ashb
Your approach makes sense. Looks like the signal handler is already installed in the "operator" process (airflow run --raw) here, but this process never received any SIGTERM signal from the "listener" (airflow run --local).

I think you may be referring to the LocalTaskJob not the LocalExecutor, which is what airflow run --local instantiates, and it starts/terminates the "operator" process (via the task runner). So imho the task_runner.terminate() should be responsible for sending the signal to the "operator" process. Right now it only sends the signal to the children of the "operator" process via kill_process_tree, but not itself.

One way to fix the issue is to modify the terminate method to something roughly like the following:

    def terminate(self):
        if self.process and psutil.pid_exists(self.process.pid):
            kill_process_tree(self.log, self.process.pid)
        root_process = psutil.Process(self.process.pid)
        root_process.terminate()

I'm not sure why the root process is not signaled in the first place, so perhaps there's some other implications I'm overlooking.

That said, I don't have any experience running Airflow with Celery (we run LocalExecutor), but I am also curious about the question on handling state in distributed context.

run_this_2.set_upstream(run_this_1)
run_this_3 = DummyOperator(task_id='run_this_3', dag=dag)
run_this_3.set_upstream(run_this_2)
run_this_1 = BashOperator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change this test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for a quick local test. Will be cleaned up when the approach has been validated or not :)

@ghost ghost closed this Feb 6, 2018
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants