In [1]:
"""
Noisy data set generation.

Author: Natalie Klein

TODO: Add other noise generation models.
"""

import numpy as np

from splitml.signal import VoigtSignal

def sig_gen(t, df):
    """
    Generate collection of Voigt time series signals based on Pandas data frame of parameter values.
    Args:
        t: vector of time points
        df: Pandas data frame containing signal parameter columns A, w, T2, phi, sigma, C, with N rows
    Returns:
        np.ndarray of signals, shape (N, len(t))
    """
    sigs = np.zeros((df.shape[0], len(t)), dtype=complex)
    for i in range(df.shape[0]):
        sigs[i, :] = VoigtSignal(df.w.iloc[i], df.T2.iloc[i], df.A.iloc[i], df.phi.iloc[i], df.sigma.iloc[i], df.C.iloc[i]).time_signal(t)
    return sigs
    
def wn_gen(t, N, sigma=1.0):
    """
    Generate complex Gaussian white noise time series of shape (N, t) with total variance sigma.
    """
    return np.random.normal(0, scale=sigma/np.sqrt(2), size=(N, len(t))) + 1j* np.random.normal(0, scale=sigma/np.sqrt(2), size=(N, len(t)))

def split_noise(noise, n_timesteps):
    """
    Split noise data array along time axis to increase number of examples (with shorter duration).
    """
    nt = noise.shape[1]
    nseg = nt // n_timesteps
    noise = noise[:, :(nseg*n_timesteps)]
    snoise = np.stack(np.hsplit(noise, nseg), axis=1)
    rsnoise = np.reshape(snoise, (noise.shape[0]*nseg, -1))
    return rsnoise

In [2]:
"""
Custom Pytorch layers for complex-valued tensors.


ComplexiSTFT based on code from torchaudio.

BSD 2-Clause License

Copyright (c) 2017 Facebook Inc. (Soumith Chintala), 
Copyright (c) 2022, Triad National Security, LLC
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
  list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
  this list of conditions and the following disclaimer in the documentation
  and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""

import torch
from torch.nn import Module, Conv1d, ConvTranspose1d

class ComplexConv1d(Module):
    """ Pytorch Conv1d adapted to complex-valued
    """

    def __init__(self,in_channels, out_channels, kernel_size=3, stride=1, padding = 0,
                 dilation=1, groups=1, bias=True):
        super(ComplexConv1d, self).__init__()
        self.conv_r = Conv1d(in_channels, out_channels, kernel_size, stride, padding, dilation, groups, bias)
        self.conv_i = Conv1d(in_channels, out_channels, kernel_size, stride, padding, dilation, groups, bias)
        
    def forward(self, input):    
        fwd_rr = self.conv_r(input.real)
        fwd_ri = self.conv_r(input.imag)
        fwd_ir = self.conv_i(input.real)
        fwd_ii = self.conv_i(input.imag)
        output = torch.zeros_like(fwd_rr.type(input.dtype))
        output.real = fwd_rr - fwd_ii
        output.imag = fwd_ri + fwd_ir
        return output

class ComplexConvTranspose1d(Module):
    """ Pytorch ConvTranspose1d adapted to complex-valued
    """

    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0,
                 output_padding=0, groups=1, bias=True, dilation=1, padding_mode='zeros'):

        super(ComplexConvTranspose1d, self).__init__()

        self.conv_tran_r = ConvTranspose1d(in_channels, out_channels, kernel_size, stride, padding,
                                       output_padding, groups, bias, dilation, padding_mode)
        self.conv_tran_i = ConvTranspose1d(in_channels, out_channels, kernel_size, stride, padding,
                                       output_padding, groups, bias, dilation, padding_mode)

    def forward(self,input):
        fwd_rr = self.conv_tran_r(input.real)
        fwd_ri = self.conv_tran_r(input.imag)
        fwd_ir = self.conv_tran_i(input.real)
        fwd_ii = self.conv_tran_i(input.imag)
        output = torch.zeros_like(fwd_rr.type(input.dtype))
        output.real = fwd_rr - fwd_ii
        output.imag = fwd_ri + fwd_ir
        return output

class ComplexiSTFT(Module):
    """ torchaudio.functional.inverse_spectrogram, adapted to give complex-valued output
    """

    def __init__(self, n_fft, hop_length, win_length, pad=0):
        super(ComplexiSTFT, self).__init__()
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.win_length = win_length
        self.pad = pad
        self.window = torch.hann_window(self.win_length)
        self.center = True
        self.length = None

    def forward(self, input):
        shape = input.size()
        spectrogram = input.reshape(-1, shape[-2], shape[-1])
        waveform = torch.istft(
            input=spectrogram,
            n_fft=self.n_fft,
            hop_length=self.hop_length,
            win_length=self.win_length,
            window=self.window.to(input.get_device()),
            center=self.center,
            normalized=False,
            onesided=False,
            length=self.length + 2 * self.pad if self.length is not None else None,
            return_complex=True
        )
        if self.length is not None and self.pad > 0:
            # remove padding from front and back
            waveform = waveform[:, self.pad:-self.pad]

        # unpack batch
        waveform = waveform.reshape(shape[:-2] + waveform.shape[-1:])
        return waveform

__Data Generation__

Generated and saved clean and noisy data separately for both training and validation sets.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import math
import random
from sklearn.decomposition import PCA

#### Initialize Parameters ###
t = np.arange(0, 20, 0.1) # equally spaced time points
nrow = 100 # observations of sine waves
ncol = len(t) # time points
per_Lbound = 2 # lower bound for period
per_Ubound = 5 # upper bound for period
mu = 0 # noise parameter
sig_sq = 0.5 # noise parameter
A = 1 # initial amplitude 
d = 0.2 # amplitude decay constant
amp = np.exp(-d*t)*A # amplitude

### Generate Training Data ###
w = np.random.randint(per_Lbound,per_Ubound, nrow) # random period of the sine wave
p = np.random.uniform(0,2*math.pi, nrow) # random starting point in the cycle
noise = np.random.normal(mu,sig_sq,(nrow,ncol)) # epsilon follows N(mu, sig_sq)

y_clean_training = np.zeros((nrow,ncol)) 
y_noisy_training = np.zeros((nrow,ncol))
for i in range(nrow): 
    y_clean_training[i,:] = (np.sin(t*w[i]+p[i]))*amp # y = np.sin(t*w+p) 
    y_noisy_training[i,:] = y_clean_training[i,:] + noise[i,:] # y = np.sin(t*w+p) + ϵ

### Generate Validation Data ###
w = np.random.randint(per_Lbound,per_Ubound, nrow)
p = np.random.uniform(0,2*math.pi, nrow) 
noise = np.random.normal(mu,sig_sq,(nrow,ncol))

y_clean_validation = np.zeros((nrow,ncol)) 
y_noisy_validation = np.zeros((nrow,ncol))
for i in range(nrow): 
    y_clean_validation[i,:] = (np.sin(t*w[i]+p[i]))*amp 
    y_noisy_validation[i,:] = y_clean_validation[i,:] + noise[i,:]


__Compare Noisy and Clean Data__

In [None]:
plt.plot(y_clean_training[1,:]) # clean data
plt.plot(y_noisy_training[1,:]) # noisy data
plt.show()

__Autoencoder__

Currently has two linear layers of input size $M$ and $H$ respectively with `hardtanh` activation function.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


### Neural Network ###
class simpleNet(nn.Module):

    def __init__(self, t_input = len(t), M=10, H=5): 
        super(simpleNet, self).__init__()
        self.transform_lin_layer = nn.Linear(t_input, M) 
        self.transform_hidden_layer = nn.Linear(M, H)
        self.inverse_hidden_layer = nn.Linear(H, M)
        self.inverse_lin_layer = nn.Linear(M, t_input)
        
        self.activation = F.hardtanh
        
    def forward(self, t): 
        m = self.activation(self.transform_lin_layer(t))
        h = self.activation(self.transform_hidden_layer(m))
        m = self.activation(self.inverse_hidden_layer(h))
        t = self.inverse_lin_layer(m)
        
        return t

    def embed(self, t): 
        m = self.activation(self.transform_lin_layer(t))
        h = self.activation(self.transform_hidden_layer(m))
        
        return h

net = simpleNet()
print(net)

__Training__

In [None]:
### Preparing for training ###
device = torch.device('cpu')
model = simpleNet().to(device) 

noisy_training_data = torch.tensor(y_noisy_training).float().to(device)
clean_training_data = torch.tensor(y_clean_training).float().to(device)

noisy_validation_data = torch.tensor(y_noisy_validation).float().to(device)
clean_validation_data = torch.tensor(y_clean_validation).float().to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)

model.train() 
criterion = nn.MSELoss()

use_clean_training_data = False
use_clean_validation_data = False
patience = 5
trigger_times = 0
loss_training = []
loss_validation = []


### Training ###
for epoch in range(10000):
    optimizer.zero_grad() 
    out = model(noisy_training_data)
    if use_clean_training_data is False:
        loss = criterion(out, noisy_training_data)
    else:
        loss = criterion(out, clean_training_data)
    loss.backward() 
    optimizer.step()
    loss_training.append(loss.item())
    if epoch % 100 == 0:
        print(loss.item())
    with torch.no_grad(): 
        out = model(noisy_validation_data)
        if use_clean_validation_data is False:
            loss = criterion(out, noisy_validation_data)
        else: 
            loss = criterion(out, clean_validation_data)
        loss_validation.append(loss.item())
    if epoch > 200:
        if loss_training[-1] > loss_training[-2]:
            trigger_times += 1
            

            if trigger_times >= patience:
                print('Early stopping!\nStopped at epoch')
                print(epoch)
                break
        else:
            trigger_times = 0

model.eval() 

__Plot of Loss__

In [None]:
fig = plt.figure() 
fig.set_size_inches(14,6) 

plt.subplot(1, 2, 1) 
plt.plot(loss_training)
plt.plot(loss_validation)
plt.title("Loss")

plt.subplot(1, 2, 2) 
begin = round(0.3*len(loss_training))
end = len(loss_training)
plt.plot(loss_training[begin:end])
plt.plot(loss_validation[begin:end])
plt.title("Loss Zoomed In")
plt.show()

__Plot of Original and Reproduction__

__Observation 1__

In [None]:
out = model(noisy_validation_data)
row = 1

fig = plt.figure() 
fig.set_size_inches(14,6) 

plt.subplot(1, 2, 1) 
plt.plot(t, y_clean_validation[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Validation")

plt.subplot(1, 2, 2) 
plt.plot(t, y_noisy_validation[row,:])
plt.plot(t, y_clean_validation[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Validation with Noise")

plt.show()

out = model(noisy_training_data)

fig = plt.figure() 
fig.set_size_inches(14,6) 

plt.subplot(1, 2, 1)
plt.plot(t, y_clean_training[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Training")

plt.subplot(1, 2, 2) 
plt.plot(t, y_noisy_training[row,:])
plt.plot(t, y_clean_training[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Training with Noise")

plt.show()

__Observation 2__

In [None]:
out = model(noisy_validation_data)
row = 2

fig = plt.figure() 
fig.set_size_inches(14,6) 

plt.subplot(1, 2, 1)
plt.plot(t, y_clean_validation[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Validation")

plt.subplot(1, 2, 2) 
plt.plot(t, y_noisy_validation[row,:])
plt.plot(t, y_clean_validation[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Validation with Noise")

plt.show()

out = model(noisy_training_data)

fig = plt.figure() 
fig.set_size_inches(14,6) 

plt.subplot(1, 2, 1) 
plt.plot(t, y_clean_training[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Training")

plt.subplot(1, 2, 2) 
plt.plot(t, y_noisy_training[row,:])
plt.plot(t, y_clean_training[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Training with Noise")

plt.show()

__Observation 3__

In [None]:
out = model(noisy_validation_data)
row = 3

fig = plt.figure() 
fig.set_size_inches(14,6) 

plt.subplot(1, 2, 1)
plt.plot(t, y_clean_validation[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Validation")

plt.subplot(1, 2, 2) 
plt.plot(t, y_noisy_validation[row,:])
plt.plot(t, y_clean_validation[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Validation with Noise")

plt.show()

out = model(noisy_training_data)

fig = plt.figure() 
fig.set_size_inches(14,6) 

plt.subplot(1, 2, 1) 
plt.plot(t, y_clean_training[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Training")

plt.subplot(1, 2, 2) 
plt.plot(t, y_noisy_training[row,:])
plt.plot(t, y_clean_training[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Training with Noise")

plt.show()

__Observation 4__

In [None]:
out = model(noisy_validation_data)
row = 4

fig = plt.figure() 
fig.set_size_inches(14,6) 

plt.subplot(1, 2, 1)
plt.plot(t, y_clean_validation[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Validation")

plt.subplot(1, 2, 2) 
plt.plot(t, y_noisy_validation[row,:])
plt.plot(t, y_clean_validation[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Validation with Noise")

plt.show()

out = model(noisy_training_data)

fig = plt.figure() 
fig.set_size_inches(14,6) 

plt.subplot(1, 2, 1) 
plt.plot(t, y_clean_training[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Training")

plt.subplot(1, 2, 2) 
plt.plot(t, y_noisy_training[row,:])
plt.plot(t, y_clean_training[row,:])
plt.plot(t, out.detach().numpy()[row,:])
plt.title("Training with Noise")

plt.show()