Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

380: Initial implementation of moving_product operation #383

Merged
merged 11 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 57 additions & 0 deletions temporian/core/event_set_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -3171,6 +3171,63 @@ def moving_standard_deviation(
self, window_length=window_length, sampling=sampling
)

def moving_product(
self: EventSetOrNode,
window_length: WindowLength,
sampling: Optional[EventSetOrNode] = None,
) -> EventSetOrNode:
"""Computes the product of values in a sliding window over an
[`EventSet`][temporian.EventSet].

For each t in sampling, and for each feature independently, returns at
time t the product of non-zero and non-NaN values for the feature in the window
(t - window_length, t].

`sampling` can't be specified if a variable `window_length` is
specified (i.e., if `window_length` is an EventSet).

If `sampling` is specified or `window_length` is an EventSet, the moving
window is sampled at each timestamp in them, else it is sampled on the
input's.

Zeros or missing values (such as NaNs) result in the accumulator's result being 0 for the window.
If the window does not contain any non-missing values (e.g., all values are
missing or zero, or the window does not contain any sampling), outputs missing
values.

Example:
```python
>>> a = tp.event_set(
... timestamps=[0, 1, 2, 5, 6, 7],
... features={"value": [np.nan, 1, 5, 10, 15, 20]},
... )

>>> b = a.moving_product(tp.duration.seconds(4))
>>> b
indexes: ...
(6 events):
timestamps: [0. 1. 2. 5. 6. 7.]
'value': [1. 1. 5. 50. 150. 3000.]
...

```

See [`EventSet.moving_count()`][temporian.EventSet.moving_count] for
examples of moving window operations with external sampling and indices.

Args:
window_length: Sliding window's length.
sampling: Timestamps to sample the sliding window's value at. If not
provided, timestamps in the input are used.

Returns:
EventSet containing the moving product of each feature in the input,
considering non-zero and non-NaN values only.
"""
from temporian.core.operators.window.moving_product import moving_product

return moving_product(self, window_length=window_length, sampling=sampling)

def moving_sum(
self: EventSetOrNode,
window_length: WindowLength,
Expand Down
16 changes: 16 additions & 0 deletions temporian/core/operators/window/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ py_library(
":moving_standard_deviation",
":moving_sum",
":simple_moving_average",
":moving_product",
],
)

Expand Down Expand Up @@ -126,3 +127,18 @@ py_library(
"//temporian/core/data:schema",
],
)

py_library(
name = "moving_product",
srcs = ["moving_product.py"],
srcs_version = "PY3",
deps = [
":base",
"//temporian/core:compilation",
"//temporian/core:operator_lib",
"//temporian/core:typing",
"//temporian/core/data:dtype",
"//temporian/core/data:node",
"//temporian/core/data:schema",
],
)
1 change: 1 addition & 0 deletions temporian/core/operators/window/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@
from temporian.core.operators.window.moving_count import moving_count
from temporian.core.operators.window.moving_min import moving_min
from temporian.core.operators.window.moving_max import moving_max
from temporian.core.operators.window.moving_product import moving_product
69 changes: 69 additions & 0 deletions temporian/core/operators/window/moving_product.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2021 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Moving Product operator class and public API function definition.."""

from typing import Optional

from temporian.core import operator_lib
from temporian.core.compilation import compile
from temporian.core.data.dtype import DType
from temporian.core.data.node import EventSetNode
from temporian.core.data.schema import FeatureSchema
from temporian.core.operators.window.base import BaseWindowOperator
from temporian.utils.typecheck import typecheck
from temporian.core.typing import EventSetOrNode, WindowLength


class MovingProductOperator(BaseWindowOperator):
"""
Window operator to compute the moving product.
"""

@classmethod
def operator_def_key(cls) -> str:
return "MOVING_PRODUCT"

def get_feature_dtype(self, feature: FeatureSchema) -> DType:
if not feature.dtype.is_float:
raise ValueError(
"moving_product requires the input EventSet to contain"
" floating point features only, but received feature"
f" {feature.name!r} with type {feature.dtype}. Note: You can"
" cast features e.g. `.cast(tp.float32)`"
)
return (
DType.FLOAT32 if feature.dtype == DType.FLOAT32 else DType.FLOAT64
)


operator_lib.register_operator(MovingProductOperator)


@typecheck
@compile
def moving_product(
input: EventSetOrNode,
window_length: WindowLength,
sampling: Optional[EventSetOrNode] = None,
) -> EventSetOrNode:
assert isinstance(input, EventSetNode)
if sampling is not None:
assert isinstance(sampling, EventSetNode)

return MovingProductOperator(
input=input,
window_length=window_length,
sampling=sampling,
).outputs["output"]
1 change: 1 addition & 0 deletions temporian/implementation/numpy/operators/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ py_library(
"//temporian/implementation/numpy/operators/window:moving_count",
"//temporian/implementation/numpy/operators/window:moving_max",
"//temporian/implementation/numpy/operators/window:moving_min",
"//temporian/implementation/numpy/operators/window:moving_product",
"//temporian/implementation/numpy/operators/window:moving_standard_deviation",
"//temporian/implementation/numpy/operators/window:moving_sum",
"//temporian/implementation/numpy/operators/window:simple_moving_average",
Expand Down
1 change: 1 addition & 0 deletions temporian/implementation/numpy/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from temporian.implementation.numpy.operators.scalar import relational_scalar

from temporian.implementation.numpy.operators.window import simple_moving_average
from temporian.implementation.numpy.operators.window import moving_product
from temporian.implementation.numpy.operators.window import moving_standard_deviation
from temporian.implementation.numpy.operators.window import moving_sum
from temporian.implementation.numpy.operators.window import moving_count
Expand Down
12 changes: 12 additions & 0 deletions temporian/implementation/numpy/operators/window/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ py_library(
],
)

py_library(
name = "moving_product",
srcs = ["moving_product.py"],
srcs_version = "PY3",
deps = [
":base",
"//temporian/core/operators/window:moving_product",
"//temporian/implementation/numpy:implementation_lib",
"//temporian/implementation/numpy_cc/operators:operators_cc",
],
)

py_library(
name = "moving_sum",
srcs = ["moving_sum.py"],
Expand Down
35 changes: 35 additions & 0 deletions temporian/implementation/numpy/operators/window/moving_product.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2021 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from temporian.core.operators.window.moving_product import (
MovingProductOperator,
)
from temporian.implementation.numpy import implementation_lib
from temporian.implementation.numpy.operators.window.base import (
BaseWindowNumpyImplementation,
)
from temporian.implementation.numpy_cc.operators import operators_cc


class MovingProductNumpyImplementation(BaseWindowNumpyImplementation):
"""Numpy implementation of the moving product operator."""

def _implementation(self):
return operators_cc.moving_product


implementation_lib.register_operator_implementation(
MovingProductOperator, MovingProductNumpyImplementation
)
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def test_base(self):
"MOVING_COUNT",
"MOVING_MAX",
"MOVING_MIN",
"MOVING_PRODUCT",
"MOVING_STANDARD_DEVIATION",
"MOVING_SUM",
"MULTIPLICATION",
Expand Down
54 changes: 54 additions & 0 deletions temporian/implementation/numpy_cc/operators/window.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,53 @@ struct MovingMaxAccumulator : MovingExtremumAccumulator<INPUT, OUTPUT> {
bool Compare(INPUT a, INPUT b) { return a > b; }
};

// TODO: Revisit the MovingProductAccumulator for potential optimization to improve calculation efficiency while maintaining accuracy.
// Especially consider optimizing the Result method which recalculates the product on-demand.
template <typename INPUT, typename OUTPUT>
struct MovingProductAccumulator : public Accumulator<INPUT, OUTPUT> {
int start_idx = 0;
int end_idx = -1; // Initialize to -1 to indicate an empty window initially

MovingProductAccumulator(const ArrayRef<INPUT>& values)
: Accumulator<INPUT, OUTPUT>(values) {}

void Add(Idx idx) override {
// Simply move the end to the given index
end_idx = idx;
}

void Remove(Idx idx) override {
// Adjust the start index to exclude the removed value, signaling a window shift.
start_idx = idx + 1;
}

OUTPUT Result() override {
double product = 1.0;
bool has_zero = false;

// Calculate the product of all values inside the window
for (int idx = start_idx; idx <= end_idx; ++idx) {
const INPUT value = Accumulator<INPUT, OUTPUT>::values[idx];
if (value == 0) {
has_zero = true;
akshatvishu marked this conversation as resolved.
Show resolved Hide resolved
break; // Exit early if a zero is found
} else if (!std::isnan(value)) {
product *= value;
}
// NaN values are skipped
}

return has_zero ? 0 : product;
}

void Reset() {
start_idx = 0;
end_idx = -1;
}
};



// Instantiate the "accumulate" function with and without sampling,
// and with and without variable window length.
//
Expand Down Expand Up @@ -614,6 +661,9 @@ REGISTER_CC_FUNC(moving_max, int32_t, int32_t, MovingMaxAccumulator);
REGISTER_CC_FUNC(moving_max, int64_t, int64_t, MovingMaxAccumulator);

REGISTER_CC_FUNC_NO_INPUT(moving_count, int32_t, MovingCountAccumulator);

REGISTER_CC_FUNC(moving_product, float, float, MovingProductAccumulator);
REGISTER_CC_FUNC(moving_product, double, double, MovingProductAccumulator);
} // namespace

// Register c++ functions to pybind with and without sampling,
Expand Down Expand Up @@ -693,4 +743,8 @@ void init_window(py::module &m) {
ADD_PY_DEF(moving_max, int64_t, int64_t)

ADD_PY_DEF_NO_INPUT(moving_count, int32_t)

ADD_PY_DEF(moving_product, float, float)
ADD_PY_DEF(moving_product, double, double)

}