In [1]:
import numpy as np
import torch
import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'

# Microsoft Data

In [2]:
def read_data(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()
    
    input_data = []
    target_data = []
    
    for i, line in enumerate(lines):
        if i % 2 == 0:  # Input data (rows 1, 3, 5, 7, ...)
            input_data.append(float(line.strip()))
        else:          # Target data (rows 2, 4, 6, 8, ...)
            target_data.append([float(x) for x in line.strip().split()])

    # Convert to NumPy arrays and reshape
    input_array = np.array(input_data).reshape(-1, 1)
    target_array = np.array(target_data).reshape(-1, 2)
    
    return input_array, target_array

def min_max_normalize(data, data_min=None, data_max=None):
    if data_min is None:
        data_min = np.min(data, axis=0)
    if data_max is None:
        data_max = np.max(data, axis=0)
    return (data - data_min) / (data_max - data_min), data_min, data_max

train_in, train_target = read_data('fftdata5/aggregated_train.txt')
print("input:", train_in.shape, "target:", train_target.shape)
train_in = torch.from_numpy(train_in).float()
train_target = torch.from_numpy(train_target).float()

test_in, test_target = read_data('fftdata5/aggregated_test.txt')
print("input:", test_in.shape, "target:", test_target.shape)
test_in = torch.from_numpy(test_in).float()
test_target = torch.from_numpy(test_target).float()

input: (2027, 1) target: (2027, 2)
input: (225, 1) target: (225, 2)


# custom data

In [30]:
import numpy as np

# Define the parameters of the signal
N = 1000  # number of sample points
T = 1.0 / 1000.0  # sample spacing
x = np.linspace(0.0, N*T, N, endpoint=False)

# Generate a sine wave signal with amplitude less than 1
y = 0.5 * np.sin(2.0 * np.pi * x) + 0.25 * np.sin(4.0 * np.pi * x)

# Compute the DFT of the signal
yf = np.fft.fft(y)

# Compute the corresponding frequencies
xf = np.fft.fftfreq(N, T)[:N//2]

# Now, let's create the input and target data
input_data = xf.reshape(-1, 1)
target_data = np.stack((np.abs(yf[:N//2].real), np.abs(yf[:N//2].imag)), axis=-1)

# normalize the input data and target data in rage [0, 1]
input_data = (input_data - np.min(input_data)) / (np.max(input_data) - np.min(input_data))
target_data = (target_data - np.min(target_data)) / (np.max(target_data) - np.min(target_data))

# check the min and max of input_data and target_data
print('input_data min: ', np.min(input_data), 'max: ', np.max(input_data), 'shape: ', input_data.shape)

# split train and test data use traintetst split, then make them to torch tensor
from sklearn.model_selection import train_test_split
train_in, test_in, train_target, test_target = train_test_split(input_data, target_data, test_size=0.1, random_state=42)
train_in, test_in = torch.from_numpy(train_in).float(), torch.from_numpy(test_in).float()
train_target, test_target = torch.from_numpy(train_target).float(), torch.from_numpy(test_target).float()

input_data min:  0.0 max:  1.0 shape:  (500, 1)


# Clamp -1,1

In [54]:
# create a NN, all FCs, 1 input, 2 output, layer nodes is (1,4,4,2), using tanh() as activation function
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import math

def calculate_error(orig_lines, nn_lines):
    # python version of average relative error between two sets of complex numbers
    abs_error = 0
    for i in range(len(orig_lines)):
        orig_real, orig_imag = orig_lines[i]
        nn_real, nn_imag = nn_lines[i]

        diff_real = orig_real - nn_real
        diff_imag = orig_imag - nn_imag

        nominator = math.sqrt(diff_real * diff_real + diff_imag * diff_imag)
        denominator = math.sqrt(orig_real * orig_real + orig_imag * orig_imag)

        if denominator == 0 or math.isnan(nominator) or math.isnan(denominator):
            e = 1.0
        else:
            e = min(nominator / denominator, 1.0)

        abs_error += e

    return abs_error / float(len(orig_lines))

class dianet(nn.Module):
    def __init__(self):
        super(dianet, self).__init__()
        self.fc1 = nn.Linear(1, 2)
        self.fc2 = nn.Linear(2, 3)
        self.fc3 = nn.Linear(3, 4)
        self.fc4 = nn.Linear(4, 3)
        self.fc5 = nn.Linear(3, 2)

        self.msk2 = np.array([[1,0],[1,1],[0,1]])
        self.msk3 = np.array([[1,0,0],[1,1,0],[0,1,1],[0,0,1]])
        self.msk4 = np.array([[1,1,0,0],[0,1,1,0],[0,0,1,1]])
        self.msk5 = np.array([[1,1,0],[0,1,1]])

    def forward(self, x, dwn=-1, up=1):
        self.fc1.weight.data = torch.clamp(self.fc1.weight.data, dwn, up)
        self.fc1.bias.data = torch.clamp(self.fc1.bias.data, dwn, up)
        x1 = torch.tanh(self.fc1(x))
        # x1 = F.leaky_relu(self.fc1(x), negative_slope=0.125)

        self.fc2.weight.data *= torch.from_numpy(self.msk2).float()
        self.fc2.weight.data = torch.clamp(self.fc2.weight.data, dwn, up)
        self.fc2.bias.data = torch.clamp(self.fc2.bias.data, dwn, up)
        x2 = torch.tanh(self.fc2(x1))
        # x2 = F.leaky_relu(self.fc2(x1), negative_slope=0.125)

        self.fc3.weight.data *= torch.from_numpy(self.msk3).float()
        self.fc3.weight.data = torch.clamp(self.fc3.weight.data, dwn, up)
        self.fc3.bias.data = torch.clamp(self.fc3.bias.data, dwn, up)
        x3 = torch.tanh(self.fc3(x2))
        x3c = torch.cat((x3[:,0:1], x3[:,1:-1]+x1, x3[:,-1:]), 1) # (n, 4)

        self.fc4.weight.data *= torch.from_numpy(self.msk4).float()
        self.fc4.weight.data = torch.clamp(self.fc4.weight.data, dwn, up)
        self.fc4.bias.data = torch.clamp(self.fc4.bias.data, dwn, up)
        x4 = torch.tanh(self.fc4(x3c))
        x4c = x4 + x2 # (n, 3)

        self.fc5.weight.data *= torch.from_numpy(self.msk5).float()
        self.fc5.weight.data = torch.clamp(self.fc5.weight.data, dwn, up)
        self.fc5.bias.data = torch.clamp(self.fc5.bias.data, dwn, up)
        x5 = self.fc5(x4c)
        x5c = x5 + x3[:,1:-1] # (n, 2)

        return x5c

# train the NN
import torch.optim as optim

# model = baseline() # test_loss 0.0001585180580150336; error:  0.015001012789398081
model = dianet() # test_loss 0.001; error:  0.0401
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.MSELoss()

minloss = 999
for epoch in range(1000):
    epoch_loss = 0
    count = 0
    batchsize = 16
    for i in range(0, len(train_in), batchsize):
        optimizer.zero_grad()
        input = train_in[i:i+batchsize]
        tar = train_target[i:i+batchsize]
        output = model(input, -1, 1)
        loss = loss_fn(output, tar)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        count += 1
    epoch_loss /= count
    
    # test the model
    model.eval()
    test_output = model(test_in, -1, 1)
    test_loss = loss_fn(test_output, test_target).item()
    if test_loss < minloss:
        minloss = test_loss
        torch.save(model.state_dict(), 'fftdata5/dianet-11.pt')
        print('saved model at epoch: ', epoch, 'test_loss: ', test_loss)


saved model at epoch:  0 test_loss:  0.3514707088470459
saved model at epoch:  1 test_loss:  0.32352688908576965
saved model at epoch:  2 test_loss:  0.31682127714157104
saved model at epoch:  3 test_loss:  0.3063015937805176
saved model at epoch:  4 test_loss:  0.29004767537117004
saved model at epoch:  5 test_loss:  0.2653801739215851
saved model at epoch:  6 test_loss:  0.2294348180294037
saved model at epoch:  7 test_loss:  0.18161407113075256
saved model at epoch:  8 test_loss:  0.13233450055122375
saved model at epoch:  9 test_loss:  0.09806369245052338
saved model at epoch:  10 test_loss:  0.08126118034124374
saved model at epoch:  11 test_loss:  0.07129648327827454
saved model at epoch:  12 test_loss:  0.06704793870449066
saved model at epoch:  13 test_loss:  0.06515732407569885
saved model at epoch:  14 test_loss:  0.0642089769244194
saved model at epoch:  15 test_loss:  0.06351149082183838
saved model at epoch:  16 test_loss:  0.06301552057266235
saved model at epoch:  17 tes

In [55]:
def calculate_error_rate(test_output, test_target):
    # Calculate the absolute error
    absolute_error = np.abs(test_output - test_target)

    # Calculate the mean absolute percentage error
    mape = np.mean(absolute_error / np.abs(test_target)) * 100

    return mape

model = dianet()
model.load_state_dict(torch.load('fftdata5/dianet-11.pt'))
model.eval()
test_output = model(test_in)
test_loss = loss_fn(test_output, test_target).item()
print('test_loss', test_loss)

test_output_np = test_output.detach().numpy()
test_target_np = test_target.detach().numpy()

err = calculate_error(test_target_np, test_output_np)
print('error: ', err)

test_loss 0.0022081208880990744
error:  0.060688020657458604


# Clamp 0,1

In [16]:
# create a NN, all FCs, 1 input, 2 output, layer nodes is (1,4,4,2), using tanh() as activation function
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import math

def calculate_error(orig_lines, nn_lines):
    abs_error = 0
    for i in range(len(orig_lines)):
        orig_real, orig_imag = orig_lines[i]
        nn_real, nn_imag = nn_lines[i]

        diff_real = orig_real - nn_real
        diff_imag = orig_imag - nn_imag

        nominator = math.sqrt(diff_real * diff_real + diff_imag * diff_imag)
        denominator = math.sqrt(orig_real * orig_real + orig_imag * orig_imag)

        if denominator == 0 or math.isnan(nominator) or math.isnan(denominator):
            e = 1.0
        else:
            e = min(nominator / denominator, 1.0)

        abs_error += e

    return abs_error / float(len(orig_lines))

class dianet(nn.Module):
    def __init__(self):
        super(dianet, self).__init__()
        self.fc1 = nn.Linear(1, 2)
        self.fc2 = nn.Linear(2, 3)
        self.fc3 = nn.Linear(3, 2)

        self.msk2 = np.array([[1,0],[1,1],[0,1]])
        self.msk3 = np.array([[1,1,0],[0,1,1]])

    def forward(self, x):
        self.fc1.weight.data = torch.clamp(self.fc1.weight.data, 0, 1)
        self.fc1.bias.data = torch.clamp(self.fc1.bias.data, 0, 1)
        x1 = torch.tanh(self.fc1(x))
        # x1 = F.leaky_relu(self.fc1(x), negative_slope=0.125)

        self.fc2.weight.data *= torch.from_numpy(self.msk2).float()
        self.fc2.weight.data = torch.clamp(self.fc2.weight.data, 0, 1)
        self.fc2.bias.data = torch.clamp(self.fc2.bias.data, 0, 1)
        x2 = torch.tanh(self.fc2(x1))
        # x2 = F.leaky_relu(self.fc2(x1), negative_slope=0.125)

        self.fc3.weight.data *= torch.from_numpy(self.msk3).float()
        self.fc3.weight.data = torch.clamp(self.fc3.weight.data, 0, 1)
        self.fc3.bias.data = torch.clamp(self.fc3.bias.data, 0, 1)
        x3 = self.fc3(x2)
        x3 = x3+x1

        return x3


import torch.optim as optim

# model = baseline() # test_loss 0.0001585180580150336; error:  0.015001012789398081
model = dianet() # test_loss 0.001; error:  0.0401
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.MSELoss()

minloss = 999
for epoch in range(1000):
    epoch_loss = 0
    count = 0
    batchsize = 16
    for i in range(0, len(train_in), batchsize):
        optimizer.zero_grad()
        input = train_in[i:i+batchsize]
        tar = train_target[i:i+batchsize]
        output = model(input)
        loss1 = loss_fn(output[:,0:1], tar[:,0:1])
        loss2 = loss_fn(output[:,1:2], tar[:,1:2])
        loss = loss1 + loss2
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        count += 1
    epoch_loss /= count
    
    if epoch_loss < minloss:
        minloss = epoch_loss
        torch.save(model.state_dict(), 'fftdata4/dianet_0_1.pth')
        print('saved at epoch', epoch, 'epoch_loss', epoch_loss)

# test using test_in and test_target
model.load_state_dict(torch.load('fftdata4/dianet_0_1.pth'))
model.eval()
test_pred = model(test_in)
test_loss = loss_fn(test_pred, test_target)
print('test_loss: MSE: ', test_loss.item())
# calculate error
test_pred = test_pred.detach().numpy()
test_target = test_target.detach().numpy()
print('error: ', calculate_error(test_target, test_pred))


saved at epoch 0 epoch_loss 0.41202271382013955
saved at epoch 1 epoch_loss 0.3917871614297231
saved at epoch 2 epoch_loss 0.3767745872338613
saved at epoch 3 epoch_loss 0.3647073984146118
saved at epoch 4 epoch_loss 0.3548909107844035
saved at epoch 5 epoch_loss 0.3468308349450429
saved at epoch 6 epoch_loss 0.34014036059379577
saved at epoch 7 epoch_loss 0.3345129152139028
saved at epoch 8 epoch_loss 0.32970499992370605
saved at epoch 9 epoch_loss 0.32552324334780375
saved at epoch 10 epoch_loss 0.32181506355603534
saved at epoch 11 epoch_loss 0.3184611439704895
saved at epoch 12 epoch_loss 0.3153696109851201
saved at epoch 13 epoch_loss 0.31247095863024393
saved at epoch 14 epoch_loss 0.30971385836601256
saved at epoch 15 epoch_loss 0.3070603092511495
saved at epoch 16 epoch_loss 0.3044837127129237
saved at epoch 17 epoch_loss 0.30196021298567455
saved at epoch 18 epoch_loss 0.29946885307629906
saved at epoch 19 epoch_loss 0.297007617354393
saved at epoch 20 epoch_loss 0.29456921915

# no clamp

In [5]:
# create a NN, all FCs, 1 input, 2 output, layer nodes is (1,4,4,2), using tanh() as activation function
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import math

def calculate_error(orig_lines, nn_lines):
    # python version of average relative error between two sets of complex numbers
    abs_error = 0
    for i in range(len(orig_lines)):
        orig_real, orig_imag = orig_lines[i]
        nn_real, nn_imag = nn_lines[i]

        diff_real = orig_real - nn_real
        diff_imag = orig_imag - nn_imag

        nominator = math.sqrt(diff_real * diff_real + diff_imag * diff_imag)
        denominator = math.sqrt(orig_real * orig_real + orig_imag * orig_imag)

        if denominator == 0 or math.isnan(nominator) or math.isnan(denominator):
            e = 1.0
        else:
            e = min(nominator / denominator, 1.0)

        abs_error += e

    return abs_error / float(len(orig_lines))

class dianet(nn.Module):
    def __init__(self):
        super(dianet, self).__init__()
        self.fc1 = nn.Linear(1, 2)
        self.fc2 = nn.Linear(2, 3)
        self.fc3 = nn.Linear(3, 4)
        self.fc4 = nn.Linear(4, 3)
        self.fc5 = nn.Linear(3, 2)

        self.msk2 = np.array([[1,0],[1,1],[0,1]])
        self.msk3 = np.array([[1,0,0],[1,1,0],[0,1,1],[0,0,1]])
        self.msk4 = np.array([[1,1,0,0],[0,1,1,0],[0,0,1,1]])
        self.msk5 = np.array([[1,1,0],[0,1,1]])

    def forward(self, x, dwn=-1, up=1):
        self.fc1.weight.data = torch.clamp(self.fc1.weight.data, dwn, up)
        self.fc1.bias.data = torch.clamp(self.fc1.bias.data, dwn, up)
        x1 = torch.tanh(self.fc1(x))
        # x1 = F.leaky_relu(self.fc1(x), negative_slope=0.125)

        self.fc2.weight.data *= torch.from_numpy(self.msk2).float()
        self.fc2.weight.data = torch.clamp(self.fc2.weight.data, dwn, up)
        self.fc2.bias.data = torch.clamp(self.fc2.bias.data, dwn, up)
        x2 = torch.tanh(self.fc2(x1))
        # x2 = F.leaky_relu(self.fc2(x1), negative_slope=0.125)

        self.fc3.weight.data *= torch.from_numpy(self.msk3).float()
        self.fc3.weight.data = torch.clamp(self.fc3.weight.data, dwn, up)
        self.fc3.bias.data = torch.clamp(self.fc3.bias.data, dwn, up)
        x3 = torch.tanh(self.fc3(x2))
        x3c = torch.cat((x3[:,0:1], x3[:,1:-1]+x1, x3[:,-1:]), 1) # (n, 4)

        self.fc4.weight.data *= torch.from_numpy(self.msk4).float()
        self.fc4.weight.data = torch.clamp(self.fc4.weight.data, dwn, up)
        self.fc4.bias.data = torch.clamp(self.fc4.bias.data, dwn, up)
        x4 = torch.tanh(self.fc4(x3c))
        x4c = x4 + x2 # (n, 3)

        self.fc5.weight.data *= torch.from_numpy(self.msk5).float()
        self.fc5.weight.data = torch.clamp(self.fc5.weight.data, dwn, up)
        self.fc5.bias.data = torch.clamp(self.fc5.bias.data, dwn, up)
        x5 = self.fc5(x4c)
        x5c = x5 + x3[:,1:-1] # (n, 2)

        return x5c

# train the NN
import torch.optim as optim

# model = baseline() # test_loss 0.0001585180580150336; error:  0.015001012789398081
model = dianet() # test_loss 0.001; error:  0.0401
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.MSELoss()

minloss = 999
for epoch in range(1000):
    epoch_loss = 0
    count = 0
    batchsize = 16
    for i in range(0, len(train_in), batchsize):
        optimizer.zero_grad()
        input = train_in[i:i+batchsize]
        tar = train_target[i:i+batchsize]
        output = model(input, -1000, 1000)
        loss = loss_fn(output, tar)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        count += 1
    epoch_loss /= count
    
    # test the model
    model.eval()
    test_output = model(test_in, -1000, 1000)
    test_loss = loss_fn(test_output, test_target).item()
    if test_loss < minloss:
        minloss = test_loss
        torch.save(model.state_dict(), 'fftdata5/dianet-nolimit.pt')
        print('saved model at epoch: ', epoch, 'test_loss: ', test_loss)


saved model at epoch:  0 test_loss:  0.41120296716690063
saved model at epoch:  1 test_loss:  0.3376537263393402
saved model at epoch:  2 test_loss:  0.33518171310424805
saved model at epoch:  3 test_loss:  0.33127909898757935
saved model at epoch:  4 test_loss:  0.32454174757003784
saved model at epoch:  5 test_loss:  0.3117678165435791
saved model at epoch:  6 test_loss:  0.28475257754325867
saved model at epoch:  7 test_loss:  0.2257520854473114
saved model at epoch:  8 test_loss:  0.12550994753837585
saved model at epoch:  9 test_loss:  0.06142571195960045
saved model at epoch:  10 test_loss:  0.056578174233436584
saved model at epoch:  11 test_loss:  0.05590280145406723
saved model at epoch:  12 test_loss:  0.055302608758211136
saved model at epoch:  13 test_loss:  0.054705724120140076
saved model at epoch:  14 test_loss:  0.05407021567225456
saved model at epoch:  15 test_loss:  0.05337583273649216
saved model at epoch:  16 test_loss:  0.052610401064157486
saved model at epoch:  

In [6]:
def calculate_error_rate(test_output, test_target):
    # Calculate the absolute error
    absolute_error = np.abs(test_output - test_target)

    # Calculate the mean absolute percentage error
    mape = np.mean(absolute_error / np.abs(test_target)) * 100

    return mape

model = dianet()
model.load_state_dict(torch.load('fftdata5/dianet-nolimit.pt'))
model.eval()
test_output = model(test_in, -1000, 1000)
test_loss = loss_fn(test_output, test_target).item()
print('test_loss', test_loss)

test_output_np = test_output.detach().numpy()
test_target_np = test_target.detach().numpy()

err = calculate_error(test_target_np, test_output_np)
print('error: ', err)

test_loss 5.426608822745038e-06
error:  0.0027790435759565573


# custom data