Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarify how schedule_interval works #221

Closed
maraca opened this issue Aug 5, 2015 · 15 comments
Closed

Clarify how schedule_interval works #221

maraca opened this issue Aug 5, 2015 · 15 comments
Labels
kind:bug This is a clearly a bug

Comments

@maraca
Copy link

maraca commented Aug 5, 2015

Hi,

This is a dummy example that consists of 4 tasks, back to back, all attached to the same DAG events_redshift.
I've set schedule_interval to 1 for now, as I am trying to see this executed, but that's not a real life example.
This is running the CeleryExecutor and Postgresql.

"""
Extracts events from S3 and loads them into Redshift.
"""

from airflow import DAG 
from airflow.operators import DummyOperator
from datetime import datetime
from datetime import timedelta


default_args = { 
    'owner': 'airflow',
    'start_date': datetime(2015, 8, 5, 8, 4), 
    'schedule_interval': timedelta(minutes=1),
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('events_redshift', default_args=default_args)

t_download_from_s3 = DummyOperator(
    task_id='download_from_s3',
    dag=dag,
)

t_cleanup = DummyOperator(
    task_id='cleanup',
    dag=dag,
)

t_upload_to_s3 = DummyOperator(
    task_id='upload_to_s3',
    dag=dag,
)

t_load_to_redshift = DummyOperator(
    task_id='load_to_redshift',
    dag=dag,
)

t_cleanup.set_upstream(t_download_from_s3)
t_upload_to_s3.set_upstream(t_cleanup)
t_load_to_redshift.set_upstream(t_upload_to_s3)

I can see the DAG on the web UI, however the only way to get it to execute the tasks is by clicking on it and Run manually, as you can see with download_from_s3.

87ba

This is the celery worker:
73dc

And the scheduler's output, refreshing every 5 seconds.

6a36

My expectations are that this should be running every minute, and each task should be executed back to back, however none of this is happening.

So I guess my question is: do I have the wrong expectation, and what am I doing wrong?

Thanks a lot for your help!

@mistercrunch
Copy link
Member

Thanks for all the details, it should make it much easier to zero in on what's going on (actually, what is NOT going on...).

Oh and your expectations are right, though typically Airflow isn't used to do every minute-type batch jobs, but I understand you did this for testing purposes.

The schedule_interval is really just how often the DAG should run and it's generally bad to change this around because if you go from minutes to hours, or to days it will add the schedule_interval to the last run and things might not line up across tasks. If task A is at 00:33 and task b is at 00:35 when you switch to a daily job, the dependencies will not line up in time when adding a day to both. Anyhow. For testing purpose every minute should work, though daily should work just as well. Just make sure to clear the history and set your start_date to where you want it as you change the schedule.

So it looks like your scheduler is aware of your DAG, and the worker too. First thing I would do would be to look into the log file on the worker (if you cannot see). Was that job triggered by the scheduler or the UI "run" button?

Is the scheduler emitting messages? Are the workers picking them up? If so it may be that the worker and the scheduler are pointing to different metadata databases, make sure the configurations are in sync.

@maraca
Copy link
Author

maraca commented Aug 5, 2015

Thanks for the quick response @mistercrunch. The job was definitely triggered by the UI run button.

The scheduler isn't emitting any messages other than the one below, every 5 seconds.

2015-08-05 10:36:02,680 - root - INFO - Importing ./dags/events.py
2015-08-05 10:36:02,685 - root - INFO - Loaded DAG <DAG: events_redshift>
2015-08-05 10:36:02,690 - root - INFO - Getting latest instance for all task in dag events_redshift

You are right - it looks to me like the scheduler and the workers are talking passed each other. But at the same time I am not seeing any action from the scheduler :\

@maraca
Copy link
Author

maraca commented Aug 5, 2015

Oh - probably also worth nothing that when I click the UI run button, the job type was marked as LocalTaskJob - which I assume is expected since it's not "scheduled" per say - but wanted to double check with you.

fc9e

@mistercrunch
Copy link
Member

The job view can be misleading, it look fine to me. You can disregard it for now.

The scheduler should be emitting logs saying "First run for ..." or "Queuing next run:"

From the context where you run the scheduler, can you issue the command:

airflow list_tasks events_redshift

I'd also wipe out the task_instance table if there's nothing in there you care about. You can do it straight in the db, or from the UI or airflow clear command

@maraca
Copy link
Author

maraca commented Aug 5, 2015

This is the outcome of the command.

$ airflow list_tasks events_redshift
cleanup
download_from_s3
load_to_redshift
upload_to_s3

Full output:

2015-08-05 12:11:14,375 - root - INFO - Importing ./dags/events.py
2015-08-05 12:11:14,381 - root - INFO - Loaded DAG <DAG: events_redshift>
2015-08-05 12:11:14,385 - root - INFO - Getting latest instance for all task in dag events_redshift
2015-08-05 12:11:19,376 - root - INFO - Importing ./dags/events.py
2015-08-05 12:11:19,381 - root - INFO - Loaded DAG <DAG: events_redshift>
2015-08-05 12:11:19,387 - root - INFO - Getting latest instance for all task in dag events_redshift
^C2015-08-05 12:11:19,580 - root - ERROR - SIGINT (ctrl-c) received
(env)martin@tipunch:~/cotap/coflow$ airflow list_tasks events_redshift
cleanup
download_from_s3
load_to_redshift
upload_to_s3
(env)martin@tipunch:~/cotap/coflow$ kitchen login
Welcome to Ubuntu 12.04.4 LTS (GNU/Linux 3.11.0-15-generic x86_64)

 * Documentation:  https://help.ubuntu.com/
New release '14.04.2 LTS' available.
Run 'do-release-upgrade' to upgrade to it.

Last login: Wed Aug  5 14:40:49 2015 from 10.0.2.2
vagrant@default-ubuntu-1204:~$ sudo su postgres
postgres@default-ubuntu-1204:/home/vagrant$ psql airbox
psql (9.2.10)
Type "help" for help.

airbox=# TRUNCATE TABLE task_instance;
TRUNCATE TABLE
airbox=# \q
postgres@default-ubuntu-1204:/home/vagrant$ exit
vagrant@default-ubuntu-1204:~$ logout
Connection to 127.0.0.1 closed.
(env)martin@tipunch:~/cotap/coflow$ airflow scheduler 
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/

2015-08-05 12:13:22,704 - root - INFO - Starting the scheduler
2015-08-05 12:13:22,705 - root - INFO - Filling up the DagBag from ./dags
2015-08-05 12:13:22,705 - root - INFO - Importing ./dags/events.py
2015-08-05 12:13:22,709 - root - INFO - Loaded DAG <DAG: events_redshift>
2015-08-05 12:13:22,722 - root - INFO - Importing ./dags/events.py
2015-08-05 12:13:22,727 - root - INFO - Loaded DAG <DAG: events_redshift>
2015-08-05 12:13:22,732 - root - INFO - Getting latest instance for all task in dag events_redshift
2015-08-05 12:13:27,690 - root - INFO - Importing ./dags/events.py
2015-08-05 12:13:27,692 - root - INFO - Loaded DAG <DAG: events_redshift>

Still no luck, but thanks again for your help :)

@maraca
Copy link
Author

maraca commented Aug 5, 2015

Here is a gist of the config I am using too https://gist.github.com/maraca/6fb10e135cadbb0b65bd

@mistercrunch
Copy link
Member

A bit of a wild guess here, but can you try changing your config to non-relative folders? ~ for home is supported, but go for full path if you can.

@mistercrunch
Copy link
Member

Also, can you throw a file airflow_local_settings.py somewhere in your PYTHONPATH, and the file only needs these two lines:

import logging
LOGGING_LEVEL = logging.DEBUG

I think you'll get a bit more logging events on the scheduler which may help

@mistercrunch
Copy link
Member

Also please share the output of airflow version

@mistercrunch
Copy link
Member

We'll get to the bottom of this!

@maraca
Copy link
Author

maraca commented Aug 6, 2015

@mistercrunch sorry for the delayed responsed I was traveling last night.

The following is not going to be very useful as I can not for the life of me figure out why it started working. Here is what I did.

  • Updated the airflow.cfg to fullpath -> didn't work
  • Added the folder running the code to the $PYTHONPATH (which was empty) and enabled debug logging
  • Cleared the DB, flushed all keys in Redis, reloaded my bashrc and started all processes again and it started working...

I attempted reverting all those steps to see where it breaks and although I am back to where I was yesterday config wise, the scheduler is still scheduling tasks, and the worker is still picking them up.

$ airflow version
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   v1.3.0
2015-08-06 08:16:09,582 - root - INFO - Starting the scheduler
2015-08-06 08:16:09,582 - root - INFO - Filling up the DagBag from ./dags
2015-08-06 08:16:09,582 - root - INFO - Importing ./dags/events.py
2015-08-06 08:16:09,591 - root - INFO - Loaded DAG <DAG: events_redshift>
2015-08-06 08:16:09,604 - root - INFO - Importing ./dags/events.py
2015-08-06 08:16:09,606 - root - INFO - Loaded DAG <DAG: events_redshift>
2015-08-06 08:16:09,609 - root - INFO - Getting latest instance for all task in dag events_redshift
2015-08-06 08:16:09,617 - root - INFO - First run for <TaskInstance: events_redshift.download_from_s3 2015-08-05 08:04:00 [None]>
2015-08-06 08:16:09,617 - root - INFO - Adding to queue: airflow run events_redshift download_from_s3 2015-08-05T08:04:00   --local     -sd DAGS_FOLDER/events.py  
2015-08-06 08:16:14,566 - root - INFO - Importing ./dags/events.py
2015-08-06 08:16:14,572 - root - INFO - Loaded DAG <DAG: events_redshift>
2015-08-06 08:16:19,565 - root - INFO - Importing ./dags/events.py
2015-08-06 08:16:19,568 - root - INFO - Loaded DAG <DAG: events_redshift>
2015-08-06 08:16:19,572 - root - INFO - Getting latest instance for all task in dag events_redshift
2015-08-06 08:16:19,590 - root - INFO - First run for <TaskInstance: events_redshift.cleanup 2015-08-05 08:04:00 [None]>
2015-08-06 08:16:19,590 - root - INFO - Adding to queue: airflow run events_redshift cleanup 2015-08-05T08:04:00   --local     -sd DAGS_FOLDER/events.py
2015-08-06 08:16:24,573 - root - INFO - Importing ./dags/events.py
2015-08-06 08:16:24,577 - root - INFO - Loaded DAG <DAG: events_redshift>
2015-08-06 08:16:24,582 - root - INFO - Getting latest instance for all task in dag events_redshift
2015-08-06 08:16:24,606 - root - INFO - First run for <TaskInstance: events_redshift.upload_to_s3 2015-08-05 08:04:00 [None]>
2015-08-06 08:16:24,606 - root - INFO - Adding to queue: airflow run events_redshift upload_to_s3 2015-08-05T08:04:00   --local     -sd DAGS_FOLDER/events.py
2015-08-06 08:16:29,573 - root - INFO - Importing ./dags/events.py
2015-08-06 08:16:29,577 - root - INFO - Loaded DAG <DAG: events_redshift>
2015-08-06 08:16:34,574 - root - INFO - Importing ./dags/events.py
2015-08-06 08:16:34,580 - root - INFO - Loaded DAG <DAG: events_redshift>
2015-08-06 08:16:34,585 - root - INFO - Getting latest instance for all task in dag events_redshift
2015-08-06 08:16:34,612 - root - INFO - First run for <TaskInstance: events_redshift.load_to_redshift 2015-08-05 08:04:00 [None]>
2015-08-06 08:16:34,612 - root - INFO - Adding to queue: airflow run events_redshift load_to_redshift 2015-08-05T08:04:00   --local     -sd DAGS_FOLDER/events.py

This is great news because it now works, however I am not able to identified what I did to possibly get this to run now :(
I will keep investigating and get back to you asap!

@maraca
Copy link
Author

maraca commented Aug 6, 2015

I think I foud the culprit ...

-    'start_date': datetime(2015, 8, 6, 8, 4),
+    'start_date': datetime(2015, 8, 5, 8, 4),

If I put the start date to Today, it doesn't schedule the job. If I put it at yesterday's date it works.

>>> from datetime import datetime
>>> start_date = datetime(2015, 8, 6, 8, 4) # 2015-08-06 08:04am
>>> (datetime.now() - start_date).total_seconds()
2003.481128

What's strange is that the start_date is not in the future (was thinking maybe it had to do with time zones or something.

In any case, having a start_date one day prior to Today's job seems to fix the issue!

@mistercrunch
Copy link
Member

Oh. I should have caught this earlier but the issue is your DAG is actually a daily dag since at the moment schedule_interval is based on the argument you pass to the DAG object as in

dag = DAG("my_dag_id", schedule_interval=timedelta(hours=1))

I need to clarify that in the docs / API.

@mistercrunch mistercrunch changed the title DAG not being scheduled. Clarify how schedule_interval works Aug 7, 2015
@mistercrunch mistercrunch added the kind:bug This is a clearly a bug label Aug 7, 2015
@guang
Copy link

guang commented Aug 7, 2015

thanks for the clarification @mistercrunch! had same issues and moving the schedule_interval from default_args into DAG() solved it nicely

@mistercrunch
Copy link
Member

Updated the tutorials / docs here: #238

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

3 participants