Skip to content

Commit

Permalink
Add vtimezone tzinfo implementation (#94)
Browse files Browse the repository at this point in the history
* Add vtimezone tzinfo implementation

* Fix dst test coverage and fix incorrect EDT/EST tests

* Add 100% test coverage for iters

* Improve test coverage for date/time validation rules
  • Loading branch information
allenporter authored Aug 27, 2022
1 parent 0943cbe commit 386c98a
Show file tree
Hide file tree
Showing 11 changed files with 565 additions and 162 deletions.
3 changes: 3 additions & 0 deletions ical/calendar.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ class Calendar(ComponentModel):
prodid: str = Field(default=_PRODID)
version: str = Field(default=_VERSION)

#
# Calendar components
#

events: list[Event] = Field(alias="vevent", default_factory=list)
todos: list[Todo] = Field(alias="vtodo", default_factory=list)
journal: list[Journal] = Field(alias="vjournal", default_factory=list)
Expand Down
11 changes: 7 additions & 4 deletions ical/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Recur,
RequestStatus,
Uri,
validate_until_dtstart,
)
from .util import dtstamp_factory, normalize_datetime, uid_factory

Expand Down Expand Up @@ -181,7 +182,7 @@ def __ge__(self, other: Any) -> bool:
return NotImplemented
return self._tuple() >= other._tuple()

@root_validator
@root_validator(allow_reuse=True)
def validate_date_types(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Validate that start and end values are the same date or datetime type."""
if (
Expand All @@ -192,7 +193,7 @@ def validate_date_types(cls, values: dict[str, Any]) -> dict[str, Any]:
raise ValueError("Expected end value type to match start")
return values

@root_validator
@root_validator(allow_reuse=True)
def validate_datetime_timezone(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Validate that start and end values have the same timezone information."""
if (
Expand All @@ -210,14 +211,14 @@ def validate_datetime_timezone(cls, values: dict[str, Any]) -> dict[str, Any]:
raise ValueError(f"Expected end datetime with timezone but was {dtend}")
return values

@root_validator
@root_validator(allow_reuse=True)
def validate_one_end_or_duration(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Validate that only one of duration or end date may be set."""
if values.get("dtend") and values.get("duration"):
raise ValueError("Only one of dtend or duration may be set." "")
return values

@root_validator
@root_validator(allow_reuse=True)
def validate_duration_unit(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Validate the duration is the appropriate units."""
if not (duration := values.get("duration")):
Expand All @@ -229,3 +230,5 @@ def validate_duration_unit(cls, values: dict[str, Any]) -> dict[str, Any]:
if duration < datetime.timedelta(seconds=0):
raise ValueError(f"Expected duration to be positive but was {duration}")
return values

_validate_until_dtstart = root_validator(allow_reuse=True)(validate_until_dtstart)
130 changes: 130 additions & 0 deletions ical/iter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Library for iterators used in ical.
These iterators are primarily used for implementing recurrence rules where an
object should be returned for a series of date/time, with some modification
based on that date/time. Additionally, it is often necessary to handle multiple
recurrence rules together as a single view of recurring date/times.
"""

from __future__ import annotations

import datetime
import heapq
from collections.abc import Callable, Iterable, Iterator
from typing import TypeVar, Union

T = TypeVar("T")

ItemAdapter = Callable[[Union[datetime.datetime, datetime.date]], T]
"""An adapter for an object in a sorted container (iterator).
The adapter is invoked with the date/time of the current instance and
the callback returns an object at that time (e.g. event with updated time)
"""


class RecurIterator(Iterator[T]):
"""An iterator for a recurrence rule."""

def __init__(
self,
item_cb: ItemAdapter[T],
recur: Iterator[datetime.datetime | datetime.date],
):
"""Initialize the RecurIterator."""
self._item_cb = item_cb
self._recur = recur

def __iter__(self) -> Iterator[T]:
return self

def __next__(self) -> T:
"""Return the next event in the recurrence."""
dtstart: datetime.datetime | datetime.date = next(self._recur)
return self._item_cb(dtstart)


class RecurIterable(Iterable[T]):
"""A series of events from a recurring event.
The inputs are a callback that creates objects at a specific date/time, and an iterable
of all the relevant date/times (typically a dateutil.rrule or dateutil.rruleset).
"""

def __init__(
self,
item_cb: ItemAdapter[T],
recur: Iterable[datetime.datetime | datetime.date],
) -> None:
"""Initialize timeline."""
self._item_cb = item_cb
self._recur = recur

def __iter__(self) -> Iterator[T]:
"""Return an iterator as a traversal over events in chronological order."""
return RecurIterator(self._item_cb, iter(self._recur))


class PeekingIterator(Iterator[T]):
"""An iterator with a preview of the next item.
The primary purpose is to implement a merged iterator where it is needed to
see the next item in the iterator in order to decide which child iterator
to pull from.
"""

def __init__(self, iterator: Iterator[T]):
"""Initialize PeekingIterator."""
self._iterator = iterator
self._next = next(self._iterator, None)

def __iter__(self) -> Iterator[T]:
"""Return this iterator."""
return self

def peek(self) -> T | None:
"""Peek at the next item without consuming."""
return self._next

def __next__(self) -> T:
"""Produce the next item from the merged set."""
result = self._next
self._next = next(self._iterator, None)
if result is None:
raise StopIteration()
return result


class MergedIterator(Iterator[T]):
"""An iterator with a merged sorted view of the underlying sorted iterators."""

def __init__(self, iters: list[Iterator[T]]):
"""Initialize MergedIterator."""
self._iters = [PeekingIterator(iterator) for iterator in iters]

def __iter__(self) -> Iterator[T]:
"""Return this iterator."""
return self

def __next__(self) -> T:
"""Produce the next item from the merged set."""
heap: list[tuple[T, PeekingIterator[T]]] = []
for iterator in self._iters:

This comment has been minimized.

Copy link
@make-github-pseudonymous-again

make-github-pseudonymous-again Sep 23, 2022

This pulls from all iterators on each next. Better would be to initialize the heap with the first item of each iterator on first call, and pop the top element of the heap an push the next element from the popped element's iterator.

This comment has been minimized.

Copy link
@make-github-pseudonymous-again

make-github-pseudonymous-again Sep 23, 2022

So you would need to attach a pointer to the generating iterator to each value pushed to the heap. It could even be just the index of the iterator in self._iters so it can be used as a tie breaker.

This comment has been minimized.

Copy link
@make-github-pseudonymous-again

make-github-pseudonymous-again Sep 23, 2022

Better would be to initialize the heap with the first item of each iterator on first call,

Or on __init__.

This comment has been minimized.

Copy link
@make-github-pseudonymous-again

make-github-pseudonymous-again Sep 23, 2022

Also PeekingIterator will be unnecessary with this implementation.

This comment has been minimized.

Copy link
@allenporter

allenporter Sep 23, 2022

Author Owner

Makes sense to me, I can give that a try.

This comment has been minimized.

Copy link
@allenporter

allenporter Sep 24, 2022

Author Owner

I've sent #119 if you want to give it a review. (I can't tag you)

peekd: T | None = iterator.peek()
if peekd:
heapq.heappush(heap, (peekd, iterator))
if not heap:
raise StopIteration()
(_, iterator) = heapq.heappop(heap)
return next(iterator)


class MergedIterable(Iterable[T]):
"""An iterator that merges results from underlying sorted iterables."""

def __init__(self, iters: list[Iterable[T]]) -> None:
"""Initialize MergedIterable."""
self._iters = iters

def __iter__(self) -> Iterator[T]:
return MergedIterator([iter(it) for it in self._iters])
5 changes: 4 additions & 1 deletion ical/journal.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import logging
from typing import Any, Optional, Union

from pydantic import Field
from pydantic import Field, root_validator

from .parsing.property import ParsedProperty
from .types import (
Expand All @@ -19,6 +19,7 @@
Recur,
RequestStatus,
Uri,
validate_until_dtstart,
)
from .util import dtstamp_factory, normalize_datetime, uid_factory

Expand Down Expand Up @@ -89,3 +90,5 @@ def start(self) -> datetime.datetime | datetime.date:
def start_datetime(self) -> datetime.datetime:
"""Return the events start as a datetime."""
return normalize_datetime(self.start).astimezone(tz=datetime.timezone.utc)

_validate_until_dtstart = root_validator(allow_reuse=True)(validate_until_dtstart)
139 changes: 7 additions & 132 deletions ical/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dateutil import rrule

from .event import Event
from .types import Frequency, Recur, Weekday
from .iter import MergedIterable, RecurIterable
from .util import normalize_datetime

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -126,24 +126,17 @@ def __iter__(self) -> Iterator[Event]:
yield event


class RecurIterator(Iterator[Event]):
class RecurAdapter:
"""An iterator for a recurrence rule."""

def __init__(
self, event: Event, recur: Iterator[datetime.datetime | datetime.date]
):
"""Initialize the RecurIterator."""
def __init__(self, event: Event):
"""Initialize the RecurAdapter."""
self._event = event
self._event_duration = event.computed_duration
self._recur = recur
self._is_all_day = not isinstance(self._event.dtstart, datetime.datetime)

def __iter__(self) -> Iterator[Event]:
return self

def __next__(self) -> Event:
def get(self, dtstart: datetime.datetime | datetime.date) -> Event:
"""Return the next event in the recurrence."""
dtstart: datetime.datetime | datetime.date = next(self._recur)
if self._is_all_day and isinstance(dtstart, datetime.datetime):
dtstart = dtstart.date()
return self._event.copy(
Expand All @@ -155,124 +148,6 @@ def __next__(self) -> Event:
)


class RecurIterable(Iterable[Event]):
"""A series of events from a recurring event."""

def __init__(self, event: Event, recur: rrule.rrule | rrule.rruleset) -> None:
"""Initialize timeline."""
self._event = event
self._recur = recur

def __iter__(self) -> Iterator[Event]:
"""Return an iterator as a traversal over events in chronological order."""
return RecurIterator(self._event, iter(self._recur))


RRULE_FREQ = {
Frequency.DAILY: rrule.DAILY,
Frequency.WEEKLY: rrule.WEEKLY,
Frequency.MONTHLY: rrule.MONTHLY,
Frequency.YEARLY: rrule.YEARLY,
}
RRULE_WEEKDAY = {
Weekday.MONDAY: rrule.MO,
Weekday.TUESDAY: rrule.TU,
Weekday.WEDNESDAY: rrule.WE,
Weekday.THURSDAY: rrule.TH,
Weekday.FRIDAY: rrule.FR,
Weekday.SATURDAY: rrule.SA,
Weekday.SUNDAY: rrule.SU,
}


def _create_rrule(
dtstart: datetime.datetime | datetime.date, rule: Recur
) -> rrule.rrule:
"""Create a dateutil rrule for the specified event."""
if (freq := RRULE_FREQ.get(rule.freq)) is None:
raise ValueError(f"Unsupported frequency in rrule: {rule.freq}")

byweekday: list[rrule.weekday] | None = None
if rule.by_weekday:
byweekday = [
RRULE_WEEKDAY[weekday.weekday](
1 if weekday.occurrence is None else weekday.occurrence
)
for weekday in rule.by_weekday
]
return rrule.rrule(
freq=freq,
dtstart=dtstart,
interval=rule.interval,
count=rule.count,
until=rule.until,
byweekday=byweekday,
bymonthday=rule.by_month_day if rule.by_month_day else None,
bymonth=rule.by_month if rule.by_month else None,
cache=True,
)


class PeekingIterator(Iterator[Event]):
"""An iterator with a preview of the next item."""

def __init__(self, iterator: Iterator[Event]):
"""Initialize PeekingIterator."""
self._iterator = iterator
self._next = next(self._iterator, None)

def __iter__(self) -> Iterator[Event]:
"""Return this iterator."""
return self

def peek(self) -> Event | None:
"""Peek at the next item without consuming."""
return self._next

def __next__(self) -> Event:
"""Produce the next item from the merged set."""
result = self._next
self._next = next(self._iterator, None)
if result is None:
raise StopIteration()
return result


class MergedIterator(Iterator[Event]):
"""An iterator with a merged sorted view of the underlying sorted iterators."""

def __init__(self, iters: list[Iterator[Event]]):
"""Initialize MergedIterator."""
self._iters = [PeekingIterator(iterator) for iterator in iters]

def __iter__(self) -> Iterator[Event]:
"""Return this iterator."""
return self

def __next__(self) -> Event:
"""Produce the next item from the merged set."""
heap: list[tuple[datetime.datetime, PeekingIterator]] = []
for iterator in self._iters:
peekd = iterator.peek()
if peekd:
heapq.heappush(heap, (peekd.start_datetime, iterator))
if not heap:
raise StopIteration()
(_, iterator) = heapq.heappop(heap)
return next(iterator)


class MergedIterable(Iterable[Event]):
"""An iterator that merges results from underlying sorted iterables."""

def __init__(self, iters: list[Iterable[Event]]) -> None:
"""Initialize MergedIterable."""
self._iters = iters

def __iter__(self) -> Iterator[Event]:
return MergedIterator([iter(it) for it in self._iters])


def calendar_timeline(events: list[Event]) -> Timeline:
"""Create a timeline for events on a calendar, including recurrence."""
iters: list[Iterable[Event]] = [EventIterable(events)]
Expand All @@ -281,10 +156,10 @@ def calendar_timeline(events: list[Event]) -> Timeline:
continue
ruleset = rrule.rruleset()
if event.rrule:
ruleset.rrule(_create_rrule(event.start, event.rrule))
ruleset.rrule(event.rrule.as_rrule(event.start))
for rdate in event.rdate:
ruleset.rdate(rdate) # type: ignore[no-untyped-call]
for exdate in event.exdate:
ruleset.exdate(exdate) # type: ignore[no-untyped-call]
iters.append(RecurIterable(event, ruleset))
iters.append(RecurIterable(RecurAdapter(event).get, ruleset))
return Timeline(MergedIterable(iters))
Loading

0 comments on commit 386c98a

Please sign in to comment.