In [80]:
import numpy as np
import scipy.integrate as integrate
import scipy.special as special

import random

import torch
import torch.nn as nn
import torch.nn.functional as func
import torch.optim as optim

from matplotlib import pyplot as plt

from numpy import exp,arange
from pylab import meshgrid,cm,imshow,contour,clabel,colorbar,axis,title,show

import csv

In [2]:
# the analytical representation of exact solution
def heat_equ_analytical_solu(x, t):
    return np.sin(np.pi * x) * np.exp(-np.power(np.pi, 2) * t)

In [3]:
# ==========
# helping methods
# ==========

# generate a list from lower and upper bound
def gen_list(p0, pn, delta, dig=5):
    ret = []
    i = p0
    while i < pn:
        ret.append(float(i))
        i += delta
        i = round(i, dig)
    return ret

# padding and zero padding
def padding(origin, a_list, b_list):
    return np.hstack((a_list, origin, b_list))

def zero_padding(origin, num):
    zero_list = [0 for i in range(num)]
    return padding(origin, zero_list, zero_list)

In [4]:
# ==============
# getting trainning data
# ==============

# trainning pairs
def gen_pair(u, x, t, length=3, num=1000):
    pairs = []
    for i in range(num):
        r = random.randint(0, t-2)
        current_t = u[r]
        next_t = u[r+1]
        p = random.randint(length, x-1-length)
        train = current_t[p-length:p+length+1]
        solu = next_t[p]
        pair = {'input': train, 'solu': solu}
        pairs.append(pair)
    return pairs

# trainning pairs (in average)
def gen_pair_ave(u, x, t, length=3, num=1000):
    pairs = []
    for i in range(num):
        r = random.randint(0, t-2)
        current_t = u[r]
        next_t = u[r+1]
        p = random.randint(length, x-2-length)
        train0 = current_t[p-length:p+length+2]
        solu1 = next_t[p]
        solu2 = next_t[p+1]
        train = []
        for j in range(len(train0)-1):
                train.append(0.5 * (train0[j] + train0[j+1]))
        solu = 0.5 * (solu1 + solu2)
        pair = {'input': train, 'solu': solu}
        pairs.append(pair)
    return pairs

# trainning pairs (in cell average)
def gen_pair_cell_ave(u, x, t, delta_x, delta_y, length=3, num=1000):
    pairs = []
    for i in range(num):
        t_index = random.randint(0, len(t)-2)
        x_s = random.randint(0, len(x)-2*length-3)
        x_e = x_s+2*length+2
        starting = x[x_s]
        ending = x[x_e]
        input_data = []
        for i in range(x_s, x_e-1):
            value = integrate.quad(lambda x: heat_equ_analytical_solu(x, t[t_index]), x[i], x[i+1])
            value = value[0] * (1/delta_x)
            input_data.append(value)
        target_data = integrate.quad(lambda x: heat_equ_analytical_solu(x, t[t_index+1]), x[x_s+length], x[x_s+length+1])
        target_data = target_data[0] * (1/delta_x)
        pair = {'input': input_data, 'solu': target_data}
        pairs.append(pair)
    return pairs

In [5]:
# restnet
class ResNet(nn.Module):
    def __init__(self, i, h1, h2, o, twolayers):
        super(ResNet, self).__init__()
        self.linear1 = nn.Linear(i, h1)
        self.relu1 = nn.ReLU()
        self.linear21 = nn.Linear(h1, h2)
        self.relu21 = nn.ReLU()
        self.linear22 = nn.Linear(h2, o)
        self.relu22 = nn.ReLU()
        self.linear23 = nn.Linear(h1, o)
        self.relu23 = nn.ReLU()
        self.twolayers = twolayers
        
    def forward(self, x):
        if self.twolayers == True:
            out = self.linear1(x)
            out = self.relu1(out)
            out = self.linear21(out)
            out = self.relu21(out)
            out = self.linear22(out)
            out = self.relu22(out)
        else:
            out = self.linear1(x)
            out = self.relu1(out)
            out = self.linear23(out)
            out = self.relu23(out)
        return out + torch.mean(x)

    def load_model(self, save_path):
        self.load_state_dict(torch.load(save_path))

    def save_model(self, save_path):
        torch.save(self.state_dict(), save_path)

In [83]:
def calc_next_time(x, t, t_index, delta_x, delta_t, current, model, length=3):
    list_of_error = []
    list_of_predict = []
    padding = np.hstack(([0 for i in range(length)], current, [0 for i in range(length)])).tolist()
    padding_x = np.hstack(([0 for i in range(length)], x, [0 for i in range(length)])).tolist()
    padding_t = np.hstack(([0 for i in range(length)], t, [0 for i in range(length)])).tolist() # not used
    for index in range(len(current)-1):
        j = index + length
        input_data = []
        for i in range(j-length, j+length+1):
            value = integrate.quad(lambda x: heat_equ_analytical_solu(x, t[t_index]), padding_x[i], padding_x[i+1])
            value = value[0] * (1/delta_x)
            input_data.append(value)
        target_data = integrate.quad(lambda x: heat_equ_analytical_solu(x, t[t_index+1]), padding_x[j], padding_x[j+1])
        target_data = target_data[0] * (1/delta_x)
        # print(input_data, target_data)
        tensor_input_data = torch.FloatTensor(input_data)
        out = model(tensor_input_data)
        list_of_predict.append(out.item())
        list_of_error.append(np.abs(out.item()-target_data))
    return list_of_predict, list_of_error

In [90]:
def testing_the_model(delta_x=1/20, delta_t=1/20, xmin=0, ymin=0, xmax=2 * np.pi, ymax=2 * np.pi,
                      padding_num=3, hidden=6, twolayers=False, lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False,
                      model_name="model_xx delta x=1 20 delta t=1 20", error_file='list_of_error_11 xx.txt', prediction_file='list_of_prediction_11 xx.txt'):
    x = arange(xmin, xmax, delta_x)
    t = arange(ymin, ymax, delta_t)
    print("Generating meshes...")
    X,T = meshgrid(x, t) # grid of point
    print("Generating analytical solution...")
    Z = heat_equ_analytical_solu(X, T) # evaluation of the function on the grid
    print("Preparing the model and the optimizer...")
    model = ResNet(7, hidden, hidden, 1, twolayers=twolayers)
    optimizer = optim.Adam(model.parameters(), lr=lr, betas=betas, eps=eps, weight_decay=weight_decay, amsgrad=amsgrad)
    criterion = nn.MSELoss()
    model.train()
    print("Updating data...")
    model.load_model(model_name)
    prediction = []
    error = []
    prediction.append(torch.FloatTensor(Z[0]).tolist())
    for i in range(len(Z)-1):
        list_of_predict, list_of_error = calc_next_time(x, t, i, delta_x, delta_t, Z[i], model)
        prediction.append(list_of_predict)
        error.append(list_of_error)
        print("time", i, "is done", '", the average error is:"', np.average(list_of_error))
    print("Saving the prediction result...")
    with open(prediction_file, "w") as f:
        writer = csv.writer(f)
        writer.writerows(prediction)
    print("Saving the training error...")
    with open(error_file, "w") as f:
        writer = csv.writer(f)
        writer.writerows(error)

In [93]:
testing_the_model(delta_x=1/20, delta_t=1/20, xmin=0, ymin=0, xmax=2 * np.pi, ymax=2 * np.pi,
                      model_name="./model/model pairs_20_20 iteration=1 layers=1 xx", error_file='./prediction/error pairs_20_20 iteration=1 layers=1 xx', prediction_file='./prediction/prediction pairs_20_20 iteration=1 layers=1 xx')
testing_the_model(delta_x=1/20, delta_t=1/40, xmin=0, ymin=0, xmax=2 * np.pi, ymax=2 * np.pi,
                      model_name="./model/model pairs_20_20 iteration=1 layers=1 xx", error_file='./prediction/error pairs_20_20 iteration=1 layers=1 xx', prediction_file='./prediction/prediction pairs_20_20 iteration=1 layers=1 xx')
testing_the_model(delta_x=1/20, delta_t=1/80, xmin=0, ymin=0, xmax=2 * np.pi, ymax=2 * np.pi,
                      model_name="./model/model pairs_20_20 iteration=1 layers=1 xx", error_file='./prediction/error pairs_20_20 iteration=1 layers=1 xx', prediction_file='./prediction/prediction pairs_20_20 iteration=1 layers=1 xx')
testing_the_model(delta_x=1/20, delta_t=1/160, xmin=0, ymin=0, xmax=2 * np.pi, ymax=2 * np.pi,
                      model_name="./model/model pairs_20_20 iteration=1 layers=1 xx", error_file='./prediction/error pairs_20_20 iteration=1 layers=1 xx', prediction_file='./prediction/prediction pairs_20_20 iteration=1 layers=1 xx')

Generating meshes...
Generating analytical solution...
Preparing the model and the optimizer...
Updating data...
time 0 is done ", the average error is:" 0.21817045333769033
time 1 is done ", the average error is:" 0.13319262599937787
time 2 is done ", the average error is:" 0.08131383028285435
time 3 is done ", the average error is:" 0.04964193492277228
time 4 is done ", the average error is:" 0.03030630403318991
time 5 is done ", the average error is:" 0.0185019382441407
time 6 is done ", the average error is:" 0.01129539641203453
time 7 is done ", the average error is:" 0.006895817382401729
time 8 is done ", the average error is:" 0.004209882948294138
time 9 is done ", the average error is:" 0.002570125272639946
time 10 is done ", the average error is:" 0.0015690564059091688
time 11 is done ", the average error is:" 0.0009579058386086275
time 12 is done ", the average error is:" 0.0005847996118801145
time 13 is done ", the average error is:" 0.00035701900102048614
time 14 is done ",