In [1]:
import torch
import numpy as np
import random

from dataset.utils import DataLoader
from models.DynGraphESN import DynGESNModel
from tqdm import tqdm
from einops import rearrange

seed = 42
random.seed(seed)
torch.manual_seed(seed)
np.random.seed(seed)

if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

config = {
        'reservoir_size': 800,
        'input_scaling': 0.15,
        'reservoir_layers': 1,
        'leaking_rate': 0.1,
        'spectral_radius': 0.99,
        'density': 0.25,
        'reservoir_activation': 'tanh',
        'lr': 0.05,
        'epochs': 100,
        'alpha_decay': False,
        'add_self_loops': False,
        'b_leaking_rate': True,
        }



# Load & process datasets

Japanese_Vowels:

Multivariate time series classification.
Samples: 640 (270 training, 370 test)
Features: 12
Classes: 9
Time series length: 29

In [2]:
downloader = DataLoader()
Xtr, Ytr, Xte, Yte = downloader.get_data('Japanese_Vowels')

Loaded Japanese_Vowels dataset.
Number of classes: 9
Data shapes:
  Xtr: (270, 29, 12)
  Ytr: (270, 1)
  Xte: (370, 29, 12)
  Yte: (370, 1)


In [3]:
X = torch.from_numpy(np.concatenate((Xtr, Xte), axis=0)).float().to(device)
Y = torch.from_numpy(np.concatenate((Ytr, Yte), axis=0)).float().to(device)

In [4]:
# Define the model
feat_size = X.shape[2]
model = DynGESNModel(input_size=feat_size,
                        reservoir_size=config['reservoir_size'],
                        input_scaling=config['input_scaling'],
                        reservoir_layers=config['reservoir_layers'],
                        leaking_rate=config['leaking_rate'],
                        spectral_radius=config['spectral_radius'],
                        density=config['density'],
                        reservoir_activation=config['reservoir_activation'],
                        alpha_decay=config['alpha_decay'],
                        b_leaking_rate=config['b_leaking_rate']).to(device)

inputs = []
labels = []
states = []

for n in tqdm(range(X.shape[0])):
    node_label = rearrange(X[n], 't f -> t 1 f') # time steps, 1 node, features
    graph_label = Y[n]
    edge_index = torch.tensor([[0], [0]], dtype=torch.long).to(device) # single node graph
    edge_weight = torch.tensor([1], dtype=torch.float32).to(device) # single node graph
    output = model(node_label, edge_index, edge_weight)
    output = rearrange(output, 't n l f -> t n (l f)')
    inputs.append(output.detach().cpu()[-1])
    states.append(output.detach().cpu())
    labels.append(graph_label.squeeze().detach().cpu())

inputs = torch.stack(inputs, dim=0).squeeze()
states = torch.stack(states, dim=0).squeeze()
labels = torch.stack(labels, dim=0)

100%|██████████| 640/640 [00:10<00:00, 62.96it/s]


In [13]:
states.shape

torch.Size([640, 29, 800])

# Train readout

In [5]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.linear_model import Ridge
from sklearn.metrics import accuracy_score

Xtr, Xte, Ytr, Yte = train_test_split(states, labels, test_size=0.42, random_state=seed)

onehot_encoder = OneHotEncoder(sparse_output=False)
Ytr = onehot_encoder.fit_transform(Ytr.unsqueeze(dim=1))
Yte = onehot_encoder.transform(Yte.unsqueeze(dim=1))

readout = Ridge(alpha=1.0)

readout.fit(Xtr[:,-1,:], Ytr)

logits = readout.predict(Xte[:,-1,:])
pred_class = np.argmax(logits, axis=1)

true_class = np.argmax(Yte, axis=1)
accuracy = accuracy_score(true_class, pred_class)

print('Accuracy: ', accuracy)

Accuracy:  0.966542750929368


# State space model

Dimensionality reduction "TensorPCA"

In [6]:
import numpy.linalg as linalg

X = Xtr.numpy()
Xt = np.swapaxes(X,1,2)  # [N,T,V] --> [N,V,T]
Xm = np.expand_dims(np.mean(X, axis=0), axis=0) # mean sample
Xmt = np.swapaxes(Xm,1,2)

C = np.tensordot(X-Xm,Xt-Xmt,axes=([1,0],[2,0])) / (X.shape[0]-1) # covariance of 0-mode slices

# sort eigenvalues of covariance matrix
eigenValues, eigenVectors = linalg.eig(C)
idx = eigenValues.argsort()[::-1]   
eigenVectors = eigenVectors[:,idx]

n_components = 75
first_eigs = eigenVectors[:,:n_components]

Xtr_dimred = np.einsum('klj,ji->kli', X, first_eigs)
Xte_dimred = np.einsum('klj,ji->kli', Xte.numpy(), first_eigs)

In [20]:
Xte_dimred.shape

(269, 29, 75)

Train a linear model between reduced states

In [7]:
fit_intercept = False
ridge_embedding = Ridge(alpha=1.0, fit_intercept=fit_intercept)

coeff_tr = []
biases_tr = [] 

for i in range(X.shape[0]):
    ridge_embedding.fit(Xtr_dimred[i, 0:-1, :], Xtr_dimred[i, 1:, :])
    coeff_tr.append(ridge_embedding.coef_.ravel())
    if fit_intercept:
        biases_tr.append(ridge_embedding.intercept_.ravel())
if fit_intercept:
    input_repr_tr = np.concatenate((np.vstack(coeff_tr), np.vstack(biases_tr)), axis=1)
else:
    input_repr_tr = np.vstack(coeff_tr)

readout.fit(input_repr_tr, Ytr)

Train readout on state model representation

In [8]:
readout.fit(input_repr_tr, Ytr)

Test it

In [9]:
coeff_te = []
biases_te = []   
         
for i in range(Xte.shape[0]):
    ridge_embedding.fit(Xte_dimred[i, 0:-1, :], Xte_dimred[i, 1:, :])
    coeff_te.append(ridge_embedding.coef_.ravel())
    if fit_intercept:
        biases_te.append(ridge_embedding.intercept_.ravel())

if fit_intercept:
    input_repr_te = np.concatenate((np.vstack(coeff_te), np.vstack(biases_te)), axis=1)
else:
    input_repr_te = np.vstack(coeff_te)

logits = readout.predict(input_repr_te)
pred_class = np.argmax(logits, axis=1)

true_class = np.argmax(Yte, axis=1)
accuracy = accuracy_score(true_class, pred_class)

print('Accuracy: ', accuracy)

Accuracy:  0.9888475836431226


# Same thing, with KANN

Dimensionality reduction with standard PCA

In [33]:
Xtr.shape

torch.Size([371, 29, 800])

In [10]:
from sklearn.decomposition import PCA

pca = PCA(n_components=75)
pca.fit(Xtr.reshape(-1, Xtr.shape[-1]))
pca_proj = pca.components_

# project states
Xtr_dimred_pca = np.einsum('klj,ji->kli', Xtr, pca_proj.T)
Xte_dimred_pca = np.einsum('klj,ji->kli', Xte.numpy(), pca_proj.T)

In [38]:
Xtr_dimred_pca.shape

(371, 29, 75)

Find the linear operator

In [11]:
from DMD.dmd import KANN

Ks = []

dim_red = 0
method = None

# Koperator = Ridge(alpha=0.00001, fit_intercept=False)

for x in Xtr_dimred_pca:
    # Koperator.fit(x[:-1,:], x[1:,:])
    # Ks.append(Koperator.coef_.flatten())
    kann_ = KANN(x[np.newaxis,:,:], k=dim_red, emb=method)
    KL = kann_.compute_KOP()
    Ks.append(KL.flatten())

Ks = np.stack(Ks, axis=0)

# Fit a classifier on linear operators
Kclassifier = Ridge(alpha=1.0)

Kclassifier.fit(Ks, Ytr)

# Test the classifier
Ks_te = []

for x in Xte_dimred_pca:
    # Koperator.fit(x[:-1,:], x[1:,:])
    # Ks_te.append(Koperator.coef_.flatten())
    kann_ = KANN(x[np.newaxis,:,:], k=dim_red, emb=method)
    KL = kann_.compute_KOP()
    Ks_te.append(KL.flatten())

Ks_te = np.stack(Ks_te, axis=0)

y_pred = Kclassifier.predict(Ks_te)
pred_class = np.argmax(y_pred, axis=1)

true_class = np.argmax(Yte, axis=1)
accuracy = accuracy_score(true_class, pred_class)
print(f"Accuracy: {accuracy}")

Accuracy: 0.9888475836431226


In [12]:
KL.shape

(75, 75)