Skip to content

Commit

Permalink
fix(#447): implement earliest and latest without requiring sorting (#450
Browse files Browse the repository at this point in the history
)

Fixes #447
  • Loading branch information
MartinBernstorff committed Feb 16, 2024
2 parents bf6e2d4 + 24acab1 commit f88fccb
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 10 deletions.
22 changes: 18 additions & 4 deletions src/timeseriesflattenerv2/aggregators.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@


class MinAggregator(Aggregator):
name = "min"
name: str = "min"

def __call__(self, column_name: str) -> pl.Expr:
return pl.col(column_name).min().alias(self.new_col_name(column_name))


class MaxAggregator(Aggregator):
name = "max"
name: str = "max"

def __call__(self, column_name: str) -> pl.Expr:
return pl.col(column_name).max().alias(self.new_col_name(column_name))
Expand All @@ -34,18 +34,32 @@ def __call__(self, column_name: str) -> pl.Expr:
return pl.col(column_name).count().alias(self.new_col_name(column_name))


@dataclass(frozen=True)
class EarliestAggregator(Aggregator):
timestamp_col_name: str
name: str = "earliest"

def __call__(self, column_name: str) -> pl.Expr:
return pl.col(column_name).first().alias(self.new_col_name(column_name))
return (
pl.col(column_name)
.filter(pl.col(self.timestamp_col_name) == pl.col(self.timestamp_col_name).min())
.first()
.alias(self.new_col_name(column_name))
)


@dataclass(frozen=True)
class LatestAggregator(Aggregator):
timestamp_col_name: str
name: str = "latest"

def __call__(self, column_name: str) -> pl.Expr:
return pl.col(column_name).last().alias(self.new_col_name(column_name))
return (
pl.col(column_name)
.filter(pl.col(self.timestamp_col_name) == pl.col(self.timestamp_col_name).max())
.first()
.alias(self.new_col_name(column_name))
)


class SumAggregator(Aggregator):
Expand Down
36 changes: 30 additions & 6 deletions src/timeseriesflattenerv2/test_aggregators.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ def expected_output(self) -> pl.DataFrame:
SingleVarAggregatorExample(
aggregator=CountAggregator(), input_values=[1, 2], expected_output_values=[2]
),
SingleVarAggregatorExample(
aggregator=EarliestAggregator(), input_values=[1, 2], expected_output_values=[1]
),
SingleVarAggregatorExample(
aggregator=LatestAggregator(), input_values=[1, 2], expected_output_values=[2]
),
SingleVarAggregatorExample(
aggregator=SumAggregator(), input_values=[1, 2], expected_output_values=[3]
),
Expand Down Expand Up @@ -106,6 +100,36 @@ def expected_output(self) -> pl.DataFrame:
"""
),
),
ComplexAggregatorExample(
aggregator=EarliestAggregator(timestamp_col_name="timestamp"),
input=str_to_pl_df(
"""pred_time_uuid,timestamp,value
1,2013-01-01,1, # Kept, first value in 1
1,2013-01-02,2, # Dropped, second value in 1
2,2013-01-04,3, # Dropped, second value in 2
2,2013-01-03,4, # Kept, first value in 2"""
).lazy(),
expected_output=str_to_pl_df(
"""pred_time_uuid,value_earliest_fallback_nan
1,1,
2,4,"""
),
),
ComplexAggregatorExample(
aggregator=LatestAggregator(timestamp_col_name="timestamp"),
input=str_to_pl_df(
"""pred_time_uuid,timestamp,value
1,2013-01-01,1, # Dropped, first value in 1
1,2013-01-02,2, # Kept, second value in 1
2,2013-01-04,3, # Kept, second value in 2
2,2013-01-03,4, # Dropped, first value in 2"""
).lazy(),
expected_output=str_to_pl_df(
"""pred_time_uuid,value_latest_fallback_nan
1,2,
2,3,"""
),
),
],
ids=lambda example: example.aggregator.__class__.__name__,
)
Expand Down

0 comments on commit f88fccb

Please sign in to comment.