Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def filter_by_available_at(self, available_at: AvailableAt) -> Self:
if available_at_series is None:
return self

cutoff = self.index.floor("D") - pd.Timedelta(available_at.lag_from_day)
cutoff = available_at.apply_index(self.index)
data_filtered = self.data[available_at_series <= cutoff]
return self._copy_with_data(data=data_filtered)

Expand Down
175 changes: 130 additions & 45 deletions packages/openstef-core/src/openstef_core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
key domain types like lead times, availability timestamps, and quantile values.
"""

from __future__ import annotations

import re
from datetime import timedelta
from datetime import datetime, time, timedelta
from datetime import timezone as dt_timezone
from enum import StrEnum
from functools import total_ordering
from typing import Any, Literal, Self, override

import pandas as pd
import pytz
from pydantic import GetCoreSchemaHandler, TypeAdapter
from pydantic_core import CoreSchema, core_schema

Expand Down Expand Up @@ -119,83 +124,163 @@ def to_hours(self) -> float:
class AvailableAt(PydanticStringPrimitive):
"""Represents a time point available relative to a reference day.

Uses a specialized string format 'DnTHHMM' where:
- n is the day offset (negative indicates prior days)
- HHMM is the time of day
Uses a specialized string format ``DnTHHMM`` where:

For example, 'D-1T0600' means "6:00 AM on the previous day".
The legacy 'DnTHH:MM' format (with colon) is also accepted by from_string().
- *n* is the day offset (negative or zero)
- *HHMM* is the time of day

Example:
Creating and using availability times:
An optional timezone suffix ``[Region/City]`` (RFC 9557 bracket
notation) makes the availability time timezone-aware. Both pytz
and stdlib ``datetime.timezone`` objects are accepted; they
round-trip through the IANA name via ``str(tz)`` /
``pytz.timezone(name)``.

>>> from datetime import timedelta
>>> # Available at 6 AM on the previous day
>>> at = AvailableAt(timedelta(hours=18)) # 18 hours before day end
>>> str(at)
'D-1T0600'
>>> # Available at midnight of the current day
>>> AvailableAt.from_string('D0T00:00').lag_from_day
datetime.timedelta(0)
For example, ``D-1T0600[Europe/Amsterdam]`` means "6:00
Europe/Amsterdam on the previous day".
The legacy ``DnTHH:MM`` format (with colon) is also accepted by
``from_string()``.

Example:
>>> from datetime import time
>>> import pytz
>>> tz_at = AvailableAt(day_offset=-1, time_of_day=time(6, 0), tzinfo=pytz.timezone('Europe/Amsterdam'))
>>> str(tz_at)
'D-1T0600[Europe/Amsterdam]'
>>> at = AvailableAt.from_string("D-1T0600")
>>> at.day_offset, at.time_of_day
(-1, datetime.time(6, 0))
"""

def __init__(self, lag_from_day: timedelta):
"""Initializes with a lag from the reference day start."""
self.lag_from_day = lag_from_day
def __init__(self, day_offset: int, time_of_day: time, *, tzinfo: pytz.BaseTzInfo | dt_timezone | None = None):
"""Initialise with a day offset, time of day, and optional timezone.

Args:
day_offset: Day offset from the reference day (must be ≤ 0).
``-1`` means "the previous day", ``0`` means "the same day".
time_of_day: Clock time when data becomes available.
tzinfo: Optional timezone for the availability time
(e.g. ``pytz.timezone("Europe/Amsterdam")``, ``pytz.UTC``,
or ``datetime.timezone.utc``).

Raises:
ValueError: If day_offset is positive.
"""
if day_offset > 0:
msg = f"Day offset must be negative or zero, got {day_offset}"
raise ValueError(msg)
self.day_offset = day_offset
self.time_of_day = time_of_day
self.tzinfo = tzinfo

def __str__(self) -> str:
"""Converts to string in 'DnTHHMM' format (Windows-safe, no colon).
"""Converts to string in ``DnTHHMM`` or ``DnTHHMM[tz]`` format.

Returns:
String representation in 'DnTHHMM' format.
String representation, with optional ``[timezone]`` suffix.
"""
lag_days = -int(self.lag_from_day / timedelta(days=1)) - 1
time = timedelta(hours=24) - (self.lag_from_day % timedelta(days=1))
return f"D{lag_days}T{time.seconds // 3600:02}{(time.seconds // 60) % 60:02}"
base = f"D{self.day_offset}T{self.time_of_day.hour:02}{self.time_of_day.minute:02}"
if self.tzinfo is not None:
return f"{base}[{self.tzinfo}]"
return base

@classmethod
def from_string(cls, s: str) -> Self:
"""Creates an instance from a string in 'DnTHHMM' or 'DnTHH:MM' format.
"""Creates an instance from a string in ``DnTHHMM[tz]`` format.

Accepts an optional ``[Region/City]`` timezone suffix. The
legacy colon format ``DnTHH:MM`` is also accepted.

Args:
s: String in 'DnTHHMM' or 'DnTHH:MM' format to parse.
s: String to parse.

Returns:
AvailableAt instance parsed from the string.

Raises:
ValueError: If the string format is invalid.
ValueError: If the string format is invalid or day offset is positive.
"""
match = re.match(r"D(-?\d+)T(\d{2}):?(\d{2})", s)
match = re.match(r"D(-?\d+)T(\d{2}):?(\d{2})(?:(Z)|\[([^\]]+)\])?$", s)
if not match:
error_message = f"Cannot convert {s} to {cls.__name__}"
raise ValueError(error_message)

days_part, hours_part, minutes_part = match.groups()
days_part, hours_part, minutes_part, z_part, tz_part = match.groups()

# Calculate lag_from_day
lag_days = -int(days_part) - 1
time = timedelta(hours=int(hours_part), minutes=int(minutes_part))
lag_from_day = timedelta(days=lag_days) + (timedelta(hours=24) - time)
if int(days_part) > 0:
msg = f"Day offset must be negative or zero, got {days_part}"
raise ValueError(msg)

return cls(lag_from_day=lag_from_day)
if z_part:
resolved_tz = pytz.UTC
elif tz_part:
resolved_tz = pytz.timezone(tz_part)
else:
resolved_tz = None

@classmethod
@override
def validate(cls, v: Self | str | timedelta, _info: Any = None) -> Self:
"""Validates and converts various input types to AvailableAt.
return cls(
day_offset=int(days_part),
time_of_day=time(hour=int(hours_part), minute=int(minutes_part)),
tzinfo=resolved_tz,
)

def apply(self, date: datetime) -> datetime:
"""Apply this availability offset to a reference date.

The time-of-day is interpreted in ``self.tzinfo`` (falls back to
``date.tzinfo``). The result is returned in the reference date's
timezone, or naive when the reference date is naive.

Args:
v: Value to validate (AvailableAt, string, or timedelta).
_info: Additional validation info (unused).
date: The reference date to apply the availability offset to.

Returns:
Validated AvailableAt instance.
The datetime when data is available, in the reference date's
timezone (or naive when the reference date is naive).
"""
if isinstance(v, timedelta):
return cls(lag_from_day=v)
result_date = (date + timedelta(days=self.day_offset)).date()
naive_result = datetime.combine(result_date, self.time_of_day)

return super().validate(v, _info)
source_tz = self.tzinfo or date.tzinfo
if source_tz is None:
return naive_result

if isinstance(source_tz, pytz.BaseTzInfo):
aware = source_tz.localize(naive_result)
else:
aware = naive_result.replace(tzinfo=source_tz)

if date.tzinfo is not None:
return aware.astimezone(date.tzinfo)
return naive_result

def apply_index(self, index: pd.DatetimeIndex) -> pd.DatetimeIndex:
"""Vectorized version of :meth:`apply` for a pandas DatetimeIndex.

Same timezone logic as :meth:`apply`: the time-of-day is
interpreted in ``self.tzinfo`` (falls back to ``index.tz``),
then converted back to the index's timezone.

Args:
index: DatetimeIndex of reference dates.

Returns:
DatetimeIndex of cutoff timestamps, in the same timezone as *index*.
"""
source_tz = self.tzinfo
data_tz = index.tz

work_index = index.tz_convert(source_tz) if source_tz is not None and data_tz is not None else index

cutoff = work_index.floor("D") + pd.Timedelta(
days=self.day_offset,
hours=self.time_of_day.hour,
minutes=self.time_of_day.minute,
)

if source_tz is not None and data_tz is not None:
cutoff = cutoff.tz_convert(data_tz)

return cutoff


class Quantile(float):
Expand Down Expand Up @@ -259,7 +344,7 @@ def format(self) -> str:
return f"quantile_P{value:.1f}"

@staticmethod
def parse(quantile_str: str) -> "Quantile":
def parse(quantile_str: str) -> Quantile:
"""Static method to parse a quantile string back to a Quantile object.

Args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

"""Tests for TimeSeriesDataset parquet serialization."""

from datetime import datetime, timedelta
from datetime import datetime, time, timedelta
from pathlib import Path
from typing import cast

import pandas as pd
import pytest
import pytz

from openstef_core.datasets.timeseries_dataset import TimeSeriesDataset
from openstef_core.testing import create_timeseries_dataset
Expand Down Expand Up @@ -117,8 +118,8 @@ def test_filter_by_available_before(
@pytest.mark.parametrize(
("available_at", "expected_values"),
[
(AvailableAt(timedelta(hours=-13)), [10, 20, 30]),
(AvailableAt(timedelta(hours=-15)), [10, 20, 30, 40, 55, 50]),
(AvailableAt(day_offset=0, time_of_day=time(13, 0)), [10, 20, 30]),
(AvailableAt(day_offset=0, time_of_day=time(15, 0)), [10, 20, 30, 40, 55, 50]),
],
)
def test_filter_by_available_at(
Expand All @@ -131,6 +132,33 @@ def test_filter_by_available_at(
assert list(filtered.data["value1"]) == expected_values


def test_filter_by_available_at_dst_aware():
"""Cutoff shifts by 1h across CET→CEST transition (2026-03-29)."""

available_at = AvailableAt(day_offset=-1, time_of_day=time(6, 0), tzinfo=pytz.timezone("Europe/Amsterdam"))

dataset = TimeSeriesDataset(
data=pd.DataFrame(
data={
"available_at": pd.to_datetime([
"2026-03-28T04:30:00+00:00", # before cutoff 05:00 UTC → kept
"2026-03-29T04:30:00+00:00", # after cutoff 04:00 UTC → filtered
]),
"value": [1, 2],
},
index=pd.to_datetime([
"2026-03-29T12:00:00+00:00", # cutoff = Mar 28 06:00 CET = 05:00 UTC
"2026-03-30T12:00:00+00:00", # cutoff = Mar 29 06:00 CEST = 04:00 UTC
]),
),
sample_interval=timedelta(hours=24),
)

filtered = dataset.filter_by_available_at(available_at)

assert list(filtered.data["value"]) == [1]


@pytest.mark.parametrize(
("lead_time", "expected_values"),
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

"""Tests for VersionedTimeSeriesDataset parquet serialization."""

from datetime import timedelta
from datetime import time, timedelta
from pathlib import Path

import numpy as np
Expand Down Expand Up @@ -170,7 +170,7 @@ def test_filter_by_available_before(versioned_dataset: VersionedTimeSeriesDatase
def test_filter_by_available_at(versioned_dataset: VersionedTimeSeriesDataset):
"""Filter dataset using relative availability definition."""
# Arrange
available_at = AvailableAt(timedelta(hours=-13))
available_at = AvailableAt(day_offset=0, time_of_day=time(13, 0))

# Act
filtered = versioned_dataset.filter_by_available_at(available_at)
Expand Down
Loading