Skip to content

Commit

Permalink
Update pipelines to handle integer timestamp (#241)
Browse files Browse the repository at this point in the history
  • Loading branch information
d-a-bunin committed Feb 16, 2024
1 parent 8c8d0ca commit 7c5aa0e
Show file tree
Hide file tree
Showing 27 changed files with 1,480 additions and 448 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Update `FoldMask` to work with integer timestamp, in `validate_on_dataset` method add validation on presence of `FoldMask` parameters in `ts.index`, add tests for `FoldMask` ([#226](https://github.com/etna-team/etna/pull/226))
- Fix `FourierTransform` on integer index, add inference tests ([#230](https://github.com/etna-team/etna/pull/230))
- Update outliers transforms to handle integer timestamp ([#229](https://github.com/etna-team/etna/pull/229))
- Update pipelines to handle integer timestamp ([#241](https://github.com/etna-team/etna/pull/241))

### Fixed
-
Expand Down
20 changes: 10 additions & 10 deletions etna/datasets/tsdataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ def plot(
n_segments: int = 10,
column: str = "target",
segments: Optional[Sequence[str]] = None,
start: Optional[Union[pd.Timestamp, int, str]] = None,
end: Optional[Union[pd.Timestamp, int, str]] = None,
start: Union[pd.Timestamp, int, str, None] = None,
end: Union[pd.Timestamp, int, str, None] = None,
seed: int = 1,
figsize: Tuple[int, int] = (10, 5),
):
Expand Down Expand Up @@ -930,10 +930,10 @@ def to_hierarchical_dataset(

def _find_all_borders(
self,
train_start: Optional[Union[pd.Timestamp, int, str]],
train_end: Optional[Union[pd.Timestamp, int, str]],
test_start: Optional[Union[pd.Timestamp, int, str]],
test_end: Optional[Union[pd.Timestamp, int, str]],
train_start: Union[pd.Timestamp, int, str, None],
train_end: Union[pd.Timestamp, int, str, None],
test_start: Union[pd.Timestamp, int, str, None],
test_end: Union[pd.Timestamp, int, str, None],
test_size: Optional[int],
) -> Tuple[Union[pd.Timestamp, int], Union[pd.Timestamp, int], Union[pd.Timestamp, int], Union[pd.Timestamp, int]]:
"""Find borders for train_test_split if some values wasn't specified."""
Expand Down Expand Up @@ -1005,10 +1005,10 @@ def _find_all_borders(

def train_test_split(
self,
train_start: Optional[Union[pd.Timestamp, int, str]] = None,
train_end: Optional[Union[pd.Timestamp, int, str]] = None,
test_start: Optional[Union[pd.Timestamp, int, str]] = None,
test_end: Optional[Union[pd.Timestamp, int, str]] = None,
train_start: Union[pd.Timestamp, int, str, None] = None,
train_end: Union[pd.Timestamp, int, str, None] = None,
test_start: Union[pd.Timestamp, int, str, None] = None,
test_end: Union[pd.Timestamp, int, str, None] = None,
test_size: Optional[int] = None,
) -> Tuple["TSDataset", "TSDataset"]:
"""Split given df with train-test timestamp indices or size of test set.
Expand Down
11 changes: 8 additions & 3 deletions etna/datasets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,19 +298,24 @@ def inverse_transform_target_components(


def _check_timestamp_param(
param: Optional[Union[pd.Timestamp, int, str]], param_name: str, freq: Optional[str]
) -> Optional[Union[pd.Timestamp, int]]:
param: Union[pd.Timestamp, int, str, None], param_name: str, freq: Optional[str]
) -> Union[pd.Timestamp, int, None]:
if param is None:
return param

if freq is None:
if not isinstance(param, int):
if not (isinstance(param, int) or isinstance(param, np.integer)):
raise ValueError(
f"Parameter {param_name} has incorrect type! For integer timestamp only integer parameter type is allowed."
)

return param
else:
if not isinstance(param, str) and not isinstance(param, pd.Timestamp):
raise ValueError(
f"Parameter {param_name} has incorrect type! For datetime timestamp only pd.Timestamp or str parameter type is allowed."
)

new_param = pd.Timestamp(param)
return new_param

Expand Down
5 changes: 3 additions & 2 deletions etna/ensembles/direct_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import List
from typing import Optional
from typing import Sequence
from typing import Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -148,8 +149,8 @@ def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset:
def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
start_timestamp: Union[pd.Timestamp, int],
end_timestamp: Union[pd.Timestamp, int],
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool,
Expand Down
5 changes: 3 additions & 2 deletions etna/ensembles/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from copy import deepcopy
from typing import List
from typing import Optional
from typing import Union

import pandas as pd
from typing_extensions import Self
Expand Down Expand Up @@ -52,8 +53,8 @@ def _forecast_pipeline(pipeline: BasePipeline, ts: TSDataset) -> TSDataset:
def _predict_pipeline(
ts: TSDataset,
pipeline: BasePipeline,
start_timestamp: Optional[pd.Timestamp],
end_timestamp: Optional[pd.Timestamp],
start_timestamp: Union[pd.Timestamp, int, str, None],
end_timestamp: Union[pd.Timestamp, int, str, None],
) -> TSDataset:
"""Make predict with given pipeline."""
tslogger.log(msg=f"Start prediction with {pipeline}.")
Expand Down
4 changes: 2 additions & 2 deletions etna/ensembles/stacking_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset:
def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
start_timestamp: Union[pd.Timestamp, int],
end_timestamp: Union[pd.Timestamp, int],
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool,
Expand Down
4 changes: 2 additions & 2 deletions etna/ensembles/voting_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset:
def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
start_timestamp: Union[pd.Timestamp, int],
end_timestamp: Union[pd.Timestamp, int],
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool,
Expand Down
29 changes: 7 additions & 22 deletions etna/pipeline/autoregressive_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Sequence
from typing import cast

import numpy as np
import pandas as pd
from typing_extensions import get_args

Expand Down Expand Up @@ -106,10 +107,12 @@ def fit(self, ts: TSDataset, save_ts: bool = True) -> "AutoRegressivePipeline":

def _create_predictions_template(self, ts: TSDataset) -> pd.DataFrame:
"""Create dataframe to fill with forecasts."""
prediction_df = ts[:, :, "target"]
future_dates = pd.date_range(
start=prediction_df.index.max(), periods=self.horizon + 1, freq=ts.freq, closed="right"
)
prediction_df = ts.to_pandas(features=["target"])
last_timestamp = prediction_df.index[-1]
if ts.freq is None:
future_dates = pd.Index(np.arange(last_timestamp, last_timestamp + self.horizon + 1))[1:]
else:
future_dates = pd.date_range(start=last_timestamp, periods=self.horizon + 1, freq=ts.freq, closed="right")
prediction_df = prediction_df.reindex(prediction_df.index.append(future_dates))
prediction_df.index.name = "timestamp"
return prediction_df
Expand Down Expand Up @@ -172,21 +175,3 @@ def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset:
prediction_ts.add_target_components(target_components_df=target_components_df)

return prediction_ts

def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool = False,
) -> TSDataset:
return super()._predict(
ts=ts,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
prediction_interval=prediction_interval,
quantiles=quantiles,
return_components=return_components,
)
43 changes: 32 additions & 11 deletions etna/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from etna.core import AbstractSaveable
from etna.core import BaseMixin
from etna.datasets import TSDataset
from etna.datasets.utils import _check_timestamp_param
from etna.distributions import BaseDistribution
from etna.loggers import tslogger
from etna.metrics import Metric
Expand Down Expand Up @@ -269,8 +270,8 @@ def forecast(
def predict(
self,
ts: TSDataset,
start_timestamp: Optional[pd.Timestamp] = None,
end_timestamp: Optional[pd.Timestamp] = None,
start_timestamp: Union[pd.Timestamp, int, str, None] = None,
end_timestamp: Union[pd.Timestamp, int, str, None] = None,
prediction_interval: bool = False,
quantiles: Sequence[float] = (0.025, 0.975),
return_components: bool = False,
Expand All @@ -280,6 +281,8 @@ def predict(
Currently, in situation when segments start with different timestamps
we only guarantee to work with ``start_timestamp`` >= beginning of all segments.
Parameters ``start_timestamp`` and ``end_timestamp`` of type ``str`` are converted into ``pd.Timestamp``.
Parameters
----------
ts:
Expand All @@ -305,6 +308,10 @@ def predict(
Raises
------
ValueError:
Value of ``start_timestamp`` doesn't match the type of timestamp in ``ts``.
ValueError:
Value of ``end_timestamp`` doesn't match the type of timestamp in ``ts``.
ValueError:
Value of ``end_timestamp`` is less than ``start_timestamp``.
ValueError:
Expand Down Expand Up @@ -553,9 +560,12 @@ def forecast(
@staticmethod
def _make_predict_timestamps(
ts: TSDataset,
start_timestamp: Optional[pd.Timestamp] = None,
end_timestamp: Optional[pd.Timestamp] = None,
) -> Tuple[pd.Timestamp, pd.Timestamp]:
start_timestamp: Union[pd.Timestamp, int, str, None],
end_timestamp: Union[pd.Timestamp, int, str, None],
) -> Union[Tuple[pd.Timestamp, pd.Timestamp], Tuple[int, int]]:
start_timestamp = _check_timestamp_param(param=start_timestamp, param_name="start_timestamp", freq=ts.freq)
end_timestamp = _check_timestamp_param(param=end_timestamp, param_name="end_timestamp", freq=ts.freq)

min_timestamp = ts.describe()["start_timestamp"].max()
max_timestamp = ts.index[-1]

Expand All @@ -569,7 +579,7 @@ def _make_predict_timestamps(
if end_timestamp > max_timestamp:
raise ValueError("Value of end_timestamp is more than ending of dataset!")

if start_timestamp > end_timestamp:
if start_timestamp > end_timestamp: # type: ignore
raise ValueError("Value of end_timestamp is less than start_timestamp!")

return start_timestamp, end_timestamp
Expand All @@ -578,8 +588,8 @@ def _make_predict_timestamps(
def _predict(
self,
ts: TSDataset,
start_timestamp: Optional[pd.Timestamp],
end_timestamp: Optional[pd.Timestamp],
start_timestamp: Union[pd.Timestamp, int],
end_timestamp: Union[pd.Timestamp, int],
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool,
Expand All @@ -589,8 +599,8 @@ def _predict(
def predict(
self,
ts: TSDataset,
start_timestamp: Optional[pd.Timestamp] = None,
end_timestamp: Optional[pd.Timestamp] = None,
start_timestamp: Union[pd.Timestamp, int, str, None] = None,
end_timestamp: Union[pd.Timestamp, int, str, None] = None,
prediction_interval: bool = False,
quantiles: Sequence[float] = (0.025, 0.975),
return_components: bool = False,
Expand All @@ -600,6 +610,8 @@ def predict(
Currently, in situation when segments start with different timestamps
we only guarantee to work with ``start_timestamp`` >= beginning of all segments.
Parameters ``start_timestamp`` and ``end_timestamp`` of type ``str`` are converted into ``pd.Timestamp``.
Parameters
----------
ts:
Expand All @@ -625,6 +637,10 @@ def predict(
Raises
------
ValueError:
Value of ``start_timestamp`` doesn't match the type of timestamp in ``ts``.
ValueError:
Value of ``end_timestamp`` doesn't match the type of timestamp in ``ts``.
ValueError:
Value of ``end_timestamp`` is less than ``start_timestamp``.
ValueError:
Expand Down Expand Up @@ -721,10 +737,15 @@ def _generate_masks_from_n_folds(
min_train, max_train = dataset_timestamps[min_train_idx], dataset_timestamps[max_train_idx]
min_test, max_test = dataset_timestamps[min_test_idx], dataset_timestamps[max_test_idx]

if ts.freq is None:
target_timestamps = np.arange(min_test, max_test + 1).tolist()
else:
target_timestamps = list(pd.date_range(start=min_test, end=max_test, freq=ts.freq))

mask = FoldMask(
first_train_timestamp=min_train,
last_train_timestamp=max_train,
target_timestamps=list(pd.date_range(start=min_test, end=max_test, freq=ts.freq)),
target_timestamps=target_timestamps,
)
masks.append(mask)

Expand Down
19 changes: 11 additions & 8 deletions etna/pipeline/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Dict
from typing import Optional
from typing import Sequence
from typing import Union

import numpy as np
import pandas as pd
Expand All @@ -27,7 +28,9 @@
class ModelPipelinePredictMixin:
"""Mixin for pipelines with model inside with implementation of ``_predict`` method."""

def _create_ts(self, ts: TSDataset, start_timestamp: pd.Timestamp, end_timestamp: pd.Timestamp) -> TSDataset:
def _create_ts(
self, ts: TSDataset, start_timestamp: Union[pd.Timestamp, int], end_timestamp: Union[pd.Timestamp, int]
) -> TSDataset:
"""Create ``TSDataset`` to make predictions on."""
self.model: ModelType
self.transforms: Sequence[Transform]
Expand All @@ -37,7 +40,7 @@ def _create_ts(self, ts: TSDataset, start_timestamp: pd.Timestamp, end_timestamp
freq = deepcopy(ts.freq)
known_future = deepcopy(ts.known_future)

df_to_transform = df[:end_timestamp]
df_to_transform = df.loc[:end_timestamp]

cur_ts = TSDataset(
df=df_to_transform,
Expand All @@ -51,25 +54,25 @@ def _create_ts(self, ts: TSDataset, start_timestamp: pd.Timestamp, end_timestamp

# correct start_timestamp taking into account context size
timestamp_indices = pd.Series(np.arange(len(df.index)), index=df.index)
start_idx = timestamp_indices[start_timestamp]
start_idx = timestamp_indices.loc[start_timestamp]
start_idx = max(0, start_idx - self.model.context_size)
start_timestamp = timestamp_indices.index[start_idx]

cur_ts.df = cur_ts.df[start_timestamp:end_timestamp]
cur_ts.df = cur_ts.df.loc[start_timestamp:end_timestamp]
return cur_ts

def _determine_prediction_size(
self, ts: TSDataset, start_timestamp: pd.Timestamp, end_timestamp: pd.Timestamp
self, ts: TSDataset, start_timestamp: Union[pd.Timestamp, int], end_timestamp: Union[pd.Timestamp, int]
) -> int:
timestamp_indices = pd.Series(np.arange(len(ts.index)), index=ts.index)
timestamps = timestamp_indices[start_timestamp:end_timestamp]
timestamps = timestamp_indices.loc[start_timestamp:end_timestamp]
return len(timestamps)

def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
start_timestamp: Union[pd.Timestamp, int],
end_timestamp: Union[pd.Timestamp, int],
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool = False,
Expand Down
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,11 @@ def product_level_constant_hierarchical_ts(product_level_constant_hierarchical_d
return ts


@pytest.fixture
def product_level_constant_hierarchical_ts_int_timestamp(product_level_constant_hierarchical_ts):
return convert_ts_to_int_timestamp(product_level_constant_hierarchical_ts)


@pytest.fixture
def product_level_constant_hierarchical_ts_with_exog(
product_level_constant_hierarchical_df, market_level_constant_hierarchical_df_exog, hierarchical_structure
Expand Down

0 comments on commit 7c5aa0e

Please sign in to comment.