Train Hammerstein-, Wiener- and WienerHammerstein model on any type of data, with or without linear preprocessing.

In [None]:
import tensorflow as tf
from tensorflow import keras
import scipy.io as sio
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
from models import NeuralNetGlobalHammer, NeuralNetGlobalWiener, NeuralNetGlobalHammerWiener, cmplxMeanSquaredError
from util import load_signal_data, SIestimationLinear
from config import *

import pickle
import fullduplex as fd

### Script Config

In [None]:
DATA_TYPE = 'H' # R/H/W
MODEL_TYPE = 'WH' # H/W/WH
LIN_PREPROCESSING = False

CHANNEL_LEN = 13
TRAINING_RATIO = 0.9

EXPORT_RESULTS = False
PATH_EXPORT_RESULTS = 'comparison_results/new/'

### Load Signal Data

In [None]:
if DATA_TYPE == 'R':

    matFile = sio.loadmat(PATH_DATA_REAL)

    x_train_batched = matFile['txSamples']
    x_train_batched = np.expand_dims(x_train_batched, 0)
    x_train_batched = np.expand_dims(x_train_batched, 0)
    x_train_batched = x_train_batched[:,:,:TOTAL_SIGNAL_LENGTH,:]
    x_train_batched = x_train_batched/np.std(x_train_batched)
    x_train_batched = tf.convert_to_tensor(x_train_batched)

    y_train_batched = matFile['analogResidual']
    y_train_batched = np.expand_dims(y_train_batched, 0)
    y_train_batched = np.expand_dims(y_train_batched, 0)
    y_train_batched = y_train_batched[:,:,:TOTAL_SIGNAL_LENGTH,:]
    y_train_batched = y_train_batched[:,:,CHANNEL_LEN-1:,:]
    y_train_batched = tf.convert_to_tensor(y_train_batched)

    noise = np.squeeze(matFile['noiseSamples'], axis=1)

elif DATA_TYPE == 'H' or DATA_TYPE == 'W':

    DATA_DIR_ROOT = PATH_DATA_SYNTH_HAMMERSTEIN if DATA_TYPE == 'H' else PATH_DATA_SYNTH_WIENER
    x_train, x_train_batched, y_train, y_train_batched, noise_train = load_signal_data(DATA_DIR_ROOT, noSignals=1, norm_y='none', norm_x='var', seq_len=TOTAL_SIGNAL_LENGTH, filter_len=CHANNEL_LEN, with_noise=True)

    noise = noise_train
    noise = np.squeeze(noise)

else:
    raise ValueError('DATA_TYPE must be either "R", "H", or "W".')

##### Split into training- and test-set

In [None]:
trainingSamples = int(np.floor(y_train_batched.shape[2]*TRAINING_RATIO))
x_test_batched = x_train_batched[:,:,trainingSamples:,:]
y_test_batched = y_train_batched[:,:,trainingSamples:,:]
x_train_batched = x_train_batched[:,:,:trainingSamples+CHANNEL_LEN-1,:] # indexing is unintuitive at first glance due to the different lengths of x and y signals
y_train_batched = y_train_batched[:,:,:trainingSamples,:]


# consider additional offset according to balatsoukas
if DATA_TYPE == 'R':    
    offset = np.maximum(DATA_OFFSET-int(np.ceil(CHANNEL_LEN/2)),1) # according to Balatsoukas, exact logic is unclear...
    x_train_batched = x_train_batched[:,:,:-offset,:]
    x_test_batched = x_test_batched[:,:,:-offset,:]
    y_train_batched = y_train_batched[:,:,offset:,:]
    y_test_batched = y_test_batched[:,:,offset:,:]

    # additional mean-removal in receive signal
    y_train_batched -= np.mean(y_train_batched)
    y_test_batched -= np.mean(y_test_batched)


y_test_batched_orig = tf.identity(y_test_batched) # needed later for eval, untouched by lin preproc

### Optional linear preprocessing

In [None]:
if LIN_PREPROCESSING:

    # For train data
    h = SIestimationLinear(x_train_batched[0,0,:,0].numpy(), y_train_batched[0,0,:,0].numpy(), CHANNEL_LEN)
    y_train_lin = fd.SIcancellationLinear(x_train_batched[0,0,:,0].numpy(), h, {})[CHANNEL_LEN-1:]
    y_train_batched = y_train_batched[0,0,:,0].numpy() - y_train_lin
    y_train_batched = np.expand_dims(y_train_batched, 0)
    y_train_batched = np.expand_dims(y_train_batched, 0)
    y_train_batched = tf.convert_to_tensor(np.expand_dims(y_train_batched, 3))

    # For test data
    y_test_lin = fd.SIcancellationLinear(x_test_batched[0,0,:,0].numpy(), h, {})[CHANNEL_LEN-1:]
    y_test_batched = y_test_batched[0,0,:,0].numpy() - y_test_lin
    y_test_batched = np.expand_dims(y_test_batched, 0)
    y_test_batched = np.expand_dims(y_test_batched, 0)
    y_test_batched = tf.convert_to_tensor(np.expand_dims(y_test_batched, 3))

### Define Model and Train

In [None]:
if MODEL_TYPE == 'H':
   neuralNet = NeuralNetGlobalHammer(filterLen=CHANNEL_LEN, expected_SI_power_dB=10*np.log10(np.var(y_train_batched)))
elif MODEL_TYPE == 'W':
   neuralNet = NeuralNetGlobalWiener(filterLen=CHANNEL_LEN, expected_SI_power_dB=10*np.log10(np.var(y_train_batched)))
elif MODEL_TYPE == 'WH':
   neuralNet = NeuralNetGlobalHammerWiener(filterLen=CHANNEL_LEN, expected_SI_power_dB=10*np.log10(np.var(y_train_batched)))
else:
   raise ValueError('MODEL_TYPE must be "H", "W", or "WH"')


optim = keras.optimizers.Adam(learning_rate=0.003, amsgrad=False, beta_1=0.9) 
neuralNet.compile(loss=cmplxMeanSquaredError(),    
              optimizer=optim, # Einzelsignal 0.01,  Zehn Signale lr=0.001 bei batchsize=1
              metrics=['MeanSquaredError'], run_eagerly=True)


In [None]:
history = neuralNet.fit(x_train_batched, y_train_batched,                    
          batch_size=100,
          epochs=8000,
          verbose=1,  
          validation_split=0.0,
          shuffle=False,
)

### Results and Plots

In [None]:
_, mse_test = neuralNet.evaluate(x_test_batched, y_test_batched)
mse_test_dB = np.round((10*np.log10(mse_test))*10)/10

plt.rcParams.update({'font.size': 16})

plt.figure(figsize=(10, 3))
plt.tight_layout()

finalMSE = history.history['mean_squared_error'][-1]
plt.plot(10*np.log10(history.history['mean_squared_error']), label=f'MSE, Final: {np.round((10*np.log10(finalMSE))*10)/10}dB')
plt.axhline(10*np.log10(mse_test), label=f'MSE (Test): {mse_test_dB}dB', linestyle='--', color='tab:orange')
plt.grid()
plt.legend()
plt.ylabel('MSE [dB]')

##### Balatsoukas-Style PSD Plot

In [None]:
font = {'family' : 'normal',
        'weight' : 'regular',
        'size'   : 14}

mpl.rc('font', **font)


# Plot PSD and get signal powers
scalingConst = np.array([PSD_SCALING_CONST]) # as per balatsoukas measurements, set equally for comparability
yVarTest = np.var(y_test_batched[0, 0, CHANNEL_LEN-1:, 0])
yPred = neuralNet(x_test_batched)[0, 0, CHANNEL_LEN-1:, 0] 

if not LIN_PREPROCESSING:
    y_test_lin = 0*y_test_batched[0,0,:,0]


fig, noisePower, yTestPower, yTestLinCancPower, yTestNonLinCancPower = fd.plotPSD(y_test_batched_orig[0, 0, CHANNEL_LEN-1:, 0]/np.sqrt(scalingConst), 
                                                                        y_test_lin[(CHANNEL_LEN-1):]/np.sqrt(scalingConst), 
                                                                        yPred/(np.sqrt(scalingConst)*np.sqrt(yVarTest)), # anticipate that the function subtracts prediction from normalized SI signal
                                                                        noise/np.sqrt(scalingConst), 
                                                                        {'hSILen': CHANNEL_LEN, 'samplingFreqMHz': 20}, 
                                                                        'NN', 
                                                                        yVarTest)


# Print cancellation performance
print('')
print('The linear SI cancellation is: {:.2f} dB'.format(yTestPower-yTestLinCancPower))
print('The non-linear SI cancellation is: {:.2f} dB'.format(yTestLinCancPower-yTestNonLinCancPower))
print('The noise floor is: {:.2f} dBm'.format(noisePower))
print('The distance from noise floor is: {:.2f} dB'.format(yTestNonLinCancPower-noisePower))


### Export Results

In [None]:
if EXPORT_RESULTS:
    file_name = PATH_EXPORT_RESULTS + 'data_' + DATA_TYPE + '_model_' + MODEL_TYPE + '_linSIC_' + ('yes' if LIN_PREPROCESSING else 'no') + '.pkl'
    confirm = input(f'Export as {file_name}? (yes/no)')
    if confirm == 'yes':
        with open(file_name, 'wb') as f:
            pickle.dump({'y_test': y_test_batched_orig[0, 0, CHANNEL_LEN-1:, 0], 'y_test_lin': y_test_lin[CHANNEL_LEN-1:], 'y_test_nl': yPred/np.sqrt(yVarTest), 'noise': noise, 'yVar': yVarTest, 'chanLen': CHANNEL_LEN}, f)
        print('File saved.')
    else:
        print('File not saved.')