In [1]:
import torch
import torch.nn as nn

import numpy as np
from scipy.io import loadmat, savemat
import math
import os
import h5py

from functools import partial
from models.models import MWT2d
from models.utils import train, test, LpLoss, get_filter, UnitGaussianNormalizer

In [2]:
torch.manual_seed(0)
np.random.seed(0)

In [3]:
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
def get_initializer(name):
    
    if name == 'xavier_normal':
        init_ = partial(nn.init.xavier_normal_)
    elif name == 'kaiming_uniform':
        init_ = partial(nn.init.kaiming_uniform_)
    elif name == 'kaiming_normal':
        init_ = partial(nn.init.kaiming_normal_)
    return init_

In [6]:
ntrain = 1000
ntest = 200

r = 1
h = int(((512 - 1)/r) + 1)
s = h

In [7]:
# laod data
dataloader = h5py.File('../../data/all_train_test_Darcy.mat','r')

a_data = dataloader['a_train']
p_data = dataloader['p_train']

a_numpy = []
for i in range(len(a_data)):
    obj = dataloader[a_data[i][0]]
    a_numpy.append(obj[:])
a_tensor = torch.from_numpy(np.array(a_numpy).astype(np.float32))

p_numpy = []
for i in range(len(p_data)):
    obj = dataloader[p_data[i][0]]
    p_numpy.append(obj[:])
p_tensor = torch.from_numpy(np.array(p_numpy).astype(np.float32))

In [8]:
x_train = a_tensor[:ntrain,::r,::r][:,:s,:s]
y_train = p_tensor[:ntrain,::r,::r][:,:s,:s]

x_test = a_tensor[-ntest:,::r,::r][:,:s,:s]
y_test = p_tensor[-ntest:,::r,::r][:,:s,:s]

In [9]:
x_normalizer = UnitGaussianNormalizer(x_train)
x_train = x_normalizer.encode(x_train)
x_test = x_normalizer.encode(x_test)

y_normalizer = UnitGaussianNormalizer(y_train)
y_train = y_normalizer.encode(y_train)

grids = []
grids.append(np.linspace(0, 1, s))
grids.append(np.linspace(0, 1, s))
grid = np.vstack([xx.ravel() for xx in np.meshgrid(*grids)]).T
grid = grid.reshape(1,s,s,2)
grid = torch.tensor(grid, dtype=torch.float)
x_train = torch.cat([x_train.reshape(ntrain,s,s,1), grid.repeat(ntrain,1,1,1)], dim=3)
x_test = torch.cat([x_test.reshape(ntest,s,s,1), grid.repeat(ntest,1,1,1)], dim=3)

In [10]:
batch_size = 10
train_loader = torch.utils.data.DataLoader(torch.utils.data.TensorDataset(x_train, y_train), batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(torch.utils.data.TensorDataset(x_test, y_test), batch_size=batch_size, shuffle=False)

In [11]:
ich = 3
initializer = get_initializer('xavier_normal') # xavier_normal, kaiming_normal, kaiming_uniform

torch.manual_seed(0)
np.random.seed(0)

model = MWT2d(ich, 
            alpha = 12,
            c = 4,
            k = 4, 
            base = 'legendre', # 'chebyshev'
            nCZ = 4,
            L = 0,
            initializer = initializer,
            ).to(device)
learning_rate = 0.001

epochs = 500
step_size = 100
gamma = 0.5

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)

myloss = LpLoss(size_average=False)
y_normalizer.cuda()

for epoch in range(1, epochs+1):
    train_l2 = train(model, train_loader, optimizer, epoch, device,
        lossFn = myloss, lr_schedule = scheduler,
        post_proc = y_normalizer.decode)
    
    test_l2 = test(model, test_loader, device, lossFn=myloss, post_proc=y_normalizer.decode)
    print(f'epoch: {epoch}, train l2 = {train_l2}, test l2 = {test_l2}')