Skip to content

Commit

Permalink
Remove NoSchedule and IntervalSchedule
Browse files Browse the repository at this point in the history
Closes #321
  • Loading branch information
jlowin committed Nov 4, 2018
1 parent 1e1f6f1 commit e149892
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 149 deletions.
2 changes: 0 additions & 2 deletions docs/generate_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@
"page": "schedules.md",
"classes": [
prefect.schedules.Schedule,
prefect.schedules.NoSchedule,
prefect.schedules.IntervalSchedule,
prefect.schedules.CronSchedule,
prefect.schedules.DateSchedule,
],
"title": "Schedules",
},
Expand Down
5 changes: 2 additions & 3 deletions src/prefect/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def my_task():
- name (str, optional): The name of the flow
- version (str, optional): The flow's version
- project (str, optional): The flow's project
- schedule (prefect.schedules.Schedule, optional): A schedule used to
represent when the flow should run
- schedule (prefect.schedules.Schedule, optional): A default schedule for the flow
- description (str, optional): Descriptive information about the flow
- environment (prefect.environments.Environment, optional): The environment
type that the flow should be run in
Expand Down Expand Up @@ -140,7 +139,7 @@ def __init__(
self.version = version or prefect.config.flows.default_version # type: ignore
self.project = project or prefect.config.flows.default_project # type: ignore
self.description = description or None
self.schedule = schedule or prefect.schedules.NoSchedule()
self.schedule = schedule
self.environment = environment

self.tasks = set() # type: Set[Task]
Expand Down
47 changes: 0 additions & 47 deletions src/prefect/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,6 @@ def serialize(self):
return ScheduleSchema().dump(self)


class NoSchedule(Schedule):
"""
No schedule; this Flow will only run on demand.
"""

def next(self, n: int, on_or_after: datetime = None) -> List[datetime]:
"""
Retrieve next scheduled dates.
Args:
- n (int): the number of future scheduled dates to return
- on_or_after (datetime, optional): date to begin returning from
Returns:
- list: list of next scheduled dates; in this case, always the empty list
"""
return []


class IntervalSchedule(Schedule):
"""
A schedule formed by adding `timedelta` increments to a start_date.
Expand Down Expand Up @@ -125,31 +106,3 @@ def next(self, n: int, on_or_after: datetime = None) -> List[datetime]:

cron = croniter.croniter(self.cron, on_or_after)
return list(itertools.islice(cron.all_next(datetime), n))


class DateSchedule(Schedule):
"""
Schedule for running on a manually created list of dates.
Args:
- dates ([datetime]): a list of datetimes to run on
"""

def __init__(self, dates: Iterable[datetime]) -> None:
self.dates = dates

def next(self, n: int, on_or_after: datetime = None) -> List[datetime]:
"""
Retrieve next scheduled dates.
Args:
- n (int): the number of future scheduled dates to return
- on_or_after (datetime, optional): date to begin returning from
Returns:
- list: list of next scheduled dates
"""
if on_or_after is None:
on_or_after = datetime.utcnow()
dates = sorted([d for d in self.dates if d >= on_or_after])
return dates[:n]
2 changes: 1 addition & 1 deletion src/prefect/serialization/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Meta:
version = fields.String(allow_none=True)
description = fields.String(allow_none=True)
type = fields.Function(lambda flow: to_qualified_name(type(flow)), lambda x: x)
schedule = fields.Nested(ScheduleSchema)
schedule = fields.Nested(ScheduleSchema, allow_none=True)
environment = JSONField(allow_none=True)
environment_key = fields.String(allow_none=True)
parameters = NestedField(
Expand Down
16 changes: 0 additions & 16 deletions src/prefect/serialization/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@
)


@version("0.3.3")
class NoScheduleSchema(VersionedSchema):
class Meta:
object_class = prefect.schedules.NoSchedule


@version("0.3.3")
class IntervalScheduleSchema(VersionedSchema):
class Meta:
Expand All @@ -33,23 +27,13 @@ class Meta:
cron = fields.String(required=True)


@version("0.3.3")
class DateScheduleSchema(VersionedSchema):
class Meta:
object_class = prefect.schedules.DateSchedule

dates = fields.List(fields.DateTime(), required=True)


class ScheduleSchema(OneOfSchema):
"""
Field that chooses between several nested schemas
"""

# map class name to schema
type_schemas = {
"NoSchedule": NoScheduleSchema,
"IntervalSchedule": IntervalScheduleSchema,
"CronSchedule": CronScheduleSchema,
"DateSchedule": DateScheduleSchema,
}
11 changes: 8 additions & 3 deletions tests/core/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_create_flow_with_description(self):

def test_create_flow_with_schedule(self):
f1 = Flow()
assert isinstance(f1.schedule, prefect.schedules.NoSchedule)
assert f1.schedule is None

cron = prefect.schedules.CronSchedule("* * * * *")
f2 = Flow(schedule=cron)
Expand Down Expand Up @@ -1170,7 +1170,12 @@ def test_serialization(self):
def test_deserialization(self):
p1, t2, t3, = Parameter("1"), Task("2"), Task("3")

f = Flow(name="hi", version="2", tasks=[p1, t2, t3])
f = Flow(
name="hi",
version="2",
tasks=[p1, t2, t3],
schedule=prefect.schedules.CronSchedule("* * 0 0 0"),
)
f.add_edge(p1, t2)
f.add_edge(p1, t3)

Expand All @@ -1183,4 +1188,4 @@ def test_deserialization(self):
assert {t.name for t in f2.reference_tasks()} == {"2", "3"}
assert f2.name == f.name
assert f2.version == f.version
assert isinstance(f2.schedule, prefect.schedules.NoSchedule)
assert isinstance(f2.schedule, prefect.schedules.CronSchedule)
33 changes: 0 additions & 33 deletions tests/serialization/test_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ def cron_schedule():
return schedules.CronSchedule(cron="0 0 * * *")


@pytest.fixture()
def date_schedule():
return schedules.DateSchedule(
dates=[datetime.datetime(2020, 1, 1), datetime.datetime(2021, 1, 1)]
)


def test_all_schedules_have_serialization_schemas():
"""
Tests that all Schedule subclasses in prefect.schedules have corresponding schemas
Expand Down Expand Up @@ -67,24 +60,6 @@ def test_deserialize_bad_type_fails():
schemas.ScheduleSchema().load({"type": "BadSchedule"})


def test_no_schedule_serialization():
schedule = schedules.NoSchedule()
serialized = schemas.ScheduleSchema().dump(schedule)
assert serialized == {"type": "NoSchedule", "__version__": __version__}


def test_no_schedule_deserialization():
schedule = schedules.NoSchedule()
serialized = schemas.ScheduleSchema().dump(schedule)
deserialized = schemas.ScheduleSchema().load(serialized)
assert isinstance(deserialized, schedules.NoSchedule)


def test_serialize_no_schedule():
schema = schemas.NoScheduleSchema()
assert schema.dump(schedules.NoSchedule()) == {"__version__": __version__}


def test_serialize_cron_schedule(cron_schedule):
schema = schemas.CronScheduleSchema()
assert schema.dump(cron_schedule) == {
Expand All @@ -100,11 +75,3 @@ def test_serialize_interval_schedule(interval_schedule):
"interval": interval_schedule.interval.total_seconds(),
"__version__": __version__,
}


def test_serialize_date_schedule(date_schedule):
schema = schemas.DateScheduleSchema()
assert schema.dump(date_schedule) == {
"dates": [d.isoformat() + "+00:00" for d in date_schedule.dates],
"__version__": __version__,
}
44 changes: 0 additions & 44 deletions tests/test_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,6 @@ def test_base_schedule_next_no_implemented():
s.next(1)


def test_create_no_schedule():
assert schedules.NoSchedule()


def test_no_schedule_returns_no_dates():
s = schedules.NoSchedule()
assert s.next(1) == []


def test_create_interval_schedule():
assert schedules.IntervalSchedule(start_date=START_DATE, interval=timedelta(days=1))

Expand Down Expand Up @@ -80,34 +71,7 @@ def test_cron_schedule_next_n_with_on_or_after_argument():
assert s.next(3, on_or_after=TODAY + timedelta(days=4)) == DATES[3:]


def test_create_date_schedule():
s = schedules.DateSchedule(DATES)
assert s.dates == DATES


def test_date_schedule_next_n():
s = schedules.DateSchedule(DATES)
assert s.next(3) == DATES[:3]


def test_date_schedule_next_n_with_on_or_after_argument():
s = schedules.DateSchedule(DATES)
assert s.next(3, on_or_after=TODAY + timedelta(days=4)) == DATES[3:]


def test_date_schedule_after_last_date_returns_empty_list():
s = schedules.DateSchedule(DATES)
assert s.next(3, on_or_after=TODAY + timedelta(days=100)) == []


class TestSerialization:
def test_serialize_no_schedule(self):
schedule = schedules.NoSchedule()
assert schedule.serialize() == {
"type": "NoSchedule",
"__version__": __version__,
}

def test_serialize_cron_schedule(self):
schedule = schedules.CronSchedule("0 0 * * *")
assert schedule.serialize() == {
Expand All @@ -126,11 +90,3 @@ def test_serialize_interval_schedule(self):
"interval": 3600,
"__version__": __version__,
}

def test_serialize_date_schedule(self):
schedule = schedules.DateSchedule(dates=DATES)
assert schedule.serialize() == {
"type": "DateSchedule",
"dates": [d.isoformat() + "+00:00" for d in DATES],
"__version__": __version__,
}

0 comments on commit e149892

Please sign in to comment.