<a href="https://colab.research.google.com/github/ArtemJDS/NN-for-population-approximation/blob/main/PINN_for_population.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
import h5py
import tqdm
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.models import Model
from google.colab import files
import seaborn as sns
import pandas as pd
from sklearn.preprocessing import normalize
tf.keras.backend.set_floatx('float32')
from google.colab import drive
drive.mount('/content/drive')
!unzip /content/drive/MyDrive/voltage_and_input.zip

In [None]:
N_BINS = 249
INPUT_EPOCH = 2500

In [None]:
dataset = h5py.File('/content/content/voltage_and_input.h5', 'r')
voltage = np.array(dataset['voltage'], dtype = np.float32).T    # population's voltage dynamics               
input = np.array(dataset['input'], dtype = np.float32)     # population's input   

min = np.min(voltage)
max = np.max(voltage)
bins = np.linspace(min, max, N_BINS)     
x = np.apply_along_axis(np.histogram, 1, voltage, bins = bins, density = True).T[0]
x = list(x)
x = np.stack(x)
densities = normalize(x, norm = 'l1') # shape = n,m, where n = # of timesteps, m = N_BINS

In [None]:
# take one epoch for input to be constant over all timesteps

densities_test = densities[INPUT_EPOCH*21:INPUT_EPOCH*22]
derivatives_t = (densities_test[1:] - densities_test[:-1])   # time derivative of density for each datapoint but the last timestep

xs = np.arange(0,N_BINS)    # quasi-spatial coordinate
ts = np.arange(0,1999)    # time coordinate
ps = densities_test[:-1]    # density 

xs = np.tile(xs, (1999,1))
ts = np.tile(ts, (N_BINS, 1)).T

ts = normalize(ts)
xs = normalize(xs)
derivatives_t = normalize(derivatives_t)

ts = ts.reshape(-1)
xs = xs.reshape(-1)
ps = ps.reshape(-1)    
derivatives_t = derivatives_t.reshape(-1)    # reshape all to one tall vector 

input = np.vstack((ts, xs)).T#, ps)).T    # input to the network. time coordinate, spatial coordinate, density for each time-place pair 

DT = ts[1] - ts[0]
DX = xs[1] - xs[0]

In [None]:
batch_size = N_BINS    # batch_size MUST be divisible by N_BINS - 1 because of network loss function design
input = tf.convert_to_tensor(input)
derivatives_t = tf.convert_to_tensor(derivatives_t)
train_dataset = tf.data.Dataset.from_tensor_slices((input, derivatives_t)).shuffle(495752).batch(batch_size) 

This network approximates temporal derivative of population's voltage distribution. 
\begin{align}
\ T = \frac{\partial P} {\partial t} 
\end{align}
Spatial and temporal coordinates are independent, hence

\begin{align}
\ T = \frac{dP} {dt} 
\end{align}

Approximating loss is defined as follows:

\begin{align}
\ \mathcal{L}_{AP} = (T - \hat{T})^2
\end{align}
where $\hat{T}$ is the approximation

Network is presupposed to use some mathematical properties of distributions 
\begin{align}
\ \int_{0}^{X} T \,dx = 1 \text{ for each t }
\end{align}
because total density must be conserved.

So total density loss is defined as
\begin{align}
\ \mathcal{L}_{DC} = ((\int_{0}^{X} \hat{T} \,dx ) - 1 )^2
\end{align}

As $T$ is the temporal derivative of some function defined over 2D  it is a conservative field and its derivative must match $\frac{dT} {dt} = \frac{dT} {dx}$ (written as full derivatives).

Derivative loss is defined as
\begin{align}
\ \mathcal{L}_{DR} = (\frac{d\hat{T}} {dt} - \frac{d\hat{T}} {dx} )^2
\end{align}

The previous loss paves the way for derivative zeroing (both $\frac{dT} {dt}$ and  $\frac{dP} {dx}$ becoming zero). Therefore we add loss that prevents it.
\begin{align}
\ \mathcal{L}_{AZ} = \log \frac {1} {\|\frac{d\hat{T}} {dt}\| + \|\frac{d\hat{T}} {dx}\|}
\end{align}

Total loss is a mere sum of previously defined losses with coefficients

In [None]:
@tf.function
def training_step(inp):
    with tf.GradientTape() as tape:
        loss, total_density_loss, derivative_loss, anti_zeroing_loss = model(inp)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    return loss, total_density_loss, derivative_loss, anti_zeroing_loss


class ProbabilityTimeDerivativeApproximator(Model):
    def __init__(self, ts, xs):
       super(ProbabilityTimeDerivativeApproximator, self).__init__()
       self.layer_list = []
       self.layer_list.append(tf.keras.Sequential([layers.Flatten(),layers.Dense(128, activation='sigmoid')]))
       self.layer_list.append(tf.keras.Sequential([layers.Dense(128, activation='sigmoid')]))
       self.layer_list.append(tf.keras.Sequential([layers.Dense(128, activation='sigmoid')]))
       self.layer_list.append(tf.keras.Sequential([layers.Dense(128, activation='sigmoid'),
                                               layers.Dense(1, activation='sigmoid'),]))
       self.ts = ts
       self.xs = xs

    def call(self, input):
        
        for_network, real_derivatives  =  input

        for n, layer in enumerate(self.layer_list[:-1]):
            if n!= 0: x = layer(x)
            if n == 0: x = layer(for_network)
        approximated_derivatives = self.layer_list[-1](x)
        approximation_loss = tf.math.reduce_sum(tf.math.square(real_derivatives - approximated_derivatives))

        dIdt_dIdx = tf.transpose(tf.gradients(approximated_derivatives, for_network)[0])[:2]
        total_density_loss = tf.math.square(1. - tf.math.reduce_sum(approximated_derivatives))
        int_loss_scale = 1e-0
        derivative_loss = tf.math.reduce_sum(tf.math.square(dIdt_dIdx[0] * self.ts - dIdt_dIdx[1] * self.xs)) * int_loss_scale
        anti_zeroing_loss = tf.math.reduce_sum(tf.math.square(dIdt_dIdx[0])) + tf.math.reduce_sum(tf.math.square(dIdt_dIdx[1]))
        loss = approximation_loss + total_density_loss + tf.math.log(1/anti_zeroing_loss) #+ derivative_loss #+ tf.math.log(1/anti_zeroing_loss)

        return loss, total_density_loss, derivative_loss, anti_zeroing_loss
model = ProbabilityTimeDerivativeApproximator(DT, DX)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

In [None]:
EPOCHES = 10
for k in range(EPOCHES):
    l = 0.
    i = 0.
    d = 0.
    az = 0.
    for inp in train_dataset: 
        loss, total_density_loss, derivative_loss, anti_zeroing_loss = training_step(inp)
        l += loss
        i += total_density_loss
        d += derivative_loss
        az += anti_zeroing_loss


    print(f'EPOCH {k}', float(l), float(i), float(d), float(az))