Skip to content

Commit

Permalink
feat(#360): ensure correct ordering before concatenating
Browse files Browse the repository at this point in the history
Fixes #360
  • Loading branch information
MartinBernstorff committed Feb 12, 2024
1 parent dba3063 commit 3af55e3
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 35 deletions.
23 changes: 15 additions & 8 deletions src/timeseriesflattenerv2/aggregators.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
from dataclasses import dataclass

import polars as pl
from polars.lazyframe.group_by import LazyGroupBy

from .feature_specs import AggregatedValueFrame, Aggregator, SlicedFrame
from timeseriesflattenerv2.feature_specs import Aggregator

from .feature_specs import AggregatedValueFrame


@dataclass
class MeanAggregator(Aggregator):
name: str = "mean"
def apply(self, grouped_frame: LazyGroupBy, column_name: str) -> AggregatedValueFrame:
value_col_name = f"{column_name}_mean"
df = grouped_frame.agg(pl.col(column_name).mean().alias(value_col_name))
return AggregatedValueFrame(df=df, value_col_name=value_col_name)


def apply(self, sliced_frame: SlicedFrame, column_name: str) -> AggregatedValueFrame:
df = sliced_frame.df.group_by(
sliced_frame.pred_time_uuid_col_name, maintain_order=True
).agg(pl.col(column_name).mean())
# TODO: Figure out how to standardise the output column names
@dataclass
class MaxAggregator(Aggregator):
def apply(self, grouped_frame: LazyGroupBy, column_name: str) -> AggregatedValueFrame:
value_col_name = f"{column_name}_max"
df = grouped_frame.agg(pl.col(column_name).max().alias(value_col_name))

return AggregatedValueFrame(df=df)
return AggregatedValueFrame(df=df, value_col_name=value_col_name)
8 changes: 4 additions & 4 deletions src/timeseriesflattenerv2/feature_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pandas as pd
import polars as pl
from polars.lazyframe.group_by import LazyGroupBy

ValueType = Union[int, float, str]
LookDistance = dt.timedelta
Expand Down Expand Up @@ -65,8 +66,8 @@ class SlicedFrame:
@dataclass(frozen=True)
class AggregatedValueFrame:
df: pl.LazyFrame
value_col_name: str
pred_time_uuid_col_name: str = default_pred_time_uuid_col_name
value_col_name: str = "value"

def fill_nulls(self, fallback: ValueType) -> "SlicedFrame":
filled = self.df.with_columns(pl.col(self.value_col_name).fill_null(fallback))
Expand All @@ -79,9 +80,7 @@ def fill_nulls(self, fallback: ValueType) -> "SlicedFrame":


class Aggregator(Protocol):
name: str

def apply(self, value_frame: SlicedFrame, column_name: str) -> AggregatedValueFrame:
def apply(self, value_frame: LazyGroupBy, column_name: str) -> AggregatedValueFrame:
...


Expand Down Expand Up @@ -117,5 +116,6 @@ def get_timedeltas(self) -> Sequence[dt.datetime]:

@dataclass(frozen=True)
class AggregatedFrame:
df: pl.LazyFrame
pred_time_uuid_col_name: str
timestamp_col_name: str
43 changes: 28 additions & 15 deletions src/timeseriesflattenerv2/flattener.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from iterpy._iter import Iter

from .feature_specs import (
AggregatedFrame,
AggregatedValueFrame,
Aggregator,
LookDistance,
Expand All @@ -22,9 +23,12 @@
def _aggregate_within_slice(
sliced_frame: SlicedFrame, aggregators: Sequence[Aggregator], fallback: ValueType
) -> Sequence[AggregatedValueFrame]:
grouped_frame = sliced_frame.df.groupby(
sliced_frame.pred_time_uuid_col_name, maintain_order=True
)

aggregated_value_frames = [
agg.apply(SlicedFrame(sliced_frame.df), column_name=sliced_frame.value_col_name)
for agg in aggregators
agg.apply(grouped_frame, column_name=sliced_frame.value_col_name) for agg in aggregators
]

with_fallback = [frame.fill_nulls(fallback=fallback) for frame in aggregated_value_frames]
Expand Down Expand Up @@ -69,9 +73,12 @@ def _normalise_lookdistances(spec: ValueSpecification) -> Sequence[LookDistance]
return lookdistances


def _horizontally_concatenate_dfs(dfs: Sequence[pl.LazyFrame]) -> pl.LazyFrame:
# Run some checks on the dfs
return pl.concat(dfs, how="horizontal")
def horizontally_concatenate_dfs(
dfs: Sequence[pl.LazyFrame], pred_time_uuid_col_name: str
) -> pl.LazyFrame:
dfs_without_identifiers = Iter(dfs).map(lambda df: df.drop([pred_time_uuid_col_name])).to_list()

return pl.concat([dfs[0], *dfs_without_identifiers[1:]], how="horizontal")


def _get_timedelta_frame(
Expand All @@ -96,16 +103,13 @@ def _get_timedelta_frame(
def _process_spec(
predictiontime_frame: PredictionTimeFrame, spec: ValueSpecification
) -> ValueFrame:
lookdistances = _normalise_lookdistances(spec)
timedelta_frame = _get_timedelta_frame(
predictiontime_frame=predictiontime_frame, value_frame=spec.value_frame
)

aggregated_value_frames = (
Iter(lookdistances)
Iter(_normalise_lookdistances(spec))
.map(
lambda distance: _slice_and_aggregate_spec(
timedelta_frame=timedelta_frame,
timedelta_frame=_get_timedelta_frame(
predictiontime_frame=predictiontime_frame, value_frame=spec.value_frame
),
distance=distance,
aggregators=spec.aggregators,
fallback=spec.fallback,
Expand All @@ -115,7 +119,10 @@ def _process_spec(
)

return ValueFrame(
df=_horizontally_concatenate_dfs([f.df for f in aggregated_value_frames.to_list()]),
df=horizontally_concatenate_dfs(
[f.df for f in aggregated_value_frames.to_list()],
pred_time_uuid_col_name=predictiontime_frame.pred_time_uuid_col_name,
),
value_type=spec.value_frame.value_type,
entity_id_col_name=spec.value_frame.entity_id_col_name,
value_timestamp_col_name=spec.value_frame.value_timestamp_col_name,
Expand All @@ -126,7 +133,7 @@ def _process_spec(
class Flattener:
predictiontime_frame: PredictionTimeFrame

def aggregate_timeseries(self, specs: Sequence[ValueSpecification]) -> AggregatedValueFrame:
def aggregate_timeseries(self, specs: Sequence[ValueSpecification]) -> AggregatedFrame:
dfs = (
Iter(specs)
.map(
Expand All @@ -137,4 +144,10 @@ def aggregate_timeseries(self, specs: Sequence[ValueSpecification]) -> Aggregate
.map(lambda x: x.lazyframe)
.to_list()
)
return AggregatedValueFrame(df=_horizontally_concatenate_dfs(dfs))
return AggregatedFrame(
df=horizontally_concatenate_dfs(
dfs, pred_time_uuid_col_name=self.predictiontime_frame.pred_time_uuid_col_name
),
pred_time_uuid_col_name=self.predictiontime_frame.pred_time_uuid_col_name,
timestamp_col_name=self.predictiontime_frame.timestamp_col_name,
)
43 changes: 35 additions & 8 deletions src/timeseriesflattenerv2/test_flattener.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import polars.testing as polars_testing
from timeseriesflattener.testing.utils_for_testing import str_to_pl_df

from timeseriesflattenerv2.aggregators import MeanAggregator
from timeseriesflattenerv2.aggregators import MaxAggregator, MeanAggregator

from . import flattener
from .feature_specs import (
AggregatedValueFrame,
AggregatedFrame,
PredictionTimeFrame,
PredictorSpec,
SlicedFrame,
Expand All @@ -17,9 +17,7 @@


def assert_frame_equal(left: pl.DataFrame, right: pl.DataFrame):
polars_testing.assert_frame_equal(
left, right, check_dtype=False, check_column_order=False, check_row_order=False
)
polars_testing.assert_frame_equal(left, right, check_dtype=False, check_column_order=False)


def test_flattener():
Expand Down Expand Up @@ -48,7 +46,7 @@ def test_flattener():
]
)

assert isinstance(result, AggregatedValueFrame)
assert isinstance(result, AggregatedFrame)


def test_get_timedelta_frame():
Expand Down Expand Up @@ -90,7 +88,7 @@ def test_aggregate_within_slice():
)

expected = str_to_pl_df(
"""pred_time_uuid,value
"""pred_time_uuid,value_mean
1-2021-01-03,1.5
2-2021-01-03,3"""
)
Expand All @@ -108,8 +106,37 @@ def test_aggregate_over_fallback():
)

expected = str_to_pl_df(
"""pred_time_uuid,value
"""pred_time_uuid,value_mean
1-2021-01-03,0"""
)

assert_frame_equal(aggregated_values[0].df.collect(), expected)


def test_multiple_aggregatrs():
sliced_frame = SlicedFrame(
df=str_to_pl_df(
"""pred_time_uuid,value
1-2021-01-03,1
1-2021-01-03,2
2-2021-01-03,2
2-2021-01-03,4"""
).lazy()
)

aggregated_values = flattener._aggregate_within_slice(
sliced_frame=sliced_frame, aggregators=[MeanAggregator(), MaxAggregator()], fallback=0
)

expected = str_to_pl_df(
"""pred_time_uuid,value_mean,value_max
1-2021-01-03,1.5,2
2-2021-01-03,3,4"""
)

assert_frame_equal(
flattener.horizontally_concatenate_dfs(
[agg.df for agg in aggregated_values], pred_time_uuid_col_name="pred_time_uuid"
).collect(),
expected,
)

0 comments on commit 3af55e3

Please sign in to comment.