-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Description
Consider a task with schedule_interval=timedelta(hours=1) and start_date=datetime(2015,12,1,0,0,0).
| # | task |
|---|---|
| 1 | 2015-12-01 00:00:00 |
| 2 | 2015-12-01 01:00:00 |
| 3 | 2015-12-01 02:00:00 |
| 4 | 2015-12-01 03:00:00 |
| n | ... |
On the example above, the task 1 covers a data extraction for 2015-12-01 00:00:00 ~ 2015-12-01 00:59:59, the task 2 for 2015-12-01 01:00:00 ~ 2015-12-01 01:59:59, and so on...
A lot of our python scripts are parameterized with 'initial' and 'end' datetimes.
Nowdays, in Airflow, is very tricky to pass these parameters to the tasks, once the execution_date is only available within the jinja templates (is that right?), unlike the schedule_interval, which is a dag level parameter and it's available directly on both scenarios (the python code and the jinja templates).
So, if I want to run a python script that takes date_ini and date_end as parameters, I need to code a BashOperator task with export execution_date={{ execution_date }}; export schedule_interval={{ dag.schedule_interval }}; python path/to/python_script.py. Then in my python script I use os.getenv() to retrieve the values and sum the schedule_interval with the execution_date.
Searching the docs I couldn't find none simple way to access the vars available in jinja from my python scripts.
I feel uncomfortable getting stuck on BashOperator with this mess of code. Would be really great if we kept all the things in python.
Anyone had a similar use case? Do you know a better way to achieve the solution to this problem.