## Create .gitignore file

In [None]:
%%file .gitignore

/temp

Notebook.ipynb

## Create TimeSeries Class

In [2]:
%%file timeseries.py
import reprlib
import numpy as np

class TimeSeries():
    
    '''
    Class object for storing and manipulating time series data.
    
    Parameters:
    -----------
        - times: a list of time indexes
        - values: an list of corresponding values 
        
    Methods:
    --------
        - times: returns a sequence of the times
        - values: returns a sequence of the values
        - items: returns a sequence of time, value tuples
        - interpolate: returns a TimeSeries object 

    '''
    
    def __init__(self, times=[], values=[]):
        
        if not(times == []) and values == []:
            values = np.zeros(len(times))
            
        assert len(times) == len(values), "Sequence of times does not match sequences of values."
        
        self._times = np.array(times)
        self._values = np.array(values)
        
    def __len__(self):
        """
        Returns the length of time series object
        """
        return len(self._times)
    
    def __getitem__(self, time):
        """
        Returns the corresponding value for a given time within the time series
        """       
        index = np.where(self._times==time)[0]
        
        if len(index) == 0:
            raise ValueError('Time ({}) not in TimeSeries'.format(time))
        
        return self._values[index[0]]
    
    def __setitem__(self, time, value):
        """
        Sets the value of a given time if the time is present with in the time series
        """
        index = np.where(self._times==time)[0]
        
        if len(index) == 0:
            raise ValueError('Time ({}) not in TimeSeries'.format(time))
        
        self._values[index] = value
        
        
    def __str__(self):
        """
        Returns a printable representation of the timeseries object

        If the length of the timeseries is greater than five we abreviate the values in between 
        the first and last values. 
        
        
        """
        
        name = type(self).__name__
        
        if len(self) <= 5:
            all_samples = [(t, v) for t, v in zip(self._times,self._values)]
            return "{} {} ({} Samples)".format(name, all_samples, len(self))
        else:
            first_sample = (self._times[0],self._values[0])
            last_sample = (self._times[-1], self._values[-1])
            return "{} [{}...{}] ({} Samples)".format(name, first_sample, last_sample, len(self))
        
    def __repr__(self):
        """
        Returns a representation of the timeseries object
        
        If the length of the timeseries is greater than five we abreviate the values in between 
        the first and last values. 
        """
        
        name = type(self).__name__
        
        if len(self) <= 5:
            all_samples = [(t, v) for t, v in zip(self._times,self._values)]
            return "{}({})".format(name, all_samples, len(self))
        else:
            first_sample = (self._times[0],self._values[0])
            last_sample = (self._times[-1], self._values[-1])
            return "{}({}...{})".format(name, first_sample, last_sample, len(self))
        
    def __contains__(self, time):
        return (time in self._times)
    
    def __iter__(self):
        for value in self._values:
            yield value
            
    def __eq__(self, ts):
        return self.items == ts.items
            
    @property
    def times(self):
        """
        Returns a sequence of the times within time series object
        """
        return self._times
    
    @property
    def values(self): 
        """
        Returns a sequence of the values within time series object
        """
        return self._values
    
    @property
    def items(self):
        """
        Returns a sequence of time, value tuples within time series object
        """
        return [(t,v) for t,v in zip(self._times,self._values)]
    
    def interpolate(self, times):
        """
        Takes a sequence of times and returns a new time series object with 
        values that are generated from the values within the current time series object
        """
        values = []
        
        for time in times:
            
            right_index = np.searchsorted(self._times, time, side='right')
            
            if right_index == len(self):
                values.append(self._values[-1])
                continue
            
            if right_index == 0:
                values.append(self._values[0])
                continue
                    
            left_index = right_index - 1
            
            
            left_time = self._times[left_index]
            right_time = self._times[right_index]
            
            left_val = self._values[left_index]
            right_val = self._values[right_index]
            
            time_delta = float(right_time - left_time)            
            val_delta = float(right_val - left_val)
            slope = time_delta / val_delta
            step = (time - left_time) / slope
            
            values.append(left_val+step)
            
        return TimeSeries(times, values)
    
    @property 
    def lazy(self):
        """
        Return a new LazyOperation instance using an identity function and 
        self as the only argument. This wraps up the TimeSeries instance 
        and a function which does nothing and saves them both for later.
        """ 
        def func(self):
            return self
        
        return LazyOperation(func, self)


Overwriting timeseries.py


In [5]:
a = TimeSeries([0,5,10], [1,2,3])
b = TimeSeries([2.5,7.5], [100, -100])
# Simple cases
assert a.interpolate([1]) == TimeSeries([1],[1.2])
assert a.interpolate(b.times()) == TimeSeries([2.5,7.5], [1.5, 2.5])
# Boundary conditions
assert a.interpolate([-100,100]) == TimeSeries([-100,100],[1,3])



## Create Smoke Test

In [None]:
%%file smoketest.py

from timeseries import TimeSeries

threes = TimeSeries(range(0,1000,3))
fives = TimeSeries(range(0,1000,5))

s = 0
for i in range(0,1000):
  if i in threes or i in fives:
    s += i

print("sum",s)

## Create Lazy Operations

In [None]:
%%file lazy.py

class LazyOperation:
    
    def __init__(self, func, *args, **kwargs):
        self.__func = func
        self.__args = args
        self.__kwargs = kwargs
        
    def eval(self):
        """
        Recursively evaluate LazyOperation object
        """
        arg_list = list(self.__args)
        for i, arg in enumerate(self.__args):
            if isinstance(arg, LazyOperation): 
                arg_list[i] = arg.eval()
        
        self.__args = tuple(arg_list)
                
        for key in self.__kwargs:
            if isinstance(self.__kwarg[key], LazyOperation):
                self.__kwarg[key] = self.__kwarg[key].eval()
                
        return self.__func(*self.__args, **self.__kwargs)

In [2]:
#%%file lazy_test.py
from timeseries import *
from lazy import *

def lazy(func):
    def inner(*args, **kwargs):
        return LazyOperation(func, *args,**kwargs)   
    return inner

@lazy
def lazy_add(a,b):
    return a+b

@lazy
def lazy_mul(a,b):
    return a*b

@lazy
def check_length(a,b):
  return len(a)==len(b)

thunk = lazy_mul( lazy_add(1,2), 4)
assert thunk.eval()==12

thunk = check_length(TimeSeries(range(0,4),range(1,5)), TimeSeries(range(1,5),range(2,6)))
assert thunk.eval()==True