Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIRFLOW-20: Improving the scheduler by making dag runs more coherent #1431

Merged
merged 1 commit into from
May 10, 2016

Conversation

bolkedebruin
Copy link
Contributor

@bolkedebruin bolkedebruin commented Apr 26, 2016

This particular issue arises because of an alignment issue between
start_date and schedule_interval. This can only happen with cron-based
schedule_intervals that describe absolute points in time (like “1am”) as
opposed to time deltas (like “every hour”)

In the past (and in the docs) we have simply said that users must make
sure the two params agree. But this is counter intuitive. As in these
cases, start_date is sort of like telling the scheduler to
“start paying attention” as opposed to “this is my first execution date”.

This patch changes the behavior of the scheduler. The next run date of
the dag will be treated as "start_date + interval" unless the start_date
is on the (previous) interval in which case the start_date will be the
next run date.

@bolkedebruin
Copy link
Contributor Author

bolkedebruin commented Apr 26, 2016

@r39132 @jlowin @aoen @plypaul @mistercrunch

You probably have some strong opinions about this. I kindly ask for your feedback (also see the post by @jlowin on dev@)

if latest_run:
# Migrating from previous version
# make the past 5 runs active
# todo: remove this?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this causes lots of confusion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mistercrunch Is this line still needed? I suspect this was needed to handle some airbnb migration. Can you add some comments as to what this (next_run_date = dag.date_range(latest_run, -5)[0]) is?

@r39132
Copy link
Contributor

r39132 commented Apr 26, 2016

+1 on the PR, though I would like to see some testing and have other comments addressed

@bolkedebruin bolkedebruin changed the title Align start_date with the schedule_interval WIP: Align start_date with the schedule_interval Apr 26, 2016
if dag.start_date:
next_run_date = dag.start_date if not next_run_date else max(next_run_date, dag.start_date)
if next_run_date == dag.start_date:
next_run_date = dag.following_schedule(dag.start_date)
next_run_date = self._normalize_schedule(dag,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With DAG.normalize_schedule I suggested above you'd have a single simpler line of code here instead of 2 lines

@mistercrunch
Copy link
Member

mistercrunch commented Apr 27, 2016

So if I understand right the previous DagRun.id handling is to account for DAGs that have a mixed workload of scheduled and triggered DagRuns right?

Do we have real world use cases for those? I always though a DAG would either be scheduled, or externally triggered, not a mix of both, but we never made that a real restriction, so I'm sure some people mix both.

If that is the case that we want to support both, the notion of depends_on_past become mushy, where maybe people may expect or desire either a depends_on_past_schedule or depends_on_past_run.

Since it's pretty edge-case (combo of both mixed scheduled AND depend_on_past=True) and unclear what the desired behavior might be, I'd vote to do something simple in terms of logic and document it clearly. It could be something like "if the DAG is scheduled, then we use the latest DagRun that is >= previous schedule, if not scheduled then we take the previous DagRun, whatever it is at that time.", this would not require handling the previous_dag_run_id

@bolkedebruin
Copy link
Contributor Author

@mistercrunch will respond on dev@, it is not the entire story I think.

op.drop_constraint("dag_id", "dag_run", "unique")
op.create_unique_constraint("uq_dag_run_dag_id_execution_date_run_id",
"dag_run", ["dag_id", "execution_date", "run_id"])
elif url.find("sqlite") > -1:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out alembic's batch_op -- it does the sqlite rename/create/insert/drop table stuff for you. Here is an example: https://github.com/jlowin/airflow/blob/dagrun-refactor/airflow/migrations/versions/89614830774c_dagrun_refactor.py#L21-L30

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I am running into an unrelated bug that required me to delete a unique constraint called "dag_id_2" for mysql ONLY. I don't understand why I had to do that, or where it came from, or -- most bizarre -- why it only decided to pop up in 1 unit test even though the code is used in every test... but just in case it helps you, I had to use your code with that constraint name.

Copy link
Contributor Author

@bolkedebruin bolkedebruin Apr 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlowin is that with my PR? In any case you might want to look at AIRFLOW-18 to understand why I am doing this (Jira).

@bolkedebruin bolkedebruin changed the title WIP: Align start_date with the schedule_interval Align start_date with the schedule_interval Apr 29, 2016
@bolkedebruin bolkedebruin self-assigned this Apr 29, 2016
@bolkedebruin bolkedebruin changed the title Align start_date with the schedule_interval [AIRFLOW-20] Align start_date with the schedule_interval Apr 29, 2016
@criccomini criccomini changed the title [AIRFLOW-20] Align start_date with the schedule_interval AIRFLOW-20: Align start_date with the schedule_interval Apr 29, 2016
@syvineckruyk
Copy link
Contributor

@bolkedebruin not sure if this can cause the multiple executions we saw ...

@bolkedebruin
Copy link
Contributor Author

No you can have multiple processes due to multiprocessing now available. It breaks up the dag bag per dag so I don't think that is the issue. My first hunch is that the there is indeed a race condition but rather by the subdag not detecting a run in yet and scheduling too many runs (hence overridden which should not occur here)

@bolkedebruin
Copy link
Contributor Author

I also think that the Pool issue is a race condition. The pool is checked for enough slots in the subdag operator, but it is not claimed at the same time. If you have multiple concurrent schedulers - and you have essentially just that with a subdagoperator that fires off its own executor, they can check at the same time which will then pass. (@jlowin)

@syvineckruyk
Copy link
Contributor

syvineckruyk commented May 3, 2016

@bolkedebruin of the two dags in this test ... in one the SDO is using SequentialExecutor ... in the other the default is left. Both dags hit the SDO issue.

@bolkedebruin
Copy link
Contributor Author

@syvineckruyk For the Pool stuff I created the following Jira issue: https://issues.apache.org/jira/browse/AIRFLOW-41

@bolkedebruin
Copy link
Contributor Author

@syvineckruyk So what I dont understand yet is why there are dagruns overriding each other. This means that a dagrun is scheduled at the same execution_date as happened earlier. For example hanging_subdags_n16_sqe.level_1_14 with dag_run.id 27 is overriden by 391. I don't think that should occur and I wonder if we got ourselves again a race condition, but I don't understand yet why it would occur.

@syvineckruyk
Copy link
Contributor

@bolkedebruin I don't think we should use any of the 04/10 dagruns in the analysis as I believe I had to clear some stuff to set up the test... Im more focused on the execution dates > 04/10

@bolkedebruin
Copy link
Contributor Author

788 hanging_subdags_n5.level_3_1 28-04-16 00:00 overridden backfill__85b5f44b-040f-45e6-b197-498eb5e1a1e5 0 NULL NULL 02-05-16 19:05 679
789 hanging_subdags_n5.level_3_1 28-04-16 00:00 running backfill__b686b303-2fd1-48d1-8846-c6873bec7240 0 NULL NULL 02-05-16 19:05 679

The overriding one gets scheduled very quicky after the first one. It also happens from a SubDagOperator (it is named backfill)

@syvineckruyk
Copy link
Contributor

Is that from my logs ? Or are you running the DAG ?

@bolkedebruin
Copy link
Contributor Author

That is from your database :)

@bolkedebruin
Copy link
Contributor Author

bolkedebruin commented May 3, 2016

Ah but you might be right that there are two (even more maybe) schedulers running. See you first Log above, it shows loading the dagbag twice (or even more often).

Can you try your Dag withOUT "-n 5" and see what it does? (Sorry that didn't occur to me earlier)

@syvineckruyk
Copy link
Contributor

@bolkedebruin sorry not sure what you mean there by running the DAG with -n 5 the scheduler is already being run that way as mentioned above.

@bolkedebruin
Copy link
Contributor Author

bolkedebruin commented May 3, 2016

Sorry: without that parameter (and make sure no scheduler processes are running before you start)

@syvineckruyk
Copy link
Contributor

ok doing that ... ill need to run many days again ... as you saw I ran both dags for a bunch of days before the issue occured.

@bolkedebruin
Copy link
Contributor Author

Got it. Thanks I really hope we can nail this down.

@bolkedebruin
Copy link
Contributor Author

@syvineckruyk I have run your Dags without "-n 5" for the scheduler and now the "override" issues are gone. Some of the tasks are still stuck in "queued" status and @jlowin has told me he was working on that too, I am looking at integrating what he as done in a smart way to solve this remaining thing

dag1

dag2

@syvineckruyk
Copy link
Contributor

@bolkedebruin i'm testing your version of the branch from yesterday afternoon (with the task dependencies). Let me know if I should be testing on another branch or if the changes will be applied to yours.

Thanks

@syvineckruyk
Copy link
Contributor

@bolkedebruin I have one dag (an hourly dag) reporting the following ... Do you know how I could resolve this ?

[2016-05-04 17:10:07,277] {jobs.py:746} ERROR - Instance <TaskInstance at 0x7fef3d98c6d0> is not bound to a Session; attribute refresh operation cannot proceed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/src/airflow/airflow/jobs.py", line 743, in _do_dags
    self.process_dag(dag, tis_out)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/src/airflow/airflow/jobs.py", line 538, in process_dag
    active_dag_runs = dag.get_active_runs()
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/src/airflow/airflow/models.py", line 2771, in get_active_runs
    for ti in task_instances
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/src/airflow/airflow/models.py", line 2772, in <genexpr>
    if ti.state in State.unfinished()))
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/attributes.py", line 237, in __get__
    return self.impl.get(instance_state(instance), dict_)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/attributes.py", line 578, in get
    value = state._load_expired(state, passive)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/state.py", line 474, in _load_expired
    self.manager.deferred_scalar_loader(self, toload)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 610, in load_scalar_attributes
    (state_str(state)))
DetachedInstanceError: Instance <TaskInstance at 0x7fef3d98c6d0> is not bound to a Session; attribute refresh operation cannot proceed

@bolkedebruin
Copy link
Contributor Author

Is suspect this can be to premature db session closing. You could try increasing the amount of session to your db that you allow or to allow sessions to stay longer

@bolkedebruin
Copy link
Contributor Author

bolkedebruin commented May 5, 2016

@syvineckruyk you could try the current status of this pr for your dags. I merged @jlowin bits and adjusted them slightly.

Travis tests need to be fixed still but that is for later

@syvineckruyk
Copy link
Contributor

@bolkedebruin ill be able to try it over the weekend ... unfortunately with the issues around subdags, and the growing airflow user base at our office ...I had to revert to avoiding them ...

I am planning on bringing them back as soon as the issues are resolved ... I'll be setting up a separate environment to test.

I have deployed 1.7.1rc3 at work ... However I am experiencing an issue which has been reported .... but am not too sure has been resolved ... I have tasks (not subdags) getting queued but never getting picked up. I have even tried removing all our pools but it did not help... Any ideas what version I could run to calm things down while I continue testing subdags and dev branches on the side ?

@bolkedebruin
Copy link
Contributor Author

This Pr should solve all those issues. Including the queued tasks thing (NOT the oversubscribing).

We are working hard to a 1.7.1 release the should including the fix for queued tasks not being picked up.

Keep in touch over the weekend. I'll work with you to iron out any issues we can find are can be considered blockers.

@syvineckruyk
Copy link
Contributor

@bolkedebruin I will test this most likely today when I get a new environment setup ... I noticed that you opened a blocking JIRA regarding the queues ... I did not notice this particular issue on your branch in the past ... can we confirm that this is present in rc3 but not the branch from this PR ?

I had to go back to rc3 since I was running branches with cherry picked commits ... I don't mind testing with the issues ... but I was exposing them to our devs and as we have been facing issues with SDOs for sometime did not want to discourage or weaken the adoption of Airflow that we have been advocating for at my company.

So for our stage and prod environments I will now try to avoid going rogue and having custom branches ... however RCs should be fine.

So currently I have moved the environments to rc3 ... the queued task issue is currently an annoyance but not a show stopper ... other things seem to be working better. Just wondering what others are doing in prod ... Would I be better off reverting back to 1.7.0 for the time being ? Curious to know your thoughts.

Thanks

@bolkedebruin
Copy link
Contributor Author

@syvineckruyk Yes it is present in rc3, and not in this PR. I wouldn't use this PR for sure as it incurs database changes that might not end up in master.

So for you I would definitely maintain a known state. If that is 1.7.0 you could use that, but 1.7.1 will mostly contain bugfixes. I hope we will fix the remaining blockers over the weekend and then we should have a pretty solid release with known issues but those can be worked with.

If you were on 1.6 and were happy with that for the time being then you could also use that for now. 1.7.0 really has some issues that I don't like.

Are you on the dev list? If you can weigh in on the discussion on the scheduler and on what works for you from this PR that would be nice. As this PR touches the core of the scheduler there are some strong feelings about it :)

@syvineckruyk
Copy link
Contributor

thanks @bolkedebruin going back to 1.6.2 is not an option. Biggest issues for us on that version was the unlimited retries within SDOs ... and ShortCircuitOperator not functioning so well ... especially not marking downstream tasks as skipped and keeping the DagRun Running caused a bunch of issues.

Outside of SDOs my biggest pain point with rc3 is the queue which i can live with for now ...

I am indeed now on the dev list. Thanks for all your help you guys have been really helpful. Will let you know once I am running your branch ... Im fine with manipulating the Alembic migrations ... been doing it for a while now .. but will no longer be doing it on any of our official environments.

I'll be in touch soon with the testing results of this branch.

Thanks

@asfgit asfgit merged commit dddfd3b into apache:master May 10, 2016
@bolkedebruin bolkedebruin removed their assignment May 10, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants