In [None]:
from itertools import *
from functools import partial, reduce

import streamz   as sz
import cytoolz   as cz

import numpy as np
import pandas as pd

import holoviews as hv
from holoviews.streams import Buffer

hv.extension('bokeh')

# numpy output preprocessor

In [None]:
# utility to make np output prettier
import contextlib

@contextlib.contextmanager
def printoptions(*args, **kwargs):
    original = np.get_printoptions()
    np.set_printoptions(*args, **kwargs)
    try:
        yield
    finally: 
        np.set_printoptions(**original)

x = np.hstack( [np.random.normal(size=9),[0.]])
with printoptions(precision=3, floatmode='fixed', sign=' ', suppress=True):
    print(x)

In [None]:
# hv Dataframe buffer for plotting eigenvalues

def eigenvalue_plot():
    eigenvalue_estimates = pd.DataFrame({'iteration': [], 'eigenvalue': [], 'diff': []}, columns=['iteration', 'eigenvalue', 'diff'])
    eigenvalue_stream    = Buffer(eigenvalue_estimates, length=100, index=False)

    def plot(data):
        h = hv.Curve(data, 'iteration', 'eigenvalue', label='Evolution of the Eigenvalue')+\
            hv.Curve(data, 'iteration', 'diff',       label = 'Change in the Eigenvector')
        return h

    h =\
    hv.DynamicMap( plot, streams=[eigenvalue_stream] )\
      .relabel('Evolution of the Eigenvalue')

    return eigenvalue_stream, h

# Example: Power method to find dominant eigenvalue/eigenvector

Let $(\lambda_i, x_i)$ be the eigenvalue/eigenvector pairs of a non-degenerate matrix $A$, and assume $| \lambda_0 | > | \lambda_i |$ for all $i > 0$,<br>
i.e., $A$ has a 'dominant eigenvalue'.

For any vector $x = \sum_i { \alpha_i x_i }$, we have $A^n x = \sum_i {\lambda_i^n \alpha_i x_i}$. Factoring out $\lambda_0$, we see that
$$
A^n x = \lambda_0^n \alpha_0  x_0 + \sum_{i=1}{( \lambda_i/\lambda_0 )^n \alpha_i x_i} \approx \lambda_0^n \alpha_0 x_0
$$
as $n$ grows large.

Computing this expression recursively starting with some random vector $b$ with a component along $x_0$, i.e., $\alpha_0 \ne 0$, and rescaling the result at each step results in
* $b_{n+1} = A {\hat{b}_n}$, where for large $n$ we have $b_{n+1} \approx \lambda_0 x_0$
* $\hat{b}_{n+1} = b_{n+1} / | b_{n+1}$  to ensure good numerical behavior

In [None]:
# straightforward implementation

def power_method(A, x, n=1000, tol=1.e-8 ):
    '''Power method to obtain dominant eigenvector of a matrix
    
    Args:
        A   : a square np.array (assumed to have a dominant eigenvalue)
        x   : a starting np vector for the iteration (assumed to have a component along the dominant eigenvector)
        n   : maximum number of iterations
        tol : tolerance for a convergence criterion
        
    Returns:
        dominant eigenpair if found, None otherwise
    '''
    tol2 = tol*tol
    #evolution = []

    for i in range(n):
        new_x = A @ x
        
        # check for convergence
        diff       = new_x - x
        diff_norm2 = diff @ diff
        # to keep track of the evolution of the computation,
        #    we might use a list: evolution.append( diff_norm2, x_new)
        
        if diff_norm2 < tol2: break
        x = new_x / np.linalg.norm(new_x)

        if i%10 ==0: print(x, diff @ diff)
    else:
        return None
    
    e = x @ new_x   # Rayleigh quotient eigenvalue estimate. Note np.linalg.norm(x) == 1
    return e, new_x / np.linalg.norm(new_x)
    
with printoptions(formatter={'float': '{: 0.4f}'.format}):
    eigenvalue,eigenvector = power_method( np.array( [1.2, 1., 1., 0., .7, .5, 0., 0., .1 ]).reshape((3,3)), np.array([.1,.2,.8]), tol=.2 )
    print( 'Eigenvalue'); print('.  ', np.round(eigenvalue,4))
    print( 'Eigenvector');print('.  ', eigenvector)

# Itertools solution

In [None]:
# 'utility Stupid Itertools Tricks' https://github.com/joelgrus/stupid-itertools-tricks-pydata

def tail(it):
    '''drop 1st element'''
    next(it)
    return it

def take(n, it):
    ''' first n values from an it'''
    return [x for x in islice(it, n)]

def drop(n, it):
    '''new sequence with all but first n values'''
    return islice(it, n, None)

def iterate(f, x):
    """return (x, f(x), f(f(x)), ...)"""
    return accumulate(repeat(x), lambda fx, _: f(fx))

def until_convergence(it):
    """returns elements of it until the same element appears twice in a row,
    then stops"""
    def no_repeat(prev, curr):
        if prev == curr: raise StopIteration
        else: return curr

    return accumulate(it, no_repeat)

def within_tolerance(tol, prev, curr):
    if abs(prev - curr) < tol:
        raise StopIteration
    else:
        return curr

def until_nearly_converged(it, tolerance=0.001):
      return accumulate(it, partial(within_tolerance, tolerance))

### Power Method functions

In [None]:
# define a set of problem specific functions to be used
# Design Criterion: keep track of the evolution

# Iteration function designed to be applied repetedly: step( A, step(A, x))
def step( A, x ):
    ''' return y = A x normalized to unit length for numerical stability'''
    y = A @ x
    e = np.linalg.norm(y)           # estimated eigenvalue
    #eigenvalue =  x @ y            # Rayleigh coefficient: better estimate of the eigenvalue
    #                               #    We don't have an iteration count; if we emit it here, the receiver will have to count...
    return (y / e)                  # stabilize the iteration
    
# Convergence criterion
def change2( x, y ):
    ''' measure the change between two vectors ||x-y||^2'''
    diff = x - y
    return diff @ diff

def check_convergence(c,y,tol):
    '''given a change c of y from the previous vector x, return y unless c is sufficiently small
       return y rather than (c,y) to enable calling `step( A, check_convergence(c,y,tol)`)
    '''
    if c < tol: raise StopIteration  # <- Exception needed to terminate execution in the pipeline
    else:       return y

def random_point( xmin,xmax,ymin,ymax, n=2):
    '''get a random point within the specified extents'''
    return np.array([np.random.uniform(xmin,xmax) for _ in range(n)])

########################################################
def tst_functions():
    A=np.array( [1.2, 1., 1., 0., .7, .5, 0., 0., .1 ]).reshape((3,3))
    print('A =');print(A)

    v = random_point(-1.,1.,-1.,1.,n=3)
    w = random_point(-1.,1.,-1.,1.,n=3)
    print('\ncheck point shape: ', v.shape)
    print('check change       : ', change2(v, w))

    print('\nstep :           ', step( A, step( A, np.array([1.,1.,1.]))) )
    print('\nThis succeeds or fails with StopIteration')
    try:
        print('   **** within tolerance:', check_convergence( 1.,    w, 1.e-10))
        print('   **** within_tolerance:', check_convergence( 1e-15, v, 1.e-10))
    except StopIteration:
        print('   **** StopIteration')
        
tst_functions()

In [None]:
# Implement the method: Curry the functions to remove constant problem specific arguments
# Note we want to **make certain we return the LATEST iterate**

# curry step, random point, and check_convergence
stepA            = partial( step, np.array( [[1.2, 1., 1.], [0., .7, .5], [0., 0., .1] ]) )
randomVec        = partial( random_point, xmin=-1,xmax=1,ymin=-1,ymax=1,n=3)
checkConvergence = partial( check_convergence, tol= 1e-10)

# implement a step with the curried functions:
#     since we want to compute f(f(f...f(x)...)), we need to make sure inputs and outputs match!
def nxt_step(c_x, step_func, check_func, change_func):
    ''' given c_x=(cost,vector), check convergence, otherwise compute the next (cost,vector) pair'''
    y = step_func( check_func(*c_x) )   # if the check_func fails, the computation stops and step_func is never executed
    return (change_func(c_x[1],y), y)   # return the new  change,vector pair

nxtStep = partial( nxt_step, step_func=stepA, check_func=checkConvergence, change_func=change2)

## Algorithm pipeline execution

In [None]:
# The pipeline
# ------------------------------------------------------------------------------------------------
# the algorithm:  starting with a random vector, iterate the nxtStep function up to 500 times
# Note the result of all our work: the pipeline is generic, although the stopping criterion is not explicit:
#     take(500, iterate(  nxtStep, (np.Inf,randomVec())) )

with printoptions(formatter={'float': '{: 0.4f}'.format}):
    print( 'POWER METHOD')
    for c,x in drop(1, take(500, iterate(  nxtStep, (np.Inf,randomVec())) )):
        print('%.4f'%c, x )

In [None]:
# Keep the iterates, finish by computing the eigenvalue
l=take(500, iterate(  nxtStep, (np.Inf,randomVec())) )
with printoptions(formatter={'float': '{: 0.4f}'.format}):
    print('============= Final result ===============')
    print( 'Eigenvector')
    print('.  ', l[-1][1], 'Converged: ', len(l)< 500)
    print('Eigenvalue Estimate: Oops, all the vectors are normalized to one, and we did not keep the matrix')
    print('.  ', l[-1][1] @ np.array( [[1.2, 1., 1.], [0., .7, .5], [0., 0., .1] ]) @ l[-1][1])

---
>Summary: Unsatisfactory
* pro: a reusable pipeline for an iterative algorithm
* pro: maintains the evolution history of the algorithm
* pro: could easily be modified to update a plot during computations useing hv streams
* con: next_step function design requires carful thought to ensure it can be applied to itself
* con: the stopping criterion is not explicit
* con: changes such as keeping additional information (the eigenvalue estimate) requires rethinking the implementation
* con: the problem specific inputs are no longer apparent unless we allow far more verbiage, making the code hard to read

```python
A=np.array([[12.,1.,-1.],[20.,2.,1.],[0.,0.,30.]])
take(     500,
iterate(  partial( nxt_step, step_func  = partial( step, A),
                             check_func = partial( check_convergence, tol=1e-6),
                             change_func= change2 ),
          (np.Inf, randomVec())
))
```

The problem arises due to the insistance that the stepA function can be iterated:
a better decomposition of the algorithm iterates

-- x -> STEP -- new_x, new_eigenvalue, ||new_x-x|| -> CHECK_CONVERGENCE -> CHECK_ITERATION_LIMIT -> OUTPUT -- x -> LOOP

which invites branching (hard to do with itertools!)

# Toolz Implementation

https://toolz.readthedocs.io/en/latest/index.html

toolz provides all the iterator tools we have defined, and more!
As such, the solution looks very similar

# Streamz: More complex pipelines with loops, feedback, etc...

## Practice exercises

In [None]:
#just try something
def incr(x): return x+1
def pr(x):   print(x,end=', ')

data = []

# set up pipeline
pipeline = sz.Stream()
pipeline.map(incr).sliding_window(2).map(data.append).sink(pr)
print('append returns None, starts seeing data with the second emit!')
print('.  ',end='')
for n in range(4): pipeline.emit(n)
print()
print('Overall, we accumulated',data)

In [None]:
# now construct a pipeline that feeds itself
def looper(p,lmt=7):
    def f(x):
        a,b=x
        if b < lmt: p.emit(b+1)
        return x
    return f

data=[]
pipeline = sz.Stream()
pipeline.sliding_window(2).map(looper(pipeline,lmt=7)).sink(data.append) #.sink(pr);
pipeline.emit(-1);pipeline.emit(0);
print('The pipeline is recursive: first input is appended last!!!!!',end='\n\n')
print(data)

In [None]:
pipeline.emit(15)
print(data)
print(pipeline.sink_to_list())

## streamz version of the power method

In [None]:
def power_method_step(A, x):
    '''implements a single step of the power method
    
    Args:
        A  : the np.array matrix
        x  : the current np.array unit vector
        
    Returns
        diff2, eigval_estimate, eigvec_estimate, where diff2 is the l2 norm of the change in the eigvec_estimate
    '''
    print('step', A, x)
    new_x        = A @ x

    normalized_x = new_x / np.linalg.norm(new_x)
    diff         = normalized_x - x
    
    return diff @ diff, x @ new_x, normalized_x

def check_convergence(cur, n_step, n_max, tol):
    '''power method convergence
    
    Args:
        n_step:  the current step number
        cur:     current output of step(A, x)
        tol:     tolerance
        n_max:   maximum number of steps
        
    Returns:
        1 if converged, -1 if too many steps, 0 otherwise
    '''
    print('conv_checker',cur)
    return 1 if cur[0] < tol else -1 if n_step >= n_max else 0

# ============================================================
# curry the functions and build the pipeline

def make_convergence_checker(n_max,tol):
    '''don't like this: need a local variable!
    We should be able to use an accumulator
    with state = current exe count!
    '''
    def checkConvergence( cur ):
        checkConvergence.cur_step += 1
        return cur,check_convergence(cur, checkConvergence.cur_step, n_max, tol)

    checkConvergence.cur_step = 0
    return checkConvergence
# ---------------------------------------------
def make_plot(hv_stream):
    '''don't like this: need a local variable!
    We should be able to use an accumulator
    with state = current exe count!
    '''
    def send( cur ):
        print('plot_stream')
        send.cur_step += 1
        hv_stream.send( pd.DataFrame([(send.cur_step, cur[1], np.sqrt( np.abs(cur[0])))], columns=['iteration', 'eigenvalue', 'diff']) )

    send.cur_step = 0
    return send
# ---------------------------------------------
def loop(cur_cnv, pipeline):
    print('loop', cur_cnv)
    if cur_cnv[1] == 0: pipeline.emit(cur_cnv[0])
# ---------------------------------------------
def save_result(cur, state):
    '''accumulate intermediate results and pass through: better implemented with a sink in the pipeline'''
    state.append(cur)
    return cur

# ===========================================================
# curried functions and pipeline:
pipeline = sz.Stream()

def makeStep(A):
    def step(x):
        return step.A @ x
    step.A=A
    return step

stepA = makeStep(np.array([[12., 1., -1.],
                           [20., 2.,  1.],
                           [ 0., 0., 30.]]))


# stepA    = partial( step,
#                     np.array([[12., 1., -1.],
#                               [20., 2.,  1.],
#                               [ 0., 0., 30.]]))

evolution = []
checkConv = make_convergence_checker( 500, 1e-8 )
untilDone = partial( loop, pipeline=pipeline )

eigenvalue_stream,eigenvalue_dmap = eigenvalue_plot()

# PIPELINE construction --------------#  eigenvector estimate ->
p_step = pipeline.map(stepA)          #  ||diff||_2, eigenvalue_estimate, eigenvector_estimate  ->
p_step.sink( evolution.append )       #              emit eigenvalue_estimate, accumulate values
p_step.sink( make_plot( eigenvalue_stream ))
p_step.map(checkConv).rate_limit(0.50).sink(untilDone) #  (||diff||_2, eigenvalue_estimate, eigenvector_estimate), cnv indicator -> None

display(pipeline.visualize(source_node=True, rankdir="LR"))         # not too useful with curried functions!

eigenvalue_dmap.options( {'Curve': {'width':500, 'show_grid':True}, 'Curve.Change_in_the_Eigenvector':{'logy':True}})

In [None]:
eigenvalue_stream.clear()

pipeline.emit(np.random.normal(size=3));

In [None]:
for e in evolution:
    print('diff = %.4f, e_est = %10.4f' % (e[0], e[1]), end='  e_vec =' )
    with printoptions(formatter={'float': '{: 0.4f}'.format}):
          print(e[2])


> Much easier to handle then itertools
* need to figure out accumulate to get a step counter
* need to figure out argument passing options to keep better track of what goes where
* need to figure out a good way to name functions so pipeline.visualize makes more sense...

## Start Over

In [None]:
def power_method_step(A, x):
    '''implements a single step of the power method
    
    Args:
        A  : the np.array matrix
        x  : the current np.array unit vector
        
    Returns
        diff2, eigval_estimate, eigvec_estimate, where diff2 is the l2 norm of the change in the eigvec_estimate
    '''
    #print('power_method_step'); print( A ); print( x )
    new_x        = A @ x

    normalized_x = new_x / np.linalg.norm(new_x)
    diff         = normalized_x - x
    
    return {'diff2': diff @ diff, 'e_val': x @ new_x, 'e_vec' : normalized_x }

def power_method_convergence_check(diff, n_step, n_max, tol):
    '''power method convergence
    
    Args:
        n_step:  the current step number
        diff:    convergence estimate
        tol:     tolerance
        n_max:   maximum number of steps
        
    Returns:
        1 if converged, -1 if too many steps, 0 otherwise
    '''
    print('power_method_convergence_check',cur)
    return 1 if diff < tol else -1 if n_step >= n_max else 0

In [None]:
# Build the pipeline: 1) insert a step counter

# # a decorator to unpack the args
# def unpack_args(func):
#     ''' unpack function args decorator
    
#     Example:
#         @unpack_args
#         def foo(a,b): return a+b

#         foo( (1,2) ) # returns 3
#     '''
#     def f(a):
#         return func(*a)
#     return f

# a running count:
def counter( state, nxt):
    state += 1
    #return state,(state,*nxt)
    return state,(state,nxt)

pipeline = sz.Stream()

pipeline.accumulate( counter, returns_state=True, start=0).sink(print)

display(pipeline.visualize(source_node=True, rankdir="LR"))

for _ in range(5):
    pipeline.emit({'aha':(1,2)})
pipeline.emit((7,8));

In [None]:
# Step2: run a power method step, add a step count

foo=np.array([1.,2.,3.])
A = np.array([[12., 1., -1.],
              [20., 2.,  1.],
              [ 0., 0., 30.]])

def makeStep(A):
    '''this is faster! and lets me reassign A! and yields a sane function name!'''
    def step(x):
        return power_method_step( step.A, x )
    step.A=A
    return step

# --------------------------------------------------
def counter( state, nxt):
    state += 1

    nxt['step_n'] = state
    return state,nxt

# --------------------------------------------------
def saveResults():
    def save(res):
        save.results.append( res )
    save.results=[]
    return save

# ====================================================================================
pipeline = sz.Stream()
before_sink = pipeline.map( makeStep(A) )\
                      .accumulate( counter, returns_state=True, start=0)
#results  = [];   before_sink.sink( results.append)
save = saveResults()
before_sink.sink( save)
before_sink.sink(print)


display(pipeline.visualize(source_node=True, rankdir="LR"))
print( 'Sink Print output')
for i in range(3):
    pipeline.emit(np.array([1.+float(i),1.,2.]));
print( 'Sink Result List')
for res in save.results:
    print(res)
    
print('Caller loop back')
save.results=[]; before_sink.state=0
# This shows the power algorithm in operation:
#      No convergence test, run a fixed number of steps and accumulate the results

pipeline.emit(np.array([1.,1.,1.]));
for _ in range(20):
    pipeline.emit( save.results[-1]['e_vec'])
    
hv.Curve( ([ x['step_n'] for x in save.results], [x['e_val'] for x in save.results]))

In [None]:
# insert a convergence criterion

def check_convergence(diff2, n_step, n_max, tol2):
    '''power method convergence
    
    Args:
        n_step:  the current step number
        cur:     current output of step(A, x)
        tol:     tolerance
        n_max:   maximum number of steps
        
    Returns:
        1 if converged, -1 if too many steps, 0 otherwise
    '''
    #print( 'diff2', diff2, ', cnv?', diff2<tol2, '; n_step', n_step,' < ', n_max)
    return 1 if diff2 < tol2 else -1 if n_step >= n_max else 0

def has_converged(n_max, tol):
    def checkConvergence(cur):
        cur['cnv_flag'] = check_convergence( cur['diff2'], cur['step_n'], checkConvergence.n_max, checkConvergence.tol2 )
        return cur

    checkConvergence.tol2  = tol*tol
    checkConvergence.n_max = n_max
    return checkConvergence


pipeline = sz.Stream()
before_sink = pipeline.map( makeStep(A) )\
                      .accumulate( counter, returns_state=True, start=0)\
                      .map( has_converged( 3, 1e-22))

save = saveResults(); before_sink.sink( save)
before_sink.sink(print)


display(pipeline.visualize(source_node=True, rankdir="LR"))

pipeline.emit(np.array([1.,2.,1.]));

In [None]:
# step, add stepcount, check convergence, loop back and print
def loopback_ntimes(n, pipeline):
    def loop( res_dict ):
        if loop.n > res_dict['step_n'] and res_dict['cnv_flag'] == 0:
            #print( 'LOOP ', res_dict['step_n'], end=' ')
            loop.p.emit(res_dict['e_vec'])
        return res_dict
    loop.n = n
    loop.p = pipeline
    return loop

def show_ppresults(x):
    with printoptions(formatter={'float': '{: 0.4f}'.format}):
        print( '%5d:  e %10.4f  '%(x['step_n'], x['e_val']), '  v ',  x['e_vec'], '  diff2', '%12.6g'%x['diff2'] )

# --------------------------------------------------------
# build the new pipeline

pipeline = sz.Stream(loop=None)
pm_out   = pipeline.map(makeStep(A))\
              .accumulate( counter, returns_state=True, start=0)\
              .map(has_converged(50,1e-8))
pm_out.sink(loopback_ntimes(50,pipeline))
pm_out.sink(show_ppresults)
#foo.sink(lambda x: print( 'e %10.4f  '%x['e_val'], '  v ', x['e_vec'].round(4), 'd', x['diff2'], x['step_n']))



display(pipeline.visualize(source_node=True, rankdir="LR"))

pipeline.emit(np.random.uniform(size=3));

# Puzzle: why does the lambda print function print values in reverse order?

In [None]:
# Finally, add a ratelimiter and a hv Stream to display the evolution

eigenvalue_stream, dmap = eigenvalue_plot()
def to_Hv(x):
    eigenvalue_stream.send( pd.DataFrame({'iteration':[x['step_n']], 'eigenvalue': [x['e_val']], 'diff':[x['diff2']]},
                                         columns=['iteration', 'eigenvalue','diff']) )


pipeline = sz.Stream(loop=None)
pm_out   = pipeline.map(makeStep(A))\
              .accumulate( counter, returns_state=True, start=0)\
              .map(has_converged(50,1e-8))\
              .rate_limit(0.5)
pm_out.sink(loopback_ntimes(50,pipeline))
pm_out.sink(show_ppresults)
pm_out.sink(to_Hv)

display(pipeline.visualize(source_node=True, rankdir="LR"))
dmap.options({'Curve':{'width':500, 'show_grid':True, 'logy':True}}).relabel('')

In [None]:
pipeline.emit( np.random.uniform(size=3));

> Better, but:
* data representation: move dictionaries or dataframes rather than tuples
* package functions such that their stored data can be reset:
    * to modify A, we need to set `step.A`
    * to restart a computation, we need to reset the counter, clear the hv stream,...
    * to change convergence criteria, we need checkConvergence.tol...

# Stuff

In [None]:
def make_plot_with_stream():  
    update_pipe = hv.streams.Pipe( data=None )

    def plot( data ):
        if data is None:
            h = (hv.Text(0.5,0.5, 'x' ).options(text_alpha=1.)*\
                hv.Curve(([1,3,2]))).relabel('Data')
        else:
            h = (hv.Text(0.5,0.5, str(data)+'%' ).options(color='red',text_alpha=1.)*\
                hv.Curve([])).relabel(str(data)+'%')
            
        return h

    h =\
    hv.DynamicMap( plot, streams=[update_pipe] )

    return update_pipe, h

update_pipe,h = make_plot_with_stream()
h

In [None]:
import time
for i in np.arange(0,101,20):
    update_pipe.send(i)
    time.sleep(.5)
update_pipe.send(None)

In [None]:
update_pipe.data

# Debug

In [None]:
pipeline = sz.Stream()
data     = []

def pr(x):
    print(type(x))

pipeline.map(data.append).sink(pr)
for n in range(4):
    pipeline.emit( np.array([1,2,3])+n)
data