Skip to content

Commit

Permalink
feat(#399): handle boolean outcomes without value column automatically
Browse files Browse the repository at this point in the history
Fixes #399
  • Loading branch information
MartinBernstorff committed Feb 16, 2024
1 parent d069b14 commit b4e09bc
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 70 deletions.
14 changes: 7 additions & 7 deletions src/timeseriesflattenerv2/_process_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,23 @@ def _mask_outside_lookperiod(


def _aggregate_masked_frame(
sliced_frame: TimeMaskedFrame, aggregators: Sequence[Aggregator], fallback: ValueType
masked_frame: TimeMaskedFrame, aggregators: Sequence[Aggregator], fallback: ValueType
) -> pl.LazyFrame:
aggregator_expressions = [aggregator(sliced_frame.value_col_name) for aggregator in aggregators]
aggregator_expressions = [aggregator(masked_frame.value_col_name) for aggregator in aggregators]

grouped_frame = sliced_frame.init_df.group_by(
sliced_frame.pred_time_uuid_col_name, maintain_order=True
grouped_frame = masked_frame.init_df.group_by(
masked_frame.pred_time_uuid_col_name, maintain_order=True
).agg(aggregator_expressions)

value_columns = (
Iter(grouped_frame.columns)
.filter(lambda col: sliced_frame.value_col_name in col)
.filter(lambda col: masked_frame.value_col_name in col)
.map(lambda old_name: (old_name, f"{old_name}_fallback_{fallback}"))
)
rename_mapping = dict(value_columns)

with_fallback = grouped_frame.with_columns(
cs.contains(sliced_frame.value_col_name).fill_null(fallback)
cs.contains(masked_frame.value_col_name).fill_null(fallback)
).rename(rename_mapping)

return with_fallback
Expand Down Expand Up @@ -132,7 +132,7 @@ def process_spec(
predictiontime_frame=predictiontime_frame, value_frame=spec.value_frame
),
masked_aggregator=lambda sliced_frame: _aggregate_masked_frame(
aggregators=spec.aggregators, fallback=spec.fallback, sliced_frame=sliced_frame
aggregators=spec.aggregators, fallback=spec.fallback, masked_frame=sliced_frame
),
time_masker=lambda timedelta_frame: _mask_outside_lookperiod(
timedelta_frame=timedelta_frame,
Expand Down
159 changes: 108 additions & 51 deletions src/timeseriesflattenerv2/feature_specs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime as dt
from abc import ABC, abstractmethod
from dataclasses import InitVar, dataclass
from typing import Literal, NewType, Sequence, Union
from typing import Literal, NewType, Sequence, TypeAlias, Union

import pandas as pd
import polars as pl
Expand All @@ -18,6 +18,35 @@
InitDF_T = pl.LazyFrame | pl.DataFrame | pd.DataFrame


def _anyframe_to_lazyframe(init_df: InitDF_T) -> pl.LazyFrame:
if isinstance(init_df, pl.LazyFrame):
return init_df
if isinstance(init_df, pl.DataFrame):
return init_df.lazy()
if isinstance(init_df, pd.DataFrame):
return pl.from_pandas(init_df).lazy()
raise ValueError(f"Unsupported type: {type(init_df)}.")


FrameTypes: TypeAlias = "PredictionTimeFrame | ValueFrame | TimeMaskedFrame | AggregatedValueFrame | TimedeltaFrame | TimestampValueFrame | PredictorSpec | OutcomeSpec | BooleanOutcomeSpec"


def _validate_col_name_columns_exist(obj: FrameTypes):
missing_columns = (
Iter(dir(obj))
.filter(lambda attr_name: attr_name.endswith("_col_name"))
.map(lambda attr_name: getattr(obj, attr_name))
.filter(lambda col_name: col_name not in obj.df.columns)
.to_list()
)

if len(missing_columns) > 0:
raise SpecColumnError(
f"""Missing columns: {missing_columns} in dataframe.
Current columns are: {obj.df.columns}."""
)


@dataclass
class PredictionTimeFrame:
init_df: InitVar[InitDF_T]
Expand All @@ -26,13 +55,7 @@ class PredictionTimeFrame:
pred_time_uuid_col_name: str = default_pred_time_uuid_col_name

def __post_init__(self, init_df: InitDF_T):
if isinstance(init_df, pl.LazyFrame):
self.df: pl.LazyFrame = init_df
elif isinstance(init_df, pd.DataFrame):
self.df: pl.LazyFrame = pl.from_pandas(init_df).lazy()
elif isinstance(init_df, pl.DataFrame):
self.df: pl.LazyFrame = init_df.lazy()

self.df = _anyframe_to_lazyframe(init_df)
self.df = self.df.with_columns(
pl.concat_str(
pl.col(self.entity_id_col_name), pl.lit("-"), pl.col(self.timestamp_col_name)
Expand All @@ -41,21 +64,7 @@ def __post_init__(self, init_df: InitDF_T):
.alias(self.pred_time_uuid_col_name)
)

self._validate_columns_exist()

def _validate_columns_exist(self):
missing_columns = (
Iter(dir(self))
.filter(lambda attr_name: attr_name.endswith("_col_name"))
.map(lambda attr_name: getattr(self, attr_name))
.filter(lambda col_name: col_name not in self.df.columns)
)

if missing_columns.count() > 0:
raise SpecColumnError(
f"""Missing columns: {missing_columns} in dataframe.
Current columns are: {self.df.columns}."""
)
_validate_col_name_columns_exist(obj=self)

def collect(self) -> pl.DataFrame:
if isinstance(self.df, pl.DataFrame):
Expand All @@ -81,24 +90,26 @@ class ValueFrame:
value_timestamp_col_name: str = "timestamp"

def __post_init__(self, init_df: InitDF_T):
if isinstance(init_df, pl.LazyFrame):
self.df: pl.LazyFrame = init_df
elif isinstance(init_df, pd.DataFrame):
self.df: pl.LazyFrame = pl.from_pandas(init_df).lazy()
elif isinstance(init_df, pl.DataFrame):
self.df: pl.LazyFrame = init_df.lazy()
# validate that the required columns are present in the dataframe
required_columns = [
self.entity_id_col_name,
self.value_col_name,
self.value_timestamp_col_name,
]
missing_columns = [col for col in required_columns if col not in self.df.columns]
if missing_columns:
raise SpecColumnError(
f"""Missing columns: {missing_columns} in the {self.value_col_name} specification.
Current columns are: {self.df.columns}."""
)
self.df = _anyframe_to_lazyframe(init_df)
_validate_col_name_columns_exist(obj=self)

def collect(self) -> pl.DataFrame:
if isinstance(self.df, pl.DataFrame):
return self.df
return self.df.collect()


@dataclass
class TimestampValueFrame:
"""A frame that contains the values of a time series."""

init_df: InitVar[InitDF_T]
value_timestamp_col_name: str = "timestamp"
entity_id_col_name: str = default_entity_id_col_name

def __post_init__(self, init_df: InitDF_T):
self.df = _anyframe_to_lazyframe(init_df)
_validate_col_name_columns_exist(obj=self)

def collect(self) -> pl.DataFrame:
if isinstance(self.df, pl.DataFrame):
Expand All @@ -114,6 +125,11 @@ class TimeMaskedFrame:
value_col_name: str
timestamp_col_name: str = default_timestamp_col_name
pred_time_uuid_col_name: str = default_pred_time_uuid_col_name
validate_cols_exist: bool = True

def __post_init__(self):
if self.validate_cols_exist:
_validate_col_name_columns_exist(obj=self)

@property
def df(self) -> pl.LazyFrame:
Expand All @@ -129,6 +145,9 @@ class AggregatedValueFrame:
value_col_name: str
pred_time_uuid_col_name: str = default_pred_time_uuid_col_name

def __post_init__(self):
_validate_col_name_columns_exist(obj=self)

def fill_nulls(self, fallback: ValueType) -> "AggregatedValueFrame":
filled = self.df.with_columns(
pl.col(self.value_col_name)
Expand Down Expand Up @@ -190,38 +209,73 @@ def _lookdistance_to_normalised_lookperiod(
)


LookDistances = Sequence[LookDistance | tuple[LookDistance, LookDistance]]


@dataclass
class PredictorSpec:
value_frame: ValueFrame
lookbehind_distances: InitVar[Sequence[LookDistance | tuple[LookDistance, LookDistance]]]
lookbehind_distances: InitVar[LookDistances]
aggregators: Sequence[Aggregator]
fallback: ValueType
column_prefix: str = "pred"

def __post_init__(
self, lookbehind_distances: Sequence[LookDistance | tuple[LookDistance, LookDistance]]
):
def __post_init__(self, lookbehind_distances: LookDistances):
self.normalised_lookperiod = [
_lookdistance_to_normalised_lookperiod(lookdistance=lookdistance, direction="behind")
for lookdistance in lookbehind_distances
]
_validate_col_name_columns_exist(obj=self)

@property
def df(self) -> pl.LazyFrame:
return self.value_frame.df

@dataclass()

@dataclass
class OutcomeSpec:
value_frame: ValueFrame
lookahead_distances: InitVar[Sequence[LookDistance | tuple[LookDistance, LookDistance]]]
lookahead_distances: InitVar[LookDistances]
aggregators: Sequence[Aggregator]
fallback: ValueType
column_prefix: str = "outc"

def __post_init__(
self, lookahead_distances: Sequence[LookDistance | tuple[LookDistance, LookDistance]]
):
def __post_init__(self, lookahead_distances: LookDistances):
self.normalised_lookperiod = [
_lookdistance_to_normalised_lookperiod(lookdistance=lookdistance, direction="ahead")
for lookdistance in lookahead_distances
]
_validate_col_name_columns_exist(obj=self)

@property
def df(self) -> pl.LazyFrame:
return self.value_frame.df


@dataclass
class BooleanOutcomeSpec:
init_frame: InitVar[TimestampValueFrame]
lookahead_distances: LookDistances
aggregators: Sequence[Aggregator]
fallback: ValueType
column_prefix: str = "outc"

def __post_init__(self, init_frame: TimestampValueFrame):
self.normalised_lookperiod = [
_lookdistance_to_normalised_lookperiod(lookdistance=lookdistance, direction="ahead")
for lookdistance in self.lookahead_distances
]

self.value_frame = ValueFrame(
init_df=init_frame.df.with_columns((pl.lit(1)).alias("value")),
value_col_name="value",
entity_id_col_name=init_frame.entity_id_col_name,
value_timestamp_col_name=init_frame.value_timestamp_col_name,
)

@property
def df(self) -> pl.LazyFrame:
return self.value_frame.df


@dataclass
Expand All @@ -232,14 +286,17 @@ class TimedeltaFrame:
pred_time_uuid_col_name: str = default_pred_time_uuid_col_name
timedelta_col_name: str = "time_from_prediction_to_value"

def __post_init__(self):
_validate_col_name_columns_exist(obj=self)

def get_timedeltas(self) -> Sequence[dt.datetime]:
return self.collect().get_column(self.timedelta_col_name).to_list()

def collect(self) -> pl.DataFrame:
return self.df.collect()


ValueSpecification = Union[PredictorSpec, OutcomeSpec]
ValueSpecification = Union[PredictorSpec, OutcomeSpec, BooleanOutcomeSpec]


@dataclass(frozen=True)
Expand Down
8 changes: 6 additions & 2 deletions src/timeseriesflattenerv2/test_aggregators.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ class SingleVarAggregatorExample:
@property
def input(self) -> pl.LazyFrame:
return pl.LazyFrame(
{"pred_time_uuid": [1] * len(self.input_values), "value": self.input_values}
{
"pred_time_uuid": [1] * len(self.input_values),
"value": self.input_values,
"timestamp": ["2021-01-01"] * len(self.input_values),
}
)

@property
Expand Down Expand Up @@ -135,7 +139,7 @@ def expected_output(self) -> pl.DataFrame:
)
def test_aggregator(example: AggregatorExampleType):
result = _aggregate_masked_frame(
sliced_frame=TimeMaskedFrame(
masked_frame=TimeMaskedFrame(
init_df=example.input,
value_col_name="value",
pred_time_uuid_col_name="pred_time_uuid",
Expand Down
32 changes: 22 additions & 10 deletions src/timeseriesflattenerv2/test_process_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@


def test_aggregate_over_fallback():
sliced_frame = TimeMaskedFrame(
masked_frame = TimeMaskedFrame(
validate_cols_exist=False,
init_df=pl.LazyFrame(
{"pred_time_uuid": ["1-2021-01-03", "1-2021-01-03"], "value": [None, None]}
{
"pred_time_uuid": ["1-2021-01-03", "1-2021-01-03"],
"value": [None, None],
"timestamp": ["2021-01-01", "2021-01-02"],
}
),
value_col_name="value",
)

aggregated_values = process_spec._aggregate_masked_frame(
sliced_frame=sliced_frame, aggregators=[MeanAggregator()], fallback=0
masked_frame=masked_frame, aggregators=[MeanAggregator()], fallback=0
)

expected = str_to_pl_df(
Expand All @@ -37,15 +42,20 @@ def test_aggregate_over_fallback():


def test_aggregate_with_null():
sliced_frame = TimeMaskedFrame(
masked_frame = TimeMaskedFrame(
validate_cols_exist=False,
init_df=pl.LazyFrame(
{"pred_time_uuid": ["1-2021-01-03", "1-2021-01-03"], "value": [1, None]}
{
"pred_time_uuid": ["1-2021-01-03", "1-2021-01-03"],
"value": [1, None],
"timestamp": ["2021-01-01", "2021-01-02"],
}
),
value_col_name="value",
)

aggregated_values = process_spec._aggregate_masked_frame(
sliced_frame=sliced_frame, aggregators=[MeanAggregator()], fallback=0
masked_frame=masked_frame, aggregators=[MeanAggregator()], fallback=0
)

expected = str_to_pl_df(
Expand All @@ -57,7 +67,8 @@ def test_aggregate_with_null():


def test_aggregate_within_slice():
sliced_frame = TimeMaskedFrame(
masked_frame = TimeMaskedFrame(
validate_cols_exist=False,
init_df=str_to_pl_df(
"""pred_time_uuid,value
1-2021-01-03,1
Expand All @@ -69,7 +80,7 @@ def test_aggregate_within_slice():
)

aggregated_values = process_spec._aggregate_masked_frame(
sliced_frame=sliced_frame, aggregators=[MeanAggregator()], fallback=0
masked_frame=masked_frame, aggregators=[MeanAggregator()], fallback=0
)

expected = str_to_pl_df(
Expand Down Expand Up @@ -141,7 +152,8 @@ def test_slice_without_any_within_window():


def test_multiple_aggregators():
sliced_frame = TimeMaskedFrame(
masked_frame = TimeMaskedFrame(
validate_cols_exist=False,
init_df=str_to_pl_df(
"""pred_time_uuid,value
1-2021-01-03,1
Expand All @@ -153,7 +165,7 @@ def test_multiple_aggregators():
)

aggregated_values = process_spec._aggregate_masked_frame(
sliced_frame=sliced_frame, aggregators=[MeanAggregator(), MaxAggregator()], fallback=0
masked_frame=masked_frame, aggregators=[MeanAggregator(), MaxAggregator()], fallback=0
)

expected = str_to_pl_df(
Expand Down

0 comments on commit b4e09bc

Please sign in to comment.