Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update pipelines to handle integer timestamp #241

Merged
merged 4 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Update `FoldMask` to work with integer timestamp, in `validate_on_dataset` method add validation on presence of `FoldMask` parameters in `ts.index`, add tests for `FoldMask` ([#226](https://github.com/etna-team/etna/pull/226))
- Fix `FourierTransform` on integer index, add inference tests ([#230](https://github.com/etna-team/etna/pull/230))
- Update outliers transforms to handle integer timestamp ([#229](https://github.com/etna-team/etna/pull/229))
- Update pipelines to handle integer timestamp ([#241](https://github.com/etna-team/etna/pull/241))

### Fixed
-
Expand Down
20 changes: 10 additions & 10 deletions etna/datasets/tsdataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ def plot(
n_segments: int = 10,
column: str = "target",
segments: Optional[Sequence[str]] = None,
start: Optional[Union[pd.Timestamp, int, str]] = None,
end: Optional[Union[pd.Timestamp, int, str]] = None,
start: Union[pd.Timestamp, int, str, None] = None,
end: Union[pd.Timestamp, int, str, None] = None,
ostreech1997 marked this conversation as resolved.
Show resolved Hide resolved
seed: int = 1,
figsize: Tuple[int, int] = (10, 5),
):
Expand Down Expand Up @@ -930,10 +930,10 @@ def to_hierarchical_dataset(

def _find_all_borders(
self,
train_start: Optional[Union[pd.Timestamp, int, str]],
train_end: Optional[Union[pd.Timestamp, int, str]],
test_start: Optional[Union[pd.Timestamp, int, str]],
test_end: Optional[Union[pd.Timestamp, int, str]],
train_start: Union[pd.Timestamp, int, str, None],
train_end: Union[pd.Timestamp, int, str, None],
test_start: Union[pd.Timestamp, int, str, None],
test_end: Union[pd.Timestamp, int, str, None],
test_size: Optional[int],
) -> Tuple[Union[pd.Timestamp, int], Union[pd.Timestamp, int], Union[pd.Timestamp, int], Union[pd.Timestamp, int]]:
"""Find borders for train_test_split if some values wasn't specified."""
Expand Down Expand Up @@ -1005,10 +1005,10 @@ def _find_all_borders(

def train_test_split(
self,
train_start: Optional[Union[pd.Timestamp, int, str]] = None,
train_end: Optional[Union[pd.Timestamp, int, str]] = None,
test_start: Optional[Union[pd.Timestamp, int, str]] = None,
test_end: Optional[Union[pd.Timestamp, int, str]] = None,
train_start: Union[pd.Timestamp, int, str, None] = None,
train_end: Union[pd.Timestamp, int, str, None] = None,
test_start: Union[pd.Timestamp, int, str, None] = None,
test_end: Union[pd.Timestamp, int, str, None] = None,
test_size: Optional[int] = None,
) -> Tuple["TSDataset", "TSDataset"]:
"""Split given df with train-test timestamp indices or size of test set.
Expand Down
11 changes: 8 additions & 3 deletions etna/datasets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,19 +298,24 @@


def _check_timestamp_param(
param: Optional[Union[pd.Timestamp, int, str]], param_name: str, freq: Optional[str]
) -> Optional[Union[pd.Timestamp, int]]:
param: Union[pd.Timestamp, int, str, None], param_name: str, freq: Optional[str]
) -> Union[pd.Timestamp, int, None]:
if param is None:
return param

if freq is None:
if not isinstance(param, int):
if not (isinstance(param, int) or isinstance(param, np.integer)):

Check warning on line 307 in etna/datasets/utils.py

View check run for this annotation

Codecov / codecov/patch

etna/datasets/utils.py#L307

Added line #L307 was not covered by tests
raise ValueError(
f"Parameter {param_name} has incorrect type! For integer timestamp only integer parameter type is allowed."
)

return param
else:
if not isinstance(param, str) and not isinstance(param, pd.Timestamp):
raise ValueError(

Check warning on line 315 in etna/datasets/utils.py

View check run for this annotation

Codecov / codecov/patch

etna/datasets/utils.py#L314-L315

Added lines #L314 - L315 were not covered by tests
f"Parameter {param_name} has incorrect type! For datetime timestamp only pd.Timestamp or str parameter type is allowed."
)

new_param = pd.Timestamp(param)
return new_param

Expand Down
5 changes: 3 additions & 2 deletions etna/ensembles/direct_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import List
from typing import Optional
from typing import Sequence
from typing import Union

Check warning on line 7 in etna/ensembles/direct_ensemble.py

View check run for this annotation

Codecov / codecov/patch

etna/ensembles/direct_ensemble.py#L7

Added line #L7 was not covered by tests

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -148,8 +149,8 @@
def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
start_timestamp: Union[pd.Timestamp, int],
end_timestamp: Union[pd.Timestamp, int],
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool,
Expand Down
5 changes: 3 additions & 2 deletions etna/ensembles/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from copy import deepcopy
from typing import List
from typing import Optional
from typing import Union

Check warning on line 7 in etna/ensembles/mixins.py

View check run for this annotation

Codecov / codecov/patch

etna/ensembles/mixins.py#L7

Added line #L7 was not covered by tests

import pandas as pd
from typing_extensions import Self
Expand Down Expand Up @@ -52,8 +53,8 @@
def _predict_pipeline(
ts: TSDataset,
pipeline: BasePipeline,
start_timestamp: Optional[pd.Timestamp],
end_timestamp: Optional[pd.Timestamp],
start_timestamp: Union[pd.Timestamp, int, str, None],
end_timestamp: Union[pd.Timestamp, int, str, None],
) -> TSDataset:
"""Make predict with given pipeline."""
tslogger.log(msg=f"Start prediction with {pipeline}.")
Expand Down
4 changes: 2 additions & 2 deletions etna/ensembles/stacking_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset:
def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
start_timestamp: Union[pd.Timestamp, int],
end_timestamp: Union[pd.Timestamp, int],
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool,
Expand Down
4 changes: 2 additions & 2 deletions etna/ensembles/voting_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset:
def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
start_timestamp: Union[pd.Timestamp, int],
end_timestamp: Union[pd.Timestamp, int],
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool,
Expand Down
29 changes: 7 additions & 22 deletions etna/pipeline/autoregressive_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Sequence
from typing import cast

import numpy as np

Check warning on line 5 in etna/pipeline/autoregressive_pipeline.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/autoregressive_pipeline.py#L5

Added line #L5 was not covered by tests
import pandas as pd
from typing_extensions import get_args

Expand Down Expand Up @@ -106,10 +107,12 @@

def _create_predictions_template(self, ts: TSDataset) -> pd.DataFrame:
"""Create dataframe to fill with forecasts."""
prediction_df = ts[:, :, "target"]
future_dates = pd.date_range(
start=prediction_df.index.max(), periods=self.horizon + 1, freq=ts.freq, closed="right"
)
prediction_df = ts.to_pandas(features=["target"])
last_timestamp = prediction_df.index[-1]
if ts.freq is None:
future_dates = pd.Index(np.arange(last_timestamp, last_timestamp + self.horizon + 1))[1:]

Check warning on line 113 in etna/pipeline/autoregressive_pipeline.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/autoregressive_pipeline.py#L110-L113

Added lines #L110 - L113 were not covered by tests
else:
future_dates = pd.date_range(start=last_timestamp, periods=self.horizon + 1, freq=ts.freq, closed="right")

Check warning on line 115 in etna/pipeline/autoregressive_pipeline.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/autoregressive_pipeline.py#L115

Added line #L115 was not covered by tests
prediction_df = prediction_df.reindex(prediction_df.index.append(future_dates))
prediction_df.index.name = "timestamp"
return prediction_df
Expand Down Expand Up @@ -172,21 +175,3 @@
prediction_ts.add_target_components(target_components_df=target_components_df)

return prediction_ts

def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool = False,
) -> TSDataset:
return super()._predict(
ts=ts,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
prediction_interval=prediction_interval,
quantiles=quantiles,
return_components=return_components,
)
43 changes: 32 additions & 11 deletions etna/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from etna.core import AbstractSaveable
from etna.core import BaseMixin
from etna.datasets import TSDataset
from etna.datasets.utils import _check_timestamp_param

Check warning on line 28 in etna/pipeline/base.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/base.py#L28

Added line #L28 was not covered by tests
from etna.distributions import BaseDistribution
from etna.loggers import tslogger
from etna.metrics import Metric
Expand Down Expand Up @@ -269,8 +270,8 @@
def predict(
self,
ts: TSDataset,
start_timestamp: Optional[pd.Timestamp] = None,
end_timestamp: Optional[pd.Timestamp] = None,
start_timestamp: Union[pd.Timestamp, int, str, None] = None,
end_timestamp: Union[pd.Timestamp, int, str, None] = None,
prediction_interval: bool = False,
quantiles: Sequence[float] = (0.025, 0.975),
return_components: bool = False,
Expand All @@ -280,6 +281,8 @@
Currently, in situation when segments start with different timestamps
we only guarantee to work with ``start_timestamp`` >= beginning of all segments.

Parameters ``start_timestamp`` and ``end_timestamp`` of type ``str`` are converted into ``pd.Timestamp``.

Parameters
----------
ts:
Expand All @@ -305,6 +308,10 @@

Raises
------
ValueError:
Value of ``start_timestamp`` doesn't match the type of timestamp in ``ts``.
ValueError:
Value of ``end_timestamp`` doesn't match the type of timestamp in ``ts``.
ValueError:
Value of ``end_timestamp`` is less than ``start_timestamp``.
ValueError:
Expand Down Expand Up @@ -553,9 +560,12 @@
@staticmethod
def _make_predict_timestamps(
ts: TSDataset,
start_timestamp: Optional[pd.Timestamp] = None,
end_timestamp: Optional[pd.Timestamp] = None,
) -> Tuple[pd.Timestamp, pd.Timestamp]:
start_timestamp: Union[pd.Timestamp, int, str, None],
end_timestamp: Union[pd.Timestamp, int, str, None],
) -> Union[Tuple[pd.Timestamp, pd.Timestamp], Tuple[int, int]]:
start_timestamp = _check_timestamp_param(param=start_timestamp, param_name="start_timestamp", freq=ts.freq)
end_timestamp = _check_timestamp_param(param=end_timestamp, param_name="end_timestamp", freq=ts.freq)

Check warning on line 567 in etna/pipeline/base.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/base.py#L566-L567

Added lines #L566 - L567 were not covered by tests

min_timestamp = ts.describe()["start_timestamp"].max()
max_timestamp = ts.index[-1]

Expand All @@ -569,7 +579,7 @@
if end_timestamp > max_timestamp:
raise ValueError("Value of end_timestamp is more than ending of dataset!")

if start_timestamp > end_timestamp:
if start_timestamp > end_timestamp: # type: ignore

Check warning on line 582 in etna/pipeline/base.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/base.py#L582

Added line #L582 was not covered by tests
ostreech1997 marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("Value of end_timestamp is less than start_timestamp!")

return start_timestamp, end_timestamp
Expand All @@ -578,8 +588,8 @@
def _predict(
self,
ts: TSDataset,
start_timestamp: Optional[pd.Timestamp],
end_timestamp: Optional[pd.Timestamp],
start_timestamp: Union[pd.Timestamp, int],
end_timestamp: Union[pd.Timestamp, int],
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool,
Expand All @@ -589,8 +599,8 @@
def predict(
self,
ts: TSDataset,
start_timestamp: Optional[pd.Timestamp] = None,
end_timestamp: Optional[pd.Timestamp] = None,
start_timestamp: Union[pd.Timestamp, int, str, None] = None,
end_timestamp: Union[pd.Timestamp, int, str, None] = None,
prediction_interval: bool = False,
quantiles: Sequence[float] = (0.025, 0.975),
return_components: bool = False,
Expand All @@ -600,6 +610,8 @@
Currently, in situation when segments start with different timestamps
we only guarantee to work with ``start_timestamp`` >= beginning of all segments.

Parameters ``start_timestamp`` and ``end_timestamp`` of type ``str`` are converted into ``pd.Timestamp``.

Parameters
----------
ts:
Expand All @@ -625,6 +637,10 @@

Raises
------
ValueError:
Value of ``start_timestamp`` doesn't match the type of timestamp in ``ts``.
ValueError:
Value of ``end_timestamp`` doesn't match the type of timestamp in ``ts``.
ValueError:
Value of ``end_timestamp`` is less than ``start_timestamp``.
ValueError:
Expand Down Expand Up @@ -721,10 +737,15 @@
min_train, max_train = dataset_timestamps[min_train_idx], dataset_timestamps[max_train_idx]
min_test, max_test = dataset_timestamps[min_test_idx], dataset_timestamps[max_test_idx]

if ts.freq is None:
target_timestamps = np.arange(min_test, max_test + 1).tolist()

Check warning on line 741 in etna/pipeline/base.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/base.py#L740-L741

Added lines #L740 - L741 were not covered by tests
else:
target_timestamps = list(pd.date_range(start=min_test, end=max_test, freq=ts.freq))

Check warning on line 743 in etna/pipeline/base.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/base.py#L743

Added line #L743 was not covered by tests

mask = FoldMask(
first_train_timestamp=min_train,
last_train_timestamp=max_train,
target_timestamps=list(pd.date_range(start=min_test, end=max_test, freq=ts.freq)),
target_timestamps=target_timestamps,
)
masks.append(mask)

Expand Down
19 changes: 11 additions & 8 deletions etna/pipeline/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Dict
from typing import Optional
from typing import Sequence
from typing import Union

Check warning on line 8 in etna/pipeline/mixins.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/mixins.py#L8

Added line #L8 was not covered by tests

import numpy as np
import pandas as pd
Expand All @@ -27,7 +28,9 @@
class ModelPipelinePredictMixin:
"""Mixin for pipelines with model inside with implementation of ``_predict`` method."""

def _create_ts(self, ts: TSDataset, start_timestamp: pd.Timestamp, end_timestamp: pd.Timestamp) -> TSDataset:
def _create_ts(

Check warning on line 31 in etna/pipeline/mixins.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/mixins.py#L31

Added line #L31 was not covered by tests
self, ts: TSDataset, start_timestamp: Union[pd.Timestamp, int], end_timestamp: Union[pd.Timestamp, int]
) -> TSDataset:
"""Create ``TSDataset`` to make predictions on."""
self.model: ModelType
self.transforms: Sequence[Transform]
Expand All @@ -37,7 +40,7 @@
freq = deepcopy(ts.freq)
known_future = deepcopy(ts.known_future)

df_to_transform = df[:end_timestamp]
df_to_transform = df.loc[:end_timestamp]

Check warning on line 43 in etna/pipeline/mixins.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/mixins.py#L43

Added line #L43 was not covered by tests

cur_ts = TSDataset(
df=df_to_transform,
Expand All @@ -51,25 +54,25 @@

# correct start_timestamp taking into account context size
timestamp_indices = pd.Series(np.arange(len(df.index)), index=df.index)
start_idx = timestamp_indices[start_timestamp]
start_idx = timestamp_indices.loc[start_timestamp]

Check warning on line 57 in etna/pipeline/mixins.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/mixins.py#L57

Added line #L57 was not covered by tests
start_idx = max(0, start_idx - self.model.context_size)
start_timestamp = timestamp_indices.index[start_idx]

cur_ts.df = cur_ts.df[start_timestamp:end_timestamp]
cur_ts.df = cur_ts.df.loc[start_timestamp:end_timestamp]

Check warning on line 61 in etna/pipeline/mixins.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/mixins.py#L61

Added line #L61 was not covered by tests
return cur_ts

def _determine_prediction_size(
self, ts: TSDataset, start_timestamp: pd.Timestamp, end_timestamp: pd.Timestamp
self, ts: TSDataset, start_timestamp: Union[pd.Timestamp, int], end_timestamp: Union[pd.Timestamp, int]
) -> int:
timestamp_indices = pd.Series(np.arange(len(ts.index)), index=ts.index)
timestamps = timestamp_indices[start_timestamp:end_timestamp]
timestamps = timestamp_indices.loc[start_timestamp:end_timestamp]

Check warning on line 68 in etna/pipeline/mixins.py

View check run for this annotation

Codecov / codecov/patch

etna/pipeline/mixins.py#L68

Added line #L68 was not covered by tests
return len(timestamps)

def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
start_timestamp: Union[pd.Timestamp, int],
end_timestamp: Union[pd.Timestamp, int],
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool = False,
Expand Down
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,11 @@ def product_level_constant_hierarchical_ts(product_level_constant_hierarchical_d
return ts


@pytest.fixture
def product_level_constant_hierarchical_ts_int_timestamp(product_level_constant_hierarchical_ts):
return convert_ts_to_int_timestamp(product_level_constant_hierarchical_ts)


@pytest.fixture
def product_level_constant_hierarchical_ts_with_exog(
product_level_constant_hierarchical_df, market_level_constant_hierarchical_df_exog, hierarchical_structure
Expand Down