Skip to content

Commit

Permalink
Speed up TimeSeriesImputerTransform (#217)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-hse-repository committed Feb 6, 2024
1 parent 6a6f180 commit e978ad0
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 72 deletions.
4 changes: 3 additions & 1 deletion etna/transforms/math/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
result: pd.DataFrame
dataframe with results
"""
window = self.window if self.window != -1 else len(df)
window = self.window
if self.window == -1:
window = (len(df) - 1) // self.seasonality + 1
self._alpha_range = np.array([self.alpha**i for i in range(window)])
self._alpha_range = np.expand_dims(self._alpha_range, axis=0) # (1, window)
return super()._transform(df)
Expand Down
160 changes: 105 additions & 55 deletions etna/transforms/missing_values/imputation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,69 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import cast
from typing import Sequence

import bottleneck as bn
import numpy as np
import pandas as pd
from sklearn.impute import SimpleImputer

from etna.distributions import BaseDistribution
from etna.distributions import CategoricalDistribution
from etna.distributions import IntDistribution
from etna.transforms import LagTransform
from etna.transforms import MeanTransform
from etna.transforms.base import ReversibleTransform
from etna.transforms.utils import check_new_segments


class SimpleImputerSubsegment(SimpleImputer):
def __init__(
self,
missing_values=np.nan,
strategy="mean",
fill_value=None,
verbose=0,
copy=True,
add_indicator=False,
):
super().__init__(
missing_values=missing_values,
strategy=strategy,
fill_value=fill_value,
verbose=verbose,
copy=copy,
add_indicator=add_indicator,
)
self._segment_to_index: Optional[Dict[str, int]] = None

def fit(self, X, y=None): # noqa: N803
X.sort_index(axis=1, inplace=True)
segments = sorted(X.columns.get_level_values("segment").unique())
self._segment_to_index = {segment: i for i, segment in enumerate(segments)}
super().fit(X.values)

def transform(self, X): # noqa: N803
X.sort_index(axis=1, inplace=True)
segments = X.columns.get_level_values("segment").unique()
old_statistics = self.statistics_.copy()
self.n_features_in_ = len(segments)
self.statistics_ = self.statistics_[[self._segment_to_index[segment] for segment in segments]]
try:
super().transform(X.values) # noqa: N803
finally:
self.n_features_in_ = len(old_statistics)
self.statistics_ = old_statistics


class ImputerMode(str, Enum):
"""Enum for different imputation strategy."""

mean = "mean"
running_mean = "running_mean"
forward_fill = "forward_fill"
seasonal = "seasonal"
seasonal_nonautoreg = "seasonal_nonautoreg"
constant = "constant"

@classmethod
Expand Down Expand Up @@ -70,7 +114,11 @@ def __init__(
- If "forward_fill" then replace missing dates using last existing value
- If "seasonal" then replace missing dates using seasonal moving average
- If "seasonal" then replace missing dates using seasonal moving average in autoregressive manner,
point are imputed one by one in time order, already imputed points are used to impute the next points
- If "seasonal_nonautoreg" then replace missing dates using seasonal moving average of existing values,
all points are imputed using only existing points
- If "constant" then replace missing dates using constant value.
Expand Down Expand Up @@ -101,8 +149,9 @@ def __init__(
self.default_value = default_value
self.constant_value = constant_value
self._strategy = ImputerMode(strategy)
self._fill_value: Optional[Dict[str, float]] = None
self._nan_timestamps: Optional[Dict[str, List[pd.Timestamp]]] = None
self._fit_segments: Optional[Sequence[str]] = None
self._nans_to_impute_mask: Optional[pd.DataFrame] = None
self._mean_imputer: Optional[SimpleImputerSubsegment] = None

def get_regressors_info(self) -> List[str]:
"""Return the list with regressors created by the transform."""
Expand All @@ -116,26 +165,17 @@ def _fit(self, df: pd.DataFrame):
df:
Dataframe in etna wide format.
"""
segments = sorted(set(df.columns.get_level_values("segment")))
features = df.loc[:, pd.IndexSlice[segments, self.in_column]]
if features.isna().all().any():
if df.isna().all().any():
raise ValueError("Series hasn't non NaN values which means it is empty and can't be filled.")

nan_timestamps = {}
for segment in segments:
series = features.loc[:, pd.IndexSlice[segment, self.in_column]]
series = series[series.first_valid_index() :]
nan_timestamps[segment] = series[series.isna()].index
self._fit_segments = sorted(set(df.columns.get_level_values("segment")))

fill_value = {}
if self._strategy is ImputerMode.mean:
mean_values = features.mean().to_dict()
# take only segment from multiindex key
mean_values = {key[0]: value for key, value in mean_values.items()}
fill_value = mean_values
self._mean_imputer = SimpleImputerSubsegment(strategy="mean", copy=False)
self._mean_imputer.fit(df)

self._nan_timestamps = nan_timestamps
self._fill_value = fill_value
_beginning_nans_mask = df.fillna(method="ffill").isna()
self._nans_to_impute_mask = df.isna() & (~_beginning_nans_mask)

def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Transform dataframe.
Expand All @@ -150,24 +190,20 @@ def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
:
Transformed Dataframe in etna wide format.
"""
if self._fill_value is None or self._nan_timestamps is None:
if self._fit_segments is None or self._nans_to_impute_mask is None:
raise ValueError("Transform is not fitted!")

segments = sorted(set(df.columns.get_level_values("segment")))
check_new_segments(transform_segments=segments, fit_segments=self._nan_timestamps.keys())
segments = df.columns.get_level_values("segment").unique()
check_new_segments(transform_segments=segments, fit_segments=self._fit_segments)

cur_nans = {}
for segment in segments:
series = df.loc[:, pd.IndexSlice[segment, self.in_column]]
cur_nans[segment] = series[series.isna()].index
index_intersection = df.index.intersection(self._nans_to_impute_mask.index)
nans_to_restore_mask = df.isna()
nans_to_restore_mask.loc[index_intersection] = nans_to_restore_mask.loc[index_intersection].mask(
self._nans_to_impute_mask.loc[index_intersection], False
) # Mask is broadcasted with True values

result_df = self._fill(df)

# restore nans not in self.nan_timestamps
for segment in segments:
restore_nans = cur_nans[segment].difference(self._nan_timestamps[segment])
result_df.loc[restore_nans, pd.IndexSlice[segment, self.in_column]] = np.nan

result_df.mask(nans_to_restore_mask, inplace=True)
return result_df

def _fill(self, df: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -185,29 +221,44 @@ def _fill(self, df: pd.DataFrame) -> pd.DataFrame:
:
Filled Dataframe.
"""
self._fill_value = cast(Dict[str, float], self._fill_value)
self._nan_timestamps = cast(Dict[str, List[pd.Timestamp]], self._nan_timestamps)
segments = sorted(set(df.columns.get_level_values("segment")))
if self._nans_to_impute_mask is None or (self.strategy is ImputerMode.mean and self._mean_imputer is None):
raise ValueError("Transform is not fitted!")

if self._strategy is ImputerMode.constant:
new_values = df.loc[:, pd.IndexSlice[:, self.in_column]].fillna(value=self.constant_value)
df.loc[:, pd.IndexSlice[:, self.in_column]] = new_values
df.fillna(value=self.constant_value, inplace=True)
elif self._strategy is ImputerMode.forward_fill:
new_values = df.loc[:, pd.IndexSlice[:, self.in_column]].fillna(method="ffill")
df.loc[:, pd.IndexSlice[:, self.in_column]] = new_values
df.fillna(method="ffill", inplace=True)
elif self._strategy is ImputerMode.mean:
for segment in segments:
df.loc[:, pd.IndexSlice[segment, self.in_column]].fillna(value=self._fill_value[segment], inplace=True)
self._mean_imputer.transform(df) # type: ignore
elif self._strategy is ImputerMode.running_mean or self._strategy is ImputerMode.seasonal:
timestamp_to_index = {timestamp: i for i, timestamp in enumerate(df.index)}
for segment in segments:
history = self.seasonality * self.window if self.window != -1 else len(df)
for timestamp in self._nan_timestamps[segment]:
i = timestamp_to_index[timestamp]
indexes = np.arange(i - self.seasonality, i - self.seasonality - history, -self.seasonality)
indexes = indexes[indexes >= 0]
values = df.loc[df.index[indexes], pd.IndexSlice[segment, self.in_column]]
df.loc[timestamp, pd.IndexSlice[segment, self.in_column]] = np.nanmean(values)
history = self.seasonality * self.window if self.window != -1 else len(df)
nan_mask = df.isna().values
nan_indexes = np.arange(len(df))[nan_mask.any(axis=1)]
for i in nan_indexes:
indexes = np.arange(i - self.seasonality, i - self.seasonality - history, -self.seasonality)
indexes = indexes[indexes >= 0]
if len(indexes) > 0:
impute_values = bn.nanmean(df.iloc[indexes], axis=0)
df.iloc[i] = np.where(nan_mask[i], impute_values, df.iloc[i])
elif self._strategy is ImputerMode.seasonal_nonautoreg:
lag_transform = LagTransform(in_column=self.in_column, lags=[self.seasonality], out_column="lag")
sma_transform = MeanTransform(
in_column=f"lag_{self.seasonality}",
window=self.window,
seasonality=self.seasonality,
fillna=np.NaN,
out_column="sma",
)
df_filled = (
sma_transform._transform(lag_transform._transform(df))
.loc[:, pd.IndexSlice[:, "sma"]]
.rename(columns={"sma": self.in_column})
)

index_intersection = df.index.intersection(self._nans_to_impute_mask.index)
df.loc[index_intersection] = df.loc[index_intersection].mask(
self._nans_to_impute_mask.loc[index_intersection], df_filled.loc[index_intersection]
)

if self.default_value is not None:
df.fillna(value=self.default_value, inplace=True)
Expand All @@ -226,15 +277,14 @@ def _inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
:
Dataframe after applying inverse transformation.
"""
if self._fill_value is None or self._nan_timestamps is None:
if self._fit_segments is None or self._nans_to_impute_mask is None:
raise ValueError("Transform is not fitted!")

segments = sorted(set(df.columns.get_level_values("segment")))
check_new_segments(transform_segments=segments, fit_segments=self._nan_timestamps.keys())
check_new_segments(transform_segments=segments, fit_segments=self._fit_segments)

for segment in segments:
index = df.index.intersection(self._nan_timestamps[segment])
df.loc[index, pd.IndexSlice[segment, self.in_column]] = np.NaN
index_intersection = df.index.intersection(self._nans_to_impute_mask.index)
df.loc[index_intersection] = df.mask(self._nans_to_impute_mask.loc[index_intersection])
return df

def params_to_tune(self) -> Dict[str, BaseDistribution]:
Expand Down
14 changes: 12 additions & 2 deletions tests/test_transforms/test_inference/test_inverse_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,12 @@ def _test_inverse_transform_train_subset_segments(self, ts, transform, segments)
),
"ts_to_resample",
),
(TimeSeriesImputerTransform(in_column="target"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="constant"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="forward_fill"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="mean"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="seasonal"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="running_mean"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="seasonal_nonautoreg"), "ts_to_fill"),
# outliers
(DensityOutliersTransform(in_column="target"), "ts_with_outliers"),
(MedianOutliersTransform(in_column="target"), "ts_with_outliers"),
Expand Down Expand Up @@ -430,7 +435,12 @@ def _test_inverse_transform_future_subset_segments(self, ts, transform, segments
),
"ts_to_resample",
),
(TimeSeriesImputerTransform(in_column="target"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="constant"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="forward_fill"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="mean"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="seasonal"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="running_mean"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="seasonal_nonautoreg"), "ts_to_fill"),
# outliers
(DensityOutliersTransform(in_column="target"), "ts_with_outliers"),
(MedianOutliersTransform(in_column="target"), "ts_with_outliers"),
Expand Down
14 changes: 12 additions & 2 deletions tests/test_transforms/test_inference/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,12 @@ def _test_transform_train_subset_segments(self, ts, transform, segments):
),
"ts_to_resample",
),
(TimeSeriesImputerTransform(in_column="target"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="constant"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="forward_fill"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="mean"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="seasonal"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="running_mean"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="seasonal_nonautoreg"), "ts_to_fill"),
# outliers
(DensityOutliersTransform(in_column="target"), "ts_with_outliers"),
(MedianOutliersTransform(in_column="target"), "ts_with_outliers"),
Expand Down Expand Up @@ -412,7 +417,12 @@ def _test_transform_future_subset_segments(self, ts, transform, segments, horizo
),
"ts_to_resample",
),
(TimeSeriesImputerTransform(in_column="target"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="constant"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="forward_fill"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="mean"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="seasonal"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="running_mean"), "ts_to_fill"),
(TimeSeriesImputerTransform(in_column="target", strategy="seasonal_nonautoreg"), "ts_to_fill"),
# outliers
(DensityOutliersTransform(in_column="target"), "ts_with_outliers"),
(MedianOutliersTransform(in_column="target"), "ts_with_outliers"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def test_interface_quantile(simple_ts_for_agg: TSDataset, out_column: str):
(
(10, 1, 1, 1, 0, np.array([0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5])),
(-1, 1, 1, 1, 0, np.array([0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5])),
(-1, 2, 1, 1, 0, np.array([0, 1, 1, 2, 2, 3, 3, 4, 4, 5])),
(3, 1, 1, 1, -17, np.array([0, 0.5, 1, 2, 3, 4, 5, 6, 7, 8])),
(3, 1, 0.5, 1, -17, np.array([0, 0.5, 2.5 / 3, 4.25 / 3, 2, 7.75 / 3, 9.5 / 3, 11.25 / 3, 13 / 3, 14.75 / 3])),
(
Expand Down

0 comments on commit e978ad0

Please sign in to comment.