Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle scheduled start time in local runs (plus tests) #1418

Merged
merged 10 commits into from
Aug 29, 2019

Conversation

jlowin
Copy link
Member

@jlowin jlowin commented Aug 28, 2019

Thanks for contributing to Prefect!

Please describe your work and make sure your PR:

  • adds new tests (if appropriate)
  • updates CHANGELOG.md (if appropriate)
  • updates docstrings for any new functions or function arguments, including docs/outline.toml for API reference docs (if appropriate)

Note that your PR will not be reviewed unless all three boxes are checked.

What does this PR change?

Properly handles scheduled_start_time for local flow.run() execution, including providing it for any retries.

docs/guide/core_concepts/flows.md Show resolved Hide resolved
src/prefect/core/flow.py Outdated Show resolved Hide resolved
@codecov
Copy link

codecov bot commented Aug 29, 2019

Codecov Report

Merging #1418 into master will decrease coverage by 0.01%.
The diff coverage is 100%.

@cicdw cicdw merged commit 71e053d into master Aug 29, 2019
@cicdw cicdw deleted the scheduled-start-time branch August 29, 2019 04:30
@roveo
Copy link

roveo commented Aug 30, 2019

I think there's something wrong with this. Here's a very simple flow logging it's context:

import prefect
import time
from datetime import timedelta
from pprint import pprint


class Task1(prefect.Task):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def run(self):
        run = prefect.context["scheduled_start_time"].strftime("%Y-%m-%d %H:%M:%S")
        self.logger.warning(pprint(prefect.context.__dict__))
        return 5


@prefect.task
def task2(x):
    pass


schedule = prefect.schedules.CronSchedule("* * * * *")
flow = prefect.Flow("test_flow_name", schedule=schedule)

with flow:
    task2(Task1())


if __name__ == "__main__":
    flow.run()

And here's the output of the first two runs. scheduled_start_time doesn't change between runs.

 ❯ python test_scheduled_start_time.py
[2019-08-30 11:07:17,364] INFO - prefect.Flow: test_flow_name | Waiting for next scheduled run at 2019-08-30T11:08:00+00:00
[2019-08-30 11:08:00,001] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flow_name'
[2019-08-30 11:08:00,002] INFO - prefect.FlowRunner | Starting flow run.
[2019-08-30 11:08:00,014] INFO - prefect.TaskRunner | Task 'Task1': Starting task run...
{'caches': {},
 'checkpointing': False,
 'date': DateTime(2019, 8, 30, 11, 8, 0, 2173, tzinfo=Timezone('UTC')),
 'flow_name': 'test_flow_name',
 'logger': <Logger prefect.Task: Task1 (INFO)>,
 'map_index': None,
 'scheduled_start_time': DateTime(2019, 8, 30, 11, 8, 0, tzinfo=Timezone('UTC')),
 'task_full_name': 'Task1',
 'task_name': 'Task1',
 'task_run_count': 1,
 'task_slug': 'a6bf86d3-2c63-4ad7-b591-0a9c9bd93eeb',
 'task_tags': set(),
 'today': '2019-08-30',
 'today_nodash': '20190830',
 'tomorrow': '2019-08-31',
 'tomorrow_nodash': '20190831',
 'yesterday': '2019-08-29',
 'yesterday_nodash': '20190829'}
[2019-08-30 11:08:00,015] WARNING - prefect.Task: Task1 | None
[2019-08-30 11:08:00,016] INFO - prefect.TaskRunner | Task 'Task1': finished task run for task with final state: 'Success'
[2019-08-30 11:08:00,017] INFO - prefect.TaskRunner | Task 'task2': Starting task run...
[2019-08-30 11:08:00,017] INFO - prefect.TaskRunner | Task 'task2': finished task run for task with final state: 'Success'
[2019-08-30 11:08:00,020] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2019-08-30 11:08:00,028] INFO - prefect.Flow: test_flow_name | Waiting for next scheduled run at 2019-08-30T11:09:00+00:00
[2019-08-30 11:09:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flow_name'
[2019-08-30 11:09:00,003] INFO - prefect.FlowRunner | Starting flow run.
[2019-08-30 11:09:00,015] INFO - prefect.TaskRunner | Task 'Task1': Starting task run...
{'caches': {'Task1': [], 'task2': []},
 'checkpointing': False,
 'date': DateTime(2019, 8, 30, 11, 9, 0, 3252, tzinfo=Timezone('UTC')),
 'flow_name': 'test_flow_name',
 'logger': <Logger prefect.Task: Task1 (INFO)>,
 'map_index': None,
 'scheduled_start_time': DateTime(2019, 8, 30, 11, 8, 0, tzinfo=Timezone('UTC')),
 'task_full_name': 'Task1',
 'task_name': 'Task1',
 'task_run_count': 1,
 'task_slug': 'a6bf86d3-2c63-4ad7-b591-0a9c9bd93eeb',
 'task_tags': set(),
 'today': '2019-08-30',
 'today_nodash': '20190830',
 'tomorrow': '2019-08-31',
 'tomorrow_nodash': '20190831',
 'yesterday': '2019-08-29',
 'yesterday_nodash': '20190829'}
[2019-08-30 11:09:00,023] WARNING - prefect.Task: Task1 | None
[2019-08-30 11:09:00,024] INFO - prefect.TaskRunner | Task 'Task1': finished task run for task with final state: 'Success'
[2019-08-30 11:09:00,024] INFO - prefect.TaskRunner | Task 'task2': Starting task run...
[2019-08-30 11:09:00,025] INFO - prefect.TaskRunner | Task 'task2': finished task run for task with final state: 'Success'
[2019-08-30 11:09:00,025] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

@roveo
Copy link

roveo commented Aug 30, 2019

The version from PyPi works correctly:

❯ python test_scheduled_start_time.py
[2019-08-30 11:26:41,345] INFO - prefect.Flow | Waiting for next scheduled run at 2019-08-30T11:27:00+00:00
[2019-08-30 11:27:00,001] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flow_name'
[2019-08-30 11:27:00,004] INFO - prefect.FlowRunner | Starting flow run.
[2019-08-30 11:27:00,013] INFO - prefect.TaskRunner | Task 'Task1': Starting task run...
{'caches': {},
 'checkpointing': False,
 'date': DateTime(2019, 8, 30, 11, 27, 0, 1302, tzinfo=Timezone('UTC')),
 'flow_name': 'test_flow_name',
 'logger': <Logger prefect.Task (INFO)>,
 'map_index': None,
 'scheduled_start_time': DateTime(2019, 8, 30, 11, 27, 0, 1262, tzinfo=Timezone('UTC')),
 'task_full_name': 'Task1',
 'task_name': 'Task1',
 'task_run_count': 1,
 'task_slug': 'c59d838e-3c75-4a38-a36f-fb01a6a70b40',
 'task_tags': set(),
 'today': '2019-08-30',
 'today_nodash': '20190830',
 'tomorrow': '2019-08-31',
 'tomorrow_nodash': '20190831',
 'yesterday': '2019-08-29',
 'yesterday_nodash': '20190829'}
[2019-08-30 11:27:00,015] WARNING - prefect.Task | None
[2019-08-30 11:27:00,015] INFO - prefect.TaskRunner | Task 'Task1': finished task run for task with final state: 'Success'
[2019-08-30 11:27:00,016] INFO - prefect.TaskRunner | Task 'task2': Starting task run...
[2019-08-30 11:27:00,017] INFO - prefect.TaskRunner | Task 'task2': finished task run for task with final state: 'Success'
[2019-08-30 11:27:00,021] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2019-08-30 11:27:00,022] INFO - prefect.Flow | Waiting for next scheduled run at 2019-08-30T11:28:00+00:00
[2019-08-30 11:28:00,002] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flow_name'
[2019-08-30 11:28:00,005] INFO - prefect.FlowRunner | Starting flow run.
[2019-08-30 11:28:00,013] INFO - prefect.TaskRunner | Task 'Task1': Starting task run...
{'caches': {'Task1': [], 'task2': []},
 'checkpointing': False,
 'date': DateTime(2019, 8, 30, 11, 28, 0, 3289, tzinfo=Timezone('UTC')),
 'flow_name': 'test_flow_name',
 'logger': <Logger prefect.Task (INFO)>,
 'map_index': None,
 'scheduled_start_time': DateTime(2019, 8, 30, 11, 28, 0, 3228, tzinfo=Timezone('UTC')),
 'task_full_name': 'Task1',
 'task_name': 'Task1',
 'task_run_count': 1,
 'task_slug': 'c59d838e-3c75-4a38-a36f-fb01a6a70b40',
 'task_tags': set(),
 'today': '2019-08-30',
 'today_nodash': '20190830',
 'tomorrow': '2019-08-31',
 'tomorrow_nodash': '20190831',
 'yesterday': '2019-08-29',
 'yesterday_nodash': '20190829'}
[2019-08-30 11:28:00,016] WARNING - prefect.Task | None
[2019-08-30 11:28:00,017] INFO - prefect.TaskRunner | Task 'Task1': finished task run for task with final state: 'Success'
[2019-08-30 11:28:00,017] INFO - prefect.TaskRunner | Task 'task2': Starting task run...
[2019-08-30 11:28:00,018] INFO - prefect.TaskRunner | Task 'task2': finished task run for task with final state: 'Success'
[2019-08-30 11:28:00,018] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded```

@jlowin
Copy link
Member Author

jlowin commented Aug 30, 2019

Thanks @roveo, looking into why that's happening now!

zanieb added a commit that referenced this pull request Apr 13, 2022
Allow `temporary_settings` to restore values to their default value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants