# Finite-rank approximation of Koopman operators

In [1]:
%load_ext nb_mypy
%nb_mypy On
    
%matplotlib ipympl

Version 1.0.4


In [4]:
from functools import partial
from ipywidgets import widgets, interactive, IntSlider
from nlsa.abstract_algebra import compose_by, multiply_by
from nlsa.dynamics import orbit
from nptyping import Complex, Double, Int, NDArray, Shape
from more_itertools import take
from scipy.linalg import polar
from scipy.stats import vonmises
from typing import Callable, Generator, Iterator, TypeVar, Tuple
import matplotlib.pyplot as plt
import nlsa.function_algebra as fun
import nlsa.fourier_s1 as f1
import nlsa.fourier_t2 as f2
import nlsa.matrix_algebra as mat
import nlsa.vector_algebra as vec
import numpy as np

In [5]:
R = float
R2 = Tuple[float, float]
I2 = Tuple[int, int]

X = NDArray[Shape["*"], Double]
X2 = NDArray[Shape["*, 2"], Double]
Y = TypeVar("Y", NDArray[Shape["*"], Double], NDArray[Shape["*"], Complex])
F = Callable[[X], Y]
F2 = Callable[[X2], Y]

N = TypeVar("N")
K = NDArray[Shape["N"], Int]
V = TypeVar("V", NDArray[Shape["N"], Double], NDArray[Shape["N"], Complex])
VR = NDArray[Shape["N"], Double]
M = TypeVar("M", NDArray[Shape["N"], Double], NDArray[Shape["N, N"], Complex])

## Approximation of observables

Before studying the approximation of the dynamics by projected Koopman operators, we examine aspects of approximation of observables by orthogonal projection onto the finite-dimensional subspaces $H_L$. As an illustrative example, we consider the von Mises distribution on the circle which we also employed in Notebook   

In [7]:
def vm(kappa: R) -> Callable[[X], Y]:
    """Von Mises probability density."""
    def f(x: X) -> Y:
        y = 2 * np.pi * vonmises.pdf(x, kappa, loc=np.pi) 
        return y
    return f

def vm_fourier(kappa: R, l: int) -> V:
    """Fourier coefficients of von Mises density"""
    vm_hat = f1.von_mises_fourier(kappa, loc=np.pi)
    f_hat: V = vm_hat(f1.dual_group(l))
    return f_hat
    
def vm_proj(kappa: R, l: int) -> Callable[[X], Y]:
    """Projected Von Mises probability density."""
    phi = f1.fourier_basis(l)
    f: Callable[[X], Y] = fun.synthesis(phi, vm_fourier(kappa, l))
    return f

In [8]:
kappa = 10
l_max = 10
xs = np.linspace(0, 2 * np.pi, 200)
g = vm(kappa)
gxs = g(xs)
fig1 = plt.figure(1)

def plotfunc1(l):
    f = vm_proj(kappa, l)
    fxs = np.real(f(xs))
    plt.cla()
    plt.plot(xs, gxs, label='True')
    plt.plot(xs, fxs, label=f'Projected, $L = {l}$')
    plt.xlabel('$\\theta$')
    plt.grid(True)
    plt.title("Approximation of von Mises density by orthogonal projection")
    plt.legend()
    plt.show()
    fig1.canvas.draw()
    
interactive(plotfunc1, l=IntSlider(value=0, min=0, max=l_max - 1)) 

interactive(children=(IntSlider(value=0, description='l', max=9), Output()), _dom_classes=('widget-interact',)…

## Approximation of the Koopman operator

### Circle rotation

In [12]:
def phi_rot(a: R, x: X) -> X:
    """Circle rotation."""
    y = (x + a) % (2 * np.pi) 
    return y


def u_rot(a: R) -> Callable[[F], F]:
    """Composition map induced by circle rotation."""
    phi = partial(phi_rot, a)
    u = compose_by(fun, phi)
    return u


def vm_rot(a: R, kappa: R) -> Generator[F, None, None]:
    """Orbit of the von Mises density under the Koopman operator for the 
    circle rotation.
    
    """
    f = vm(kappa)
    u = u_rot(a)
    f_orb = orbit(f, u)
    return f_orb


def u_rot_fourier(a: R, l: int) -> Callable[[V], V]:
    """Projected Koopman operator for circle rotation in Fourier domain."""
    spec = f1.rotation_koopman_eigs(a)
    u = multiply_by(vec, spec(f1.dual_group(l)))
    return u
    
    
def vm_rot_proj(a: R, kappa: R, l: int) -> Iterator[F]:
    """Orbit of the von Mises density under the projected Koopman operator 
    associated with the circle rotation.
    
    """
    f_hat = vm_fourier(kappa, l)
    u = u_rot_fourier(a, l)
    f_hat_orb = orbit(f_hat, u)
    phi = f1.fourier_basis(l)
    synth = partial(fun.synthesis, phi)
    f_orb = map(synth, f_hat_orb)
    return f_orb

<cell>37: error: Type variable "__main__.V" is unbound
<cell>37: note: (Hint: Use "Generic[V]" or "Protocol[V]" base class to bind "V" inside a class)
<cell>37: note: (Hint: Use "V" in function signature to bind "V" inside a function)
<cell>43: error: Incompatible return value type (got "map[Callable[[X], S_mat]]", expected "Iterator[Callable[[ndarray[Any, dtype[floating[Any]]]], Any]]")


In [9]:
a_2pi = 1 / np.sqrt(20)
kappa = 10
l_max = 10
n_iter = 10

a = 2 * np.pi * a_2pi
gs = take(n_iter, vm_rot(a, kappa)) 
xs = np.linspace(0, 2 * np.pi, 200)
fig2 = plt.figure(2)

def plotfunc2(l, n):
    fs = take(n_iter, vm_rot_proj(a, kappa, l)) 
    f = fs[n]
    g = gs[n]
    fxs = np.real(f(xs))
    gxs = g(xs)
    plt.cla()
    plt.plot(xs, gxs, label='True')
    plt.plot(xs, fxs, label=f'Projected, $L = {l}$')
    plt.xlabel('$x$')
    plt.grid(True)
    plt.title(f'Circle rotation by angle $a={a_2pi:.3f}\\times 2\pi$; iteration $n= {n}$')
    plt.legend()
    plt.show()
    fig2.canvas.draw()
    
    
interactive(plotfunc2, l=IntSlider(value=0, min=0, max=l_max - 1),
            n=IntSlider(value=0, min=0, max=n_iter - 1)) 

interactive(children=(IntSlider(value=0, description='l', max=9), IntSlider(value=0, description='n', max=9), …

### Doubling map

In [10]:
def phi_doubling(x: X) -> X:
    """Doubling map on the circle."""
    y = 2 * x % (2 * np.pi) 
    return y


u_doubling = compose_by(fun, phi_doubling)

def vm_doubling(kappa: R) -> Generator[F, None, None]:
    """Orbit of the von Mises density under the Koopman operator for the 
    doubling map.
    
    """
    f = vm(kappa)
    f_orb = orbit(f, u_doubling)
    return f_orb


def u_doubling_fourier(l: int) -> Callable[[V], V]:
    """Returns the representation of the projected Koopman operator associated
    with the doubling map in the Fourier basis.

    :l: Maximal wavenumber.
    :returns: Projected Koopman operator on Fourier coefficient vectors.

    """
    k = f1.dual_group(l)

    def g(y: Y) -> Y:
        z = np.zeros_like(y)
        z[k % 2 == 0] = y[np.abs(k) <= l // 2]
        return z
    return g


def vm_doubling_proj(kappa: R, l: int) -> Iterator[F]:
    """Orbit of the von Mises density under the projected Koopman operator for
    the doubling map.
    
    """
    f_hat = vm_fourier(kappa, l)
    u = u_doubling_fourier(l)
    f_hat_orb = orbit(f_hat, u)
    phi = f1.fourier_basis(l)
    synth = partial(fun.synthesis, phi)
    f_orb  = map(synth, f_hat_orb)
    return f_orb


def u_doubling_polar(l: int) -> Callable[[V], V]:
    """Returns the unitary part of the projected Koopman operator associated 
    with the doubling map in the Fourier basis.

    :l: Maximal wavenumber.
    :returns: Unitary matrix from polar decomposition of projected Koopman
    operator

    """
    u_mat = np.array(list(map(u_doubling_fourier(l),
                              np.eye(2 * l + 1))))
    u_unitary, p = polar(u_mat) 
    u = multiply_by(mat, u_unitary) 
    return u


def vm_doubling_polar(kappa: R, l: int) -> Iterator[F]:
    """Orbit of the von Mises density under the unitary part of the projected
    Koopman operator for the doubling map.
    
    """
    f_hat = vm_fourier(kappa, l)
    u = u_doubling_polar(l)
    f_hat_orb = orbit(f_hat, u)
    phi = f1.fourier_basis(l)
    synth = partial(fun.synthesis, phi)
    f_orb  = map(synth, f_hat_orb)
    return f_orb

<cell>41: error: Type variable "__main__.V" is unbound
<cell>41: note: (Hint: Use "Generic[V]" or "Protocol[V]" base class to bind "V" inside a class)
<cell>41: note: (Hint: Use "V" in function signature to bind "V" inside a function)
<cell>47: error: Incompatible return value type (got "map[Callable[[X], S_mat]]", expected "Iterator[Callable[[ndarray[Any, dtype[floating[Any]]]], Any]]")
<cell>77: error: Incompatible return value type (got "map[Callable[[X], S_mat]]", expected "Iterator[Callable[[ndarray[Any, dtype[floating[Any]]]], Any]]")


In [11]:
u_mat = np.array(list(map(u_doubling_fourier(l_max),
                          np.eye(2* l_max + 1))))
u_mat, p = polar(u_mat) 
u = multiply_by(mat, u_mat)
f_hat = vm_fourier(kappa, l_max)
g_hat = u(f_hat)
phi = f1.fourier_basis(l_max)
synth = partial(fun.synthesis, phi)
gx = synth(g_hat)
gx(np.array([0., 1.]))


<cell>10: error: Argument 1 has incompatible type "ndarray[Any, dtype[Any]]"; expected "X"


array([7.40084463+0.j        , 1.63069333-0.94556133j])

In [33]:
kappa = 10
l_max = 10
n_iter = 10

gs = take(n_iter, vm_doubling(kappa)) 
xs = np.linspace(0, 2 * np.pi, 200)
fig2 = plt.figure(3)

def plotfunc3(l, n):
    fs = take(n_iter, vm_doubling_proj(kappa, l)) 
    f = fs[n]
    g = gs[n]
    fxs = np.real(f(xs))
    gxs = g(xs)
    plt.cla()
    plt.plot(xs, gxs, label='True')
    plt.plot(xs, fxs, label=f'Projected, $L = {l}$')
    plt.xlabel('$\\theta$')
    plt.grid(True)
    plt.title(f'Doubling map; iteration $n= {n}$')
    plt.legend()
    plt.autoscale(enable=True, axis='x', tight=True)
    plt.show()
    fig2.canvas.draw()
    
    
interactive(plotfunc3, l=IntSlider(value=0, min=0, max=l_max - 1),
            n=IntSlider(value=0, min=0, max=n_iter - 1)) 

interactive(children=(IntSlider(value=0, description='l', max=9), IntSlider(value=0, description='n', max=9), …

In [42]:
kappa = 10
l_max = 10
n_iter = 10

gs = take(n_iter, vm_doubling(kappa)) 
xs = np.linspace(0, 2 * np.pi, 200)
fig2 = plt.figure(3)

def plotfunc4(l, n):
    fs = take(n_iter, vm_doubling_polar(kappa, l)) 
    f = fs[n]
    g = gs[n]
    fxs = np.real(f(xs))
    gxs = g(xs)
    plt.cla()
    plt.plot(xs, gxs, label='True')
    plt.plot(xs, fxs, label=f'Projected, $L = {l}$')
    plt.xlabel('$\\theta$')
    plt.grid(True)
    plt.title(f'Doubling map; iteration $n= {n}$')
    plt.legend()
    plt.autoscale(enable=True, axis='x', tight=True)
    plt.show()
    fig2.canvas.draw()
    
    
interactive(plotfunc4, l=IntSlider(value=0, min=0, max=l_max - 1),
            n=IntSlider(value=0, min=0, max=n_iter - 1)) 

interactive(children=(IntSlider(value=0, description='l', max=9), IntSlider(value=0, description='n', max=9), …

## Torus rotation

In [10]:
def vm2_fourier(kappa: R2, l: I2) -> V:
    """Fourier coefficients of von Mises density on T2"""
    vm_hat = f2.von_mises_fourier(kappa, loc=(np.pi, np.pi))
    f_hat = vm_hat(f2.dual_group(l))
    return f_hat
    
    
def vm2_proj(kappa: R2, l: I2) -> Callable[[X2], Y]:
    """Projected Von Mises probability density."""
    phi = f2.fourier_basis(l)
    f: Callable[[X2], Y] = fun.synthesis(phi, vm2_fourier(kappa, l))
    return f

In [11]:
kappa = (5, 10)
l_max = 10
n_plt = 201

theta = np.linspace(0, 2 * np.pi, n_plt)
x1, x2 = np.meshgrid(theta, theta)
x = np.concatenate((x1[:-1,:-1,np.newaxis], x2[:-1,:-1,np.newaxis]), axis=2)
#x = np.vstack((x1[:-1, :-1].ravel(), x2[:-1, :-1].ravel())).T
fig4 = plt.figure(4)

def plotfunc4(l1, l2):
    f = vm2_proj(kappa, (l1, l2))
    fx = np.real(f(x))
    plt.cla()
    plt.pcolormesh(x1, x2, fx)
    #plt.plot(fx[:,0])
    plt.xlabel('$x_1$')
    plt.ylabel('$x_2$')
    plt.title(f"Projected von Mises density, $(L_1, L_2) = ({l1}, {l2})$")
    plt.show()
    fig4.canvas.draw()
    

interactive(plotfunc4, l1=IntSlider(value=0, min=0, max=l_max - 1), 
            l2=IntSlider(value=0, min=0, max=l_max - 1)) 


interactive(children=(IntSlider(value=0, description='l1', max=9), IntSlider(value=0, description='l2', max=9)…

In [12]:
def u_rot2_fourier(a: R2, l: I2) -> Callable[[V], V]:
    """Projected Koopman operator for torus rotation in Fourier domain."""
    spec = f2.rotation_eigs(a)
    u = multiply_by(vec, spec(f2.dual_group(l)))
    return u
    
    
def vm_rot2_proj(a: R2, kappa: R2, l: I2) -> Iterator[F]:
    """Orbit of the von Mises density under the projected Koopman operator 
    associated with the torus rotation.
    
    """
    f_hat = vm2_fourier(kappa, l)
    u = u_rot2_fourier(a, l)
    f_hat_orb = orbit(f_hat, u)
    phi = f2.fourier_basis(l)
    synth = partial(fun.synthesis, phi)
    f_orb  = map(synth, f_hat_orb)
    return f_orb

<cell>19: error: Incompatible return value type (got "map[Callable[[X], S_mat]]", expected "Iterator[Callable[[ndarray[Any, dtype[floating[Any]]]], ndarray[Any, dtype[floating[Any]]]]]")


In [16]:
a_2pi = np.array([np.sqrt(2) / 10, np.sqrt(7) / 10]) 
kappa = (5, 10)
l_max = 10
n_iter = 10
n_plt = 201

a = tuple(2 * np.pi * a_2pi)
theta = np.linspace(0, 2*np.pi, n_plt)
x1, x2 = np.meshgrid(theta, theta)
x = np.concatenate((x1[:-1, :-1, np.newaxis], x2[:-1, :-1, np.newaxis]), 
                   axis=2)
fig5 = plt.figure(5)

def plotfunc5(l1, l2, n):
    fs = take(n_iter, vm_rot2_proj(a, kappa, (l1, l2))) 
    f = fs[n]
    fx = np.real(f(x))
    plt.cla()
    plt.pcolormesh(x1, x2, fx)
    plt.xlabel('$\\theta_1$')
    plt.ylabel('$\\theta_2$')
    plt.title(f"Projected von Mises density, $(L_1, L_2) = ({l1}, {l2})$, " \
              + f"iteration $n = {n}$")
    plt.show()
    fig4.canvas.draw()
    

interactive(plotfunc5, 
            l1=IntSlider(value=0, min=0, max=l_max - 1), 
            l2=IntSlider(value=0, min=0, max=l_max - 1), 
            n=IntSlider(value=0, min=0, max=n_iter - 1)) 

interactive(children=(IntSlider(value=0, description='l1', max=9), IntSlider(value=0, description='l2', max=9)…