Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove NoSchedule and DateSchedule #324

Merged
merged 4 commits into from
Nov 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

### Breaking Changes

- Remove `timestamp` property from `State` classes - [#305](https://github.com/PrefectHQ/prefect/pull/305)
- Remove `NoSchedule` and `DateSchedule` schedule classes - [#324](https://github.com/PrefectHQ/prefect/pull/324)
- Change `serialize()` method to use schemas rather than custom dict - [#318](https://github.com/PrefectHQ/prefect/pull/318)
- Remove `timestamp` property from `State` classes - [#305](https://github.com/PrefectHQ/prefect/pull/305)

## 0.3.3 <Badge text="alpha" type="warn"/>

Expand Down
2 changes: 0 additions & 2 deletions docs/generate_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@
"page": "schedules.md",
"classes": [
prefect.schedules.Schedule,
prefect.schedules.NoSchedule,
prefect.schedules.IntervalSchedule,
prefect.schedules.CronSchedule,
prefect.schedules.DateSchedule,
],
"title": "Schedules",
},
Expand Down
5 changes: 2 additions & 3 deletions src/prefect/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ def my_task():
- name (str, optional): The name of the flow
- version (str, optional): The flow's version
- project (str, optional): The flow's project
- schedule (prefect.schedules.Schedule, optional): A schedule used to
represent when the flow should run
- schedule (prefect.schedules.Schedule, optional): A default schedule for the flow
- description (str, optional): Descriptive information about the flow
- environment (prefect.environments.Environment, optional): The environment
type that the flow should be run in
Expand Down Expand Up @@ -146,7 +145,7 @@ def __init__(
self.version = version or prefect.config.flows.default_version # type: ignore
self.project = project or prefect.config.flows.default_project # type: ignore
self.description = description or None
self.schedule = schedule or prefect.schedules.NoSchedule()
self.schedule = schedule
self.environment = environment

self.tasks = set() # type: Set[Task]
Expand Down
47 changes: 0 additions & 47 deletions src/prefect/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,6 @@ def serialize(self) -> tuple:
return ScheduleSchema().dump(self)


class NoSchedule(Schedule):
"""
No schedule; this Flow will only run on demand.
"""

def next(self, n: int, on_or_after: datetime = None) -> List[datetime]:
"""
Retrieve next scheduled dates.

Args:
- n (int): the number of future scheduled dates to return
- on_or_after (datetime, optional): date to begin returning from

Returns:
- list: list of next scheduled dates; in this case, always the empty list
"""
return []


class IntervalSchedule(Schedule):
"""
A schedule formed by adding `timedelta` increments to a start_date.
Expand Down Expand Up @@ -125,31 +106,3 @@ def next(self, n: int, on_or_after: datetime = None) -> List[datetime]:

cron = croniter.croniter(self.cron, on_or_after)
return list(itertools.islice(cron.all_next(datetime), n))


class DateSchedule(Schedule):
"""
Schedule for running on a manually created list of dates.

Args:
- dates ([datetime]): a list of datetimes to run on
"""

def __init__(self, dates: Iterable[datetime]) -> None:
self.dates = dates

def next(self, n: int, on_or_after: datetime = None) -> List[datetime]:
"""
Retrieve next scheduled dates.

Args:
- n (int): the number of future scheduled dates to return
- on_or_after (datetime, optional): date to begin returning from

Returns:
- list: list of next scheduled dates
"""
if on_or_after is None:
on_or_after = datetime.utcnow()
dates = sorted([d for d in self.dates if d >= on_or_after])
return dates[:n]
2 changes: 1 addition & 1 deletion src/prefect/serialization/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Meta:
version = fields.String(allow_none=True)
description = fields.String(allow_none=True)
type = fields.Function(lambda flow: to_qualified_name(type(flow)), lambda x: x)
schedule = fields.Nested(ScheduleSchema)
schedule = fields.Nested(ScheduleSchema, allow_none=True)
environment = JSONField(allow_none=True)
environment_key = fields.String(allow_none=True)
parameters = NestedField(
Expand Down
16 changes: 0 additions & 16 deletions src/prefect/serialization/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@
from prefect.utilities.serialization import VersionedSchema, version, to_qualified_name


@version("0.3.3")
class NoScheduleSchema(VersionedSchema):
class Meta:
object_class = prefect.schedules.NoSchedule


@version("0.3.3")
class IntervalScheduleSchema(VersionedSchema):
class Meta:
Expand All @@ -29,23 +23,13 @@ class Meta:
cron = fields.String(required=True)


@version("0.3.3")
class DateScheduleSchema(VersionedSchema):
class Meta:
object_class = prefect.schedules.DateSchedule

dates = fields.List(fields.DateTime(), required=True)


class ScheduleSchema(OneOfSchema):
"""
Field that chooses between several nested schemas
"""

# map class name to schema
type_schemas = {
"NoSchedule": NoScheduleSchema,
"IntervalSchedule": IntervalScheduleSchema,
"CronSchedule": CronScheduleSchema,
"DateSchedule": DateScheduleSchema,
}
11 changes: 8 additions & 3 deletions tests/core/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_create_flow_with_description(self):

def test_create_flow_with_schedule(self):
f1 = Flow()
assert isinstance(f1.schedule, prefect.schedules.NoSchedule)
assert f1.schedule is None

cron = prefect.schedules.CronSchedule("* * * * *")
f2 = Flow(schedule=cron)
Expand Down Expand Up @@ -1194,7 +1194,12 @@ def test_serialization(self):
def test_deserialization(self):
p1, t2, t3, = Parameter("1"), Task("2"), Task("3")

f = Flow(name="hi", version="2", tasks=[p1, t2, t3])
f = Flow(
name="hi",
version="2",
tasks=[p1, t2, t3],
schedule=prefect.schedules.CronSchedule("* * 0 0 0"),
)
f.add_edge(p1, t2)
f.add_edge(p1, t3)

Expand All @@ -1207,7 +1212,7 @@ def test_deserialization(self):
assert {t.name for t in f2.reference_tasks()} == {"2", "3"}
assert f2.name == f.name
assert f2.version == f.version
assert isinstance(f2.schedule, prefect.schedules.NoSchedule)
assert isinstance(f2.schedule, prefect.schedules.CronSchedule)

def test_serialize_validates_invalid_flows(self):
t1, t2 = Task(), Task()
Expand Down
33 changes: 0 additions & 33 deletions tests/serialization/test_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ def cron_schedule():
return schedules.CronSchedule(cron="0 0 * * *")


@pytest.fixture()
def date_schedule():
return schedules.DateSchedule(
dates=[datetime.datetime(2020, 1, 1), datetime.datetime(2021, 1, 1)]
)


def test_all_schedules_have_serialization_schemas():
"""
Tests that all Schedule subclasses in prefect.schedules have corresponding schemas
Expand Down Expand Up @@ -67,24 +60,6 @@ def test_deserialize_bad_type_fails():
schemas.ScheduleSchema().load({"type": "BadSchedule"})


def test_no_schedule_serialization():
schedule = schedules.NoSchedule()
serialized = schemas.ScheduleSchema().dump(schedule)
assert serialized == {"type": "NoSchedule", "__version__": __version__}


def test_no_schedule_deserialization():
schedule = schedules.NoSchedule()
serialized = schemas.ScheduleSchema().dump(schedule)
deserialized = schemas.ScheduleSchema().load(serialized)
assert isinstance(deserialized, schedules.NoSchedule)


def test_serialize_no_schedule():
schema = schemas.NoScheduleSchema()
assert schema.dump(schedules.NoSchedule()) == {"__version__": __version__}


def test_serialize_cron_schedule(cron_schedule):
schema = schemas.CronScheduleSchema()
assert schema.dump(cron_schedule) == {
Expand All @@ -100,11 +75,3 @@ def test_serialize_interval_schedule(interval_schedule):
"interval": interval_schedule.interval.total_seconds(),
"__version__": __version__,
}


def test_serialize_date_schedule(date_schedule):
schema = schemas.DateScheduleSchema()
assert schema.dump(date_schedule) == {
"dates": [d.isoformat() + "+00:00" for d in date_schedule.dates],
"__version__": __version__,
}
44 changes: 0 additions & 44 deletions tests/test_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,6 @@ def test_base_schedule_next_no_implemented():
s.next(1)


def test_create_no_schedule():
assert schedules.NoSchedule()


def test_no_schedule_returns_no_dates():
s = schedules.NoSchedule()
assert s.next(1) == []


def test_create_interval_schedule():
assert schedules.IntervalSchedule(start_date=START_DATE, interval=timedelta(days=1))

Expand Down Expand Up @@ -80,34 +71,7 @@ def test_cron_schedule_next_n_with_on_or_after_argument():
assert s.next(3, on_or_after=TODAY + timedelta(days=4)) == DATES[3:]


def test_create_date_schedule():
s = schedules.DateSchedule(DATES)
assert s.dates == DATES


def test_date_schedule_next_n():
s = schedules.DateSchedule(DATES)
assert s.next(3) == DATES[:3]


def test_date_schedule_next_n_with_on_or_after_argument():
s = schedules.DateSchedule(DATES)
assert s.next(3, on_or_after=TODAY + timedelta(days=4)) == DATES[3:]


def test_date_schedule_after_last_date_returns_empty_list():
s = schedules.DateSchedule(DATES)
assert s.next(3, on_or_after=TODAY + timedelta(days=100)) == []


class TestSerialization:
def test_serialize_no_schedule(self):
schedule = schedules.NoSchedule()
assert schedule.serialize() == {
"type": "NoSchedule",
"__version__": __version__,
}

def test_serialize_cron_schedule(self):
schedule = schedules.CronSchedule("0 0 * * *")
assert schedule.serialize() == {
Expand All @@ -126,11 +90,3 @@ def test_serialize_interval_schedule(self):
"interval": 3600,
"__version__": __version__,
}

def test_serialize_date_schedule(self):
schedule = schedules.DateSchedule(dates=DATES)
assert schedule.serialize() == {
"type": "DateSchedule",
"dates": [d.isoformat() + "+00:00" for d in DATES],
"__version__": __version__,
}