In [1]:
import scipy.io
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, random_split
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.nn.utils import prune

import numpy as np
import matplotlib.pyplot as plt
import copy

plt.rcParams['font.family'] = 'Bookman Old Style'


class dataset:
    def __init__(self, x, y, num_training_points=5000, num_memory_levels =3):
        self.num_training_points = num_training_points
        self.num_memory_levels = num_memory_levels # If there are 0 memory taps, this is 1
        self.x = x
        self.y = y

        self.model_training_input, self.model_training_output, self.training_phase, self.model_valid_input, self.model_valid_output, self.valid_phase = self.prepare_data()

    def phase_vector(self, x):    
        """Takes a vector x and returns a vector of phases of each element"""
        Ax = np.abs(x)
        return np.conj(x)/Ax

    def model_expected_output(self, y, phase):
        """Take in data, phase normalised it, and trim. Return as IQ seperately"""
        y_denorm = y*phase
        y_denorm_trim = y_denorm[self.num_memory_levels:]
        return np.array([np.real(y_denorm_trim), np.imag(y_denorm_trim)]).T

    def build_xfc(self, x, num_memory_levels):
        """
        Replicates the MATLAB build_xfc() function.
        """
        num_points = len(x)
        phase = self.phase_vector(x)
        I = np.real(x)
        Q = np.imag(x)

        # Phase-normalized data
        phase_norm_data = np.zeros((num_points, num_memory_levels), dtype=complex)
        for n in range(num_memory_levels, num_points):
            for m in range(num_memory_levels):
                phase_norm_data[n, m] = x[n - m - 1] * phase[n]

        # Ax magnitude feature
        Ax = np.sqrt(I**2 + Q**2)

        # Build A feature matrix (Ax memory taps)
        A_feats = np.zeros((num_points, num_memory_levels))
        for n in range(num_memory_levels, num_points):
            for m in range(num_memory_levels):
                A_feats[n, m] = Ax[n - m]

        # Trim first num_memory_levels samples (as in MATLAB)
        phase_norm_data = phase_norm_data[num_memory_levels:, :]
        A_feats = A_feats[num_memory_levels:, :]
        A3_feats = A_feats ** 3

        # Combine real and imaginary phase-normalized parts with A-features
        xfc = np.hstack([
            np.real(phase_norm_data),
            np.imag(phase_norm_data),
            A_feats,
            A3_feats
        ]).astype(np.float32)

        return xfc

    def build_x_matrix(self, x, num_mem_levels, num_nl_orders):
        """Build Matrix X for find Volterra Model"""
        num_points = len(x)
        X = np.zeros((num_points, num_mem_levels * num_nl_orders), dtype=np.complex128)
        
        for n in range(num_mem_levels - 1, num_points):
            col = 0
            for i in range(num_mem_levels):
                xi = x[n - i]
                for j in range(num_nl_orders):
                    X[n, col] = (abs(xi) ** ((j) * 2)) * xi
                    col += 1

        return X

    def build_y(self, u, A, num_mem_levels, num_nl_orders):
        """Builds y, the output of the volterra Model. Trims Output"""
        num_points = len(u)
        y = np.zeros((num_points, 1), dtype=np.complex128)
        for n in range(num_mem_levels - 1, num_points):
            col = 0 
            for i in range(num_mem_levels):
                ui = u[n-i]
                for j in range(num_nl_orders):
                    y[n]= y[n] + A[col]*(abs(ui)**(j*2)*ui)
                    col += 1
        y = y[self.num_memory_levels:]
        return y
            

    def volterra(self,num_nl_orders):
        """Build component matrix A"""
        X = self.build_x_matrix(self.model_training_input, self.num_memory_levels, num_nl_orders)
        
        X_trim = X[self.num_memory_levels:, :]
        y_trim = self.model_training_output[self.num_memory_levels:]
        return np.linalg.pinv(X_trim.conj().T @ X_trim) @ (X_trim.conj().T @ y_trim);

    def training_data(self):
        """Assign some data for just training"""
        idx_training = range(0, self.num_training_points -1) # training indices

        model_training_input = self.y[idx_training]
        model_training_output = self.x[idx_training]
        return model_training_input, model_training_output
    

    def validation_data(self):
        """Assign some data for just validation"""
        num_validation_points = self.num_training_points 
        validation_end_index = self.num_training_points + num_validation_points
        idx_validation = range(self.num_training_points, validation_end_index -1) # validation indices
        model_valid_input = self.y[idx_validation]
        model_valid_output = self.x[idx_validation]
        return model_valid_input, model_valid_output

    def prepare_data(self):
        """Prepare training and validation data sets"""
        # Training Data
        model_training_input, model_training_output = self.training_data()
        training_phase = self.phase_vector(model_training_input)

        # Validation Data
        model_valid_input, model_valid_output = self.validation_data()
        valid_phase = self.phase_vector(model_valid_input)
        return (model_training_input, model_training_output, training_phase,
                model_valid_input, model_valid_output, valid_phase)

    def get_model_training_xfc(self):
        """Build xfc from model training input"""
        return self.build_xfc(self.model_training_input, self.num_memory_levels)
    
    def get_model_training_expected_output(self):
        """Find what the model should output for training data"""
        return self.model_expected_output(self.model_training_output, self.training_phase)
    
    def get_valid_xfc(self):
        """Build xfc from model validation input"""
        return self.build_xfc(self.model_valid_input, self.num_memory_levels)
    
    def get_model_valid_expected_output(self):
        """Find what the model should output for validation data"""
        return self.model_expected_output(self.model_valid_output, self.valid_phase)
    
    def get_test_data(self):
        """Get test data after validation"""
        num_validation_points = self.num_training_points 
        validation_end_index = self.num_training_points + num_validation_points
        x_data = self.x[validation_end_index:]
        y_data = self.y[validation_end_index:]
        return x_data, y_data


# Load data
data = scipy.io.loadmat("PA_IO.mat")
x = data["x"].squeeze()
y = data["y"].squeeze()

# Create dataset object
data_obj = dataset(x, y)

# Access training data
model_xfc = data_obj.get_model_training_xfc()
model_training_expected_output = data_obj.get_model_training_expected_output()

# Access validation data
valid_xfc = data_obj.get_valid_xfc()
model_valid_expected_output = data_obj.get_model_valid_expected_output()

# Access test data
x_data, y_data = data_obj.get_test_data()

In [2]:
# Get Volterra Model of PA
num_memory_levels = 3
num_nl_orders = 5
A = data_obj.volterra(num_nl_orders)
print(A)

[ 0.63146409-0.11924996j  0.1694142 -0.01547961j -0.29918448+0.23918013j
  1.27680917-1.02310172j -0.87359925+0.71303688j  0.12086561+0.23694819j
  0.01362539+0.09275574j -0.09982292-0.22003663j  0.45094657+0.5524524j
 -0.35963882-0.32360673j -0.06705462-0.13809442j -0.02614158-0.01855645j
  0.16654034-0.04988737j -0.48981135-0.01449885j  0.34387347+0.01718324j]


In [3]:
# Train NN on backprop PA for inv model

class PNTDNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(PNTDNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

class NN:
    def __init__(self, pntdnn):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.pntdnn = pntdnn
        
    def build_dataloaders(self, x , y):
        X = torch.tensor(x, dtype=torch.float32)
        Y = torch.tensor(y, dtype=torch.float32)
        dataset = TensorDataset(X, Y)
        loader = DataLoader(dataset, batch_size=256, shuffle=True)
        return loader
    
    def train(self, train_loader, valid_loader, num_epochs=400, learning_rate=1e-3):
        criterion = nn.MSELoss()
        optimizer = optim.Adam(self.pntdnn.parameters(), lr=learning_rate)
        
        train_losses = []
        valid_losses = []
        
        for epoch in range(num_epochs):
            self.pntdnn.train()
            running_train_loss = 0
            running_valid_loss = 0
            
            for xb, yb in train_loader:
                optimizer.zero_grad()
                preds = self.pntdnn(xb)
                loss = criterion(preds, yb)
                loss.backward()
                optimizer.step()
                running_train_loss += loss.item() * xb.size(0)
                
            train_loss = running_train_loss
            
            self.pntdnn.eval()
            with torch.no_grad():
                for xb, yb in valid_loader:
                    preds = self.pntdnn(xb)
                    loss = criterion(preds, yb)
                    running_valid_loss += loss.item() * xb.size(0)
                
            valid_loss = running_valid_loss
            
            train_losses.append(train_loss)
            valid_losses.append(valid_loss)
            
            if (epoch + 1) % 10 == 0:
                print(f"Epoch {epoch + 1:3d}/{num_epochs}  Loss={train_loss:.4e}")
        
        return train_losses, valid_losses

    def prune_model(self, prune_amount=0.2):
        pruned_model = copy.deepcopy(self.pntdnn)
        parameters_to_prune = (
            (pruned_model.fc1, 'weight'),
            (pruned_model.fc2, 'weight'),
        )
        prune.global_unstructured(
            parameters_to_prune,
            pruning_method=prune.L1Unstructured,
            amount=prune_amount,
        )
        return pruned_model
    
    def calculate_nmse(self, x, y):
        """Return NMSE in dB"""
        self.pntdnn.eval()
        with torch.no_grad():
            inputs = torch.tensor(x, dtype=torch.float32)
            targets = torch.tensor(y, dtype=torch.float32)
            outputs = self.pntdnn(inputs)
            mse_loss = nn.MSELoss()(outputs, targets).item()
            signal_power = torch.mean(targets ** 2).item()
            nmse = mse_loss / signal_power
            nmse = 10 * np.log10(nmse)
        return nmse
    

# Instantiate and train the model
input_size = model_xfc.shape[1]
hidden_size = 12
output_size = 2
pntdnn = PNTDNN(input_size, hidden_size, output_size)
nn_model = NN(pntdnn)
train_loader = nn_model.build_dataloaders(model_xfc, model_training_expected_output)
valid_loader = nn_model.build_dataloaders(valid_xfc, model_valid_expected_output)
train_losses, valid_losses = nn_model.train(train_loader, valid_loader)
print(nn_model.calculate_nmse(model_xfc, model_training_expected_output))

Epoch  10/400  Loss=3.6815e+00
Epoch  20/400  Loss=8.7692e-01
Epoch  20/400  Loss=8.7692e-01
Epoch  30/400  Loss=5.5562e-01
Epoch  30/400  Loss=5.5562e-01
Epoch  40/400  Loss=3.6344e-01
Epoch  40/400  Loss=3.6344e-01
Epoch  50/400  Loss=2.4845e-01
Epoch  50/400  Loss=2.4845e-01
Epoch  60/400  Loss=1.9311e-01
Epoch  60/400  Loss=1.9311e-01
Epoch  70/400  Loss=1.6399e-01
Epoch  70/400  Loss=1.6399e-01
Epoch  80/400  Loss=1.4414e-01
Epoch  80/400  Loss=1.4414e-01
Epoch  90/400  Loss=1.3115e-01
Epoch  90/400  Loss=1.3115e-01
Epoch 100/400  Loss=1.2224e-01
Epoch 100/400  Loss=1.2224e-01
Epoch 110/400  Loss=1.1700e-01
Epoch 110/400  Loss=1.1700e-01
Epoch 120/400  Loss=1.1420e-01
Epoch 120/400  Loss=1.1420e-01
Epoch 130/400  Loss=1.1159e-01
Epoch 130/400  Loss=1.1159e-01
Epoch 140/400  Loss=1.1289e-01
Epoch 140/400  Loss=1.1289e-01
Epoch 150/400  Loss=1.0968e-01
Epoch 150/400  Loss=1.0968e-01
Epoch 160/400  Loss=1.0839e-01
Epoch 160/400  Loss=1.0839e-01
Epoch 170/400  Loss=1.1050e-01
Epoch 17

In [4]:
class evaluation:
    def __init__(self, x, y):
        # Evalulate the performance of this model if this is the input and ouput data
        self.x = x
        self.y = y
        print(self.find_ACLR())

    def find_ACLR(self, fs = 800e6, bw_main_ch = 200e6, n_sub_ch = 10, nperseg = 2560):
        """Calculate the ACLR of the model for this dataset"""

        #Find power spectral density of output
        frequencies, psd = self.power_spectrum(self.y, fs=fs, nperseg=nperseg)
        # Compute the left and right index of the main channel
        index_left = np.min(np.where(frequencies>= -bw_main_ch/2))
        index_right = np.max(np.where(frequencies<= bw_main_ch/2))

        # Compute the length in index of each subchannel
        sub_ch_index_len = int((index_right - index_left) / n_sub_ch)

        # Compute the power of each subchannel and find the maximum power
        sub_ch_power = np.zeros((n_sub_ch))
        for c in range(n_sub_ch):
            sub_ch_power[c] = np.sum(
                psd[index_left + c * sub_ch_index_len:index_left + (c + 1) * sub_ch_index_len])
        max_sub_ch_power = sub_ch_power.max()

        # Compute ACLR for left and right adjacent channels
        left_side_ch_power = np.sum(psd[index_left - sub_ch_index_len:index_left])
        aclr_left = np.mean(10 * np.log10(left_side_ch_power / max_sub_ch_power))
        right_side_channel_power = np.sum(psd[index_right:index_right + sub_ch_index_len])
        aclr_right = np.mean(10 * np.log10(right_side_channel_power / max_sub_ch_power))

        return aclr_left, aclr_right

    def power_spectrum(self, complex_signal, fs=800e6, nperseg=2560):
        """Calculate power spectrum of complex signal"""
        from scipy.signal import welch

        f, Pxx = welch(complex_signal, fs=fs, nperseg=nperseg, return_onesided=False)
        Pxx = np.fft.fftshift(Pxx)
        f = np.fft.fftshift(f)
        return f, Pxx
        
