New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-2760] Decouple DAG parsing loop from scheduler loop #3873

Merged
merged 1 commit into from Oct 26, 2018

Conversation

Projects
None yet
9 participants
@KevinYang21
Copy link
Contributor

KevinYang21 commented Sep 10, 2018

Jira

Description

  • Here are some details about my PR, including screenshots of any UI changes:

Before this change, scheduler loop is responsible for spinning up new DAG file processors after it harvested the results of DAG parsing. And thus the speed of new DAG file processor getting spinned up is bounded by the scheduler loop speed. Like this:
6

After the change, scheduler loop will only not longer be responsible for spinning up new DAG file processors and the DAGFileProcessorManager will be responsible for it instead. DAG parsing will be much faster and not longer bounded by scheduler loop speed. And thus scheduler will be able to schedule more tasks. Like this:
8

Summary of big behavior changes:

  1. Scheduler loop and DAG parsing loop are decoupled and run in two different processes.
  2. DAG parsing manager logging will go into a separate file.
  3. Zombie tasks will be calculate by DAG parsing manager and send to DAG parsing processor to kill. This is to reduce DB CPU load( identified to produce 80% of CPU load during stress test, CPU usage went down from 80%+ to ~40% after this change).

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    test_dag_processing.py:TestDagFileProcessorAgent
    jobs.py:SchedulerJobTest.test_no_orphan_process_will_be_left
    plan to add zombie detecting test in test_dag_processing.py
    The change is more about doing same thing in different way and thus didn't add a lot unit tests. But I'm still thinking about proper unit tests to add, and suggestion on what unit tests to add would be greatly appreciated.

Tested in our stress testing cluster with 4k files and 30k running tasks and scheduling delay has been kept within 5 mins( except when we need to scheduling huge amount of tasks at the time, which is going to be dealt with later) for 1+ month and our production cluster has been running with the change for 1+ week.

Before( #running tasks are lower because DB was stressed out and refusing connections, which caused tasks to fail):

The scheduling delay is generated by a monitoring task that is running on the cluster with 5m interval. It will compare the current timestamp against the expected scheduling timestamp(execution_date + 5m) and send the time diff in min as one data point on the metric graph, e.g. monitoring task with execution_date 2018-01-01T00:00:00 started at 2018-01-01T00:06:00 will put a 1m scheduling delay data point onto the metric graph.
screen shot 2018-07-31 at 12 23 52 pm

After:
screen shot 2018-07-30 at 5 17 10 pm

Commits

  • My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added.

Code Quality

  • Passes git diff upstream/master -u -- "*.py" | flake8 --diff

@KevinYang21 KevinYang21 force-pushed the KevinYang21:kevin_yang_decouple_dag_parsing branch 2 times, most recently from ebf4b09 to 4055e43 Sep 10, 2018

@KevinYang21 KevinYang21 changed the title [WIP][Airflow-2760] Decouple DAG parsing loop from scheduler loop [Airflow-2760] Decouple DAG parsing loop from scheduler loop Sep 10, 2018

@KevinYang21

This comment has been minimized.

@KevinYang21 KevinYang21 force-pushed the KevinYang21:kevin_yang_decouple_dag_parsing branch from 4055e43 to 2cdb90d Sep 10, 2018

@ashb

This comment has been minimized.

Copy link
Member

ashb commented Sep 10, 2018

First thought: this sounds like a good thing, but could put some of the description of this PR into the docs too??https://airflow.apache.org/scheduler.html seems like a likely place to put them.

(I haven't looked at the code for this PR yet)

@KevinYang21

This comment has been minimized.

Copy link
Contributor Author

KevinYang21 commented Sep 10, 2018

@ashb We definitely can do that. I was not aware of that page before. Would you guide me how I can update that page please? Thank you.

@kaxil

This comment has been minimized.

@KevinYang21

This comment has been minimized.

Copy link
Contributor Author

KevinYang21 commented Sep 10, 2018

@kaxil Thank you!

@KevinYang21 KevinYang21 force-pushed the KevinYang21:kevin_yang_decouple_dag_parsing branch from 2cdb90d to a3e23dd Sep 10, 2018

@feng-tao

This comment has been minimized.

Copy link
Contributor

feng-tao commented Sep 12, 2018

will take a look later this week.

@KevinYang21 KevinYang21 force-pushed the KevinYang21:kevin_yang_decouple_dag_parsing branch from a3e23dd to 61d9411 Sep 15, 2018

@KevinYang21

This comment has been minimized.

Copy link
Contributor Author

KevinYang21 commented Sep 16, 2018

A friendly reminder for reviews :D

@KevinYang21 KevinYang21 force-pushed the KevinYang21:kevin_yang_decouple_dag_parsing branch 4 times, most recently from da03da2 to 932bf4f Sep 16, 2018

@feng-tao

This comment has been minimized.

Copy link
Contributor

feng-tao commented Sep 17, 2018

sorry for the delay, haven't fully reviewed yet. And one thing worth mentioning is that the community has been discussed about DagFetcher(#3138) for some time, and I wonder whether you have considered your change with the DagFetcher compatibility.

@feng-tao
Copy link
Contributor

feng-tao left a comment

haven't finished, a quick pass

UPDATING.md Outdated
@@ -17,6 +17,11 @@ so you might need to update your config.
The scheduler.min_file_parsing_loop_time config option has been temporarily removed due to
some bugs.

### New `log_processor_manager_location` config option

This comment has been minimized.

@feng-tao

feng-tao Sep 17, 2018

Contributor

could you move it before L8? I think the file should be sorted based on the feature added.

Show resolved Hide resolved airflow/jobs.py
UPDATING.md Outdated
@@ -31,6 +31,11 @@ some bugs.
The new `sync_parallelism` config option will control how many processes CeleryExecutor will use to
fetch celery task state in parallel. Default value is max(1, number of cores - 1)

### New `log_processor_manager_location` config option

This comment has been minimized.

@feng-tao

feng-tao Sep 17, 2018

Contributor

I am not sure, but I thought the list is sorted by descending chronological order?

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 17, 2018

Author Contributor

Oh sry I didn't know that, if that is the case I'll update the order.

'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'airflow',
'filename': LOG_PROCESSOR_MANAGER_LOCATION,
'mode': 'a',

This comment has been minimized.

@feng-tao

feng-tao Sep 17, 2018

Contributor

n00b qq: are these mode options defined in RotatingFileHandler?

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 17, 2018

Author Contributor

sry I didn't get ur question... Are you asking what are these options for? That is the file opening mode, set to 'a' so we keep appending to the file( so we don't overwrite the log if we restart the scheduler or so). 'a' is indeed the default mode but just wanted to keep it here to make it more clear

'processor_manager']
directory = os.path.dirname(processor_manager_handler_config['filename'])
if not os.path.exists(directory):
mkdirs(directory, 0o777)

This comment has been minimized.

@feng-tao

feng-tao Sep 17, 2018

Contributor

nit: constant for 777 and qq: why set 777 permission?

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 17, 2018

Author Contributor

My bad here, miss read the comment from here and thought I need 777 for the mkdirs to work. Will update it to 755.

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

Rather than checking for if the path already exists it's better to try to create and catching the FileExists error - we had a race condition else where (in the utils.file_handler or something) that was fixed by doing this.

It's less likely to apply here, but better safe than sorry/

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 23, 2018

Author Contributor

You are perfectly correct. Did that before but forgot about it entirely :P

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 23, 2018

Author Contributor

Actually our mkdirs util method already covered this... Will just mkdirs(directory, 0o755)

Show resolved Hide resolved airflow/jobs.py
from airflow.utils.state import State

python_version_info = sys.version_info
if python_version_info.major > 2:

This comment has been minimized.

@feng-tao

feng-tao Sep 17, 2018

Contributor

maybe a comment here, and how we do handle this kinda version dependent change at other places in airflow?

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 17, 2018

Author Contributor

Definitely, I'll put comment in next iteration. We have a couple of places also branching based on sys.version_info( mostly because of the str to unicode mess).

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 21, 2018

Author Contributor

saw this change #3298 and will try to use that more decent approach

Show resolved Hide resolved airflow/utils/dag_processing.py
@@ -308,6 +369,249 @@ def file_path(self):
raise NotImplementedError()


class DagParsingStat(object):

This comment has been minimized.

@feng-tao

feng-tao Sep 17, 2018

Contributor

same, why not namedTuple?

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 17, 2018

Author Contributor

This is definitely a better candidate for namedTuple, will try update. Ty.

@feng-tao

This comment has been minimized.

Copy link
Contributor

feng-tao commented Sep 17, 2018

Given this change has correlation with DagFetcher(#3138), it would be good to get some input from Max @mistercrunch .

@feng-tao

This comment has been minimized.

Copy link
Contributor

feng-tao commented Sep 17, 2018

Great work BTW !

@KevinYang21

This comment has been minimized.

Copy link
Contributor Author

KevinYang21 commented Sep 17, 2018

@feng-tao Thank you for bringing up the DagFetcher discussion, I was not aware of that PR before( quite an interesting PR tho, I'll keep an eye on it and provide help if needed). I took a quick look at the PR and I believe it will work perfectly with this change as this change is more like decoupling some existing logic but not changing foundations and the dag fetching is an independent step before the parsing happens.

@KevinYang21 KevinYang21 force-pushed the KevinYang21:kevin_yang_decouple_dag_parsing branch from 932bf4f to c45cc19 Sep 18, 2018

self.result_count = result_count


class DagParsingSignal(object):

This comment has been minimized.

@dlamblin

dlamblin Sep 21, 2018

Contributor

This is either some Immutable Singleton or

DagParsingSignal = namedtuple(
    'DagParsingSignal',
    'AGENT_HEARTBEAT,   MANAGER_DONE,   TERMINATE_MANAGER,   END_MANAGER')(
    'agent_heartbeat', 'manager_done', 'terminate_manager', 'end_manager')

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

Or Python3 Enum, with https://pypi.org/project/enum34/ as the backport

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 21, 2018

Author Contributor

👍 @feng-tao had the same comment and will try use namedtuple. Ty!

@@ -172,6 +196,20 @@

REMOTE_LOGGING = conf.get('core', 'remote_logging')

if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True':

This comment has been minimized.

@XD-DENG

XD-DENG Sep 21, 2018

Contributor

Hi @yrqls21 , this may be an invalid question: may you advise what this environment variable CONFIG_PROCESSOR_MANAGER_LOGGER is meant to be? Is it going to be specified by the user, or it's environment-specific?

It's not documented anywhere else.

This comment has been minimized.

@XD-DENG

XD-DENG Sep 21, 2018

Contributor

Turns out I DID ask an invalid question.... I have noticed the relevant chunk in airflow/utils/dag_processing.py.....

So this environment variable is some sort of "flag" to help determine if these a few following lines here in airflow/config_templates/airflow_local_settings.py should be run when we load/reload airflow_local_settings, if I'm not mistaken?

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

This feels like it should be a config option somewhere in the config, and then the normal AIRFLOW__${SECT}__${OPT} environment variables could be used.

Also there is zero documentation or metion anywher eelse what this block is doing. Adding to the default config (with comments explaining it's use) gives us two birds with one stone.

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 21, 2018

Author Contributor

@XD-DENG you are perfectly correctly. The purpose is to avoid RotatingFileHandler to be initialized in multiple processes, which would cause exceptions. I thought I had some comments around it but obviously I'm wrong :(

@ashb This is more like a internal flag controlling the behavior of different processes, similar idea as this line https://github.com/apache/incubator-airflow/blob/master/airflow/bin/cli.py#L868, I'll definitely add comments here.

"""
Helper method to clean up processor_agent to avoid leaving orphan processes.
"""
self.log.info("Exiting gracefully with signal {}".format(signum))

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

Nit: "Exiting gracefully after receiving signal {}" please

@@ -1298,8 +1286,9 @@ def query(result, items):
.all())
task_instance_str = "\n\t".join(
["{}".format(x) for x in tis_to_be_queued])
self.log.info("Setting the follow tasks to queued state:\n\t%s",
task_instance_str)
self.log.info("Setting the follow {} tasks to queued state:\n\t{}"

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

nit: following

self.result_count = result_count


class DagParsingSignal(object):

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

Or Python3 Enum, with https://pypi.org/project/enum34/ as the backport

if self._parent_signal_conn.poll() \
and self._parent_signal_conn.recv() == DagParsingSignal.MANAGER_DONE:
break
time.sleep(0.1)

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

According to the docs:

If timeout is None then an infinite timeout is used.

So I thine we can do self._parent_signal_conn.poll(None) and then the kernel/OS will deal with waking us when there's something to read, and we wouldn't need a sleep here either.

I think.

Alternatively, an even simpler approach: Just call recv() - that will block until there is something to read. No poll or sleep needed that way.

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 21, 2018

Author Contributor

You are perfectly correct. Totally forget about recv() is blocking. Will update to just use recv(). Ty!

# do not recreate the SQLA connection pool.
os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
reload(airflow.config_templates.airflow_local_settings)
reload(airflow.settings)

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

This reload is done soley to reconfigure the logger right? Wouldn't it be more immediate and less "hacky" (using reload is a bit of a kludge) to reconfigure the logger instead?

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

Or even better: could we use a different logger name and have it already configured with normal logging, but just only used by this class/logger?

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 21, 2018

Author Contributor

W/o reloading airflow_local_settings, the custom logic here would not be evaluated and thus we'll end up reconfigure with the same logging config again...

Also as mentioned in the comment, right now the major purposes of reloading settings is to reload logger and also the connection pool.

Tho I do agree reload is a bit of a kludge--I would avoid it if I had other solutions :(

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

Can you do something like this?

logging.config.dictConfig({
  # New handler,
  disable_existing_loggers=False
}

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 21, 2018

Author Contributor

Nice trick here 👍 But moving the def of handler and logger here would prevent people from using there custom handler and logger for DAG parsing right?(e.g. logger with both processor_manager and console handler) Also we've recently encountered a rare case that same logger on multiple process would create lock issue if we do not reload and thus it feel safer to just reload all loggers.

qsize = self._result_count
else:
qsize = self._result_queue.qsize()
for _ in xrange(qsize):

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

Minor nit: Since python2 is going away in less than a year can we reverse this:

i.e. In py2 re-define range = xrange and then just use range here.

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 21, 2018

Author Contributor

👍

Send termination signal to DAG parsing processor manager
and expect it to terminate all DAG file processors.
"""
self.log.info("Sending termination signal to manager.")

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

Nit: "Sending termination message" - signal could be confused with OS level signals, but this is a message/request.

self.log.info(
"Terminating manager process: {}".format(manager_process.pid))
manager_process.terminate()
timeout = 5

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

Does this timeout need to be configureable?

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 21, 2018

Author Contributor

I'll be adding a TODO comment here to make it aligned with the existing code. I might later on start another discussion around this magic number.

"""
Helper method to clean up DAG file processors to avoid leaving orphan processes.
"""
self.log.info("Exiting gracefully with signal {}".format(signum))

This comment has been minimized.

@ashb

ashb Sep 21, 2018

Member

Nit: "upon receiving signal" (the same in both cases of this please)

self.last_dag_dir_refresh_time = timezone.utcnow()
# Last time stats were printed
self.last_stat_print_time = timezone.datetime(2000, 1, 1)
self._zombie_query_interval = 10

This comment has been minimized.

@XD-DENG

XD-DENG Sep 22, 2018

Contributor

Is this another magic number?

This comment has been minimized.

@KevinYang21

KevinYang21 Sep 22, 2018

Author Contributor

Yes it is. Will put a similar TODO here and revisit it together with termination timeout magic number later.

@KevinYang21 KevinYang21 force-pushed the KevinYang21:kevin_yang_decouple_dag_parsing branch 2 times, most recently from 8191ca9 to 86f1b5e Oct 12, 2018

@KevinYang21

This comment has been minimized.

Copy link
Contributor Author

KevinYang21 commented Oct 12, 2018

@Fokko Thanks for reviewing. I actually created a ticket for further refactoring in the code base to use with block as there're a lot such cases and I feel it is beyond the scope of this PR to change all of them( updated all use cases that I touched).

Would you actually elaborate a bit more on the "weird stuff in the number of connections" please? Might be something we didn't catch.

Also this reminds me about one thing: with the new logic, it is more likely that the scheduler will leave orphan processes in extreme cases, e.g. for some reason the agent process crashed(or got a SIGTERM) or cannot gracefully exit, the dag parsing processes will be left behind. Should we put a note somewhere about this? Actually we already have similar problem, e.g. scheduler process got SIGTERM. Do we just expect people to know this and clean up the left over processes?

@kbl

This comment has been minimized.

Copy link

kbl commented Oct 12, 2018

What @Fokko mentioned about session leakage could be true. I've deployed that a while ago as a patch on top of 1.10. We're running Airflow with 2 airworkers x 12 celery processors on each node. We've seen an increase in number of open connections to our database after the upgrade from 1.9 to 1.10+patch.

@KevinYang21 KevinYang21 force-pushed the KevinYang21:kevin_yang_decouple_dag_parsing branch from 86f1b5e to 0ca9713 Oct 12, 2018

@KevinYang21 KevinYang21 force-pushed the KevinYang21:kevin_yang_decouple_dag_parsing branch from 0ca9713 to 69e9311 Oct 15, 2018

@KevinYang21 KevinYang21 force-pushed the KevinYang21:kevin_yang_decouple_dag_parsing branch from 69e9311 to 14cf345 Oct 15, 2018

@KevinYang21

This comment has been minimized.

Copy link
Contributor Author

KevinYang21 commented Oct 18, 2018

Is there more I can do to have this PR merged?

@Fokko

Fokko approved these changes Oct 20, 2018

@Fokko

This comment has been minimized.

Copy link
Contributor

Fokko commented Oct 20, 2018

@feng-tao @ashb @XD-DENG Any further comments? Otherwise I'll merge this one.

@feng-tao

This comment has been minimized.

Copy link
Contributor

feng-tao commented Oct 20, 2018

Sorry @KevinYang21 , I was on vacation for the last month and didn't go through the whole PR. But I assume @Fokko has gone through. So I am ok from my side.

@ashb

This comment has been minimized.

Copy link
Member

ashb commented Oct 22, 2018

@Fokko Go for it - I didn't have the time to give this PR the attention it deserved.

@ashb ashb changed the title [Airflow-2760] Decouple DAG parsing loop from scheduler loop [AIRFLOW-2760] Decouple DAG parsing loop from scheduler loop Oct 22, 2018

@XD-DENG

This comment has been minimized.

Copy link
Contributor

XD-DENG commented Oct 25, 2018

Hi @Fokko , I don't have further comment. Thanks

@XD-DENG

This comment has been minimized.

Copy link
Contributor

XD-DENG commented Oct 25, 2018

A side point: We may want to update the doc "Scheduler Basic" on Confluence https://cwiki.apache.org/confluence/display/AIRFLOW/Scheduler+Basics accordingly after this PR is merged/released.

@kaxil

This comment has been minimized.

Copy link
Contributor

kaxil commented Oct 26, 2018

I think it's a unanimous decision to merge this one now :)

Thanks everyone and specially @KevinYang21

@kaxil kaxil merged commit 75e2288 into apache:master Oct 26, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details

wyndhblb added a commit to ASAPPinc/incubator-airflow that referenced this pull request Nov 9, 2018

Revert "[Airflow-2760] Decouple DAG parsing loop from scheduler loop (a…
…pache#3873)"

This reverts commit 75e2288.

# Conflicts:
#	airflow/utils/dag_processing.py

aliceabe pushed a commit to aliceabe/incubator-airflow that referenced this pull request Jan 3, 2019

ashb added a commit to ashb/airflow that referenced this pull request Jan 10, 2019

cfei18 pushed a commit to cfei18/incubator-airflow that referenced this pull request Jan 23, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment