In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import *
from numpy import random

## Consider the System Given by the Equations

$\dot{x_1} = \mu x_1$

$\dot{x_2} = \lambda (x_2-x_1^2)$

### Define and Collect Trajectories of the Nonlinear System

In [None]:
def nonlinear_system(t,X,params):
    # define system parameters
    mu = params[0]
    lam = params[1]
    # specify rate of change of each state
    dx1dt = mu*X[0]
    dx2dt = lam*(X[1]-X[0]**2)
    # roll up states into a numpy array
    dXdt = np.asarray([dx1dt,dx2dt])
    
    return dXdt

In [None]:
# define some initial parameters

dt = 0.01 # sampling time
final_time = 100 # time duration of each trajectory
Ntraj = 20 # number of trajectories
nstates = 2 # number of states in the system
Nsim = int(final_time/dt)
tspan = np.asarray([0,final_time])
t_vec = np.arange(0,final_time,dt)

# system parameters

mu = -0.05
lam = -1
params = np.asarray([mu,lam])

traj_tensor = np.empty([Ntraj,nstates,Nsim]) # initialize trajectory tensor

# initial conditions
initial_state = 2*np.random.uniform(-1,1,[nstates,Ntraj])-1

# collect all trajectries
for i in range(Ntraj):
    states = solve_ivp(nonlinear_system, tspan, initial_state[:,i], t_eval = t_vec, args = (params,))
    traj_tensor[i,:,:] = states.y

In [None]:
# Prepare X and Y matrices by appending trajectories one after the other

X = traj_tensor[0,:,:]

for i in range(1,Ntraj-1):
    X = np.hstack([X,traj_tensor[i,:,:]])
    
Y = traj_tensor[1,:,:]
for i in range(2,Ntraj):
    Y = np.hstack([Y,traj_tensor[i,:,:]])
    
# # Prepare X and Y matrices by interleaving trajectories among each other
# X = np.empty([nstates,Nsim*Ntraj-Ntraj])
# index = 0
# for i in range(Ntraj-1):
#     for j in range(Nsim):
#         traj_slice = traj_tensor[i,:,j]
#         X[:,index] = traj_slice
#         index = index + 1
        
# Y = np.empty([nstates,Nsim*Ntraj-Ntraj])
# index = 0
# for i in range(1,Ntraj):
#     for j in range(Nsim):
#         traj_slice = traj_tensor[i,:,j]
#         Y[:,index] = traj_slice
#         index = index + 1

### Lift

In [None]:
# define the Radial Basis Function that will aid in lifting the states
def rbf(X,C,rbf_type):
    
    Cbig = C ; Y = np.zeros([C.shape[1],X.shape[1]])
    
    for i in range(Cbig.shape[1]):
        
        C = np.asarray([Cbig[:,i]]).T
        C = np.tile(C,X.shape[1])
        r_squared = np.sum((X-C)**2,axis = 0)
        
        r_squared = np.reshape(r_squared,(1,len(r_squared)))
        y = r_squared*np.log(np.sqrt(r_squared))
        
        Y[i,:] = y
    
    return Y

In [None]:
Nrbf = 100 # number of RBF centers
cent = 2*np.random.uniform(-1,1,[nstates,Nrbf])-1 # generate random RBF centers
rbf_type = 'thin_plate' # specify the type of RBF

# obtain the lifted states
liftFun = lambda xx,cent: np.vstack([xx,rbf(xx,cent,rbf_type)])
# update the total dimension of the lifted state vector
Nlift = Nrbf+nstates

In [None]:
Xlift = liftFun(X,cent)
Ylift = liftFun(Y,cent)

print(Xlift.shape,Ylift.shape)

### Build Predictors

In [None]:
G = np.dot(Xlift,Xlift.T)
V = np.dot(Ylift,Xlift.T)

Alift = np.dot(V,np.linalg.pinv(G))
Clift = np.dot(X,np.linalg.pinv(Xlift))

In [None]:
residual = np.linalg.norm(Ylift - np.dot(Alift,Xlift),'fro') / np.linalg.norm(Ylift,'fro')
print(residual)

### Check Predictor Performance

In [None]:
Tmax = 1
Nsim_test = int(Tmax/dt)
t_vec_test = np.arange(0,Tmax,dt)
tspan_test = np.asarray([0,Tmax])

# Initial condition
x0 = [0.5, -0.5]
x0_l = np.asarray(x0)
x0_l = np.reshape(x0_l,(2,1))

# Lifted initial condition
xlift = liftFun(x0_l,cent)

In [None]:
# Simulate

# True dynamics
states = solve_ivp(nonlinear_system, tspan_test, x0, t_eval = t_vec_test, args = (params,))
x_true = states.y 
   
# Koopman predictor
for i in range(Nsim_test-1):
    
    term = np.dot(Alift,np.asarray([xlift[:,-1]]).T)
    xlift = np.hstack([xlift, term]) # Lifted dynamics
    
x_koop = np.dot(Clift, xlift) # Koopman predictions

### Plot the Results

In [None]:
plt.plot(t_vec_test,x_true[0,:], label = 'true')
plt.plot(t_vec_test,xlift[0,:], label = 'koopman')
plt.xlabel('Time')
plt.ylabel('States')
plt.title("State 1 Prediction Comparison")
plt.legend()
plt.show()

plt.plot(t_vec_test,x_true[1,:], label = 'true')
plt.plot(t_vec_test,x_koop[1,:], label = 'koopman')
plt.xlabel('Time')
plt.ylabel('States')
plt.title("State 2 Prediction Comparison")
plt.legend()
plt.show()