# Newtonian Physics Informed Neural Network (`NwPiNN`) for Spatio-Temporal Forecast of Visual Data

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Libraries

In [2]:
pip install hampel



In [3]:
from keras.models import Sequential
from keras.layers import Dropout
from keras.layers import Dense
from keras.layers import LSTM
import keras
import tensorflow as tf
from hampel import hampel
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

Importing Pixel Coordinates and Gray Values of 200 lag frames for a single pixel point

In [4]:
data = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/data_1.csv")
# pixel_coor = data.iloc[:, 0] # Pixel Coordinates
training_set = data.iloc[:, 1] # Gray Values

In [5]:
data = np.genfromtxt('/content/drive/MyDrive/Colab Notebooks/newtonian_intermittency.dat')
training_set = pd.DataFrame(data).reset_index(drop=True)
training_set = training_set.iloc[:,1]

$\delta t = 1$

In [6]:
t_diff = 1
# print(training_set.max())
gradient_t = (training_set.diff()/t_diff).iloc[1:]
# print(gradient_t)
gradient_tt = (gradient_t.diff()/t_diff).iloc[1:]
# print(gradient_tt)

In [7]:
training_set = training_set.reset_index(drop=True)
gradient_t = gradient_t.reset_index(drop=True)
gradient_tt = gradient_tt.reset_index(drop=True)
# print(gradient_t)
# print(gradient_tt)

In [8]:
# print(gradient_t.shape)
# print(training_set.shape[:-1])
df = pd.concat((training_set[:-1],gradient_t), axis = 1)
gradient_tt.columns= ["grad_tt"]
df = pd.concat((df[:-1],gradient_tt),axis = 1)
df.columns = ['y_t','grad_t','grad_tt'] # Zeroth, First, and Second Derivatives

Data Preprocessing (Sequence to Supervised)



In [9]:
def Supervised(data, n_in=1):
    n_vars = 1 if type(data) is list else data.shape[1]
    df = pd.DataFrame(data)
    cols, names = list(), list()
    for i in range(n_in, 0, -1):
        cols.append(df.shift(i))
        names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
    cols.append(df)
    names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
    sup = pd.concat(cols, axis=1)
    sup.columns = names
    sup.dropna(inplace=True)
    return sup

data = Supervised(df.values, n_in = 10)
data.drop(['var2(t-10)', 'var3(t-10)', 'var2(t-9)', 'var3(t-9)', 'var2(t-8)',
       'var3(t-8)', 'var2(t-7)', 'var3(t-7)', 'var2(t-6)', 'var3(t-6)',
       'var2(t-5)', 'var3(t-5)', 'var2(t-4)', 'var3(t-4)', 'var2(t-2)',
       'var3(t-2)', 'var2(t-1)', 'var3(t-1)','var2(t-3)', 'var3(t-3)'], axis = 1, inplace = True)
# print(data.head())
# print(data.columns)
scaler = MinMaxScaler(feature_range=(0, 1))
data = scaler.fit_transform(data)
# print(data.shape)
y = data[:,-3:]
x = data[:,:-3]
# print(x.shape)


80-20 Split

In [10]:
train_size = int(len(y) * 0.8)
test_size = len(y) - train_size
dataX = np.array(x)
dataY = np.array(y)
trainX = np.array(x[0:train_size])
trainY = np.array(y[0:train_size])
testX = np.array(x[train_size:len(x)])
testY = np.array(y[train_size:len(y)])
trainX = trainX.reshape((trainX.shape[0], 1, trainX.shape[1]))
testX = testX.reshape((testX.shape[0], 1, testX.shape[1]))
# print(trainX.shape, trainY.shape, testX.shape, testY.shape)

In [11]:
def phys(y_pred, y_true):
    return mean_absolute_error((y_true[:,2]*2*y_true[:,0] - y_true[:,1]*y_true[:,1]), (y_pred[:,2]*2*y_pred[:,0] - y_pred[:,1]*y_pred[:,1]))

---
---

$\mathfrak{L}_{\text{phy}}=\text{RMSE}\left(\mathfrak{Y}_{\text{pred}},\ \mathfrak{Y}_{\text{real}}\right)$

where,

$\mathfrak{Y}_{\text{pred}} = 2 \cdot y_{\text{pred}} \cdot \frac{\text{d}^2y_{\text{pred}}}{\text{d}t^2} - \left(\frac{\text{d}y_{\text{pred}}}{\text{d}t}\right)^2$

and

$\mathfrak{Y}_{\text{real}} = 2 \cdot y_{\text{real}} \cdot \frac{\text{d}^2y_{\text{real}}}{\text{d}t^2} - \left(\frac{\text{d}y_{\text{real}}}{\text{d}t}\right)^2$

---

$\mathfrak{L}_{\text{data}}=\text{RMSE}\left(\mathfrak{X}_{\text{pred}},\ \mathfrak{X}_{\text{real}}\right)$

where,

$\mathfrak{X}_{\text{pred}}=\frac{\text{d}^0\left(x_{\text{pred}}(t+1)\right)}{\text{d}t^0}, \frac{\text{d}^1\left(x_{\text{pred}}(t+1)\right)}{\text{d}t^1}, \frac{\text{d}^2\left(x_{\text{pred}}(t+1)\right)}{\text{d}t^2}$

and

$\mathfrak{X}_{\text{real}}=\frac{\text{d}^0\left(x_{\text{real}}(t+1)\right)}{\text{d}t^0}, \frac{\text{d}^1\left(x_{\text{real}}(t+1)\right)}{\text{d}t^1}, \frac{\text{d}^2\left(x_{\text{real}}(t+1)\right)}{\text{d}t^2}$

---
---

For both the Pre-Training as well as Training Phase, the Loss Functions were combined with Data Losses as $\mathfrak{L}=\mathfrak{L}_{\text{data}}+\mathcal{C}_1\times\mathfrak{L}_{\text{phy}}$ and $\mathfrak{L}=\mathfrak{L}_{\text{data}}+\mathcal{C}_2\times\mathfrak{L}_{\text{phy}}$, respectively, where $\mathcal{C}_1$ and $\mathcal{C}_2$ are the hyperparameters. For the Newtonian Physics Informed Neural Network (`NwPiNN`), we considered both these hyperparameters to be 0.2. The value is chosen by means of Trial and Error.

In [12]:
def loss_fn(y_true, y_pred):
    squared_difference = tf.square(y_true[:,0] - y_pred[:,0])
    squared_difference2 = tf.square(y_true[:,2]-y_pred[:,2])
    squared_difference1 = tf.square(y_true[:,1]-y_pred[:,1])
    squared_difference3 = tf.square(y_pred[:,2]*2*y_pred[:,0] - y_pred[:,1]*y_pred[:,1])
    return tf.reduce_mean(squared_difference, axis=-1) + 0.2*tf.reduce_mean(squared_difference3, axis=-1)
model = Sequential()
model.add(LSTM(50, input_shape=(trainX.shape[1], trainX.shape[2])))
model.add(Dense(3))
model.compile(loss=loss_fn, optimizer='adam')
history = model.fit(trainX, trainY, epochs=500, batch_size=64, validation_data=(testX, testY), verbose=2, shuffle=False)

Epoch 1/500
125/125 - 6s - loss: 8.1665e-05 - val_loss: 4.9553e-04 - 6s/epoch - 47ms/step
Epoch 2/500
125/125 - 1s - loss: 6.4723e-04 - val_loss: 1.7023e-04 - 752ms/epoch - 6ms/step
Epoch 3/500
125/125 - 1s - loss: 6.1927e-05 - val_loss: 9.0390e-05 - 789ms/epoch - 6ms/step
Epoch 4/500
125/125 - 1s - loss: 4.4033e-06 - val_loss: 8.2976e-05 - 960ms/epoch - 8ms/step
Epoch 5/500
125/125 - 1s - loss: 1.2511e-06 - val_loss: 7.7000e-05 - 997ms/epoch - 8ms/step
Epoch 6/500
125/125 - 1s - loss: 9.2698e-07 - val_loss: 7.5503e-05 - 948ms/epoch - 8ms/step
Epoch 7/500
125/125 - 1s - loss: 8.3643e-07 - val_loss: 7.7030e-05 - 854ms/epoch - 7ms/step
Epoch 8/500
125/125 - 1s - loss: 8.2343e-07 - val_loss: 1.3987e-04 - 948ms/epoch - 8ms/step
Epoch 9/500
125/125 - 1s - loss: 1.0270e-06 - val_loss: 9.9334e-05 - 1s/epoch - 10ms/step
Epoch 10/500
125/125 - 1s - loss: 1.4019e-06 - val_loss: 1.6713e-04 - 1s/epoch - 9ms/step
Epoch 11/500
125/125 - 1s - loss: 8.9845e-07 - val_loss: 1.2366e-04 - 1s/epoch - 10ms/

Metrics

In [13]:
from sklearn.metrics import mean_squared_error, mean_absolute_error
yhat = model.predict(testX)
# print(yhat.shape)
testX = testX.reshape((testX.shape[0], testX.shape[2]))
# print(testX.shape)

inv_yhat = np.concatenate((testX,yhat), axis=1)
inv_yhat = scaler.inverse_transform(inv_yhat)
inv_yhat1 = inv_yhat[:,-3:]
inv_yhat = inv_yhat[:,-3]

inv_y = np.concatenate((testX,testY), axis=1)
inv_y = scaler.inverse_transform(inv_y)
inv_y1 = inv_y[:,-3:]
inv_y = inv_y[:,-3]

rmse = np.sqrt(mean_squared_error(inv_y, inv_yhat))
mae = mean_absolute_error(inv_y, inv_yhat)

print('Test RMSE: %.3f' % rmse)
print('Test MAE: %.3f' % mae)

Test RMSE: 4.711
Test MAE: 4.021
