# Spectral Signal

## Implementation

In [1]:
import numpy as np
from contextlib import contextmanager
from copy import deepcopy
from collections import OrderedDict
from pandas import Series

import colour
from colour import (
    CaseInsensitiveMapping,
    CubicSplineInterpolator,
    Extrapolator,
    LinearInterpolator,
    PchipInterpolator,
    SpragueInterpolator,
    as_numeric,
    is_numeric,
    tsplit,
    tstack,
    warning)

INTERPOLATORS = CaseInsensitiveMapping({
    'Cubic Spline': CubicSplineInterpolator,
    'Linear': LinearInterpolator,
    'Pchip': PchipInterpolator,
    'Sprague': SpragueInterpolator})


def nearest_index(a, b):
    index = np.searchsorted(a, b)
    
    return np.where(np.abs(b - a[index-1]) < np.fabs(b - a[index]),
                   index - 1,
                   index)

def nearest(a, b):
    return a[nearest_index(a, b)]


@contextmanager
def ndarray_write(a):
    a.setflags(write=True)

    yield a

    a.setflags(write=False)


def fill_nan(a, method='Interpolation'):
    mask = np.isnan(a)

    if method.lower() == 'interpolation':
        a[mask] = np.interp(
            np.flatnonzero(mask),
            np.flatnonzero(~mask),
            a[~mask])
    elif method.lower() == 'zeros':
        a[mask] = 0

    return a


def is_pandas_installed():
    try:
        import pandas

        return True
    except ImportError:
        return False


def unpack_data(data=None, index=None):
    index_f, values_f = None, None
    if isinstance(data, ContinuousSignal):
        index_f = data.index
        values_f = data.values
    if (isinstance(data, tuple) or
            isinstance(data, list) or
            isinstance(data, np.ndarray)):
        data = np.asarray(data)
        if data.ndim == 1:
            values_f = data
        elif data.ndim == 2:
            index_f, values_f = tsplit(data)
        else:
            raise ValueError('"data" must be a 1d or 2d array-like variable!')
    elif (isinstance(data, dict) or
              isinstance(data, OrderedDict)):
        index_f, values_f = tsplit(sorted(data.items()))
    elif is_pandas_installed():
        if isinstance(data, Series):
            index_f = data.index.values
            values_f = data.values
    
    index_f = index_f if index is None else index

    return index_f, values_f


class ContinuousSignal(object):
    def __init__(self,
                 data=None,
                 index=None,
                 interpolation_method=None,
                 interpolation_options=None,
                 extrapolation_method=None,
                 extrapolation_options=None):

        self.__values = None
        self.__index = None
        self.__interpolation_method = 'Pchip'
        self.__interpolation_options = {}
        self.__extrapolation_method = 'Constant'
        self.__extrapolation_options = {'left': np.nan, 'right': np.nan}

        index, values = unpack_data(data, index)

        self.values = values
        self.index = index
        self.interpolation_method = interpolation_method
        self.interpolation_options = interpolation_options
        self.extrapolation_method = extrapolation_method
        self.extrapolation_options = extrapolation_options

        self.__create_function()

    @property
    def values(self):
        return self.__values

    @values.setter
    def values(self, value):
        if value is not None:
            if not np.all(np.isfinite(value)):
                warning('"values" variable is not finite, '
                        'unpredictable results may occur!\n{0}'.format(value))

            value = np.asarray(value)

            if self.__index is not None:
                assert value.size == self.__index.size, (
                    '"index" and "values" variables must have same size!')

            value.setflags(write=False)
            self.__values = value
            self.__create_function()

    @property
    def index(self):
        return self.__index

    @index.setter
    def index(self, value):
        if value is not None:
            if not np.all(np.isfinite(value)):
                warning('"index" variable is not finite, '
                        'unpredictable results may occur!\n{0}'.format(value))

            value = np.asarray(value)

            if self.__values is not None:
                assert value.size == self.__values.size, (
                    '"index" and "values" variables must have same size!')

            value.setflags(write=False)
            self.__index = value
            self.__create_function()

    @property
    def interpolation_method(self):
        return self.__interpolation_method

    @interpolation_method.setter
    def interpolation_method(self, value):
        if value is not None:
            assert type(value) in (str, unicode), (  # noqa
                ('"{0}" attribute: "{1}" type is not '
                 '"str" or "unicode"!').format('interpolation_method', value))

            assert value in INTERPOLATORS, (
                '"{0}" attribute: "{1}" interpolation method is not defined! Available methods: "{2}".'.format(
                    'interpolation_method', value, sorted(INTERPOLATORS.keys())))

            self.__interpolation_method = value
            self.__create_function()

    @property
    def interpolation_options(self):
        return self.__interpolation_options

    @interpolation_options.setter
    def interpolation_options(self, value):
        if value is not None:
            assert type(value) in (dict, OrderedDict), (
                ('"{0}" attribute: "{1}" type is not '
                 '"dict" or "OrderedDict"!').format(
                    'interpolation_options', value))

            self.__interpolation_options = value
            self.__create_function()

    @property
    def extrapolation_method(self):
        return self.__extrapolation_method

    @extrapolation_method.setter
    def extrapolation_method(self, value):
        if value is not None:
            assert type(value) in (str, unicode), (  # noqa
                ('"{0}" attribute: "{1}" type is not '
                 '"str" or "unicode"!').format('extrapolation_method', value))

            assert value in ('Constant', 'Linear'), (
                '"{0}" attribute: "{1}" extrapolation method is not defined! Available methods: "[\'Constant\', \'Linear\']".'.format(
                    'interpolation_method', value))

            self.__extrapolation_method = value
            self.__create_function()

    @property
    def extrapolation_options(self):
        return self.__extrapolation_options

    @extrapolation_options.setter
    def extrapolation_options(self, value):
        if value is not None:
            assert type(value) in (dict, OrderedDict), (
                ('"{0}" attribute: "{1}" type is not '
                 '"dict" or "OrderedDict"!').format(
                    'extrapolation_options',value))

            self.__extrapolation_options = value
            self.__create_function()

    @property
    def function(self):
        return self.__function

    @function.setter
    def function(self, value):
        raise AttributeError(
            '"{0}" attribute is read only!'.format('function'))

    def __create_function(self):
        if self.__index is not None and self.__values is not None:
            self.__function = Extrapolator(
                INTERPOLATORS[self.__interpolation_method](
                    self.__index, self.__values,
                    **self.__interpolation_options),
                method=self.__extrapolation_method,
                **self.__extrapolation_options)
        else:
            def __undefined_signal_interpolator_function(*args, **kwargs):
                raise RuntimeError(
                    'Underlying signal interpolator function does not exists, '
                    'please ensure you defined both "index" and "values" variables!')

            self.__function = __undefined_signal_interpolator_function

    def __getitem__(self, x):
        if type(x) is slice:
            return self.__values[x]
        else:
            return self.__function(x)

    def __setitem__(self, x, value):
        if type(x) is slice:
            with ndarray_write(self.__values):
                self.__values[x] = value
        else:
            with ndarray_write(self.__index), ndarray_write(self.__values):
                x = np.atleast_1d(x)
                value = np.atleast_1d(value)

                # Matching index, replacing existing `self.values`.             
                self.__values[np.in1d(self.__index, x)] = value

                # Non matching index, inserting into existing `self.index` and 
                # `self.values`.
                x = x[~np.in1d(x, self.__index)]
                indexes = np.searchsorted(self.__index, x)

                self.__index = np.insert(self.__index, indexes, x)
                self.__values = np.insert(self.__values, indexes, value)

        self.__create_function

    def __iadd__(self, x):
        if isinstance(x, self.__class__):
            x = self.__function(x.index)

        with ndarray_write(self.__values):
            self.values += x

        return self

    def __add__(self, x):
        copy = self.copy()
        copy += x

        return copy

    def __isub__(self, x):
        if isinstance(x, self.__class__):
            x = self.__function(x.index)

        with ndarray_write(self.__values):
            self.values -= x

        return self

    def __sub__(self, x):
        copy = self.copy()
        copy -= x

        return copy

    def __imul__(self, x):
        if isinstance(x, self.__class__):
            x = self.__function(x.index)

        with ndarray_write(self.__values):
            self.values *= x

        return self

    def __mul__(self, x):
        copy = self.copy()
        copy *= x

        return copy

    def __idiv__(self, x):
        if isinstance(x, self.__class__):
            x = self.__function(x.index)

        with ndarray_write(self.__values):
            self.values /= x

        return self

    def __div__(self, x):
        copy = self.copy()
        copy /= x

        return copy

    __itruediv__ = __idiv__
    __truediv__ = __div__

    def copy(self):
        return deepcopy(self)

    def fill_nan(self, method='Interpolation'):
        with ndarray_write(self.__index), ndarray_write(self.__values):
            self.__index = fill_nan(self.__index, method)
            self.__values = fill_nan(self.__values, method)
            self.__create_function()

## Empty Object Initialisation

In [2]:
cs1 = ContinuousSignal()

print('1) cs1[0]')
try:
    print(cs1[0])
except RuntimeError as error:
    print(error)

print('\n')

index = np.arange(0, 1000, 100)
cs1 = ContinuousSignal(index=index)
print('2) cs1[0]')
try:
    print(cs1[0])
except RuntimeError as error:
    print(error)

print('\n')

values = np.linspace(1, 10, index.size)
cs1 = ContinuousSignal(values, index)
print('3) cs1[0]')
print(cs1[0])

print('\n')

print('4) cs1 = ContinuousSignal(values, [])')
try:
    cs1 = ContinuousSignal(values, [])
except AssertionError as error:
    print(error)

1) cs1[0]
Underlying signal interpolator function does not exists, please ensure you defined both "index" and "values" variables!


2) cs1[0]
Underlying signal interpolator function does not exists, please ensure you defined both "index" and "values" variables!


3) cs1[0]
1.0


4) cs1 = ContinuousSignal(values, [])
"index" and "values" variables must have same size!


## Object Initialisation

In [3]:
index = np.arange(0, 1000, 100)
index_a = np.linspace(0, 1, 10)
values = np.linspace(1, 10, index.size)

data = zip(index, values)

print('1) cs1 = ContinuousSignal(values, index)')
cs1 = ContinuousSignal(values, index)

print(cs1.values)
print(cs1.index)

print('\n')

print('2) cs1 = ContinuousSignal(data)')
cs1 = ContinuousSignal(data)

print(cs1.values)
print(cs1.index)

print('\n')

print('3) cs1 = ContinuousSignal(data, index_a)')
cs1 = ContinuousSignal(data, index_a)

print(cs1.values)
print(cs1.index)

print('\n')

print('4) cs1 = ContinuousSignal(ContinuousSignal(data))')
cs1 = ContinuousSignal(ContinuousSignal(data))

print(cs1.values)
print(cs1.index)

print('\n')

print('5) cs1 = ContinuousSignal(Series(data))')
cs1 = ContinuousSignal(Series(values, index))

print(cs1.values)
print(cs1.index)

print('\n')

1) cs1 = ContinuousSignal(values, index)
[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
[  0 100 200 300 400 500 600 700 800 900]


2) cs1 = ContinuousSignal(data)
[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
[   0.  100.  200.  300.  400.  500.  600.  700.  800.  900.]


3) cs1 = ContinuousSignal(data, index_a)
[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
[ 0.          0.11111111  0.22222222  0.33333333  0.44444444  0.55555556
  0.66666667  0.77777778  0.88888889  1.        ]


4) cs1 = ContinuousSignal(ContinuousSignal(data))
[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
[   0.  100.  200.  300.  400.  500.  600.  700.  800.  900.]


5) cs1 = ContinuousSignal(Series(data))
[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
[  0 100 200 300 400 500 600 700 800 900]




## Copy Operations

In [4]:
index = np.arange(0, 1000, 100)
values = np.linspace(1, 10, index.size)

cs1 = ContinuousSignal(values, index)

print('id(cs1)')
print(id(cs1))
print(cs1.function)

print('\n')

cs2 = cs1.copy()
print('id(cs2)')
print(id(cs2))
print(cs2.function)

id(cs1)
140007615530128
<colour.algebra.extrapolation.Extrapolator object at 0x7f56107b5f50>


id(cs2)
140007620222672
<colour.algebra.extrapolation.Extrapolator object at 0x7f56107b5310>


## Item Operations

In [5]:
index = np.arange(0, 1000, 100)
values = np.linspace(1, 10, index.size)

cs1 = ContinuousSignal(values, index)

print('cs1')
print(cs1.values)
print(cs1.index)

print('\n')

print('cs1[150.25]')
print(cs1[150.25])

print('\n')

print('cs1[np.linspace(100, 400, 10)]')
print(cs1[np.linspace(100, 400, 10)])

print('\n')

print('cs1[0:3]')
print(cs1[0:3])

print('\n')

print('cs1[10] = np.pi')
cs1[10] = np.pi
print(cs1.values)
print(cs1.index)

print('\n')

print('cs1[(200, 300)] = np.pi')
cs1[(200, 300)] = np.pi
print(cs1.values)
print(cs1.index)

print('\n')

print('cs1[(0, 850)] = np.pi')
cs1[(0, 850)] = np.pi
print(cs1.values)
print(cs1.index)

print('\n')

print('cs1[0:9] = np.pi')
cs1[0:9] = np.pi
print(cs1.values)
print(cs1.index)

cs1
[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
[  0 100 200 300 400 500 600 700 800 900]


cs1[150.25]
2.5025


cs1[np.linspace(100, 400, 10)]
[ 2.          2.33333333  2.66666667  3.          3.33333333  3.66666667
  4.          4.33333333  4.66666667  5.        ]


cs1[0:3]
[ 1.  2.  3.]


cs1[10] = np.pi
[  1.           3.14159265   2.           3.           4.           5.           6.
   7.           8.           9.          10.        ]
[  0  10 100 200 300 400 500 600 700 800 900]


cs1[(200, 300)] = np.pi
[  1.           3.14159265   2.           3.14159265   3.14159265   5.           6.
   7.           8.           9.          10.        ]
[  0  10 100 200 300 400 500 600 700 800 900]


cs1[(0, 850)] = np.pi
[  3.14159265   3.14159265   2.           3.14159265   3.14159265   5.           6.
   7.           8.           9.           3.14159265  10.        ]
[  0  10 100 200 300 400 500 600 700 800 850 900]


cs1[0:9] = np.pi
[  3.14159265   3.14159265   3.14159265   3.

## Arithmetical Operations with Mismatching Index

In [6]:
index = np.arange(0, 1000, 100)
values = np.linspace(1, 10, index.size)

cs1 = ContinuousSignal(values, index)
cs2 = ContinuousSignal(values, index + 400)

print('cs1')
print(cs1.values)
print(cs1.index)

print('cs2')
print(cs2.values)
print(cs2.index)

print('\n')

print('cs1 += cs2')
cs1 += cs2
print(cs1.values)

print('\n')

print('cs1 += 1')
cs1 += 1
print(cs1.values)

print('\n')

print('cs1 -= np.ones(index.size)')
cs1 -= np.ones(index.size)
print(cs1.values)

print('\n')

print('cs2 = cs1 + cs1')
cs2 = cs1 + cs1
print(cs2.values)

print('\n')

print('Cubic Spline interpolation fails with Nan(s) values.')
cs1.interpolation_method = 'Cubic Spline'
cs2 = cs1 + cs1
print(cs2.values)

print('\n')

print('Cubic Spline interpolation with filled Nan(s) values.')
cs1.fill_nan()
cs2 = cs1 + cs1
print(cs2.values)

cs1
[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
[  0 100 200 300 400 500 600 700 800 900]
cs2
[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
[ 400  500  600  700  800  900 1000 1100 1200 1300]


cs1 += cs2
[  6.   8.  10.  12.  14.  16.  nan  nan  nan  nan]


cs1 += 1
[  7.   9.  11.  13.  15.  17.  nan  nan  nan  nan]


cs1 -= np.ones(index.size)
[  6.   8.  10.  12.  14.  16.  nan  nan  nan  nan]


cs2 = cs1 + cs1
[ 12.  16.  20.  24.  28.  nan  nan  nan  nan  nan]


Cubic Spline interpolation fails with Nan(s) values.
[ nan  nan  nan  nan  nan  nan  nan  nan  nan  nan]


Cubic Spline interpolation with filled Nan(s) values.
[ 12.  16.  20.  24.  28.  32.  32.  32.  32.  32.]


[  6.   8.  10.  12.  14.  16.  nan  nan  nan  nan]
  warn(*args, **kwargs)
[  7.   9.  11.  13.  15.  17.  nan  nan  nan  nan]
  warn(*args, **kwargs)
[ 12.  16.  20.  24.  28.  nan  nan  nan  nan  nan]
  warn(*args, **kwargs)
[ nan  nan  nan  nan  nan  nan  nan  nan  nan  nan]
  warn(*args, **kwargs)


In [7]:
class NullInterpolator(object):
    """

    Parameters
    ----------
    x : ndarray
        Independent :math:`x` variable values corresponding with :math:`y`
        variable.
    y : ndarray
        Dependent and already known :math:`y` variable values to
        interpolate.

    Methods
    -------
    __call__


    Examples
    --------
    Interpolating a single numeric variable:

    >>> y = np.array([5.9200,
    ...               9.3700,
    ...               10.8135,
    ...               4.5100,
    ...               69.5900,
    ...               27.8007,
    ...               86.0500])
    >>> x = np.arange(len(y))
    >>> f = NullInterpolator(x, y)
    >>> # Doctests ellipsis for Python 2.x compatibility.
    >>> f(0.5)  # doctest: +ELLIPSIS
    7.64...

    Interpolating an *array_like* variable:

    >>> f([0.25, 0.75])
    array([ 6.7825,  8.5075])
    """

    def __init__(self, x=None, y=None, tolerance=10e-7):
        self.__x = None
        self.x = x
        self.__y = None
        self.y = y
        self.__tolerance = None
        self.tolerance = tolerance        

        self.__validate_dimensions()

    @property
    def x(self):
        """
        Property for **self.__x** private attribute.

        Returns
        -------
        array_like
            self.__x
        """

        return self.__x

    @x.setter
    def x(self, value):
        """
        Setter for **self.__x** private attribute.

        Parameters
        ----------
        value : array_like
            Attribute value.
        """

        if value is not None:
            value = np.atleast_1d(value).astype(np.float_)

            assert value.ndim == 1, (
                '"x" independent variable must have exactly one dimension!')

        self.__x = value

    @property
    def y(self):
        """
        Property for **self.__y** private attribute.

        Returns
        -------
        array_like
            self.__y
        """

        return self.__y

    @y.setter
    def y(self, value):
        """
        Setter for **self.__y** private attribute.

        Parameters
        ----------
        value : array_like
            Attribute value.
        """

        if value is not None:
            value = np.atleast_1d(value).astype(np.float_)

            assert value.ndim == 1, (
                '"y" dependent variable must have exactly one dimension!')

        self.__y = value

    @property
    def tolerance(self):
        """
        Property for **self.__tolerance** private attribute.

        Returns
        -------
        numeric
            self.__tolerance
        """

        return self.__tolerance

    @tolerance.setter
    def tolerance(self, value):
        """
        Setter for **self.__tolerance** private attribute.

        Parameters
        ----------
        value : numeric
            Attribute value.
        """

        if value is not None:

            assert is_numeric(value), (
                '"tolerance" variable must be a "numeric"!')

        self.__tolerance = value


    def __call__(self, x):
        """
        Evaluates the interpolating polynomial at given point(s).


        Parameters
        ----------
        x : numeric or array_like
            Point(s) to evaluate the interpolant at.

        Returns
        -------
        float or ndarray
            Interpolated value(s).
        """

        x = np.atleast_1d(x).astype(np.float_)

        xi = as_numeric(self.__evaluate(x))

        return xi

    def __evaluate(self, x):
        """
        Performs the interpolating polynomial evaluation at given points.

        Parameters
        ----------
        x : ndarray
            Points to evaluate the interpolant at.

        Returns
        -------
        ndarray
            Interpolated points values.
        """

        self.__validate_dimensions()
        self.__validate_interpolation_range(x)

        index = nearest_index(self.__x, x)
        values = self.__y[index]
        print(self.__x[index], x)
        values[~np.isclose(self.__x[index], x, rtol=0, atol=self.__tolerance)] = 0
        
        return values

    def __validate_dimensions(self):
        """
        Validates variables dimensions to be the same.
        """

        if len(self.__x) != len(self.__y):
            raise ValueError(
                ('"x" independent and "y" dependent variables have different '
                 'dimensions: "{0}", "{1}"').format(len(self.__x),
                                                    len(self.__y)))

    def __validate_interpolation_range(self, x):
        """
        Validates given point to be in interpolation range.
        """

        below_interpolation_range = x < self.__x[0]
        above_interpolation_range = x > self.__x[-1]

        if below_interpolation_range.any():
            raise ValueError('"{0}" is below interpolation range.'.format(x))

        if above_interpolation_range.any():
            raise ValueError('"{0}" is above interpolation range.'.format(x))
            
x = np.array([4.5100, 5.9200, 9.3700, 10.8135, 27.8007, 69.5900, 86.0500])
y = np.array([1, 2, 3, 4, 5, 6, 7])

i = NullInterpolator(x, y)

i((4.510000001, 9, 10))

(array([ 4.51,  9.37,  9.37]), array([  4.51,   9.  ,  10.  ]))


array([ 1.,  0.,  0.])