In [None]:
# imports

import itertools
import os
if os.getcwd().split("/")[-1] == "notebooks":
    os.chdir(os.pardir)
import time

import jax
import jax.numpy as jnp
import jax.nn as nn
from jax import vmap, jit, grad
from jax.experimental import optimizers
import numpy as np
import numpy.random as npr
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from scipy.interpolate import interp1d
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from tqdm.notebook import trange

from src.constants import eps_0, mu_0, c, pi
from src.utils.viz import fig_config

In [None]:
print(f'platform: {jax.lib.xla_bridge.get_backend().platform}')

In [None]:
# jax.config.update('jax_enable_x64', True)

In [None]:
sns.set(style='ticks', palette='colorblind')

In [None]:
%config InlineBackend.figure_format = 'retina'

In [None]:
# constants

Z_0 = np.sqrt(mu_0 / eps_0)  # free space impedance
a = 2e-3  # radius of the wire
N_elem = 61  # number of elements of the antenna
L = 1  # antenna length
dx = L / N_elem  # distance between the two elements of the antenna
x = np.arange(0, L+dx/2, dx)  # spatial coordinates

`sct_gnd_025_current.str` - holds the current distribution along the straight thin wire scatterer of a finite length of 1 m and the radius of 2 mm located at the height of 0.25 m above a lossy half-space (σ = 10 mS/m, ε = 10) that is illuminated by a normally incident electromagnetic field operating on 150 and 300 MHz, respectively.

The first column represents spatial points on the antenna. The second column holds operating frequencies, while the third and forth columns are real and imaginary current parts, respectively. Finally, the fifth column is the absolute current value. The data were obtained by solving the Pocklington equation numerically by means of indirect boundary element method with the in-house software - `SuzANA` .

In [None]:
I = np.loadtxt(os.path.join('data', 'source-model', 'dipole', 'sct_gnd_025_current.str'))

In [None]:
# load current over a half-wave dipole antenna for 0.3 GHz frequency

f = I[N_elem+1:, 1][0]
omega = 2 * pi * f
k = omega * np.sqrt(mu_0 * eps_0)
I_an = I[N_elem+1:, 2] + 1j * I[N_elem+1:, 3]
y = I_an * 1000  # in mA

In [None]:
# visualize spatial current distribution

fig_config(latex=True, scaler=1.5)
fig = plt.figure()
ax = fig.add_subplot()
ax.plot(x, np.abs(y), 'b-', lw=3, label='$|I(x)|$')
ax.plot(x, np.real(y), 'r-', lw=3, label='$\Re{[I(x)]}$')
ax.plot(x, np.imag(y), 'g-', lw=3, label='$\Im{[I(x)]}$')
ax.set(xlabel='$x$ [m]', ylabel='$I$ [mA]')
ax.legend();

# Finite-step derivative approximation on interpolated function

In [None]:
def forward_diff(fn, h=1e-6):
    """Forward difference y = np.abs(I_an) * 1000  # in mAapproximation."""
    def dfn_dx(x):
        return (fn(x + h) - fn(x)) / h
    return dfn_dx
    
    
def backward_diff(fn, h=1e-6):
    """Backward difference approximation."""
    def dfn_dx(x):
        return (fn(x) - fn(x - h)) / h
    return dfn_dx

    
def central_diff(fn, h=1e-6):
    """Central difference approximation."""
    def dfn_dx(x):
        return (fn(x + h) - fn(x - h)) / (2 * h)
    return dfn_dx


def complex_step_diff(fn, h=1e-6):
    """Complex-step difference approximation.
    
    Note: Incompatible with SciPy interpolation module.
    """
    def dfn_dx(x):
        return np.imag(fn(x + 1j * h)) / h
    return dfn_dx

In [None]:
# quadratic interpolation of current distribution function

f = interp1d(x, np.abs(y), kind='quadratic')

x_new = np.linspace(x.min(), x.max(), 1001)
y_new = f(x_new)

In [None]:
# mean relative percentage error

error = np.mean(np.abs(np.abs(y)[1:-1] - f(x)[1:-1]) / np.abs(y)[1:-1]) * 100
print(f'relative error = {error:.2e} %')

In [None]:
# visualize spatial current distribution as calculated and interpolated

fig_config(latex=True, scaler=1.5)
fig = plt.figure()
ax = fig.add_subplot()
ax.plot(x, np.abs(y), 'b-', lw=4, label='computed')
ax.plot(x_new, y_new, 'r--', lw=4, label='interpolated')
ax.set(xlabel='$x$ [m]', ylabel='$I$ [mA]')
ax.legend();

In [None]:
# finite difference on computed and interpolated data

dfdx = np.r_[(forward_diff(f)(x[0]), central_diff(f)(x[1:-1]), backward_diff(f)(x[-1]))]
dfdx_new = np.r_[(forward_diff(f)(x_new[0]), central_diff(f)(x_new[1:-1]), backward_diff(f)(x_new[-1]))]

In [None]:
# visualize current gradient distribution as calculated and interpolated

fig_config(latex=True, scaler=1.5)
fig = plt.figure()
ax = fig.add_subplot()
ax.plot(x, dfdx, 'b-', lw=4, label='computed')
ax.plot(x_new, dfdx_new, 'r--', lw=4, label='interpolated')
ax.plot([x[2], x[-2]], [dfdx[2], dfdx[-2]], 'ko', fillstyle='none', markersize=40, markeredgewidth=2.5)
ax.annotate('', xy=(x[2], dfdx[2] - 0.5), xytext=(x[11], -dfdx[2] - 2), 
            arrowprops={'facecolor': 'black'})
ax.annotate('', xy=(x[-5], dfdx[-2]), xytext=(x[11], -dfdx[2] - 2), 
            arrowprops={'facecolor': 'black'})
ax.text(x[0], -dfdx[0] - 2, 'numeric artifacts',
        bbox={'facecolor': 'wheat',
              'edgecolor': 'black',
              'alpha': 0.5,
              'pad': 5})
ax.set(xlabel='$x$ [m]', ylabel=r'$\mathrm{d}I$/$\mathrm{d}x$ [mA/m]')
fig.legend();

# Automatic differentiation on neural network-based interpolation

In [None]:
x_train = jnp.asarray(x_new).reshape(-1, 1)
y_train = jnp.asarray(y_new).reshape(-1, 1)

In [None]:
rng = jax.random.PRNGKey(0)

def init_network_params(sizes, key):
    """Initialize network parameters."""
    keys = jax.random.split(key, len(sizes))
    def random_layer_params(m, n, key, scale=1e-2):
        w_key, b_key = jax.random.split(key)
        return (scale * jax.random.normal(w_key, (n, m)),
                scale * jax.random.normal(b_key, (n, )))
    return [random_layer_params(m, n, key)
            for m, n, key in zip(sizes[:-1], sizes[1:], keys)]


def forward(params, X):
    """Forward pass."""
    output = X
    for w, b in params[:-1]:
        output = nn.tanh(w @ output + b)
    w, b = params[-1]
    output = w @ output + b
    return output


# vectorized mapping of network input, `X`, on `forward` function
batch_forward = vmap(forward, in_axes=(None, 0))


@jit
def loss_fn(params, batch):
    """Summed square error loss function."""
    X, y = batch
    y_pred = batch_forward(params, X)
    return jnp.sum(jnp.square(y_pred - y))


# derivative of the loss function
grad_fn = jit(grad(loss_fn))


@jit
def update(step, optim_state, batch):
    """Return current optimal state of the network."""
    params = optim_params(optim_state)
    grads = grad_fn(params, batch)
    optim_state = optim_update(step, grads, optim_state)
    return optim_state


def data_stream(num_train, num_batches):
    """Training data random generator."""
    rng = npr.RandomState(0)
    while True:
        perm = rng.permutation(num_train)
        for i in range(num_batches):
            batch_idx = perm[i * batch_size:(i + 1) * batch_size]
            yield x_train[batch_idx], y_train[batch_idx]

In [None]:
# set network hyperparameter and train

step_size = 1e-3
n_epochs = 25_000
printout = int(n_epochs / 100.)
epochs = np.arange(0, n_epochs+1, step=printout)
batch_size = 64
momentum_mass = 0.9  # for momentum and adagrad
sizes = [1, 128, 256, 128, 1]

num_train = x_train.shape[0]
num_complete_batches, leftover = divmod(num_train, batch_size)
num_batches = num_complete_batches + bool(leftover)

batches = data_stream(num_train, num_batches)

optim_init, optim_update, optim_params = optimizers.adam(step_size)
init_params = init_network_params(sizes, rng)
optim_state = optim_init(init_params)
itercount = itertools.count()

loss_train = []
start_time = time.time()
for epoch in trange(n_epochs):
    start_epoch_time = time.time()
    for _ in range(num_batches):
        optim_state = update(next(itercount), optim_state, next(batches))
    epoch_duration = time.time() - start_epoch_time
    
    params = optim_params(optim_state)
    if (epoch == 0) or (epoch % printout == (printout - 1)):
        loss_train.append(loss_fn(params, (x_train, y_train)))
training_duration = time.time() - start_time
print(f'Training time: {training_duration:.2f} s')

In [None]:
# visualize training loss dynamics

fig_config(latex=True, scaler=1.5)
fig = plt.figure()
ax = fig.add_subplot()
ax.plot(epochs, loss_train, 'b-', lw=4)
ax.set(xlabel='epoch', ylabel='loss', yscale='log');

In [None]:
# visualize spatial current distribution as calculated and fitted

fig_config(latex=True, scaler=1.5)
fig = plt.figure()
ax = fig.add_subplot()
ax.plot(x_train.flatten(), y_train.flatten(), 'b-', lw=4, label='computed')
ax.plot(x_train.flatten(), batch_forward(params, x_train).flatten(), 'r--', lw=4, label='fitted')
ax.set(xlabel='$x$ [m]', ylabel='$I$ [mA]')
fig.legend();

In [None]:
def I_an_nn(x):
    """Current value at specific location, `x`.
    
    Note: This is single-value wrapper for the forward pass function.
    """
    return forward(params, x)[0]


# derivative of the current approximation function
grad_I_an_nn = jit(vmap(grad(I_an_nn)))

In [None]:
# visualize current gradient distribution as calculated and fitted

fig_config(latex=True, scaler=1.5)
fig = plt.figure()
ax = fig.add_subplot()
ax.plot(x_new, dfdx_new, 'b-', lw=4, label='computed')
ax.plot(x_train, grad_I_an_nn(x_train), 'r--', lw=4, label='fitted')
ax.plot([x[2], x[-2]], [dfdx[2], dfdx[-2]], 'ko', fillstyle='none', markersize=40, markeredgewidth=2.5)
ax.annotate('', xy=(x[2], dfdx[2] - 0.5), xytext=(x[11], -dfdx[2] - 1), 
            arrowprops={'facecolor': 'black'})
ax.annotate('', xy=(x[-5], dfdx[-2]), xytext=(x[11], -dfdx[2] - 1), 
            arrowprops={'facecolor': 'black'})
ax.text(x[0], -dfdx[0] - 2, fontsize=15,
        s='smooth gradients on\nedge elements with\nthe network interpolant',        
        bbox={'facecolor': 'wheat',
              'edgecolor': 'black',
              'alpha': 0.5,
              'pad': 5})
ax.set(xlabel='$x$ [m]', ylabel=r'$\mathrm{d}I$ / $\mathrm{d}x$ [mA/m]')
fig.legend();