In [None]:
import torch
from torch import nn
from torch import optim
from torch.autograd import Variable

%matplotlib inline
import matplotlib.pyplot as plt

import pysindy as ps

from sklearn.preprocessing import PolynomialFeatures

from scipy import io as sio
from scipy.linalg import lstsq

import numpy as np

from sgolay2 import SGolayFilter2

from tqdm import trange

In [None]:
# class NetL(nn.Module):
#     def __init__(self, n_feature, n_hidden, n_output):                                                                              
#         super(NetL, self).__init__()
#         self.fc1 = nn.Linear(n_feature, n_hidden)
#         self.fc2 = nn.Linear(n_hidden, n_hidden)
#         self.fc3 = nn.Linear(n_hidden, n_hidden)
#         self.fc4 = nn.Linear(n_hidden, n_hidden)
#         self.predict = nn.Linear(n_hidden, n_output)
#     def forward(self, x):
#         out = torch.sin((self.fc1(x)))
#         out = torch.sin((self.fc2(out)))
#         out = torch.sin((self.fc3(out)))
#         out = torch.sin((self.fc4(out)))
#         out = self.predict(out)
#         return out

class Net(nn.Module):
    def __init__(self, n_feature, n_hidden, n_output):                                                                              
        super(Net, self).__init__()
        self.fc1 = nn.Linear(n_feature, n_hidden)
        self.fc2 = nn.Linear(n_hidden, n_hidden)
        self.predict = nn.Linear(n_hidden, n_output)
    def forward(self, x):
        out = torch.sin((self.fc1(x)))
        out = torch.sin((self.fc2(out)))
        out = torch.sin((self.fc2(out)))
        out = torch.sin((self.fc2(out)))
        out = self.predict(out)
        return out

In [None]:
# data = sio.loadmat('../../SGA-PDE/codes/data/KdV.mat')
# u = data.get("uu"); u_clean = u.copy()
# x = np.squeeze(data.get("x"))
# t = np.squeeze(data.get("tt").reshape(1,201))

data = sio.loadmat('../Datasets/KdV_rudy.mat')
u = (data['usol']).real; u_clean = u.copy()
x = data['x'][0]
t = data['t'].flatten()

print("Domain shape:", u.shape)

In [None]:
dt = t[1]-t[0]
dx = x[1]-x[0]
X, T = np.meshgrid(x, t)
XT = np.asarray([X, T]).T

In [None]:
diff = ps.FiniteDifference(is_uniform=True)
u_clean_t = diff._differentiate(u_clean.T, t).T
u_clean_x = diff._differentiate(u_clean, x)
u_clean_xx = diff._differentiate(u_clean_x, x)
u_clean_xxx = diff._differentiate(u_clean_xx, x)

base_features = np.hstack([u_clean_t.reshape(-1, 1), 
                           u_clean.reshape(-1, 1), 
                           u_clean_x.reshape(-1, 1), 
                           u_clean_xx.reshape(-1, 1), 
                           u_clean_xxx.reshape(-1, 1)])

In [None]:
np.random.seed(0)
noise_lv = float(30)
print("Noise level:", noise_lv)
noise = 0.01*np.abs(noise_lv)*(u.std())*np.random.randn(u.shape[0],u.shape[1])
u = u + noise

In [None]:
denoise = True
if denoise:
    u = SGolayFilter2(window_size=19, poly_order=3)(u)

In [None]:
device = torch.device('cpu')
net = Net(2, 50, 1).to(device)
xt = XT.reshape(-1, 2)
u_vec = u.reshape(-1, 1)

In [None]:
train_size = 50000
train_indices = np.random.randint(0, len(u_vec), size=train_size)
test_indices = np.array(list(set(range(len(u_vec))) - set(train_indices)))

xt_train = torch.from_numpy(xt[train_indices, :]).float()
xt_train = Variable(xt_train).to(device)
u_train = torch.from_numpy(u_vec[train_indices]).float()
u_train = Variable(u_train).to(device)

xt_test = torch.from_numpy(xt[test_indices, :]).float()
xt_test = Variable(xt_test).to(device)
u_test = torch.from_numpy(u_vec[test_indices]).float()
u_test = Variable(u_test).to(device)

In [None]:
criterion = nn.MSELoss()
optimizer = optim.Adam(net.parameters())

In [None]:
min_loss = 1
max_epoch = 1000*100

for epoch in trange(max_epoch):
    # train
    optimizer.zero_grad()
    loss = criterion(net(xt_train), u_train)
    loss.backward()
    optimizer.step()
    # test
    if (epoch%100 == 0):
        with torch.no_grad():
            test_loss = ((net(xt_test)-u_test)**2).mean().item()
            if test_loss < min_loss:
                min_loss = test_loss
                torch.save(net.state_dict(), f'./nn_files/net{int(noise_lv)}.pth')
    net.train()
    
net.load_state_dict(torch.load(f'./nn_files/net{int(noise_lv)}.pth'))
net.eval()

In [None]:
recon = np.array(net(torch.from_numpy(xt[:, :]).float()).detach()).reshape(len(x), len(t))

In [None]:
((u-u_clean)**2).mean(), ((recon-u_clean)**2).mean()

In [None]:
xt = Variable(torch.from_numpy(xt).float(), requires_grad=True).to(device)
grad = torch.autograd.grad(net(xt).sum(), xt, create_graph=True)[0]
u_t = grad[:, 1:2].detach().numpy()
u_x =  grad[:, 0:1]
u_xx = torch.autograd.grad(u_x.sum(), inputs=xt, create_graph=True)[0][:, 0:1]
u_xxx = torch.autograd.grad(u_xx.sum(), inputs=xt, create_graph=True)[0][:, 0:1]

In [None]:
base_features = np.hstack([u_t, 
                           recon.reshape(-1, 1), 
                           u_x.detach().numpy(), 
                           u_xx.detach().numpy(), 
                           u_xxx.detach().numpy()])

np.save(f"./nn_files/nn_base_features{int(noise_lv)}.npy", base_features)

#### Somehow I need to reload.
#### There seems to be some conflict between pytorch2.2 and numpy. IDK

In [None]:
import numpy as np
from sklearn.preprocessing import PolynomialFeatures

In [None]:
base_features = np.load(f"./nn_files/nn_base_features{int(noise_lv)}.npy")

In [None]:
candidate_lib = PolynomialFeatures(2, interaction_only=True, include_bias=False)
theta = candidate_lib.fit_transform(base_features[:, 1:])
candidate_lib.get_feature_names_out()

In [None]:
np.linalg.lstsq(theta[:, [3, 4]], base_features[:, 0:1], rcond=None)[0]