From 9c923ecd38c701c0defbfe283ef0dc80542057b0 Mon Sep 17 00:00:00 2001 From: Denis Date: Mon, 17 Feb 2020 10:19:23 +0300 Subject: [PATCH 1/6] Optimize dseries.rolling.mean() --- .../hpat_pandas_series_rolling_functions.py | 87 ++++++++++++++++++- sdc/tests/test_rolling.py | 4 +- 2 files changed, 85 insertions(+), 6 deletions(-) diff --git a/sdc/datatypes/hpat_pandas_series_rolling_functions.py b/sdc/datatypes/hpat_pandas_series_rolling_functions.py index 57d9a549e..9ffbaff74 100644 --- a/sdc/datatypes/hpat_pandas_series_rolling_functions.py +++ b/sdc/datatypes/hpat_pandas_series_rolling_functions.py @@ -298,8 +298,6 @@ def apply_minp(arr, ddof, minp): gen_hpat_pandas_series_rolling_impl(arr_kurt)) hpat_pandas_rolling_series_max_impl = register_jitable( gen_hpat_pandas_series_rolling_impl(arr_max)) -hpat_pandas_rolling_series_mean_impl = register_jitable( - gen_hpat_pandas_series_rolling_impl(arr_mean)) hpat_pandas_rolling_series_median_impl = register_jitable( gen_hpat_pandas_series_rolling_impl(arr_median)) hpat_pandas_rolling_series_min_impl = register_jitable( @@ -482,7 +480,30 @@ def _impl(self, other=None, pairwise=None, ddof=1): bias_adj = count / (count - ddof) def mean(series): - return series.rolling(win, min_periods=minp).mean() + # cannot call return series.rolling(win, min_periods=minp).mean() + # due to different float rounding in new and old implementations + # TODO: fix this during optimizing of covariance + input_arr = series._data + length = len(input_arr) + output_arr = numpy.empty(length, dtype=float64) + + def apply_minp(arr, minp): + finite_arr = arr[numpy.isfinite(arr)] + if len(finite_arr) < minp: + return numpy.nan + else: + return arr_mean(finite_arr) + + boundary = min(win, length) + for i in prange(boundary): + arr_range = input_arr[:i + 1] + output_arr[i] = apply_minp(arr_range, minp) + + for i in prange(boundary, length): + arr_range = input_arr[i + 1 - win:i + 1] + output_arr[i] = apply_minp(arr_range, minp) + + return pandas.Series(output_arr, series._index, name=series._name) return (mean(main_aligned * other_aligned) - mean(main_aligned) * mean(other_aligned)) * bias_adj @@ -529,7 +550,65 @@ def hpat_pandas_series_rolling_mean(self): ty_checker = TypeChecker('Method rolling.mean().') ty_checker.check(self, SeriesRollingType) - return hpat_pandas_rolling_series_mean_impl + def _sdc_pandas_series_rolling_mean_impl(self): + win = self._window + minp = self._min_periods + + input_series = self._data + input_arr = input_series._data + length = len(input_arr) + output_arr = numpy.empty(length, dtype=float64) + + nfinite = 0 + current_result = numpy.nan + boundary = min(win, length) + for i in range(boundary): + value = input_arr[i] + if numpy.isfinite(value): + nfinite += 1 + if numpy.isnan(current_result): + current_result = value / nfinite + else: + current_result = ((nfinite - 1) * current_result + value) / nfinite + + if nfinite < minp: + output_arr[i] = numpy.nan + else: + output_arr[i] = current_result + + start_indices = range(length - boundary) + end_indices = range(boundary, length) + for start_idx, end_idx in zip(start_indices, end_indices): + if start_idx == end_idx: + # case when window == 0 + output_arr[end_idx] = current_result + continue + + first_val = input_arr[start_idx] + last_val = input_arr[end_idx] + + if numpy.isfinite(first_val): + nfinite -= 1 + if nfinite: + current_result = ((nfinite + 1) * current_result - first_val) / nfinite + else: + current_result = numpy.nan + + if numpy.isfinite(last_val): + nfinite += 1 + if numpy.isnan(current_result): + current_result = last_val / nfinite + else: + current_result = ((nfinite - 1) * current_result + last_val) / nfinite + + if nfinite < minp: + output_arr[end_idx] = numpy.nan + else: + output_arr[end_idx] = current_result + + return pandas.Series(output_arr, input_series._index, name=input_series._name) + + return _sdc_pandas_series_rolling_mean_impl @sdc_rolling_overload(SeriesRollingType, 'median') diff --git a/sdc/tests/test_rolling.py b/sdc/tests/test_rolling.py index 9d3acd92e..85044deff 100644 --- a/sdc/tests/test_rolling.py +++ b/sdc/tests/test_rolling.py @@ -715,8 +715,8 @@ def test_impl(obj, window, min_periods): hpat_func = self.jit(test_impl) assert_equal = self._get_assert_equal(obj) - for window in range(0, len(obj) + 3, 2): - for min_periods in range(0, window + 1, 2): + for window in range(len(obj) + 2): + for min_periods in range(window): with self.subTest(obj=obj, window=window, min_periods=min_periods): jit_result = hpat_func(obj, window, min_periods) From 6aebd1208b68df69f44195c1b66765f16c97b56a Mon Sep 17 00:00:00 2001 From: Denis Date: Tue, 18 Feb 2020 16:23:37 +0300 Subject: [PATCH 2/6] Enable scalability for series.rolling.mean() --- .../hpat_pandas_series_rolling_functions.py | 61 +++----------- .../tests_perf/test_perf_series_rolling.py | 62 ++++++++++++-- sdc/utilities/window_utils.py | 83 +++++++++++++++++++ 3 files changed, 150 insertions(+), 56 deletions(-) create mode 100644 sdc/utilities/window_utils.py diff --git a/sdc/datatypes/hpat_pandas_series_rolling_functions.py b/sdc/datatypes/hpat_pandas_series_rolling_functions.py index 9ffbaff74..73c059205 100644 --- a/sdc/datatypes/hpat_pandas_series_rolling_functions.py +++ b/sdc/datatypes/hpat_pandas_series_rolling_functions.py @@ -37,8 +37,10 @@ from sdc.datatypes.common_functions import _sdc_pandas_series_align from sdc.datatypes.hpat_pandas_series_rolling_types import SeriesRollingType from sdc.hiframes.pd_series_type import SeriesType +from sdc.utilities.prange_utils import get_chunks from sdc.utilities.sdc_typing_utils import TypeChecker from sdc.utilities.utils import sdc_overload_method, sdc_register_jitable +from sdc.utilities.window_utils import WindowMean # disabling parallel execution for rolling due to numba issue https://github.com/numba/numba/issues/5098 @@ -559,54 +561,17 @@ def _sdc_pandas_series_rolling_mean_impl(self): length = len(input_arr) output_arr = numpy.empty(length, dtype=float64) - nfinite = 0 - current_result = numpy.nan - boundary = min(win, length) - for i in range(boundary): - value = input_arr[i] - if numpy.isfinite(value): - nfinite += 1 - if numpy.isnan(current_result): - current_result = value / nfinite - else: - current_result = ((nfinite - 1) * current_result + value) / nfinite - - if nfinite < minp: - output_arr[i] = numpy.nan - else: - output_arr[i] = current_result - - start_indices = range(length - boundary) - end_indices = range(boundary, length) - for start_idx, end_idx in zip(start_indices, end_indices): - if start_idx == end_idx: - # case when window == 0 - output_arr[end_idx] = current_result - continue - - first_val = input_arr[start_idx] - last_val = input_arr[end_idx] - - if numpy.isfinite(first_val): - nfinite -= 1 - if nfinite: - current_result = ((nfinite + 1) * current_result - first_val) / nfinite - else: - current_result = numpy.nan - - if numpy.isfinite(last_val): - nfinite += 1 - if numpy.isnan(current_result): - current_result = last_val / nfinite - else: - current_result = ((nfinite - 1) * current_result + last_val) / nfinite - - if nfinite < minp: - output_arr[end_idx] = numpy.nan - else: - output_arr[end_idx] = current_result - - return pandas.Series(output_arr, input_series._index, name=input_series._name) + chunks = get_chunks(length) + for i in prange(len(chunks)): + chunk = chunks[i] + window = WindowMean(win, minp) + for idx in range(chunk.start, chunk.stop): + window.roll(input_arr, idx) + output_arr[idx] = window.result + window.free() + + return pandas.Series(output_arr, input_series._index, + name=input_series._name) return _sdc_pandas_series_rolling_mean_impl diff --git a/sdc/tests/tests_perf/test_perf_series_rolling.py b/sdc/tests/tests_perf/test_perf_series_rolling.py index a6c988882..6995f7af9 100644 --- a/sdc/tests/tests_perf/test_perf_series_rolling.py +++ b/sdc/tests/tests_perf/test_perf_series_rolling.py @@ -24,21 +24,31 @@ # OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # ***************************************************************************** -import string + import time -import numba import pandas import numpy as np from sdc.tests.test_utils import test_global_input_data_float64 from sdc.tests.tests_perf.test_perf_base import TestBase -from sdc.tests.tests_perf.test_perf_utils import (calc_compilation, get_times, - perf_data_gen_fixed_len) +from sdc.tests.tests_perf.test_perf_utils import perf_data_gen_fixed_len from .generator import generate_test_cases from .generator import TestCase as TC +rolling_usecase_tmpl = """ +def series_rolling_{method_name}_usecase(data, {extra_usecase_params}): + start_time = time.time() + results = [] + for i in range({ncalls}): + res = data.rolling({rolling_params}).{method_name}({method_params}) + results.append(res) + end_time = time.time() + return end_time - start_time, results +""" + + def get_rolling_params(window=100, min_periods=None): """Generate supported rolling parameters""" rolling_params = [f'{window}'] @@ -48,14 +58,37 @@ def get_rolling_params(window=100, min_periods=None): return ', '.join(rolling_params) +def gen_series_rolling_usecase(method_name, rolling_params=None, + extra_usecase_params='', + method_params='', ncalls=1): + """Generate series rolling method use case""" + if not rolling_params: + rolling_params = get_rolling_params() + + func_text = rolling_usecase_tmpl.format(**{ + 'method_name': method_name, + 'extra_usecase_params': extra_usecase_params, + 'rolling_params': rolling_params, + 'method_params': method_params, + 'ncalls': ncalls + }) + + global_vars = {'np': np, 'time': time} + loc_vars = {} + exec(func_text, global_vars, loc_vars) + _series_rolling_usecase = loc_vars[f'series_rolling_{method_name}_usecase'] + + return _series_rolling_usecase + + # python -m sdc.runtests sdc.tests.tests_perf.test_perf_series_rolling.TestSeriesRollingMethods class TestSeriesRollingMethods(TestBase): - # more than 19 columns raise SystemError: CPUDispatcher() returned a result with an error set - max_columns_num = 19 - @classmethod def setUpClass(cls): super().setUpClass() + cls.map_ncalls_dlength = { + 'mean': (100, [10 ** 5]), + } def _test_case(self, pyfunc, name, total_data_length, data_num=1, input_data=test_global_input_data_float64): @@ -82,6 +115,20 @@ def _test_case(self, pyfunc, name, total_data_length, data_num=1, self._test_jit(pyfunc, base, *args) self._test_py(pyfunc, base, *args) + def _test_series_rolling_method(self, name, rolling_params=None, + extra_usecase_params='', method_params=''): + ncalls, total_data_length = self.map_ncalls_dlength[name] + usecase = gen_series_rolling_usecase(name, rolling_params=rolling_params, + extra_usecase_params=extra_usecase_params, + method_params=method_params, ncalls=ncalls) + data_num = 1 + if extra_usecase_params: + data_num += len(extra_usecase_params.split(', ')) + self._test_case(usecase, name, total_data_length, data_num=data_num) + + def test_series_rolling_mean(self): + self._test_series_rolling_method('mean') + cases = [ TC(name='apply', size=[10 ** 7], params='func=lambda x: np.nan if len(x) == 0 else x.mean()'), @@ -90,7 +137,6 @@ def _test_case(self, pyfunc, name, total_data_length, data_num=1, TC(name='cov', size=[10 ** 7]), TC(name='kurt', size=[10 ** 7]), TC(name='max', size=[10 ** 7]), - TC(name='mean', size=[10 ** 7]), TC(name='median', size=[10 ** 7]), TC(name='min', size=[10 ** 7]), TC(name='quantile', size=[10 ** 7], params='0.2'), diff --git a/sdc/utilities/window_utils.py b/sdc/utilities/window_utils.py new file mode 100644 index 000000000..605200590 --- /dev/null +++ b/sdc/utilities/window_utils.py @@ -0,0 +1,83 @@ +# ***************************************************************************** +# Copyright (c) 2020, Intel Corporation All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; +# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# ***************************************************************************** + +import numpy as np +from numba import jitclass, types + + +spec = [ + ('size', types.int64), + ('minp', types.int64), + ('_nfinite', types.int64), + ('_nroll', types.int64), + ('_result', types.float64) +] + + +@jitclass(spec) +class WindowMean: + def __init__(self, size, minp): + self.size = size + self.minp = minp + + self._nfinite = 0 + self._nroll = 0 + self._result = np.nan + + @property + def result(self): + """Get the latest result taking into account min periods.""" + if self._nfinite < self.minp: + return np.nan + + return self._result + + def roll(self, data, idx): + """Calculate the window mean.""" + if self._nroll >= self.size: + excluded_value = data[idx - self.size] + if np.isfinite(excluded_value): + self._nfinite -= 1 + if self._nfinite: + self._result = ((self._nfinite + 1) * self._result - excluded_value) / self._nfinite + else: + self._result = np.nan + + value = data[idx] + if np.isfinite(value): + self._nfinite += 1 + if np.isnan(self._result): + self._result = value / self._nfinite + else: + self._result = ((self._nfinite - 1) * self._result + value) / self._nfinite + + self._nroll += 1 + + def free(self): + """Free the window.""" + self._nfinite = 0 + self._nroll = 0 + self._result = 0. From 04e473174e1551a22d69439b5fab8afd8b86b7a8 Mon Sep 17 00:00:00 2001 From: Denis Date: Tue, 18 Feb 2020 18:30:49 +0300 Subject: [PATCH 3/6] Fix issue in case of multithreading --- sdc/utilities/window_utils.py | 40 ++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/sdc/utilities/window_utils.py b/sdc/utilities/window_utils.py index 605200590..13bcc4278 100644 --- a/sdc/utilities/window_utils.py +++ b/sdc/utilities/window_utils.py @@ -55,18 +55,8 @@ def result(self): return self._result - def roll(self, data, idx): - """Calculate the window mean.""" - if self._nroll >= self.size: - excluded_value = data[idx - self.size] - if np.isfinite(excluded_value): - self._nfinite -= 1 - if self._nfinite: - self._result = ((self._nfinite + 1) * self._result - excluded_value) / self._nfinite - else: - self._result = np.nan - - value = data[idx] + def _include(self, value): + """Calculate the window mean with new value.""" if np.isfinite(value): self._nfinite += 1 if np.isnan(self._result): @@ -74,6 +64,32 @@ def roll(self, data, idx): else: self._result = ((self._nfinite - 1) * self._result + value) / self._nfinite + def _exclude(self, value): + """Calculate the window mean without old value.""" + if np.isfinite(value): + self._nfinite -= 1 + if self._nfinite: + self._result = ((self._nfinite + 1) * self._result - value) / self._nfinite + else: + self._result = np.nan + + def roll(self, data, idx): + """Calculate the window mean.""" + if self._nroll == 0: + start = max(idx + 1 - self.size, 0) + for i in range(start, idx): + value = data[i] + self._include(value) + + self._nroll += 1 + + if self._nroll >= self.size: + value = data[idx - self.size] + self._exclude(value) + + value = data[idx] + self._include(value) + self._nroll += 1 def free(self): From fd0917c0e66d2ceabc659f979db0e5810d0d4708 Mon Sep 17 00:00:00 2001 From: Denis Date: Wed, 19 Feb 2020 14:10:14 +0300 Subject: [PATCH 4/6] Enable scalability for series.rolling.mean() --- .../hpat_pandas_series_rolling_functions.py | 56 +++++++++++-------- .../tests_perf/test_perf_series_rolling.py | 6 +- sdc/utilities/window_utils.py | 42 +++++++------- 3 files changed, 55 insertions(+), 49 deletions(-) diff --git a/sdc/datatypes/hpat_pandas_series_rolling_functions.py b/sdc/datatypes/hpat_pandas_series_rolling_functions.py index 73c059205..3f219a1e9 100644 --- a/sdc/datatypes/hpat_pandas_series_rolling_functions.py +++ b/sdc/datatypes/hpat_pandas_series_rolling_functions.py @@ -37,7 +37,7 @@ from sdc.datatypes.common_functions import _sdc_pandas_series_align from sdc.datatypes.hpat_pandas_series_rolling_types import SeriesRollingType from sdc.hiframes.pd_series_type import SeriesType -from sdc.utilities.prange_utils import get_chunks +from sdc.utilities.prange_utils import parallel_chunks from sdc.utilities.sdc_typing_utils import TypeChecker from sdc.utilities.utils import sdc_overload_method, sdc_register_jitable from sdc.utilities.window_utils import WindowMean @@ -314,6 +314,35 @@ def apply_minp(arr, ddof, minp): gen_hpat_pandas_series_rolling_ddof_impl(arr_var)) +def gen_sdc_pandas_series_rolling_impl(window_cls): + """Generate series rolling methods implementations based on window class""" + def impl(self): + win = self._window + minp = self._min_periods + + input_series = self._data + input_arr = input_series._data + length = len(input_arr) + output_arr = numpy.empty(length, dtype=float64) + + chunks = parallel_chunks(length) + for i in prange(len(chunks)): + chunk = chunks[i] + window = window_cls(win, minp) + for idx in range(chunk.start, chunk.stop): + window.roll(input_arr, idx) + output_arr[idx] = window.result + window.free() + + return pandas.Series(output_arr, input_series._index, + name=input_series._name) + return impl + + +sdc_pandas_rolling_series_mean_impl = register_jitable( + gen_sdc_pandas_series_rolling_impl(WindowMean)) + + @sdc_rolling_overload(SeriesRollingType, 'apply') def hpat_pandas_series_rolling_apply(self, func, raw=None): @@ -546,34 +575,13 @@ def hpat_pandas_series_rolling_max(self): return hpat_pandas_rolling_series_max_impl -@sdc_rolling_overload(SeriesRollingType, 'mean') +@sdc_overload_method(SeriesRollingType, 'mean') def hpat_pandas_series_rolling_mean(self): ty_checker = TypeChecker('Method rolling.mean().') ty_checker.check(self, SeriesRollingType) - def _sdc_pandas_series_rolling_mean_impl(self): - win = self._window - minp = self._min_periods - - input_series = self._data - input_arr = input_series._data - length = len(input_arr) - output_arr = numpy.empty(length, dtype=float64) - - chunks = get_chunks(length) - for i in prange(len(chunks)): - chunk = chunks[i] - window = WindowMean(win, minp) - for idx in range(chunk.start, chunk.stop): - window.roll(input_arr, idx) - output_arr[idx] = window.result - window.free() - - return pandas.Series(output_arr, input_series._index, - name=input_series._name) - - return _sdc_pandas_series_rolling_mean_impl + return sdc_pandas_rolling_series_mean_impl @sdc_rolling_overload(SeriesRollingType, 'median') diff --git a/sdc/tests/tests_perf/test_perf_series_rolling.py b/sdc/tests/tests_perf/test_perf_series_rolling.py index 6995f7af9..a31f76e69 100644 --- a/sdc/tests/tests_perf/test_perf_series_rolling.py +++ b/sdc/tests/tests_perf/test_perf_series_rolling.py @@ -40,12 +40,10 @@ rolling_usecase_tmpl = """ def series_rolling_{method_name}_usecase(data, {extra_usecase_params}): start_time = time.time() - results = [] for i in range({ncalls}): res = data.rolling({rolling_params}).{method_name}({method_params}) - results.append(res) end_time = time.time() - return end_time - start_time, results + return end_time - start_time, res """ @@ -87,7 +85,7 @@ class TestSeriesRollingMethods(TestBase): def setUpClass(cls): super().setUpClass() cls.map_ncalls_dlength = { - 'mean': (100, [10 ** 5]), + 'mean': (100, [2 * 10 ** 5]), } def _test_case(self, pyfunc, name, total_data_length, data_num=1, diff --git a/sdc/utilities/window_utils.py b/sdc/utilities/window_utils.py index 13bcc4278..17039e7f2 100644 --- a/sdc/utilities/window_utils.py +++ b/sdc/utilities/window_utils.py @@ -55,40 +55,40 @@ def result(self): return self._result - def _include(self, value): - """Calculate the window mean with new value.""" - if np.isfinite(value): - self._nfinite += 1 - if np.isnan(self._result): - self._result = value / self._nfinite - else: - self._result = ((self._nfinite - 1) * self._result + value) / self._nfinite - - def _exclude(self, value): - """Calculate the window mean without old value.""" - if np.isfinite(value): - self._nfinite -= 1 - if self._nfinite: - self._result = ((self._nfinite + 1) * self._result - value) / self._nfinite - else: - self._result = np.nan - def roll(self, data, idx): """Calculate the window mean.""" if self._nroll == 0: start = max(idx + 1 - self.size, 0) for i in range(start, idx): value = data[i] - self._include(value) + # calculate the window mean with new value + if np.isfinite(value): + self._nfinite += 1 + if np.isnan(self._result): + self._result = value / self._nfinite + else: + self._result = ((self._nfinite - 1) * self._result + value) / self._nfinite self._nroll += 1 if self._nroll >= self.size: + # calculate the window mean without old value. value = data[idx - self.size] - self._exclude(value) + if np.isfinite(value): + self._nfinite -= 1 + if self._nfinite: + self._result = ((self._nfinite + 1) * self._result - value) / self._nfinite + else: + self._result = np.nan value = data[idx] - self._include(value) + # calculate the window mean with new value + if np.isfinite(value): + self._nfinite += 1 + if np.isnan(self._result): + self._result = value / self._nfinite + else: + self._result = ((self._nfinite - 1) * self._result + value) / self._nfinite self._nroll += 1 From aa54aa594d72b89eae5803b2d67f3a1ec251cb93 Mon Sep 17 00:00:00 2001 From: Denis Date: Thu, 20 Feb 2020 10:35:45 +0300 Subject: [PATCH 5/6] Enable scalability for series.rolling.mean() --- .../hpat_pandas_series_rolling_functions.py | 31 +++++- .../tests_perf/test_perf_series_rolling.py | 2 +- sdc/utilities/window_utils.py | 99 ------------------- 3 files changed, 30 insertions(+), 102 deletions(-) delete mode 100644 sdc/utilities/window_utils.py diff --git a/sdc/datatypes/hpat_pandas_series_rolling_functions.py b/sdc/datatypes/hpat_pandas_series_rolling_functions.py index b36239e07..daf7b47f9 100644 --- a/sdc/datatypes/hpat_pandas_series_rolling_functions.py +++ b/sdc/datatypes/hpat_pandas_series_rolling_functions.py @@ -40,7 +40,6 @@ from sdc.utilities.prange_utils import parallel_chunks from sdc.utilities.sdc_typing_utils import TypeChecker from sdc.utilities.utils import sdc_overload_method, sdc_register_jitable -from sdc.utilities.window_utils import WindowMean # disabling parallel execution for rolling due to numba issue https://github.com/numba/numba/issues/5098 @@ -306,6 +305,32 @@ def apply_minp(arr, ddof, minp): gen_hpat_pandas_series_rolling_ddof_impl(arr_var)) +@sdc_register_jitable +def pop_mean(value, nfinite, result): + """Calculate the window mean without old value.""" + if numpy.isfinite(value): + nfinite -= 1 + if nfinite: + result = ((nfinite + 1) * result - value) / nfinite + else: + result = numpy.nan + + return nfinite, result + + +@sdc_register_jitable +def put_mean(value, nfinite, result): + """Calculate the window mean with new value.""" + if numpy.isfinite(value): + nfinite += 1 + if numpy.isnan(result): + result = value / nfinite + else: + result = ((nfinite - 1) * result + value) / nfinite + + return nfinite, result + + @sdc_register_jitable def pop_sum(value, nfinite, result): """Calculate the window sum without old value.""" @@ -379,6 +404,8 @@ def impl(self): return impl +sdc_pandas_series_rolling_mean_impl = register_jitable( + gen_sdc_pandas_series_rolling_impl(pop_mean, put_mean)) sdc_pandas_series_rolling_sum_impl = register_jitable( gen_sdc_pandas_series_rolling_impl(pop_sum, put_sum, init_result=0.)) @@ -621,7 +648,7 @@ def hpat_pandas_series_rolling_mean(self): ty_checker = TypeChecker('Method rolling.mean().') ty_checker.check(self, SeriesRollingType) - return sdc_pandas_rolling_series_mean_impl + return sdc_pandas_series_rolling_mean_impl @sdc_rolling_overload(SeriesRollingType, 'median') diff --git a/sdc/tests/tests_perf/test_perf_series_rolling.py b/sdc/tests/tests_perf/test_perf_series_rolling.py index 82c1dfc90..08aab460f 100644 --- a/sdc/tests/tests_perf/test_perf_series_rolling.py +++ b/sdc/tests/tests_perf/test_perf_series_rolling.py @@ -85,7 +85,7 @@ class TestSeriesRollingMethods(TestBase): def setUpClass(cls): super().setUpClass() cls.map_ncalls_dlength = { - 'mean': (100, [2 * 10 ** 5]), + 'mean': (100, [8 * 10 ** 5]), 'sum': (100, [8 * 10 ** 5]), } diff --git a/sdc/utilities/window_utils.py b/sdc/utilities/window_utils.py deleted file mode 100644 index 17039e7f2..000000000 --- a/sdc/utilities/window_utils.py +++ /dev/null @@ -1,99 +0,0 @@ -# ***************************************************************************** -# Copyright (c) 2020, Intel Corporation All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# -# Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; -# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR -# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, -# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# ***************************************************************************** - -import numpy as np -from numba import jitclass, types - - -spec = [ - ('size', types.int64), - ('minp', types.int64), - ('_nfinite', types.int64), - ('_nroll', types.int64), - ('_result', types.float64) -] - - -@jitclass(spec) -class WindowMean: - def __init__(self, size, minp): - self.size = size - self.minp = minp - - self._nfinite = 0 - self._nroll = 0 - self._result = np.nan - - @property - def result(self): - """Get the latest result taking into account min periods.""" - if self._nfinite < self.minp: - return np.nan - - return self._result - - def roll(self, data, idx): - """Calculate the window mean.""" - if self._nroll == 0: - start = max(idx + 1 - self.size, 0) - for i in range(start, idx): - value = data[i] - # calculate the window mean with new value - if np.isfinite(value): - self._nfinite += 1 - if np.isnan(self._result): - self._result = value / self._nfinite - else: - self._result = ((self._nfinite - 1) * self._result + value) / self._nfinite - - self._nroll += 1 - - if self._nroll >= self.size: - # calculate the window mean without old value. - value = data[idx - self.size] - if np.isfinite(value): - self._nfinite -= 1 - if self._nfinite: - self._result = ((self._nfinite + 1) * self._result - value) / self._nfinite - else: - self._result = np.nan - - value = data[idx] - # calculate the window mean with new value - if np.isfinite(value): - self._nfinite += 1 - if np.isnan(self._result): - self._result = value / self._nfinite - else: - self._result = ((self._nfinite - 1) * self._result + value) / self._nfinite - - self._nroll += 1 - - def free(self): - """Free the window.""" - self._nfinite = 0 - self._nroll = 0 - self._result = 0. From 63fc3016a959df35e58e3cef75b323d6a3d519d7 Mon Sep 17 00:00:00 2001 From: Denis Date: Thu, 20 Feb 2020 18:31:02 +0300 Subject: [PATCH 6/6] Accelerate algorithm of series.rolling.mean() --- .../hpat_pandas_series_rolling_functions.py | 50 +++++++------------ 1 file changed, 17 insertions(+), 33 deletions(-) diff --git a/sdc/datatypes/hpat_pandas_series_rolling_functions.py b/sdc/datatypes/hpat_pandas_series_rolling_functions.py index daf7b47f9..8c7a4680a 100644 --- a/sdc/datatypes/hpat_pandas_series_rolling_functions.py +++ b/sdc/datatypes/hpat_pandas_series_rolling_functions.py @@ -305,32 +305,6 @@ def apply_minp(arr, ddof, minp): gen_hpat_pandas_series_rolling_ddof_impl(arr_var)) -@sdc_register_jitable -def pop_mean(value, nfinite, result): - """Calculate the window mean without old value.""" - if numpy.isfinite(value): - nfinite -= 1 - if nfinite: - result = ((nfinite + 1) * result - value) / nfinite - else: - result = numpy.nan - - return nfinite, result - - -@sdc_register_jitable -def put_mean(value, nfinite, result): - """Calculate the window mean with new value.""" - if numpy.isfinite(value): - nfinite += 1 - if numpy.isnan(result): - result = value / nfinite - else: - result = ((nfinite - 1) * result + value) / nfinite - - return nfinite, result - - @sdc_register_jitable def pop_sum(value, nfinite, result): """Calculate the window sum without old value.""" @@ -360,7 +334,17 @@ def result_or_nan(nfinite, minp, result): return result -def gen_sdc_pandas_series_rolling_impl(pop, put, init_result=numpy.nan): +@sdc_register_jitable +def mean_result_or_nan(nfinite, minp, result): + """Get result mean taking into account min periods.""" + if nfinite == 0 or nfinite < minp: + return numpy.nan + + return result / nfinite + + +def gen_sdc_pandas_series_rolling_impl(pop, put, get_result=result_or_nan, + init_result=numpy.nan): """Generate series rolling methods implementations based on pop/put funcs""" def impl(self): win = self._window @@ -390,24 +374,24 @@ def impl(self): for idx in range(interlude_start, interlude_stop): value = input_arr[idx] nfinite, result = put(value, nfinite, result) - output_arr[idx] = result_or_nan(nfinite, minp, result) + output_arr[idx] = get_result(nfinite, minp, result) for idx in range(interlude_stop, chunk.stop): put_value = input_arr[idx] pop_value = input_arr[idx - win] nfinite, result = put(put_value, nfinite, result) nfinite, result = pop(pop_value, nfinite, result) - output_arr[idx] = result_or_nan(nfinite, minp, result) + output_arr[idx] = get_result(nfinite, minp, result) return pandas.Series(output_arr, input_series._index, name=input_series._name) return impl -sdc_pandas_series_rolling_mean_impl = register_jitable( - gen_sdc_pandas_series_rolling_impl(pop_mean, put_mean)) -sdc_pandas_series_rolling_sum_impl = register_jitable( - gen_sdc_pandas_series_rolling_impl(pop_sum, put_sum, init_result=0.)) +sdc_pandas_series_rolling_mean_impl = gen_sdc_pandas_series_rolling_impl( + pop_sum, put_sum, get_result=mean_result_or_nan, init_result=0.) +sdc_pandas_series_rolling_sum_impl = gen_sdc_pandas_series_rolling_impl( + pop_sum, put_sum, init_result=0.) @sdc_rolling_overload(SeriesRollingType, 'apply')