In [1]:
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LeakyReLU, Input, Normalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
from tensorflow.keras.models import Model
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view

### Loading & Processing input data after extraction from the .mlx script

In [3]:
df_train = pd.read_csv('train.txt')
df_val = pd.read_csv('val.txt')
df_test = pd.read_csv('test.txt')

In [4]:
# train
df_train['X_real'] = df_train['Var1_2'].str.extract(r'([+-]?\d+\.\d+(?:[eE][+-]?\d+)?)').astype(float)
df_train['X_img'] = df_train['Var1_2'].str.extract(r'([+-]?\d+\.\d+(?:[eE][+-]?\d+)?)i').astype(float)
df_train['y_real'] = df_train['Var1_1'].str.extract(r'([+-]?\d+\.\d+(?:[eE][+-]?\d+)?)').astype(float)
df_train['y_img'] = df_train['Var1_1'].str.extract(r'([+-]?\d+\.\d+(?:[eE][+-]?\d+)?)i').astype(float)

# test
df_test['X_real'] = df_test['Var1_2'].str.extract(r'([+-]?\d+\.\d+(?:[eE][+-]?\d+)?)').astype(float)
df_test['X_img'] = df_test['Var1_2'].str.extract(r'([+-]?\d+\.\d+(?:[eE][+-]?\d+)?)i').astype(float)
df_test['y_real'] = df_test['Var1_1'].str.extract(r'([+-]?\d+\.\d+(?:[eE][+-]?\d+)?)').astype(float)
df_test['y_img'] = df_test['Var1_1'].str.extract(r'([+-]?\d+\.\d+(?:[eE][+-]?\d+)?)i').astype(float)

# val
df_val['X_real'] = df_val['Var1_2'].str.extract(r'([+-]?\d+\.\d+(?:[eE][+-]?\d+)?)').astype(float)
df_val['X_img'] = df_val['Var1_2'].str.extract(r'([+-]?\d+\.\d+(?:[eE][+-]?\d+)?)i').astype(float)
df_val['y_real'] = df_val['Var1_1'].str.extract(r'([+-]?\d+\.\d+(?:[eE][+-]?\d+)?)').astype(float)
df_val['y_img'] = df_val['Var1_1'].str.extract(r'([+-]?\d+\.\d+(?:[eE][+-]?\d+)?)i').astype(float)

In [5]:
X_train = df_train[['X_real','X_img']].to_numpy()
y_train = df_train[['y_real','y_img']].to_numpy()

X_test = df_test[['X_real','X_img']].to_numpy()
y_test = df_test[['y_real','y_img']].to_numpy()

X_val = df_val[['X_real','X_img']].to_numpy()
y_val = df_val[['y_real','y_img']].to_numpy()

In [6]:
def preprocess(arr, n_previous = 4):
    """
    Preprocesses the input array by computing the L1 norm, adding padding, 
    and creating sliding windows.

    Parameters:
    -----------
    arr : numpy.ndarray
        A 2D array of shape n*2 where each row represents a data sample, two columns for real & imaginary parts of the signal.
    n_previous : int, optional
        The number of previous samples to include in each window (default is 4).

    Returns:
    --------
    windows : numpy.ndarray
        A 2D array where each row contains a flattened window of `n_previous + 1` 
        consecutive samples from the padded array, including the computed L1 norm 
        for each original sample.
    """
    array_with_l1_norm  = np.hstack((arr, np.sum(np.abs(arr),axis=1)[:, np.newaxis]))

    padding = np.zeros((n_previous, array_with_l1_norm.shape[1]))
    padded_array = np.vstack((padding, array_with_l1_norm ))

    windows = sliding_window_view(padded_array, window_shape=(n_previous + 1, array_with_l1_norm.shape[1]))
    windows = windows.reshape(windows.shape[0], -1)
    
    return windows

In [7]:
# applying the additional processing
X_train = preprocess(X_train)
X_test = preprocess(X_test)
X_val = preprocess(X_val)


In [8]:
X_train.shape

(131520, 15)

In [12]:
def helperNMSE(y_true, y_pred):
    
    diff = y_pred - y_true
    mse = tf.reduce_mean(tf.norm(diff,axis=1)**2) # NOTE THIS IS NOT A GOOD PRACICE AS TF.NORM ALREADY GETS THE SQUARED ERROR THEN SQRT IT
    factor = tf.reduce_mean(tf.norm(y_true,axis=1)**2)
    # nmse = 10 * tf.math.log(mse / factor) / tf.math.log(tf.constant(10,dtype=tf.float32))
    
    return mse

def custom_loss(y_true, y_pred):
    # Compute the L1 norms
    norm_true = tf.reduce_sum(tf.abs(y_true), axis=1)
    norm_pred = tf.reduce_sum(tf.abs(y_pred), axis=1)
    
    # Reshape to be compatible with the helperNMSE function
    norm_true = tf.reshape(norm_true, (-1, 1))
    norm_pred = tf.reshape(norm_pred, (-1, 1))
    
    # Compute the NMSE
    loss = helperNMSE(norm_true, norm_pred)
    return loss

### Neural Network Architecture

In [None]:
# defining layers
n_nerurons = 30
factor = 0.8

input_layer = Input(shape=(X_train.shape[1],))
dense_layer_1 = Dense(units = n_nerurons, activation = LeakyReLU(alpha=0.01))(input_layer) 
dense_layer_2 = Dense(units = n_nerurons*factor, activation = LeakyReLU(alpha=0.01))(dense_layer_1)
dense_layer_3 = Dense(units = n_nerurons*(factor**2), activation = LeakyReLU(alpha=0.01))(dense_layer_2)


#Y1 output (real part of the signal)
y1_output = Dense(units = 1, activation = "linear", name = "y1_output")(dense_layer_3)

#Y2 output (imaginary part of the signal)
y2_output = Dense(units = 1, activation = "linear", name = "y2_output")(dense_layer_3)

#Define the model with the input layer and a list of outputs
model = Model(inputs = input_layer, outputs = [y1_output, y2_output])


reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.95, patience=5, verbose=1, mode='auto')
early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='auto')
checkpoint_callback = ModelCheckpoint(
    filepath='bestmodel.keras',
    monitor='val_loss',       # Metric to monitor
    save_best_only=True,      # Only save the model if it is the best
    verbose=0                 # Verbosity mode
)



#specify the optimizer and compile with the loss function for both outputs
optimizer = tf.keras.optimizers.Adam(learning_rate=4e-4)

model.compile(optimizer = optimizer,
              loss = {'y1_output':'mse','y2_output':'mse'},
             )


### Model Training

In [18]:
# Define training parameters
maxEpochs = 700 
miniBatchSize = 1024
iterPerEpoch = len(X_train) // miniBatchSize
validation_freq = 2 * iterPerEpoch


history = model.fit(X_train, (y_train.y_real, y_train.y_img), epochs = maxEpochs, batch_size = 10,
                    validation_data = (X_val, (y_val.y_real, y_val.y_img)))


# Train the model
history = model.fit(
    X_train, (y_train[:,0], y_train[:,1]),
    epochs=maxEpochs,
    batch_size=miniBatchSize,
    callbacks=[checkpoint_callback,reduce_lr],
    validation_data = (X_val, (y_val[:,0], y_val[:,1])),
    shuffle=True,
)

In [None]:
# saving the model
model.save('tf_model.h5')

In [19]:
# after many training attemps, the best model achieved -33.55dB accuracy, thus loaded here
model.load_weights('tf_model_33_5dB_nprev_4.h5')

### Applying the model to the y_test data and saving the output for input into the simulated PA in the .mlx script, as showing in the following test bench


![model test bench](tb.png)


In [None]:
# output processing
tf_output = model.predict(preprocess(y_test))
tf_output = np.concatenate((tf_output[0],tf_output[1]),axis=1)

In [27]:
arr = []

for i in range(len(tf_output)):
    arr.append(f'{tf_output[i][0]} + {tf_output[i][1]}i')


In [28]:
pd.Series(arr).to_csv('tf_model_output_test.csv',index=False,header=False)


Exporting the model output to .mat

In [29]:
import numpy as np
import scipy.io

# Function to parse complex numbers from the given format
def parse_complex_number(s):
    real, imag = s.split(' + ')
    real = float(real)
    imag = float(imag.replace('i', ''))
    return np.complex64(real + 1j * imag)

# Read the CSV file
filename = './tf_model_output_test.csv'  # Replace with your CSV filename
with open(filename, 'r') as file:
    lines = file.readlines()

# Parse the lines into complex numbers
complex_numbers = np.array([parse_complex_number(line.strip()) for line in lines], dtype=np.complex64)

# Save the array to a .mat file
output_filename = 'complex_data.mat'
scipy.io.savemat(output_filename, {'complex_data': complex_numbers})
