# Vectorization Design POC

An R&D exploration by Edmond den Dekker.

## Pandas Rolling Sum

The benchmark.

In [21]:
#import rlab_common_numpy_pandas as rlnp

In [22]:
import unittest as ut
data = [2, 2, 3, 4, 5, 5, 5, 5, 6, 7, 8, 5, 4, 3, 4, 5, 3, 2, 1]

In [23]:
import pandas as pd
df = pd.DataFrame(data)
rolling_version = df.rolling(3).sum()
print(list(rolling_version[0].values))

[nan, nan, 7.0, 9.0, 12.0, 14.0, 15.0, 15.0, 16.0, 18.0, 21.0, 20.0, 17.0, 12.0, 11.0, 12.0, 12.0, 10.0, 6.0]


In [24]:
timeit df.rolling(3).sum()

209 µs ± 13.8 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


## Vectorized Rolling Sum

### Pros
* The bigger the data, the faster the speed gains
* Always faster than pandas - between 6 to 80 times faster

### Cons
* Complex code and way of thinking
* Less runtime parameterizations for functions, eg. rolling windows are hard coded
* Attempts to use exec() for parameterization will make it slower by a factor of 10 times

In [25]:
import numpy as np
class VectorizeEngine():
    def __init__(self, **kwargs):
        """ If no specific library is set, then one will be selected during runtime
            in the below order of precedence. It is best to pre-import a gpu library
            and feed it in via this instance super constructor. This will be faster
            if you have many vectorization layers

            INPUT: **kwargs - set specific gpu / cpu library
                            - use_tensorflow_lib=<tf module import object>
                            - use_pytorch_lib=<pytorch module import object>
                            - use_numpy_lib=<numpy module import object>
        """
        self.__shift_direction = None
        self.__TF = None
        self.__PT = None
        self.__NP = None
        self.__DROPNA = False
        if 'use_tensorflow_lib' in kwargs.keys():
            self.__TF = kwargs['use_tensorflow_lib']
        elif 'use_pytorch_lib' in kwargs.keys():
            self.__PT = kwargs['use_pytorch_lib']
        elif 'use_numpy_lib' in kwargs.keys():
            self.__NP = kwargs['use_numpy_lib']
        if 'dropna' in kwargs.keys():
            self.__DROPNA = kwargs['dropna']

    def _shiftr(self, data):
        x = data.copy()
        x.insert(0, None)
        x.pop()
        return x

    def _shiftl(self, data):
        x = data.copy()
        x.append(None)
        x.pop(0)
        return x

    def _shift(self, data, x):
        if x > 0:
            d = 1
            e = x
        else:
            d = -1
            e = abs(x)
        if type(data) == tuple:
            data = list(data)
        new = data.copy()
        for i in range(0, e, d):
            if d == 1:
                new = self._shiftr(new)
            else:
                new = self._shiftl(new)
        return new

    def _vshift(self, data, start, amount):
        """ Abstracted method to vertically shift the given array N times.
            If N is positive it shifts to the right, else to the left.

            Use this method to take an array input and create a rolling value matrix for vectorization.
            
            INPUT: data - a single array
                   start - usually 0
                   amount - +/- integer - shift left or right by this much
            OUTPUT: an array of arrays suitable for rolling vectorizations.
        """
        return tuple([self._shift(data, x) for x in range(start, amount)])

    def _dropna(self, data):
        """ Abstracted method to exclude Nones from an array.
        """
        return [x for x in data if x]

    def _cropna(self, data):
        """ Determines if the array was shifted left or right. Then we make sure the first array has
            no Nones by dropping it. We then cut all the other arrays by the same amount so that it
            is all an evenly cropped square.
            
            We crop the array matrix at either the left or right side.
            
            INPUT: array of arrays with values
            OUTPUT: the same as the input but with cropped and squared values based on whether it was
                    shifted right or left.
        """
        data_size = data.__len__()
        is_shifted_left = 0
        is_shifted_right = 0
        for datum in data:
            first = datum[0]
            last = datum[-1]
            if not first:
                is_shifted_right += 1
            elif not last:
                is_shifted_left += 1
        first_array = data[0]
        first_offset_size = 0
        for i in first_array:
            if not i:
                first_offset_size += 1
        cropped = []
        if is_shifted_right > is_shifted_left:
            self.__shift_direction = 1
            for datum in data:
                new = datum[first_offset_size:].copy()
                for a in range(0, data_size - 1): new.pop(0)
                cropped.append(new)
        elif is_shifted_left > is_shifted_right:
            self.__shift_direction = -1
            for datum in data:
                new = datum[first_offset_size:].copy()
                for a in range(0, data_size - 1): new.pop()
                cropped.append(new)
        elif is_shifted_left == 0 and is_shifted_right == 0:
            return tuple(data)
        else:
            raise ValueError('Bad data structure')
        return tuple(cropped)

    def __square(self, square_size, data):
        """ Helper method for squaring a matrix.
        """
        new_data = []
        for series in data:
            series_size = series.__len__()
            fill_size = square_size - series_size
            if fill_size == 0:
                new_data.append(list(series))
                continue
            new_series = [None for x in range(0, fill_size)]
            new_series.extend(list(series))
            new_data.append(new_series)
        return new_data
    
    def _square_right(self, data):
        """ Pads an array of series so that it is shifted to the right 
            and padded with Nones to the left. The result should be a
            squared matrix of values.
            
            This first array value should have no Nones at the left.

            INPUT:  data     - an array of series data
            OUTPUT: new_data - a squared matrix

        """
        square_size = 0
        for series in data:
            size = series.__len__()
            if size > square_size:
                square_size = size
        # square size is the largest series size
        if square_size == 0:
            raise ValueError('Data needs to be an array of arrays greater than 2.')
        return self.__square(square_size, data)

    def _square_right_by_ref(self, reference, data):
        """ Instead of using the largest series as the reference, we use the specified
            reference for squaring to the right.
        """
        square_size = reference.__len__()
        if self._is_matrices(data):
            return self.__square(square_size, data)
        elif self._is_series(data):
            return self.__square(square_size, [data])[0]
        else:
            raise ValueError('Bad data type')
    
    def _has_accel_lib(self, lib='at-least-one'):
        """ INPUT: lib - str - default - all
                    - specific lib - tensorflow, pytorch, numpy
            OUTPUT: boolean - True or False
        """
        if lib == 'at-least-one':
            if not self.__TF and not self.__PT and not self.__NP:
                has_tf = self._has_accel_lib(lib='tensorflow')
                has_pt = self._has_accel_lib(lib='pytorch')
                has_np = self._has_accel_lib(lib='numpy')
                if has_tf or has_pt or has_np:
                    return True
                return False
            elif self.__TF:
                return True
            elif self.__PT:
                return True
            elif self.__NP:
                return True
        elif lib == 'tensorflow' and self.__TF:
            return True
        elif lib == 'pytorch' and self.__PT:
            return True
        elif lib == 'numpy' and self.__NP:
            return True
        return False

    def _is_series(self, data):
        if type(data) != list or data.__len__() == 0:
            return True
        number_types = [int, float]
        array_types = [list, tuple, np.array]
        if type(data) in array_types and type(data[0]) not in array_types:
            return True
        return False

    def _is_matrices(self, data):
        if type(data) != list or data.__len__() == 0:
            return False
        number_types = [int, float]
        array_types = [list, tuple, np.array]
        if type(data) in array_types and type(data[0]) in array_types:
            return True
        return False

    def _is_a_list_of_series(self, data):
        """ Check if data is a list of series, or a list of matrices.
        """
        number_types = [int, float]
        array_types = [list, tuple, np.array]
        if type(data[0][0]) in number_types or type(data[0][0]) not in array_types:
            return True
        return False

    def _is_a_list_of_matrices(self, data):
        """ Check if data is a list of series, or a list of matrices.
        """
        array_types = [list, tuple]
        if type(data[0][0]) in array_types:
            return True
        return False

    def _multiply(self, data):
        """ Multi library mutliply.
        """
        if self._has_accel_lib(lib='tensorflow') and self._is_a_list_of_matrices(data):
            tf = self.__TF
            new_data = []
            for matrix in data:
                new_matrix = []
                for mrow in matrix:
                    new_matrix.append(tuple(mrow))
                new_data.append(np.array(new_matrix))
            reduction = tf.constant(new_data[0])
            with tf.Session() as sess:
                for matrix in new_data[1:]:
                    reduction = tf.matmul(reduction, tf.constant(matrix))
            new_reduction = []
            for matrix in new_reduction:
                new_reduction.append(list(matrix))
            return new_reduction

        elif self._has_accel_lib(lib='tensorflow') and self._is_a_list_of_series(data):
            tf = self.__TF
            reduction = tf.constant(data[0])
            with tf.Session() as sess:
                for layer in data[1:]:
                    reduction = sess.run(tf.multiply(reduction, tf.constant(layer)))
            return list(reduction)

        elif self._has_accel_lib(lib='pytorch') and self._is_a_list_of_series(data):
            pt = self.__PT
            reduction = pt.tensor(data[0])
            for layer in data[1:]:
                reduction = pt.multiply(reduction, pt.tensor(layer))
            return list(reduction)

        elif self._has_accel_lib(lib='numpy') and self._is_a_list_of_series(data):
            np = self.__NP
            reduction = np.array(data[0])
            for layer in data[1:]:
                reduction = np.multiply(reduction, np.array(layer))
            return list(reduction)
        raise RuntimeError('Accellerator library requried.')

    def _add(self, data):
        """ Multi library add
        """
        if self._has_accel_lib(lib='tensorflow') and self._is_a_list_of_matrices(data):
            tf = self.__TF
            new_data = []
            for matrix in data:
                new_matrix = []
                for mrow in matrix:
                    new_matrix.append(tuple(mrow))
                new_data.append(np.array(new_matrix))
            reduction = tf.constant(new_data[0])
            with tf.Session() as sess:
                for matrix in new_data[1:]:
                    reduction = sess.run(tf.add(reduction, tf.constant(matrix)))
            new_reduction = []
            for matrix in new_reduction:
                new_reduction.append(list(matrix))
            return new_reduction
        
        elif self._has_accel_lib(lib='tensorflow') and self._is_a_list_of_series(data):
            tf = self.__TF
            reduction = tf.constant(data[0])
            with tf.Session() as sess:
                for layer in data[1:]:
                    reduction = sess.run(tf.add(reduction, tf.constant(layer)))
            return list(reduction)

        elif self._has_accel_lib(lib='pytorch') and self._is_a_list_of_series(data):
            pt = self.__PT
            reduction = pt.tensor(data[0])
            for layer in data[1:]:
                reduction = pt.add(reduction, pt.tensor(layer))
            return list(reduction)

        elif self._has_accel_lib(lib='numpy') and self._is_a_list_of_series(data):
            np = self.__NP
            reduction = np.array(data[0])
            for layer in data[1:]:
                reduction = np.add(reduction, np.array(layer))
            return list(reduction)
        raise RuntimeError('Accellerator library requried.')

    def _subtract(self, data):
        """ Multi library subtract
        """
        if self._has_accel_lib(lib='tensorflow') and self._is_a_list_of_matrices(data):
            tf = self.__TF
            new_data = []
            for matrix in data:
                new_matrix = []
                for mrow in matrix:
                    new_matrix.append(tuple(mrow))
                new_data.append(np.array(new_matrix))
            reduction = tf.constant(new_data[0])
            with tf.Session() as sess:
                for matrix in new_data[1:]:
                    reduction = sess.run(tf.subtract(reduction, tf.constant(matrix)))
            new_reduction = []
            for matrix in new_reduction:
                new_reduction.append(list(matrix))
            return new_reduction

        elif self._has_accel_lib(lib='tensorflow') and self._is_a_list_of_series(data):
            tf = self.__TF
            reduction = tf.constant(data[0])
            with tf.Session() as sess:
                for layer in data[1:]:
                    reduction = sess.run(tf.subtract(reduction, tf.constant(layer)))
            return list(reduction)

        elif self._has_accel_lib(lib='pytorch') and self._is_a_list_of_series(data):
            pt = self.__PT
            reduction = pt.tensor(data[0])
            for layer in data[1:]:
                reduction = pt.subtract(reduction, pt.tensor(layer))
            return list(reduction)

        elif self._has_accel_lib(lib='numpy') and self._is_a_list_of_series(data):
            np = self.__NP
            reduction = np.array(data[0])
            for layer in data[1:]:
                reduction = reduction - np.array(layer)
            return list(reduction)
        raise RuntimeError('Accellerator library requried.')

    def _divide(self, data):
        """ Multi library divide
        """
        if self._has_accel_lib(lib='tensorflow') and self._is_a_list_of_matrices(data):
            tf = self.__TF
            new_data = []
            for matrix in data:
                new_matrix = []
                for mrow in matrix:
                    new_matrix.append(tuple(mrow))
                new_data.append(np.array(new_matrix))
            reduction = tf.constant(new_data[0])
            with tf.Session() as sess:
                for matrix in new_data[1:]:
                    reduction = sess.run(tf.divide(reduction, tf.constant(matrix)))
            new_reduction = []
            for matrix in new_reduction:
                new_reduction.append(list(matrix))
            return new_reduction

        elif self._has_accel_lib(lib='tensorflow') and self._is_a_list_of_series(data):
            tf = self.__TF
            reduction = tf.constant(data[0])
            with tf.Session() as sess:
                for layer in data[1:]:
                    reduction = sess.run(tf.divide(reduction, tf.constant(layer)))
            return list(reduction)

        elif self._has_accel_lib(lib='pytorch') and self._is_a_list_of_series(data):
            pt = self.__PT
            reduction = pt.tensor(data[0])
            for layer in data[1:]:
                reduction = pt.divide(reduction, pt.tensor(layer))
            return list(reduction)

        elif self._has_accel_lib(lib='numpy') and self._is_a_list_of_series(data):
            np = self.__NP
            reduction = np.array(data[0])
            for layer in data[1:]:
                reduction = reduction / np.array(layer)
            return list(reduction)
        raise RuntimeError('Accellerator library requried.')
    
    def _power(self, data):
        """ Multi library power
        """
        if self._has_accel_lib(lib='tensorflow') and self._is_a_list_of_matrices(data):
            raise RuntimeError('Power does not currently work with tensorflow')
            tf = self.__TF
            new_data = []
            for matrix in data:
                new_matrix = []
                for mrow in matrix:
                    new_matrix.append(tuple(mrow))
                new_data.append(np.array(new_matrix))
            reduction = tf.constant(new_data[0])
            with tf.Session() as sess:
                for matrix in new_data[1:]:
                    reduction = sess.run(tf.pow(reduction, tf.constant(matrix)))
            new_reduction = []
            for matrix in new_reduction:
                new_reduction.append(list(matrix))
            return new_reduction

        elif self._has_accel_lib(lib='tensorflow') and self._is_a_list_of_series(data):
            raise RuntimeError('Power does not currently work with tensorflow')
            tf = self.__TF
            reduction = tf.constant(data[0])
            with tf.Session() as sess:
                for layer in data[1:]:
                    reduction = sess.run(tf.pow(reduction, tf.constant(layer)))
            return list(reduction)

        elif self._has_accel_lib(lib='pytorch') and self._is_a_list_of_series(data):
            pt = self.__PT
            reduction = pt.tensor(data[0])
            for layer in data[1:]:
                reduction = pt.pow(reduction, pt.tensor(layer))
            return list(reduction)

        elif self._has_accel_lib(lib='numpy') and self._is_a_list_of_series(data):
            np = self.__NP
            reduction = np.array(data[0])
            for layer in data[1:]:
                reduction = np.power(reduction, np.array(layer))
            return list(reduction)
        raise RuntimeError('Accellerator library requried.')

    def _mode_np_helper(self, x, y, m):
        if x == m:
            return y

    def _mode_np(self, data):
        np = self.__NP
        cats = 10
        catsa = data.__len__() / 4
        cats = int(min([cats, catsa]))
        n, bins = np.histogram(data, bins=cats, density=True)
        m = max(n)
        bins = list(bins[1:])
        # print(n)
        # print(bins)
        # print(data)
        # print("bins=%s, np_max=%s" % (cats, m))
        found = [self._mode_np_helper(x, y, m) for x, y in zip(n, bins)]
        found = [x for x in found if x]
        # print(found)
        if found.__len__() > 0:
            max_found = max(found)
            # print("ve_max=%s" % max_found)
            # print()
            return max_found
        else:
            # print('ve_max not found')
            # print()
            return None

    def _mode(self, data):
        """ Multi library mode.
            INPUT: an single array
            OUTPUT: float - the most often recurring value
        """
        if self._has_accel_lib(lib='pytorch') and self._is_a_list_of_series(data):
            pt = self.__PT
            return [float(pt.mode(pt.tensor([*x]))[0]) for x in zip(*data)]

        elif self._has_accel_lib(lib='numpy') and self._is_a_list_of_series(data):
            return [self._mode_np([*x]) for x in zip(*data)]

        raise RuntimeError(
            'Accellerator library, numpy or pytorch, requried. Input must be series.'
        )

    def __round_series(self, x, round_size):
        if x != 0 and not x is None:
            return round(float(x), round_size)
        elif x == 0:
            return 0
        return None

    def round_series(self, data, round_size):
        return [self.__round_series(x, round_size) for x in list(data)]

    def final_func(self):
        """ Overrride Me. Mandatory.
        """
        raise RuntimeError('Override function required.')

    def ignite(self, data, final_func, **kwargs):
        if type(data) == tuple:
            data = list(data)
        round_size = None
        if 'round_size' in kwargs.keys():
            round_size = kwargs['round_size']
        massaged_components = []
        for k in kwargs.keys():
            if k == 'row_funcs':
                for f in kwargs[k]:
                    massaged_components.append([f(x) for x in data])
            if k == 'columns':
                massaged_components.extend(kwargs[k])
        if massaged_components.__len__() == 0:
            massaged_components = data
        result = final_func(massaged_components)
        if round_size:
            if self._is_series(result):
                result = self.round_series(result, round_size)
            elif self._is_matrices(result):
                new_matrix = []
                for row in result:
                    new_matrix.append(self.round_series(row, round_size))
                result = new_matrix
            else:
                raise ValueError('result is not a series or a matrix')
        if self._is_series(result) and not self.__DROPNA:
            # Exploratory result clean ups
            if data.__len__() > 0 and self._is_matrices(data[0]) and type(data[0][0][0]) == list:
                result = self._square_right_by_ref(data[0][0], result)
            elif data.__len__() > 0 and self._is_matrices(data):
                result = self._square_right_by_ref(data[0], result)
            result = self._square_right_by_ref(data, result)
        # Raw output for component mode
        return tuple(result)

In [26]:
#import tensorflow as tf
#import torch as pt
import numpy as np
class VRollingSum(VectorizeEngine):
    """Parameterized rolling sum as a vectorized algorithm.
       6.3 times faster than pandas. Slower than hardcoded vectorization.
       PERFORMANCE -
           121 µs ± 1.55 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

       BENCHMARK - 
           Pandas
           773 µs ± 42.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
    """
    def __init__(self, data, window, **kwargs):
        super().__init__(**kwargs)
        self.__DATA = data
        self.__WINDOW = window
        chunk1 = ''
        for x in range(0, self.__WINDOW):
            chunk1 += 'x%s, ' % x
        self.__C1 = chunk1[0:-2]
        chunk2 = ''
        for x in range(0, self.__WINDOW):
            chunk2 += 'x%s + ' % x
        self.__C2 = chunk2[0:-2]
        chunk3 = ''
        for x in range(0, self.__WINDOW):
            chunk3 += 'data[%s], ' % x
        self.__C3 = chunk3[0:-2]

    def final_func(self, data):
        if self._has_accel_lib():
            return self._add(data)
        else:
            cmd1 = '[%s for %s in zip(%s)]' % (
                 self.__C2, self.__C1, self.__C3
            )
            rtn = eval(cmd1)
            return rtn

    def get(self):
        data = self.__DATA
        x = [data.copy()]
        chunk3 = ''
        for i in range(1, self.__WINDOW):
            chunk3 += 'x.append(self._shift(x[0], %s)) \n' % i
        c3 = chunk3[0:-2]
        cmd1 = c3
        cmd2 = '%s = self._cropna(x)' % self.__C1
        cmd3 = 'self.ignite(data, self.final_func, columns=[%s], row_funcs=[])' % (
            self.__C1,
        )
        exec(cmd1)
        exec(cmd2)
        rtn = eval(cmd3)
        return rtn

vectorized_version = VRollingSum(data, 3).get()
print(vectorized_version)
print(data.__len__(), vectorized_version.__len__())

(None, None, 7, 9, 12, 14, 15, 15, 16, 18, 21, 20, 17, 12, 11, 12, 12, 10, 6)
19 19


In [27]:
#timeit VRollingSum(data, 3, use_tensorflow_lib=tf).get()

In [28]:
timeit VRollingSum(data, 3).get()

187 µs ± 37.7 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [29]:
class VRollingSum3(VectorizeEngine):
    """Hardcoded rolling sum as a vectorized algorithm.
       80.3 times faster than pandas. Fastest.
       PERFORMANCE -
           9.63 µs ± 74.1 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

       BENCHMARK -
           Pandas
           773 µs ± 42.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

       DESIGN: - Final Engine - Default Configured
                   - dupilcate a single row vector array and shift them up to three times
                   - sum all vector arrays to a reduced result
               - Component Engine
                   - for each pre prepared vector array
                       - sum all vector arrays to a reduced result

       INPUT: FINAL ENGINE - DEFAULT
              kwargs - skip_pre_preparations=False, skip_post_cleaning=False
                     data - array - single value array

              COMPONENT ENGINE
              kwargs - skip_pre_preparations=True, skip_post_cleaning=True
                     data - array[array 1..n] - an array of arrays pre prepared

       OUTPUT: array - a reduced vector result
    """
    def __init__(self, data, **kwargs):
        super().__init__(**kwargs)
        self.__DATA = data
        self.__ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self.__ROUND_SIZE = kwargs['round_size']
        self.__SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self.__SKIP_POST_CLEANING = True
            self.__ROUND_SIZE = None
        self.__SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self.__SKIP_PRE_PREPARATIONS = True

    def final_func(self, data):
        if self._has_accel_lib():
            return self._add(data)
        else:
            return [
                x0 + x1 + x2 for x0, x1, x2 in zip(data[0], data[1], data[2])
            ]

    def get(self):
        data = self.__DATA
        if self.__SKIP_PRE_PREPARATIONS:
            if data.__len__() != 3 and not self._is_matrices(data):
                raise ValueError('data array must contain exactly 3 arrays')
            x = data.copy()
            x0, x1, x2 = x[0], x[1], x[2]
        else:
            if data.__len__() != 1 and not self._is_series(data):
                raise ValueError(
                    'data arary must be exactly 1 array as a python list type, and contain either a float or int'
                )
            x0 = data.copy()
            x1 = self._shift(x0, 1)
            x2 = self._shift(x0, 2)
        x0, x1, x2 = self._cropna([x0, x1, x2])
        return self.ignite(data, self.final_func, columns=[x0, x1, x2], row_funcs=[], round_size=self.__ROUND_SIZE)

vectorized_version = VRollingSum3(data, skip_post_cleaning=False, skip_pre_preparations=False).get()
print(vectorized_version)

(None, None, 7.0, 9.0, 12.0, 14.0, 15.0, 15.0, 16.0, 18.0, 21.0, 20.0, 17.0, 12.0, 11.0, 12.0, 12.0, 10.0, 6.0)


In [30]:
v1 = VRollingSum3(data).get()
#v2 = VRollingSum3(data, use_pytorch_lib=pt).get()
#v3 = VRollingSum3(data, use_tensorflow_lib=tf).get()
v4 = VRollingSum3(data, use_numpy_lib=np).get()
print(v1)
#print(v2)
#print(v3)
print(v4)
#print(v1 == v2 == v3 == v4)

(None, None, 7.0, 9.0, 12.0, 14.0, 15.0, 15.0, 16.0, 18.0, 21.0, 20.0, 17.0, 12.0, 11.0, 12.0, 12.0, 10.0, 6.0)
(None, None, 7.0, 9.0, 12.0, 14.0, 15.0, 15.0, 16.0, 18.0, 21.0, 20.0, 17.0, 12.0, 11.0, 12.0, 12.0, 10.0, 6.0)


## XSmall Array Size

In [31]:
data = [x for x in range(0,19)]

In [32]:
#timeit VRollingSum3(data, use_tensorflow_lib=tf).get()

In [33]:
#timeit VRollingSum3(data, use_pytorch_lib=pt).get()

In [34]:
timeit VRollingSum3(data, use_numpy_lib=np).get()

46.6 µs ± 4.19 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)


In [35]:
timeit VRollingSum3(data).get()

30.6 µs ± 2.63 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)


In [36]:
v = VectorizeEngine()
x0 = data.copy()
x1 = v._shift(x0, 1)
x2 = v._shift(x0, 2)
x0, x1, x2 = v._cropna([x0, x1, x2])
data_preprepared = [x0, x1, x2]
component_engine_on = {'skip_post_cleaning': True, 'skip_pre_preparations': True}

In [37]:
timeit VRollingSum3(data_preprepared, use_numpy_lib=np, **component_engine_on).get()

27.2 µs ± 1.92 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)


In [38]:
timeit VRollingSum3(data_preprepared, **component_engine_on).get()

14.2 µs ± 279 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)


## Small Array Size

In [39]:
data = [x for x in range(0,1000)]

In [40]:
#timeit VRollingSum3(data, use_tensorflow_lib=tf).get()

In [41]:
#timeit VRollingSum3(data, use_pytorch_lib=pt).get()

In [42]:
timeit VRollingSum3(data, use_numpy_lib=np).get()

1.16 ms ± 36.1 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [43]:
timeit VRollingSum3(data).get()

840 µs ± 7.88 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


## Medium Array Size

In [44]:
data = [x for x in range(0,100000)]

In [45]:
#timeit VRollingSum3(data, use_tensorflow_lib=tf).get()

In [46]:
#timeit VRollingSum3(data, use_pytorch_lib=pt).get()

In [47]:
timeit VRollingSum3(data, use_numpy_lib=np).get()

130 ms ± 7.59 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [48]:
timeit VRollingSum3(data).get()

102 ms ± 4.6 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


## Large Array Size

In [49]:
data = [x for x in range(1,12500000)]  # 12.5M

In [50]:
#timeit VRollingSum3(data, use_tensorflow_lib=tf).get()

In [51]:
#timeit VRollingSum3(data, use_pytorch_lib=pt).get()

In [52]:
timeit VRollingSum3(data, use_numpy_lib=np).get()

16.8 s ± 719 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
timeit VRollingSum3(data).get()

In [None]:
v = VectorizeEngine()
x0 = data.copy()
x1 = v._shift(x0, 1)
x2 = v._shift(x0, 2)
x0, x1, x2 = v._cropna([x0, x1, x2])
data_preprepared = [x0, x1, x2]
component_engine_on = {'skip_post_cleaning': True, 'skip_pre_preparations': True}

In [None]:
timeit VRollingSum3(data_preprepared, use_numpy_lib=np, **component_engine_on).get()

In [None]:
timeit VRollingSum3(data_preprepared, **component_engine_on).get()

# Conclusion

Tensorflow and PyTorch did not add any value when trying to speed up mathematical operations with large vector arrays against each other. <br/>

Using numpy is quicker than gpu libraries, but still slower than pure functional python with generators and iterators. <br/>

We can conlude, that if our data set type is that of single row vector arrays being computed against other single row arrays, then the most performant method will always be a minimal algorithm with pure functional python primitive operations.<br/>

We should see futher if the value of gpu libraries like Tensorflow and PyTorch, shine above everyone, when used for their matrix operations where we have multi-row tensor arrays computed against each other.

# Multiplication, Subtraction and Division Examples

In [None]:
import random
data1 = [
    [random.randint(1, 100) / 100 for x in range(0, 20)],
    [random.randint(1, 100) / 100 for x in range(0, 20)],
    [random.randint(1, 100) / 100 for x in range(0, 20)]
]
data2 = [
    [1 for x in range(1, 20)],
    [2 for x in range(1, 20)],
    [3 for x in range(1, 20)]
]

## Multiplication

In [None]:
class VRollingProd3(VectorizeEngine):
    """Hardcoded rolling prod as a vectorized algorithm.
       Useless example. Only for testing and validation.
    """
    def __init__(self, data, **kwargs):
        super().__init__(**kwargs)
        self.__DATA = data
        self.__ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self.__ROUND_SIZE = kwargs['round_size']
        self.__SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self.__SKIP_POST_CLEANING = True
            self.__ROUND_SIZE = None
        self.__SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self.__SKIP_PRE_PREPARATIONS = True

    def final_func(self, data):
        if self._has_accel_lib():
            return self._multiply(data)
        else:
            return [
                x0 * x1 * x2 for x0, x1, x2 in zip(data[0], data[1], data[2])
            ]

    def get(self):
        data = self.__DATA
        if data.__len__() != 3 and not self._is_matrices(data):
            raise ValueError('data array must contain exactly 3 arrays')
        x = data.copy()
        x0, x1, x2 = x[0], x[1], x[2]
        x0, x1, x2 = self._cropna([x0, x1, x2])
        return self.ignite(
            data, self.final_func, columns=[x0, x1, x2], row_funcs=[], round_size=self.__ROUND_SIZE
        )

vectorized_version = VRollingProd3(data1).get()
print(vectorized_version)
vectorized_version = VRollingProd3(data2).get()
print(vectorized_version)

In [None]:
v1 = VRollingProd3(data1).get()
#v2 = VRollingProd3(data1, use_pytorch_lib=pt).get()
#v3 = VRollingProd3(data1, use_tensorflow_lib=tf).get()
v4 = VRollingProd3(data1, use_numpy_lib=np).get()
print(v1)
#print(v2)
#print(v3)
print(v4)
#print(v1 == v2 == v3 == v4)

In [None]:
#timeit VRollingProd3(data1, use_tensorflow_lib=tf).get()

In [None]:
#timeit VRollingProd3(data1, use_pytorch_lib=pt).get()

In [None]:
timeit VRollingProd3(data1, use_numpy_lib=np).get()

In [None]:
timeit VRollingProd3(data1).get()

In [None]:
data_preprepared = data1
component_engine_on = {'skip_post_cleaning': True, 'skip_pre_preparations': True}

In [None]:
timeit VRollingProd3(data_preprepared, use_numpy_lib=np, **component_engine_on).get()

In [None]:
timeit VRollingProd3(data_preprepared, **component_engine_on).get()

## Subtraction

In [None]:
class VRollingSub3(VectorizeEngine):
    """Hardcoded rolling sub as a vectorized algorithm.
       Useless example. Only for testing and validation.
    """
    def __init__(self, data, **kwargs):
        super().__init__(**kwargs)
        self.__DATA = data
        self.__ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self.__ROUND_SIZE = kwargs['round_size']
        self.__SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self.__SKIP_POST_CLEANING = True
            self.__ROUND_SIZE = None
        self.__SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self.__SKIP_PRE_PREPARATIONS = True

    def final_func(self, data):
        if self._has_accel_lib():
            return self._subtract(data)
        else:
            return [
                x0 - x1 - x2 for x0, x1, x2 in zip(data[0], data[1], data[2])
            ]

    def get(self):
        data = self.__DATA
        if data.__len__() != 3 and not self._is_matrices(data):
            raise ValueError('data array must contain exactly 3 arrays')
        x = data.copy()
        x0, x1, x2 = x[0], x[1], x[2]
        x0, x1, x2 = self._cropna([x0, x1, x2])
        return self.ignite(
            data, self.final_func, columns=[x0, x1, x2], row_funcs=[], round_size=self.__ROUND_SIZE
        )

vectorized_version = VRollingSub3(data1).get()
print(vectorized_version)
vectorized_version = VRollingSub3(data2).get()
print(vectorized_version)

In [None]:
v1 = VRollingSub3(data1).get()
#v2 = VRollingSub3(data1, use_pytorch_lib=pt).get()
#v3 = VRollingSub3(data1, use_tensorflow_lib=tf).get()
v4 = VRollingSub3(data1, use_numpy_lib=np).get()
print(v1)
#print(v2)
#print(v3)
print(v4)
#print(v1 == v2 == v3 == v4)

In [None]:
#timeit VRollingSub3(data1, use_tensorflow_lib=tf).get()

In [None]:
#timeit VRollingSub3(data1, use_pytorch_lib=pt).get()

In [None]:
timeit VRollingSub3(data1, use_numpy_lib=np).get()

In [None]:
timeit VRollingSub3(data1).get()

In [None]:
data_preprepared = data1
component_engine_on = {'skip_post_cleaning': True, 'skip_pre_preparations': True}

In [None]:
timeit VRollingSub3(data_preprepared, use_numpy_lib=np, **component_engine_on).get()

In [None]:
timeit VRollingSub3(data_preprepared, **component_engine_on).get()

## Division

In [None]:
class VRollingDiv3(VectorizeEngine):
    """Hardcoded rolling divide as a vectorized algorithm.
       Useless example. Only for testing and validation.
    """
    def __init__(self, data, **kwargs):
        super().__init__(**kwargs)
        self.__DATA = data
        self.__ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self.__ROUND_SIZE = kwargs['round_size']
        self.__SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self.__SKIP_POST_CLEANING = True
            self.__ROUND_SIZE = None
        self.__SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self.__SKIP_PRE_PREPARATIONS = True

    def final_func(self, data):
        if self._has_accel_lib():
            return self._divide(data)
        else:
            return [
                x0 / x1 / x2 for x0, x1, x2 in zip(data[0], data[1], data[2])
            ]

    def get(self):
        data = self.__DATA
        if data.__len__() != 3 and not self._is_matrices(data):
            raise ValueError('data array must contain exactly 3 arrays')
        x = data.copy()
        x0, x1, x2 = x[0], x[1], x[2]
        # x0, x1, x2 = self._cropna([x0, x1, x2])
        return self.ignite(
            data, self.final_func, columns=[x0, x1, x2], row_funcs=[], round_size=self.__ROUND_SIZE
        )

vectorized_version = VRollingDiv3(data1).get()
print(vectorized_version)
vectorized_version = VRollingDiv3(data2).get()
print(vectorized_version)

In [None]:
v1 = VRollingDiv3(data1).get()
#v2 = VRollingDiv3(data1, use_pytorch_lib=pt).get()
#v3 = VRollingDiv3(data1, use_tensorflow_lib=tf).get()
v4 = VRollingDiv3(data1, use_numpy_lib=np).get()
print(v1)
#print(v2)
#print(v3)
print(v4)
#print(v1 == v2 == v3 == v4)

In [None]:
#timeit VRollingDiv3(data1, use_tensorflow_lib=tf).get()

In [None]:
#timeit VRollingDiv3(data1, use_pytorch_lib=pt).get()

In [None]:
timeit VRollingDiv3(data1, use_numpy_lib=np).get()

In [None]:
timeit VRollingDiv3(data1).get()

In [None]:
data_preprepared = data1
component_engine_on = {'skip_post_cleaning': True, 'skip_pre_preparations': True}

In [None]:
timeit VRollingDiv3(data_preprepared, use_numpy_lib=np, **component_engine_on).get()

In [None]:
timeit VRollingDiv3(data_preprepared, **component_engine_on).get()

## Power

In [None]:
class VRollingPower3(VectorizeEngine):
    """Hardcoded rolling divide as a vectorized algorithm.
       Useless example. Only for testing and validation.
    """
    def __init__(self, data, **kwargs):
        super().__init__(**kwargs)
        self.__DATA = data
        self.__ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self.__ROUND_SIZE = kwargs['round_size']
        self.__SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self.__SKIP_POST_CLEANING = True
            self.__ROUND_SIZE = None
        self.__SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self.__SKIP_PRE_PREPARATIONS = True

    def final_func(self, data):
        if self._has_accel_lib():
            return self._add([
                self._power([data[0], data[1]]), data[2]
            ])
        else:
            return [
                x0**x1+x2 for x0, x1, x2 in zip(data[0], data[1], data[2])
            ]

    def get(self):
        data = self.__DATA
        if data.__len__() != 3 and not self._is_matrices(data):
            raise ValueError('data array must contain exactly 3 arrays')
        x = data.copy()
        x0, x1, x2 = x[0], x[1], x[2]
        # x0, x1, x2 = self._cropna([x0, x1, x2])
        return self.ignite(
            data, self.final_func, columns=[x0, x1, x2], row_funcs=[], round_size=self.__ROUND_SIZE
        )

vectorized_version = VRollingPower3(data1).get()
print(vectorized_version[:5])
vectorized_version = VRollingPower3(data2).get()
print(vectorized_version[:5])

In [None]:
v1 = VRollingPower3(data1).get()[-100:10]
#v2 = VRollingPower3(data1, use_pytorch_lib=pt).get()[-100:10]
# v3 = VRollingPower3(data1, use_tensorflow_lib=tf).get()[:50]
v4 = VRollingPower3(data1, use_numpy_lib=np).get()[-100:10]
print(v1[-10:])
#print(v2[-10:])
# print(v3[-10:])
print(v4[-10:]) 
#print(v1 == v2 == v4)

In [None]:
#timeit VRollingPower3(data1, use_pytorch_lib=pt).get()

In [None]:
timeit VRollingPower3(data1, use_numpy_lib=np).get()

In [None]:
timeit VRollingPower3(data1).get()

In [None]:
data_preprepared = data1
component_engine_on = {'skip_post_cleaning': True, 'skip_pre_preparations': True}

In [None]:
timeit VRollingPower3(data_preprepared, use_numpy_lib=np, **component_engine_on).get()

# 
# 
# 
# 
# Vectorized Stats Tools

We will build a set of tools to use for orchestrating a composite design, that aids in forecasting and performance comparison benchmarking.

We value hardcoding capabilities over parameterization if it means more speed.

We value numpy or pure python for single array computations.

We value gpu pytorch gpu library for multi row array matrix computations.

In [None]:
import random
data = [round(random.randint(1, 1000) / 1000, 6) for x in range(0, 1000)]
datax = [round(random.randint(1, 1000) / 1000, 6) for x in range(0, 1000)]
data_df = pd.DataFrame(data)
datac = [
    data[6:],
    [round(float(x), 6) for x in data_df.shift(1).values][6:],
    [round(float(x), 6) for x in data_df.shift(2).values][6:],
    [round(float(x), 6) for x in data_df.shift(3).values][6:],
    [round(float(x), 6) for x in data_df.shift(4).values][6:],
    [round(float(x), 6) for x in data_df.shift(5).values][6:],
    [round(float(x), 6) for x in data_df.shift(6).values][6:]
]
data1 = [
    [random.randint(1, 100) / 100 for x in range(0, 1000)],
    [random.randint(1, 100) / 100 for x in range(0, 1000)],
    [random.randint(1, 100) / 100 for x in range(0, 1000)]
]
data2 = [
    [1 for x in range(0, 1000)],
    [2 for x in range(0, 1000)],
    [3 for x in range(0, 1000)]
]

### Common Rolling Windows

Mean, Standard Deviation, Min, Max, Median, Mode

#### Mean

In [None]:
class VRollingSumEngine(VectorizeEngine):
    """Rolling sum as a vectorized algorithm.
       DESIGN: - Final Engine - Default Configured
                   - dupilcate a single row vector array and shift them up to x times
                   - sum all vector arrays to a reduced result
               - Component Engine
                   - for each pre prepared vector array
                       - sum all vector arrays to a reduced result

       INPUT: FINAL ENGINE - DEFAULT
              kwargs - skip_pre_preparations=False, skip_post_cleaning=False
                     data - array - single value array

              COMPONENT ENGINE
              kwargs - skip_pre_preparations=True, skip_post_cleaning=True
                     data - array[array 1..n] - an array of arrays pre prepared

       OUTPUT: array - a reduced vector result
    """
    def __init__(self, data, window, **kwargs):
        super().__init__(**kwargs)
        self._DATA = data
        self._ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self._ROUND_SIZE = kwargs['round_size']
        self._SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self._SKIP_POST_CLEANING = True
            self._ROUND_SIZE = None
        self._SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self._SKIP_PRE_PREPARATIONS = True
        self.__WINDOW = window

    def pure_python_func(self, data):
        f = lambda *args: sum(*args)
        return [
            f(x) for x in zip(*data)
        ]

    def final_func(self, data):
        if self._has_accel_lib():
            return self._add(data)
        else:
            return self.pure_python_func(data)

    def get(self):
        data = self._DATA
        if self._SKIP_PRE_PREPARATIONS:
            if data.__len__() != self.__WINDOW or not self._is_matrices(data):
                raise ValueError(
                    'data array must contain exactly %s arrays' % self.__WINDOW
                )
            x = tuple(data.copy())
        else:
            if data.__len__() != 1 and not self._is_series(data):
                raise ValueError(
                    'data arary must be exactly 1 array as a python list type, ' +
                    'and contain either a float or int'
                )
            x = self._vshift(data.copy(), 0, self.__WINDOW)
        x = self._cropna(list(x))
        return self.ignite(
            data, self.final_func, columns=list(x), row_funcs=[],
            round_size=self._ROUND_SIZE
        )

In [None]:
def cumsum (data, window, **kwargs):
    return VRollingSumEngine(data, window, **kwargs).get()

def sum3(data, **kwargs):
    return VRollingSumEngine(data, 3, **kwargs).get()

def sum7(data, **kwargs):
    return VRollingSumEngine(data, 7, **kwargs).get()

def sum14(data, **kwargs):
    return VRollingSumEngine(data, 14, **kwargs).get()

def sum32(data, **kwargs):
    return VRollingSumEngine(data, 32, **kwargs).get()

def sum64(data, **kwargs):
    return VRollingSumEngine(data, 64, **kwargs).get()

In [None]:
print(data.__len__(), sum3(data).__len__())

In [None]:
timeit sum7(data)

In [None]:
timeit sum7(data, use_numpy_lib=np)

In [None]:
#timeit sum7(data, use_pytorch_lib=pt)

In [None]:
#timeit sum7(data, use_tensorflow_lib=tf)

In [None]:
timeit sum14(data)

In [None]:
timeit sum14(data, use_numpy_lib=np)

In [None]:
#timeit sum14(data, use_pytorch_lib=pt)

In [None]:
#timeit sum14(data, use_tensorflow_lib=tf)

In [None]:
timeit sum32(data)

In [None]:
timeit sum32(data, use_numpy_lib=np)

In [None]:
#timeit sum32(data, use_pytorch_lib=pt)

In [None]:
#timeit sum32(data, use_tensorflow_lib=tf)

In [None]:
timeit sum64(data)

In [None]:
timeit sum64(data, use_numpy_lib=np)

In [None]:
#timeit sum64(data, use_pytorch_lib=pt)

In [None]:
#timeit sum64(data, use_tensorflow_lib=tf)

In [None]:
class VRollingAvgEngine(VectorizeEngine):
    """Rolling avg as a vectorized algorithm.
       DESIGN: - Final Engine - Default Configured
                   - use a rolling sum engine to perform a configurable computation
                   - use this engine to perform a pure python vectorized rolling average
               - Component Engine
                   - the same as the final engine, except -
                       - you must pass in pre prepared colum series for the rolling sum computation

       INPUT: FINAL ENGINE - DEFAULT
              kwargs - skip_pre_preparations=False, skip_post_cleaning=False
                     data - array - single value array

              COMPONENT ENGINE
              kwargs - skip_pre_preparations=True, skip_post_cleaning=True
                     data - array[array 1..n] - an array of arrays pre prepared

       OUTPUT: array - a reduced vector result
    """
    def __init__(self, data, window, **kwargs):
        super().__init__(**kwargs)
        self._DATA = data
        self._ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self._ROUND_SIZE = kwargs['round_size']
        self._SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self._SKIP_POST_CLEANING = True
            self._ROUND_SIZE = None
        self._SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self._SKIP_PRE_PREPARATIONS = True
        self.__VRSUM = VRollingSumEngine(data, window, **kwargs).get
        self.__AVG_WINDOW = window

    def _avg(self, vrsum, divisor):
        f = lambda a, b: a / b
        return tuple(
            [
                f(rsum, total) for rsum, total in zip(
                    vrsum, divisor
                )
            ]
        )

    def pure_python_func(self, data):
        divisor = [self.__AVG_WINDOW for x in range(0, data.__len__())]
        return self._avg(data, divisor)

    def final_func(self, data):
        if self._has_accel_lib():
            raise RuntimeError('Average component must be pure python.')
        else:
            return self.pure_python_func(data)

    def get(self):
        data = self.__VRSUM()
        data = [x for x in data if x]
        if not self._SKIP_POST_CLEANING:
            return self._square_right_by_ref(
                self._DATA,
                self.ignite(
                    data, self.final_func, columns=[], row_funcs=[],
                    round_size=self._ROUND_SIZE
                )
            )
        return self.ignite(
            data, self.final_func, columns=[], row_funcs=[],
            round_size=self._ROUND_SIZE
        )

In [None]:
def avg3(data, **kwargs):
    return VRollingAvgEngine(data, 3, **kwargs).get()

def avg7(data, **kwargs):
    return VRollingAvgEngine(data, 7, **kwargs).get()

def avg14(data, **kwargs):
    return VRollingAvgEngine(data, 14, **kwargs).get()

def avg32(data, **kwargs):
    return VRollingAvgEngine(data, 32, **kwargs).get()

def avg64(data, **kwargs):
    return VRollingAvgEngine(data, 64, **kwargs).get()

In [None]:
print(data.__len__(), avg3(data).__len__())

In [None]:
timeit avg3(data)

In [None]:
timeit avg7(data)

In [None]:
timeit avg14(data)

In [None]:
timeit avg32(data)

In [None]:
timeit avg64(data)

In [None]:
V = VectorizeEngine()
smas = V._square_right([avg14(data), avg7(data)])
pd.DataFrame({'a': smas[0], 'b': smas[1]}).tail(20).plot.line()

In [None]:
avg7(data).__len__()

In [None]:
avg64(data).__len__()

#### Standard Return

In [None]:
class VRollingReturnEngine(VectorizeEngine):
    """ Rolling return as a vectorized algorithm.
        Must be raw values that are all positive. No +/- rate of change values!

       DESIGN: - Final Engine - Default Configured
                   - 
               - Component Engine
                   - 

       INPUT: FINAL ENGINE - DEFAULT
              kwargs - skip_pre_preparations=False, skip_post_cleaning=False
                     data - array - single value array
                     window - int - rolling period size
                     uom - D,M,Q,Y - unit of measure - daily, monthly, quarterly, yearly

              COMPONENT ENGINE
              kwargs - skip_pre_preparations=True, skip_post_cleaning=True
                     data - array[array 1..n] - an array of arrays pre prepared
                     window - int - rolling period size
                     uom - D,M,Q,Y - unit of measure - daily, monthly, quarterly, yearly
                     geo - ANN, MON, QTR - geometric basis -
                             - annualization, quarterlization, monthlyization

       OUTPUT: array - a reduced vector result
    """
    def __init__(self, data, window, uom, geo, **kwargs):
        super().__init__(**kwargs)
        self._DATA = data
        self._ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self._ROUND_SIZE = kwargs['round_size']
        self._SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self._SKIP_POST_CLEANING = True
            self._ROUND_SIZE = None
        self._SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self._SKIP_PRE_PREPARATIONS = True
        self.__WINDOW = window
        self.__UNIT_OF_MEASURE = None
        if uom == 'D' and geo == 'ANN':
            self.__UNIT_OF_MEASURE = 365
        elif uom == 'M' and geo == 'ANN':
            self.__UNIT_OF_MEASURE = 12
        elif uom == 'Q' and geo == 'ANN':
            self.__UNIT_OF_MEASURE = 4
        elif uom == 'Y' and geo == 'ANN':
            self.__UNIT_OF_MEASURE = 1
        elif uom == 'D' and geo == 'MON':
            self.__UNIT_OF_MEASURE = 31
        elif uom == 'M' and geo == 'MON':
            self.__UNIT_OF_MEASURE = 1
        elif uom == 'D' and geo == 'QTR':
            self.__UNIT_OF_MEASURE = 93
        elif uom == 'M' and geo == 'QTR':
            self.__UNIT_OF_MEASURE = 3
        elif uom == 'Q' and geo == 'QTR':
            self.__UNIT_OF_MEASURE = 1
        else:
            raise ValueError(
                'uom must be either D,M,Q, or Y, with geo basis as either ANN, MON or QTR'
            )

    def __pure_python_func_helper(self, *data):
        C = data[0][0]
        P = data[0][-1]
        S = C - P
        if S == 0:
            return C
        R = S / P
        SR = abs(R)**(float(self.__UNIT_OF_MEASURE) / self.__WINDOW)
        if R < 0:
            SR = SR * -1
        return SR

    def pure_python_func(self, data):
        return [self.__pure_python_func_helper(x) for x in zip(*data)]

    def __accel_func_helper(self, R, STD):
        SR = R**STD
        if R < 0:
            SR = SR * -1
        return SR

    def final_func(self, data):
        if self._has_accel_lib():
            P = data[-1]
            C = data[0]
            uoms = [self.__UNIT_OF_MEASURE for x in range(0, data[0].__len__())]
            hold = [self.__WINDOW for x in range(0, data[0].__len__())]
            R = self._divide([self._subtract([C, P]), P])
            STD = self._divide([uoms, hold])
            return [self.__accel_func_helper(rtn, std) for rtn, std in zip(R, STD)]
        else:
            return self.pure_python_func(data)

    def get(self):
        data = self._DATA
        if self._SKIP_PRE_PREPARATIONS:
            if data.__len__() != self.__WINDOW or not self._is_matrices(data):
                raise ValueError(
                    'data array must contain exactly %s arrays' % self.__WINDOW
                )
            x = tuple(data.copy())
        else:
            if data.__len__() != 1 and not self._is_series(data):
                raise ValueError(
                    'data arary must be exactly 1 array as a python list ' +
                    'type, and contain either a float or int'
                )
            x = self._vshift(data.copy(), 0, self.__WINDOW)
        x = self._cropna(list(x))
        return self.ignite(
            data, self.final_func, columns=list(x), row_funcs=[],
            round_size=self._ROUND_SIZE
        )

In [None]:
def rtn(data, window, uom, geo, **kwargs):
    return VRollingReturnEngine(data, window, uom, geo, **kwargs).get()

def rtn3(data, uom, geo, **kwargs):
    return VRollingReturnEngine(data, 3, uom, geo, **kwargs).get()

def rtn7(data, uom, geo, **kwargs):
    return VRollingReturnEngine(data, 7, uom, geo, **kwargs).get()

def rtn14(data, uom, geo, **kwargs):
    return VRollingReturnEngine(data, 14, uom, geo, **kwargs).get()

def rtn32(data, uom, geo, **kwargs):
    return VRollingReturnEngine(data, 32, uom, geo, **kwargs).get()

def rtn64(data, uom, geo, **kwargs):
    return VRollingReturnEngine(data, 64, uom, geo, **kwargs).get()

def rtn_8M_D_MON(data, **kwargs):
    # 8 month hold(248p), daily precision, average monthly realized returns, geometrically normalised
    return rtn(data, 31 * 8, 'D', 'MON', **kwargs)

def rtn_4M_D_MON(data, **kwargs):
    # 4 month hold(124p), daily precision, average monthly realized returns, geometrically normalised
    return rtn(data, 31 * 4, 'D', 'MON', **kwargs)

def rtn_2M_D_MON(data, **kwargs):
    # 2 month hold(62p), daily precision, average monthly realized returns, geometrically normalised
    return rtn(data, 31 * 2, 'D', 'MON', **kwargs)

def rtn_2Q_D_QTR(data, **kwargs):
    # 2 quarter hold(186p), daily precision, average quarterly realized returns
    return rtn(data, 31 * 3 * 2, 'D', 'QTR', **kwargs)

def rtn_2Y_D_ANN(data, **kwargs):
    # 2 year hold(744p), daily precision, average annual realized returns
    return rtn(data, 31 * 12 * 2, 'D', 'ANN', **kwargs)

In [None]:
xdata = [float(x / 100) for x in range(1,6)]
xdata.extend([float((x / 100)) for x in range(6, 1, -1)])
print(xdata[3:])
result = rtn3(xdata, 'M', 'MON')[3:]
print(result)
print(xdata.__len__(), result.__len__())
ut.TestCase().assertTrue(result[0] > result[-1] and result[0] > 0)

In [None]:
xdata = [float((x / 100)) for x in range(64, 1, -1)]
print(xdata[-5:])
result = rtn32(xdata, 'M', 'MON')[-5:]
print(result)
ut.TestCase().assertTrue(result[0] > result[-1])

In [None]:
rtn_2M_D_MON(data)[-5:]

In [None]:
rtn_4M_D_MON(data)[-5:]

In [None]:
rtn_8M_D_MON(data)[-5:]

In [None]:
rtn_2Q_D_QTR(data)[-5:]

In [None]:
rtn_2Y_D_ANN(data)[-5:]

In [None]:
result1 = tuple([x for x in rtn7(data, 'M', 'MON', dropna=True)])
result2 = tuple([round(float(x), 6) for x in rtn7(datac, 'M', 'MON', **component_engine_on)])
print(result1[:20])
print(result2[:20])
ut.TestCase().assertTrue(result1 == result2)

In [None]:
pd.DataFrame(result2[:40]).plot.bar()

#### Min

In [None]:
class VRollingMinEngine(VectorizeEngine):
    """ Rolling min as a vectorized algorithm.
       DESIGN: - Final Engine - Default Configured
                   - 
               - Component Engine
                   - 

       INPUT: FINAL ENGINE - DEFAULT
              kwargs - skip_pre_preparations=False, skip_post_cleaning=False
                     data - array - single value array

              COMPONENT ENGINE
              kwargs - skip_pre_preparations=True, skip_post_cleaning=True
                     data - array[array 1..n] - an array of arrays pre prepared

       OUTPUT: array - a reduced vector result
    """
    def __init__(self, data, window, **kwargs):
        super().__init__(**kwargs)
        self._DATA = data
        self._ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self._ROUND_SIZE = kwargs['round_size']
        self._SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self._SKIP_POST_CLEANING = True
            self._ROUND_SIZE = None
        self._SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self._SKIP_PRE_PREPARATIONS = True
        self.__WINDOW = window

    def pure_python_func(self, data):
        f = lambda *args: min(*args)
        return [
            f(x) for x in zip(*data)
        ]

    def final_func(self, data):
        if self._has_accel_lib():
            raise RuntimeError('Min component must be pure python.')
        else:
            return self.pure_python_func(data)

    def get(self):
        data = self._DATA
        if self._SKIP_PRE_PREPARATIONS:
            if data.__len__() != self.__WINDOW or not self._is_matrices(data):
                raise ValueError(
                    'data array must contain exactly %s arrays' % self.__WINDOW
                )
            x = tuple(data.copy())
        else:
            if data.__len__() != 1 and not self._is_series(data):
                raise ValueError(
                    'data arary must be exactly 1 array as a python list ' +
                    'type, and contain either a float or int'
                )
            x = self._vshift(data.copy(), 0, self.__WINDOW)
        x = self._cropna(list(x))
        return self.ignite(
            data, self.final_func, columns=list(x), row_funcs=[],
            round_size=self._ROUND_SIZE
        )

In [None]:
def min3(data, **kwargs):
    return VRollingMinEngine(data, 3, **kwargs).get()

def min7(data, **kwargs):
    return VRollingMinEngine(data, 7, **kwargs).get()

def min14(data, **kwargs):
    return VRollingMinEngine(data, 14, **kwargs).get()

def min32(data, **kwargs):
    return VRollingMinEngine(data, 32, **kwargs).get()

def min64(data, **kwargs):
    return VRollingMinEngine(data, 64, **kwargs).get()

In [None]:
result = min3(data)
print(data.__len__(), result.__len__())

In [None]:
timeit min3(data)

In [None]:
timeit min7(data)

In [None]:
timeit min14(data)

In [None]:
timeit min32(data)

In [None]:
timeit min64(data)

In [None]:
a = min7(data, dropna=True)
b = tuple([round(x, 3) for x in min7(datac, **component_engine_on)])
print(a[-5:])
print(b[-5:])
ut.TestCase().assertTrue(a == b)

#### Max

In [None]:
class VRollingMaxEngine(VectorizeEngine):
    """ Rolling max as a vectorized algorithm.
       DESIGN: - Final Engine - Default Configured
                   - 
               - Component Engine
                   - 

       INPUT: FINAL ENGINE - DEFAULT
              kwargs - skip_pre_preparations=False, skip_post_cleaning=False
                     data - array - single value array

              COMPONENT ENGINE
              kwargs - skip_pre_preparations=True, skip_post_cleaning=True
                     data - array[array 1..n] - an array of arrays pre prepared

       OUTPUT: array - a reduced vector result
    """
    def __init__(self, data, window, **kwargs):
        super().__init__(**kwargs)
        self._DATA = data
        self._ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self._ROUND_SIZE = kwargs['round_size']
        self._SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self._SKIP_POST_CLEANING = True
            self._ROUND_SIZE = None
        self._SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self._SKIP_PRE_PREPARATIONS = True
        self.__WINDOW = window

    def pure_python_func(self, data):
        f = lambda *args: max(*args)
        return [
            f(x) for x in zip(*data)
        ]

    def final_func(self, data):
        if self._has_accel_lib():
            raise RuntimeError('Max component must be pure python.')
        else:
            return self.pure_python_func(data)

    def get(self):
        data = self._DATA
        if self._SKIP_PRE_PREPARATIONS:
            if data.__len__() != self.__WINDOW or not self._is_matrices(data):
                raise ValueError(
                    'data array must contain exactly %s arrays' % self.__WINDOW
                )
            x = tuple(data.copy())
        else:
            if data.__len__() != 1 and not self._is_series(data):
                raise ValueError(
                    'data arary must be exactly 1 array as a python list ' +
                    'type, and contain either a float or int'
                )
            x = self._vshift(data.copy(), 0, self.__WINDOW)
        x = self._cropna(list(x))
        return self.ignite(
            data, self.final_func, columns=list(x), row_funcs=[],
            round_size=self._ROUND_SIZE
        )

In [None]:
def max3(data, **kwargs):
    return VRollingMaxEngine(data, 3, **kwargs).get()

def max7(data, **kwargs):
    return VRollingMaxEngine(data, 7, **kwargs).get()

def max14(data, **kwargs):
    return VRollingMaxEngine(data, 14, **kwargs).get()

def max32(data, **kwargs):
    return VRollingMaxEngine(data, 32, **kwargs).get()

def max64(data, **kwargs):
    return VRollingMaxEngine(data, 64, **kwargs).get()

In [None]:
timeit max3(data)

In [None]:
timeit max7(data)

In [None]:
timeit max14(data)

In [None]:
timeit max32(data)

In [None]:
timeit max64(data)

In [None]:
a = max7(data, dropna=True)
b = tuple([round(x, 3) for x in max7(datac, **component_engine_on)])
print(a[-5:])
print(b[-5:])
ut.TestCase().assertTrue(a == b)

#### Mode

In [None]:
class VRollingModeEngine(VectorizeEngine):
    """Rolling mode as a vectorized algorithm.
       DESIGN: - Final Engine - Default Configured
                   - use np.histogram or pt.mode functions to determine most often recuring values
                   - both libraries will yeild difference results because they are similar in implementation
                   - we dot not care about exactness
                   - numpy is defaulted to 10 bins if window is equal to or larger than 40
                   - pytorch has its own method to yield mode
                   - numpy is more reactive to rolling fluctuations
               - Component Eine
                   - the same as the final engine, except -
                       - you must pass in pre prepared colum series for the rolling sum computation

       INPUT: FINAL ENGINE - DEFAULT
              kwargs - skip_pre_preparations=False, skip_post_cleaning=False
                     data - array - single value array

              COMPONENT ENGINE
              kwargs - skip_pre_preparations=True, skip_post_cleaning=True
                     data - array[array 1..n] - an array of arrays pre prepared

       OUTPUT: array - a reduced vector result
    """
    def __init__(self, data, window, **kwargs):
        super().__init__(**kwargs)
        self._DATA = data
        self._ROUND_SIZE = 2
        if 'round_size' in kwargs.keys():
            self._ROUND_SIZE = kwargs['round_size']
        self._SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self._SKIP_POST_CLEANING = True
            self._ROUND_SIZE = None
        self._SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self._SKIP_PRE_PREPARATIONS = True
        self.__WINDOW = window

    def final_func(self, data):
        if self._has_accel_lib(lib='numpy') or self._has_accel_lib(lib='pytorch'):
            return self._mode(data)
        else:
            raise RuntimeError('Mode component must use numpy or pytorch.')

    def get(self):
        data = self._DATA
        if self._SKIP_PRE_PREPARATIONS:
            if data.__len__() != self.__WINDOW or not self._is_matrices(data):
                raise ValueError(
                    'data array must contain exactly %s arrays' % self.__WINDOW
                )
            x = tuple(data.copy())
        else:
            if data.__len__() != 1 and not self._is_series(data):
                raise ValueError(
                    'data arary must be exactly 1 array as a python list ' +
                    'type, and contain either a float or int'
                )
            x = self._vshift(data.copy(), 0, self.__WINDOW)
        x = self._cropna(list(x))
        return self.ignite(
            data, self.final_func, columns=list(x), row_funcs=[],
            round_size=self._ROUND_SIZE
        )

In [None]:
a = tuple([round(x, 2) for x in VRollingModeEngine(data, 7, use_numpy_lib=np, dropna=True).get()])
b = tuple([round(x, 2) for x in VRollingModeEngine(datac, 7, use_numpy_lib=np, **component_engine_on).get()])
print(a[-100:])
print(b[-100:])
print(a.__len__(), b.__len__())
ok = True
c = 0
v = 0
for q, w in zip(a, b):
    c += 1
    if q != w:
        print(c, q, w)
        v +=  abs(q - w)
        ok = False
v = float(v / c)
print('+/- variance = %s' % v)
ut.TestCase().assertTrue(v < 0.01)

In [None]:
a = tuple([round(x, 2) for x in VRollingModeEngine(data, 7, use_pytorch_lib=pt, dropna=True).get()])
b = tuple([round(x, 2) for x in VRollingModeEngine(datac, 7, use_pytorch_lib=pt, **component_engine_on).get()])
print(a[-100:])
print(b[-100:])
print(a.__len__(), b.__len__())
ok = True
c = 0
v = 0
for q, w in zip(a, b):
    c += 1
    if q != w:
        print(c, q, w)
        v +=  abs(q - w)
        ok = False
v = float(v / c)
print('+/- variance = %s' % v)
ut.TestCase().assertTrue(v < 0.01)

In [None]:
xdata = data.copy()
xdata.extend([0.3 for x in range(0,10)])
xdata.extend(data[:10])
a = tuple([round(float(x), 2) for x in VRollingModeEngine(xdata, 100, use_pytorch_lib=pt, dropna=True).get()])
b = tuple([round(float(x), 2) for x in VRollingModeEngine(xdata, 100, use_numpy_lib=np, dropna=True).get()])
print(a[-100:])
print(b[-100:])
print(a.__len__(), b.__len__())
ok = True
c = 0
v = 0
for q, w in zip(a, b):
    c += 1
    if q != w:
        print(c, q, w)
        v +=  abs(q - w)
        ok = False
v = float(v / c)
print('+/- variance = %s' % v)
ut.TestCase().assertTrue(v < 0.5)

In [None]:
import matplotlib.pyplot as plt
xdata = [0.727, 0.711, 0.579, 0.779, 0.441, 0.123, 0.428, 0.912, 0.902, 0.515, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.988, 0.983, 0.987, 0.219, 0.108, 0.807, 0.239, 0.938, 0.719, 0.49, 0.789, 0.299, 0.823, 0.06, 0.083, 0.095, 0.762, 0.39, 0.614, 0.572, 0.34, 0.277, 0.917, 0.118, 0.644, 0.211, 0.258, 0.929, 0.923, 0.971, 0.18, 0.545, 0.111, 0.831, 0.833, 0.219, 0.586, 0.217, 0.124, 0.32, 0.665, 0.562, 0.543, 0.287, 0.97, 0.163, 0.858, 0.4, 0.175, 0.384, 0.128, 0.334, 0.44, 0.371, 0.151, 0.742, 0.08, 0.641, 0.325, 0.033, 0.199, 0.556, 0.151, 0.125, 0.848, 0.49, 0.878, 0.605, 0.699, 0.624, 0.556, 0.564, 0.557, 0.591, 0.385, 0.393, 0.523, 0.401, 0.504, 0.788]
F = np.histogram(xdata, bins=10, density=False)
plt.plot(F[1][1:], F[0])
plt.show()
print()

In [None]:
b = tuple([round(float(x), 2) for x in VRollingModeEngine(datac, 7, use_pytorch_lib=pt, **component_engine_on).get()])
c = tuple([round(float(x), 2) for x in VRollingModeEngine(datac, 7, use_numpy_lib=np, **component_engine_on).get()])
print(a[-100:])
print(b[-100:])
print(a.__len__(), b.__len__())
ok = True
c = 0
v = 0
for q, w in zip(a, b):
    c += 1
    if q != w:
        print(c, q, w)
        v +=  abs(q - w)
        ok = False
v = float(v / c)
print('+/- variance = %s' % v)
ut.TestCase().assertTrue(v < 0.5)

#### Cumulative Sum

Just use sum3, sum7, etc.

In [None]:
data3 = [
    [5, 5, 5],
    [-3, -3, -3],
    [-2, -2, -2]
]
sum3(data3, **component_engine_on)

In [None]:
data4 = [
    [1, 5, -2],
    [3, 1, 5],
    [-1, 3, 1]
]
sum3(data4, **component_engine_on)

In [None]:
data5 = [-1,3,1,5,-2]
sum3(data5)

In [None]:
a = sum7(data, dropna=True)
b = tuple([round(x, 3) for x in sum7(datac, **component_engine_on)])
print(a[-5:])
print(b[-5:])
ut.TestCase().assertTrue(a == b)

In [None]:
timeit sum3(data)

In [None]:
timeit sum7(data)

In [None]:
timeit sum14(data)

In [None]:
timeit sum32(data)

### Basic Stats

#### Growth Rate

In [None]:
class VGrowthRateEngine(VectorizeEngine):
    """Growth rate as a vectorized algorithm.
       DESIGN: - Final Engine - Default Configured
                   - 
               - Component Engine
                   - 

       INPUT: FINAL ENGINE - DEFAULT
              kwargs - skip_pre_preparations=False, skip_post_cleaning=False
                     data - array - single value array

              COMPONENT ENGINE
              kwargs - skip_pre_preparations=True, skip_post_cleaning=True
                     data - array[array 1..n] - an array of arrays pre prepared

       OUTPUT: array - a reduced vector result
    """
    def __init__(self, data, **kwargs):
        super().__init__(**kwargs)
        self._DATA = data
        self._ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self._ROUND_SIZE = kwargs['round_size']
        self._SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self._SKIP_POST_CLEANING = True
            self._ROUND_SIZE = None
        self._SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self._SKIP_PRE_PREPARATIONS = True
        self.__SHIFTS = 2

    def pure_python_func(self, data):
        f = lambda *args: (args[0][0] - args[0][1]) / args[0][1]
        return [f(x) for x in zip(*data)]

    def final_func(self, data):
        if self._has_accel_lib():
            c, p = data[0], data[1]
            return self._divide(self._subtract(c, p), p)
        else:
            return self.pure_python_func(data)

    def get(self):
        data = self._DATA
        if self._SKIP_PRE_PREPARATIONS:
            if data.__len__() != self.__SHIFTS or not self._is_matrices(data):
                raise ValueError(
                    'data array must contain exactly %s arrays' % self.__SHIFTS
                )
            x = tuple(data.copy())
        else:
            if data.__len__() != 1 and not self._is_series(data):
                raise ValueError(
                    'data arary must be exactly 1 array as a python list ' +
                    'type, and contain either a float or int'
                )
            x = self._vshift(data.copy(), 0, self.__SHIFTS)
            x = self._cropna(list(x))
        return self.ignite(
            data, self.final_func, columns=list(x), row_funcs=[],
            round_size=self._ROUND_SIZE
        )

In [None]:
def gr(data, **kwargs):
    return VGrowthRateEngine(data, **kwargs).get()

In [None]:
# Compare pandas growth rate with vectorized one. Check results.
gr_df = [round(x, 6) for x in data_df[0].pct_change().dropna().values]
g = [x for x in VGrowthRateEngine(data).get() if not x is None]
for a, b in zip(gr_df, g):
    if a != b:
        print(a, b)
gr_df == g

In [None]:
print(data_df[0].count())
print(data.__len__())
print(data1[0].__len__())

In [None]:
a = tuple([round(x, 6) for x in VGrowthRateEngine(data[5:], dropna=True).get()])
b = tuple([round(x, 6) for x in VGrowthRateEngine(datac[:2], **component_engine_on).get()])
print(a[:10])
print(b[:10])
print(a.__len__(), b.__len__())
ut.TestCase().assertTrue(a == b)

In [None]:
timeit pd.DataFrame(data)[0].pct_change()

In [None]:
timeit data_df[0].pct_change()

In [None]:
timeit VGrowthRateEngine(data).get()

In [None]:
timeit VGrowthRateEngine(data1[:2], **component_engine_on).get()

From the above observation, we can speculate that a pandas dataframe auto creates a stats table with pre shifted values. This is the only way pandas can be close to our vectorized performance.

#### Variance

In [None]:
import math
class VVarianceEngine(VectorizeEngine):
    """Rolling variance as a vectorized algorithm.
       DESIGN: - Final Engine - Default Configured
                   - use a rolling sum engine to perform a configurable computation
                   - use this engine to perform a pure python vectorized rolling average
               - Component Engine
                   - the same as the final engine, except -
                       - you must pass in pre prepared colum series for the rolling sum computation

       INPUT: FINAL ENGINE - DEFAULT
              kwargs - skip_pre_preparations=False, skip_post_cleaning=False
                     data - array - single value array

              COMPONENT ENGINE
              kwargs - skip_pre_preparations=True, skip_post_cleaning=True
                     data - array[array 1..n] - an array of arrays pre prepared

       OUTPUT: array - a reduced vector result
    """
    def __init__(self, data, window, **kwargs):
        super().__init__(**kwargs)
        self._DATA = data
        self._ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self._ROUND_SIZE = kwargs['round_size']
        self._SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self._SKIP_POST_CLEANING = True
            self._ROUND_SIZE = None
        self._SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self._SKIP_PRE_PREPARATIONS = True
        self.__WINDOW = window

    def pure_python_func(self, data):
        fS, fS2 = lambda *x: sum(*x), lambda x: sum([x**2 for x in x])
        fV = lambda *x: (fS2(*x) - (fS(*x)**2 / self.__WINDOW)) / (self.__WINDOW - 1)
        return [fV(x) for x in zip(*data)]

    def final_func(self, data):
        if self._has_accel_lib():
            # raise RuntimeError('Variance component must be pure python.')
            fS2 = lambda x: sum([x**2 for x in x])
            ones = [1 for x in range(0, data[0].__len__())]
            twos = [2 for x in range(0, data[0].__len__())]
            ntotal = [self.__WINDOW for x in range(0, data[0].__len__())]
            return self._divide([
                self._subtract([
                    [fS2(x) for x in zip(*data)],
                    self._divide([
                        self._power([
                            self._add(data),
                            twos
                        ])
                        , ntotal
                    ])
                ])
                ,
                self._subtract([ntotal, ones])
            ])
        else:
            return self.pure_python_func(data)

    def get(self):
        data = self._DATA
        if self._SKIP_PRE_PREPARATIONS:
            if data.__len__() != self.__WINDOW or not self._is_matrices(data):
                raise ValueError(
                    'data array must contain exactly %s arrays' % self.__WINDOW
                )
            x = tuple(data.copy())
        else:
            if data.__len__() != 1 and not self._is_series(data):
                raise ValueError(
                    'data arary must be exactly 1 array as a python list ' +
                    'type, and contain either a float or int'
                )
            x = self._vshift(data.copy(), 0, self.__WINDOW)
        x = self._cropna(list(x))
        return self.ignite(
            data, self.final_func, columns=list(x), row_funcs=[],
            round_size=self._ROUND_SIZE
        )

In [None]:
def var3(data, **kwargs):
    return VVarianceEngine(data, 3, **kwargs).get()

def var7(data, **kwargs):
    return VVarianceEngine(data, 7, **kwargs).get()

def var14(data, **kwargs):
    return VVarianceEngine(data, 14, **kwargs).get()

def var32(data, **kwargs):
    return VVarianceEngine(data, 32, **kwargs).get()

def var64(data, **kwargs):
    return VVarianceEngine(data, 64, **kwargs).get()

In [None]:
a = tuple([round(x, 6) for x in VVarianceEngine(data, 7, dropna=True).get()])
b = tuple([round(x, 6) for x in VVarianceEngine(datac, 7, **component_engine_on).get()])
print(a[:10])
print(b[:10])
print(a.__len__(), b.__len__())
ut.TestCase().assertTrue(a == b)

In [None]:
a = tuple([round(x, 4) for x in VVarianceEngine(datac, 7, **component_engine_on).get()])
b = tuple([round(float(x), 4) for x in VVarianceEngine(datac, 7, use_pytorch_lib=pt, **component_engine_on).get()])
c = tuple([round(float(x), 4) for x in VVarianceEngine(datac, 7, use_numpy_lib=np, **component_engine_on).get()])
print(a[:10])
print(b[:10])
print(c[:10])
print(a.__len__(), b.__len__(), c.__len__())
ut.TestCase().assertTrue(a == b == c)

In [None]:
timeit VVarianceEngine(data, 7).get()

In [None]:
timeit VVarianceEngine(datac, 7, use_pytorch_lib=pt, **component_engine_on).get()

In [None]:
timeit VVarianceEngine(datac, 7, use_numpy_lib=np, **component_engine_on).get()

In [None]:
timeit VVarianceEngine(datac, 7, **component_engine_on).get()

In [None]:
import math
class VStandardDeviationEngine(VectorizeEngine):
    """Rolling standard deviation as a vectorized algorithm.
       DESIGN: - Final Engine - Default Configured
                   - use a rolling sum engine to perform a configurable computation
                   - use this engine to perform a pure python vectorized rolling average
               - Component Engine
                   - the same as the final engine, except -
                       - you must pass in pre prepared colum series for the rolling sum computation

       INPUT: FINAL ENGINE - DEFAULT
              kwargs - skip_pre_preparations=False, skip_post_cleaning=False
                     data - array - single value array

              COMPONENT ENGINE
              kwargs - skip_pre_preparations=True, skip_post_cleaning=True
                     data - array[array 1..n] - an array of arrays pre prepared

       OUTPUT: array - a reduced vector result
    """
    def __init__(self, data, window, **kwargs):
        super().__init__(**kwargs)
        self._DATA = data
        self._ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self._ROUND_SIZE = kwargs['round_size']
            del kwargs['round_size']
        kwargs.update({'round_size': None})
        self._SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self._SKIP_POST_CLEANING = True
            self._ROUND_SIZE = None
        self._SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self._SKIP_PRE_PREPARATIONS = True
        self.__VRVAR = VVarianceEngine(data, window, **kwargs).get
        self.__WINDOW = window

    def _std(self, data):
        V = data
        return [math.sqrt(x) for x in V if x]

    def pure_python_func(self, data):
        return self._std(data)

    def final_func(self, data):
        if self._has_accel_lib():
            raise RuntimeError('Average component must be pure python.')
        else:
            return self.pure_python_func(data)

    def get(self):
        data = [x for x in self.__VRVAR()]
        return self.ignite(
            data, self.final_func, columns=[], row_funcs=[],
            round_size=self._ROUND_SIZE
        )

In [None]:
def std3(data, **kwargs):
    return VStandardDeviationEngine(data, 3, **kwargs).get()

def std7(data, **kwargs):
    return VStandardDeviationEngine(data, 7, **kwargs).get()

def std14(data, **kwargs):
    return VStandardDeviationEngine(data, 14, **kwargs).get()

def std32(data, **kwargs):
    return VStandardDeviationEngine(data, 32, **kwargs).get()

def std64(data, **kwargs):
    return VStandardDeviationEngine(data, 64, **kwargs).get()

In [None]:
a = std3(data, round_size=7)
b = tuple([round(x, 7) for x in list(data_df[0].rolling(3).std().values) if x > 0])
print(a[-10:])
print(b[-10:])
print(a.__len__(), b.__len__())
ut.TestCase().assertTrue(a == b)

In [None]:
a = tuple([x for x in VStandardDeviationEngine(data, 7, round_size=None, dropna=True).get()])
b = tuple([x for x in VStandardDeviationEngine(datac, 7, **component_engine_on).get()])
print(a[:10])
print(b[:10])
print(a.__len__(), b.__len__())
ut.TestCase().assertTrue(a == b)

# Inference Stats Tools

## Bayesian Inference Matrix

In [None]:
import math
from fractions import Fraction as F

class OddsTools():
    """ Methods to help with constructing odds related vectorizations.
    """
    def __get_fall(self, x):
        if x < 0:
            return 1
        return 0

    def __get_rise(self, x):
        if x > 0:
            return 1
        return 0

    def get_growth_rate_as_tagged_tuple(self, series):
        """ Use for vectorisation computations. A pre data format filter.

            INPUT: array of growth rate values.
            OUTPUT: a tag of growth rate values
                    a tuple representing rise and fall for vectorisation computations
                    (
                        rise as value 1 other is 0,
                        fall as value 1 other is 0
                    )
        """
        series_list = list(series)
        series_size = series_list.__len__()
        new = [(self.__get_rise(x), self.__get_fall(x)) for x in series_list if x]
        dropped = series_size - new.__len__()
        if dropped == 0:
            return tuple(new)
        return new


tag = OddsTools().get_growth_rate_as_tagged_tuple


class VDirectionalBayesianMatrixEngine(VectorizeEngine):
    """A 4 by 4 rolling vecotrized bayesian inference matrix ideal for time series inference.
       Use it for deductive lock step, co-incidence, and seasonality.

       DESIGN: - Final Engine - Default Configured
                   -  
               - Component Engine
                   - the same as the final engine, except -
                       - you must pass in pre prepared colum series for the rolling sum computation

       INPUT: FINAL ENGINE - DEFAULT
              kwargs - skip_pre_preparations=False, skip_post_cleaning=False
                     data - array[[speculation grs], [benchmark grs]]
                          - input series must be raw growth rate value +/- values

              COMPONENT ENGINE
              kwargs - skip_pre_preparations=True, skip_post_cleaning=True
                     data - array[
                                 [[speculation tagged tuples window],1...n],
                                 [[benchmark tagged tuples window],1...n]
                            ]
                          - pre prepared
                          - input series must be pre converted into a series of tuple tags

              Generic Inference
                    Evidence_tag - Evidence series in tagged form. E1 & E2 - Rise & Fall.
                    Hypotheses_tag - Hypotheses series in tagged form. H1 & H2 - Rise & Fall.

              Generic Growth Rate Inference
                    SPtag - Speculation growth rate - wraps Evidence_tag
                    BMtag - Benchmark growth rate - wraps Hypotheses_tag

       OUTPUT: 
              Tuple of arrays containing the rolling posterior odds, given the sp and bm inputs.
                  - (
                      [P(H1|E1), P(H2|E1), P(H1|E2), P(H2|E2)],
                      ...
                    )

              Generic Inference
                    H1_E1 - P(H1|E1)
                    H2_E2 - P(H2|E2)
                    H2_E1 - P(H2|E1)
                    H1_E2 - P(H1|E2)

              Generic Growth Rate Inference
                    BMrise_SPrise - wraps H1_E1
                    BMfall_SPfall - wraps H2_E2
                    BMfall_SPrise - wraps H2_E1
                    BMrise_SPfall - wraps H1_E2

    """
    def __init__(self, data, window, **kwargs):
        super().__init__(**kwargs)
        self._DATA = data
        self._ROUND_SIZE = 6
        if 'round_size' in kwargs.keys():
            self._ROUND_SIZE = kwargs['round_size']
        self._SKIP_POST_CLEANING = False
        if 'skip_post_cleaning' in kwargs.keys() and kwargs['skip_post_cleaning']:
            self._SKIP_POST_CLEANING = True
            self._ROUND_SIZE = None
        self._SKIP_PRE_PREPARATIONS = False
        if 'skip_pre_preparations' in kwargs.keys() and kwargs['skip_pre_preparations']:
            self._SKIP_PRE_PREPARATIONS = True
        self.__WINDOW = window

    def __min_zero(self, value):
        if value == 0:
            return 1
        return value

    def pure_python_func(self, SP_rolling, BM_rolling):
        # Calculate Prior
        #  - Create components
        count_ones = lambda x: [i for i in x if i == 1].__len__()
        rise = lambda x: [i[0] for i in x]
        fall = lambda x: [i[1] for i in x]
        sp_r = [rise(x) for x in zip(*SP_rolling)]
        sp_f = [fall(x) for x in zip(*SP_rolling)]
        bm_r = [rise(x) for x in zip(*BM_rolling)]
        bm_f = [fall(x) for x in zip(*BM_rolling)]
        intersect = lambda s, b: [x * y for x, y in zip(s, b)]
        #  - Reduce into BM quadrant components
        #  - multiply the rolling rises and falls
        p_h1_e1 = [intersect(s, b) for s, b in zip(sp_r, bm_r)]
        p_h2_e2 = [intersect(s, b) for s, b in zip(sp_f, bm_f)]
        # - count the rolling ones
        bm_p_h1_e1 = [count_ones(x) for x in p_h1_e1]
        bm_p_h2_e2 = [count_ones(x) for x in p_h2_e2]
        #  - multiply the rolling rises and falls
        p_h1_e2 = [intersect(s, b) for s, b in zip(sp_f, bm_r)]
        p_h2_e1 = [intersect(s, b) for s, b in zip(sp_r, bm_f)]
        # - count the rolling ones
        bm_p_h1_e2 = [count_ones(x) for x in p_h1_e2]
        bm_p_h2_e1 = [count_ones(x) for x in p_h2_e1]

        # Calculate Posterior
        f = lambda x: (
            F(self.__min_zero(x[0]) / self.__min_zero(x[1])).limit_denominator(100),  # P(H1|E1) / P(H2|E1)
            F(self.__min_zero(x[1]) / self.__min_zero(x[0])).limit_denominator(100),  # P(H2|E1) / P(H1|E1)
            F(self.__min_zero(x[2]) / self.__min_zero(x[3])).limit_denominator(100),  # P(H1|E2) / P(H2|E2)
            F(self.__min_zero(x[3]) / self.__min_zero(x[2])).limit_denominator(100)   # P(H2|E2) / P(H1|E2)
        )
        # - left to right - top to bottom - matrix
        bm = [
            f(x) for x in zip(
                bm_p_h1_e1, bm_p_h2_e1,
                bm_p_h1_e2, bm_p_h2_e2
            )
        ]
        return bm

    def final_func(self, data):
        SP = data[0]
        BM = data[1]
        if self._has_accel_lib():
            raise RuntimeError('BayesianMatrix component must be pure python.')
        else:
            return self.pure_python_func(SP, BM)

    def get_rolling_data(self):
        data = self._DATA
        if data.__len__() != 2 or not self._is_series(data[0]):
            raise ValueError(
                'data array must contain exactly 2 arrays containing series ' +
                'growth rate values +/-'
            )
        sp = self._vshift(tag(gr(list(data[0]).copy())), 0, self.__WINDOW)
        bm = self._vshift(tag(gr(list(data[1]).copy())), 0, self.__WINDOW)
        return sp, bm

    def get(self):
        data = self._DATA
        if self._SKIP_PRE_PREPARATIONS:
            if data.__len__() != 2 or data[0][0][0].__len__() != self.__WINDOW:
                raise ValueError(
                    'data array must contain exactly 2 arrays containing rolling ' +
                    'window arrays of %s values' % self.__WINDOW
                )
            sp = tuple(data[0].copy())
            bm = tuple(data[1].copy())
        else:
            sp, bm = self.get_rolling_data()
        sp, bm = self._cropna(sp), self._cropna(bm)
        if not self._SKIP_POST_CLEANING:
            return self._square_right_by_ref(
                self._DATA[1],
                self.ignite(
                    data, self.final_func, columns=[sp, bm], row_funcs=[],
                    round_size=self._ROUND_SIZE
                )
            )
        return self.ignite(
            data, self.final_func, columns=[sp, bm], row_funcs=[],
            round_size=self._ROUND_SIZE
        )

In [None]:
def bme(data, window, **kwargs):
    return VDirectionalBayesianMatrixEngine(data, window, **kwargs).get()

In [None]:
# Test case from Speculative Statistics III CH4.1
spx = [1,2,3,2,3,2,1]
bmx = [1,2,3,4,3,4,5]
sp = data.copy()
sp.extend(spx)
bm = datax.copy()
bm.extend(bmx)
#print(tag(gr(sp)))
rtn = bme((sp, bm), 6)

In [None]:
last_matrix = rtn[-1:][0]
print(last_matrix)
print(F(last_matrix[0]).limit_denominator(100))
print(F(last_matrix[1]).limit_denominator(100))
print(F(last_matrix[2]).limit_denominator(100))
print(F(last_matrix[3]).limit_denominator(100))

In [None]:
print(bm.__len__())
print(rtn.__len__())

In [None]:
rtn[:10]

## Lock Step Bayesian Inference Matrix

In [None]:
import math
class VLockStepBayesianMatrixEngine(VDirectionalBayesianMatrixEngine):
    """Rolling lock step bayesian matrix as a vectorized algorithm.

       DESIGN: - Final Engine - Default Configured
                   -  
               - Component Engine
                   - the same as the final engine, except -
                       - you must pass in pre prepared colum series for the rolling sum computation
                
               - 

       INPUT: FINAL ENGINE - DEFAULT
              kwargs - skip_pre_preparations=False, skip_post_cleaning=False
                     data - array[[speculation grs], [benchmark grs]]
                          - input series must be raw growth rate value +/- values

              COMPONENT ENGINE
              kwargs - skip_pre_preparations=True, skip_post_cleaning=True
                     data - array[
                                 [[speculation tagged tuples window],1...n],
                                 [[benchmark tagged tuples window],1...n]
                            ]
                          - pre prepared
                          - input series must be pre converted into a series of tuple tags

              Generic Inference
                    Evidence_tag - Evidence series in tagged form. E1 & E2 - Rise & Fall.
                    Hypotheses_tag - Hypotheses series in tagged form. H1 & H2 - Rise & Fall.

              Generic Growth Rate Inference
                    SPtag - Speculation growth rate - wraps Evidence_tag
                    BMtag - Benchmark growth rate - wraps Hypotheses_tag
    """
    def __init__(self, data, window, lag, **kwargs):
        super().__init__(data, window, **kwargs)
        self.__WINDOW = window
        if lag <= 0:
            raise ValueError('Lag must be 1 or more for lock step calculations.')
        self.__LAG = lag

    def get_rolling_data(self):
        data = self._DATA
        if data.__len__() != 2 or not self._is_series(data[0]):
            raise ValueError(
                'data array must contain exactly 2 arrays containing series ' +
                'growth rate values +/-'
            )
        # Co-incide the previous value for bayesian matrix intersection calculations, ie. lock step
        dropna = self._dropna
        if self.__LAG == 1:
            sp = dropna(data[0])
            sp = self._vshift(tag(gr(list(sp).copy())), 0, self.__WINDOW)

        elif self.__LAG > 1:
            sp = self._dropna(data[0])
            sp = self._vshift(
                tag(
                    dropna(
                        cumsum(
                            dropna(
                                gr(sp)
                            ),
                            self.__LAG
                        )
                    )
                ),
                0,
                self.__WINDOW
            )

        bm = self._vshift(tag(gr(list(data[1]).copy())), 0, self.__WINDOW)
        rtn = self._square_right([sp, bm])
        return rtn[0], rtn[1]

    def get(self):
        data = self._DATA
        if self._SKIP_PRE_PREPARATIONS:
            if data.__len__() != 2 or data[0][0][0].__len__() != self.__WINDOW:
                raise ValueError(
                    'data array must contain exactly 2 arrays containing rolling ' +
                    'window arrays of %s values' % self.__WINDOW
                )
            sp = tuple(data[0].copy())
            bm = tuple(data[1].copy())
        else:
            sp, bm = self.get_rolling_data()
        sp, bm = self._cropna(sp), self._cropna(bm)
        if not self._SKIP_POST_CLEANING:
            return self._square_right_by_ref(
                self._DATA[1],
                self.ignite(
                    data, self.final_func, columns=[sp, bm], row_funcs=[],
                    round_size=self._ROUND_SIZE
                )
            )
        return self.ignite(
            data, self.final_func, columns=[sp, bm], row_funcs=[],
            round_size=self._ROUND_SIZE
        )


In [None]:
def bme_lockstep(data, window, lag, **kwargs):
    return VLockStepBayesianMatrixEngine(data, window, lag, **kwargs).get()

In [None]:
# Test case from Speculative Statistics III CH4.1 but results are for lock step
spx = [1,2,3,2,3,2,1]
bmx = [1,2,3,4,3,4,5]
sp = data.copy()
sp.extend(spx)
bm = datax.copy()
bm.extend(bmx)
#print(tag(gr(sp)))
rtn = bme_lockstep((sp, bm), 64, 5)

In [None]:
last_matrix = rtn[-1:][0]
print(last_matrix)
print(F(last_matrix[0]).limit_denominator(100))
print(F(last_matrix[1]).limit_denominator(100))
print(F(last_matrix[2]).limit_denominator(100))
print(F(last_matrix[3]).limit_denominator(100))

In [None]:
print(sp.__len__())
print(rtn.__len__())

## Seasonal Bayesian Inference Matrix

In [None]:
# Just use bme or bme_lockstep but with pre created seasonal tags via rlnp
import math
import rlab_common_numpy_pandas as rlnp

# Validate series as time series



# Get seasonal bme and bme_lockstep






In [None]:
import tensorflow as tf
import torch as pt
tf.__name__
pt.__name__

# 
# 
# 
# 
# 
# Conclusion

Vectorization is fast.<br/>

A slow vectorization is usually caused by unecessary data preparation and cleaning, pre and post computation.<br/>

The ideal solution is to separate a mass computional orchestration into stages so that expensive coding operations are done in bulk and not on every vectorization call. <br/>

**Organisational Orchestration Stages -**
* **Clean** - type conversions
* **Prepare** - series duplication and value shifting
* **Compute** - Component Engines - Fine Grained
 * Multiple vectorization engines reducing and enriching result
 * Must have data preparation and cleaning turned off
* **Compute** - Final Engines - Course Grained
 * Takes all the results from the mass component computations and reduces it to a final result
 * **Clean** - round the reduced result
* **Store**
 * Store output of engine components to flat file
 * Store output of final engine compute to flat file
<br/>

With the above mass orchestration, **we do not duplicate** expensive data cleaning and preparations within each vectorization engine. <br/>

We also are enabled to test cycle the final engine's algorithm by storing the component engine's result. <br/>This enables fast failure cycles for the final compute R&D.