In [84]:
# Importing packages
import numpy as np
from numba import njit
from pychange.numba_costs import normal_mean_var_cost

In [73]:
# Creating test series
x = np.concatenate([np.random.normal(-1, 1, (2000,)),
                    np.random.normal(1, 2, (2000,)),
                    np.random.normal(3, 1, (2000,)),
                   np.random.normal(-1, 1, (2000,)),
                   np.random.normal(-3, 2, (2000,)),
                   np.random.normal(4, 1, (2000,))])

In [94]:
@njit(fastmath=True)
def create_summary_stats(x):
    x = np.stack((np.append(0.0, x.cumsum()),
                  np.append(0.0, (x ** 2).cumsum()),
                  np.append(0.0, ((x - x.mean()) ** 2).cumsum())),
                  axis=-1)
    return x

def binary_segmentation(x, min_len, max_cp, penalty, cost_fn):
    """Runs binary segmentation on time series"""

    # Setting up summary statistics and objects
    n = x.shape[0]
    sum_stats = create_summary_stats(x)
    is_candidate = np.ones((n, ), dtype=bool)
    is_candidate = np.arange(min_len, n - min_len)
    cps = np.zeros((n, ), dtype=bool)
    costs = np.zeros((n, ), dtype=np.float64)
    cps[-1] = True
    cps[0] = True
    costs[-1] = cost_fn(sum_stats[n, :], n)
    n_cps = 0

    # Iterating through changepoints until convergence
    while True:

        _cps = np.flatnonzero(cps)
        best_cand, best_cost, best_next_cost, best_next = 0, 0, 0, 0
        best_total_cost = costs.sum() + n_cps * penalty
        
        #print(best_total_cost)

        for c1, c2 in zip(_cps[:-1], _cps[1:]):
            _cands = np.flatnonzero(is_candidate)
            _cands = _cands[(_cands > c1 + min_len) & (_cands < c2 - min_len)]
            if _cands.shape[0] == 0:
                continue
            _costs = np.empty((_cands.shape[0], 3), dtype=np.float64)
            _other_costs = costs[:c1].sum() + costs[(c2 + 1):].sum()
            _costs[:, 0] = np.array([cost_fn(sum_stats[i, :] - sum_stats[c1, :], i - c1) for i in _cands])
            _costs[:, 1] = np.array([cost_fn(sum_stats[c2, :] - sum_stats[i, :], c2 - i) for i in _cands])
            _costs[:, 2] = _costs[:, 0] + _costs[:, 1] + _other_costs + (n_cps + 1) * penalty
            _best_cand = np.argmin(_costs[:, 2])
            if _costs[_best_cand, 2] < best_total_cost:
                best_cand = _cands[_best_cand]
                best_cost = _costs[_best_cand, 0]
                best_next_cost = _costs[_best_cand, 1]
                best_total_cost = _costs[_best_cand, 2]
                best_next = c2

        if best_cand == 0:
            break
        else:
            cps[best_cand] = True
            costs[best_cand] = best_cost
            costs[best_next] = best_next_cost
            is_candidate[(best_cand - min_len): (best_cand + min_len)] = False
            n_cps += 1

            #print(best_cand, best_total_cost, costs.sum() + n_cps * penalty)
            #print(np.flatnonzero(cps))
            #print(costs[costs > 0.0])
            #print('\n')
        
    return np.flatnonzero(cps)

In [95]:
%timeit binary_segmentation(x, min_len=30, max_cp=3, penalty=500, cost_fn=normal_mean_var_cost)

640 ms ± 17.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [96]:
numba_bin_seg = njit(fastmath=True, parallel=True)(binary_segmentation)

In [97]:
numba_bin_seg(x, min_len=30, max_cp=3, penalty=500, cost_fn=normal_mean_var_cost)

TypingError: Failed in nopython mode pipeline (step: nopython frontend)
[1m[1mnon-precise type pyobject[0m
[0m[1mDuring: typing of argument at <ipython-input-94-ebce61226814> (13)[0m
[1m
File "<ipython-input-94-ebce61226814>", line 13:[0m
[1mdef binary_segmentation(x, min_len, max_cp, penalty, cost_fn):
    <source elided>
    # Setting up summary statistics and objects
[1m    n = x.shape[0]
[0m    [1m^[0m[0m

This error may have been caused by the following argument(s):
- argument 4: [1mCannot determine Numba type of <class 'builtin_function_or_method'>[0m
