# hw4 - NN pyTorch

Во всем задании вы должны работать со входом из строк размером (длиной) 1024

In [185]:
import numpy as np
import pandas as pd
from scipy import signal
import scipy as sc
import matplotlib.pyplot as plt
import random
import itertools
from inspect import signature
from tqdm import tqdm

In [43]:
import torch
from sklearn.model_selection import train_test_split

## Часть 1 Фурье

### Подготовка данных

Сгенерите данные (сигнал) любым пакетом для численного преобразования Фурье, [например](https://numpy.org/doc/stable/reference/generated/numpy.fft.fft.html#numpy.fft.fft)

In [49]:
t = np.arange(1024)

In [55]:
def generate_random_signal(function, parameters_grid, t):
    generated_parameters = {}
    for param, values in parameters_grid.items():
        if values == 'float':
            param_value = random.uniform(-1000.0, 1000.0)
        elif values == 'int':
            param_value = random.randint(-1000, 1000)
        elif values[:4] == 'arr(':
            par_str = values[4:-1]
            start, end, *param_lens = list(map(int, par_str.split(',')))
            if len(param_lens) == 1:
                param_len = param_lens[0]
            else:
                param_len = random.randint(param_len[0], param_len[1])
            if param_len != 1:
                param_value = [random.uniform(start, end) for i in range(param_len)]
            else:
                param_value = random.uniform(start, end)
        elif values == 'int+float':
            rand_float = random.uniform(-1000.0, 1000.0)
            rand_int = random.randint(-1000, 1000)
            param_value = random.choice([rand_int, rand_float])
        elif values[:6] == 'float(':
            # range generator
            val_range = values[6:-1]
            val_range = list(map(float, val_range.split(',')))
            param_value = random.uniform(val_range[0], val_range[1])
        elif values[:4] == 'int(':
            val_range = values[4:-1]
            val_range = list(map(int, val_range.split(',')))
            # range generator
            param_value = random.randint(val_range[0], val_range[1])
        else:
            param_value = random.choice(values)
        generated_parameters[param] = param_value
    return function(t, **generated_parameters)

In [68]:
def generate_signals_by_grid(function, parameters_grid, t):
    params_names = list(parameters_grid.keys())
    params_values = []
    for name in params_names:
        params_values.append(parameters_grid[name])
    signals = []
    for param_pair in list(product(*params_values)):
        params = {params_names[i]: param_pair[i] for i in range(len(param_pair))}
        signals.append(function(t, **params))

    return signals

In [106]:
DATASET_FILENAME = 'dataset.csv'

In [107]:
def generate_random_dataset(t, funcs, size, batch_size, filename):
    with open(filename, 'w') as f:
        sig_h = [f'signal_t{i}' for i in range(1024)]
        fur_h = [f'fourier_t{i}' for i in range(1024)]
        f.write(','.join(sig_h + fur_h) + '\n')
    
    for func_name, func in funcs.items():
        batch = []
        for i in range(size):
            gen_sig = generate_random_signal(func['function'], func['params_grid'], t)
            gen_fur = np.fft.fft(gen_sig)
            line = np.concatenate((gen_sig, gen_fur))
            batch.append(line)
            if len(batch) > batch_size:
                batch = np.array(batch)
                with open(filename, 'ab') as csv_file:
                    np.savetxt(csv_file, batch, delimiter=",")
                batch = []

In [131]:
functions_random = {
    'sin': {
        'function': lambda t, scale, var, bias: np.sin(scale * t + var) + bias,
        'params_grid': {'scale': 'float', 'var': 'float', 'bias': 'float'}
    },
    'line': {
        'function': lambda t, a, b: a*t+b,
        'params_grid': {'a': 'float', 'b': 'float'}
    },
    'exp': {
        'function': lambda t, a, b, c, d: c*np.exp(a*t+b)+d,
        'params_grid': {'a': 'float(-50,50)', 'b': 'float(-50,50)', 'c': 'float', 'd': 'float'}
    },
    'log': {
        'function': lambda t, a, b, c, d: c*np.log(np.abs(a * t + b) + 1)+d,
        'params_grid': {'a': 'float', 'b': 'float', 'c': 'float', 'd': 'float'}
    },
    # 'pow': {
    #     'function': lambda t, power, a, b, c, d: c*((a * t + b) ** power)+d,
    #     'params_grid': {'power': 'int+float', 'a': 'float', 'b': 'float', 'c': 'float', 'd': 'float'}
    # },
    'chirp': {
        'function': lambda t, f0, f1, t1, method, phi: signal.chirp(t, f0, f1, t1, method, phi),
        'params_grid': {'f0': 'float', 'f1': 'float', 't1': 'float', 'method': ['linear', 'quadratic'], 'phi': 'float'}
    },
    'gausspulse': {
        'function': lambda t, fc, bw, bwr, tpr: signal.gausspulse(t, fc, bw, bwr, tpr),
        'params_grid': {'fc': 'float(0,1000)', 'bw': 'float(0,1000)', 'bwr': 'float(-200,0)', 'tpr': 'float'}
    },
    'sawtooth': {
        'function': lambda t, width: signal.sawtooth(t, width),
        'params_grid': {'width': 'float(0,1)'}
    },
    'square': {
        'function': lambda t, duty: signal.square(t, duty),
        'params_grid': {'duty': 'float(0,1)'}
    },
    # 'sweep_poly': {
    #     'function': lambda t, poly, phi: signal.sweep_poly(t, poly, phi),
    #     'params_grid': {'poly': 'arr(-100,100,1,10)', 'phi': 'float'}
    # },
    'random_noise': {
        'function': lambda t, mean, var: np.random.normal(mean, var, len(t)),
        'params_grid': {'mean': 'float', 'var': 'float(0,1000)'}
    },
    'unit_impulse': {
        'function': lambda t, idx: signal.unit_impulse(len(t), idx),
        'params_grid': {'idx': 'int'}
    },

}

In [109]:
functions_grid = {
    'sin': {
        'function': lambda t, scale, var, bias: np.sin(scale * t + var) + bias,
        'params_grid': {'scale': [0.25 * (i+1) * np.pi for i in range(20)], 'var': 'float', 'bias': [-10.0 + (i+1) * 0.5 for i in range(40)]}
    },
    'line': {
        'function': lambda t, a, b: a*t+b,
        'params_grid': {'a': [-10.0 + 0.5 * i for i in range(40)], 'b': [-10.0 + 0.5 * i for i in range(40)]}
    },
    'exp': {
        'function': lambda t, a, b, c, d: c*np.exp(a*t+b)+d,
        'params_grid': {'a': [-10.0 + 0.5 * i for i in range(40)], 'b': [-10.0 + 0.5 * i for i in range(40)], 'c': [-10.0 + 0.5 * i for i in range(40)], 'd': [-10.0 + 0.5 * i for i in range(40)]}
    },
    'log': {
        'function': lambda t, a, b, c, d: c*np.log(np.abs(a * t + b) + 1)+d,
        'params_grid': {'a': [-10.0 + 0.5 * i for i in range(40)], 'b': [-10.0 + 0.5 * i for i in range(40)], 'c': [-10.0 + 0.5 * i for i in range(40)], 'd': [-10.0 + 0.5 * i for i in range(40)]}
    },
    'pow': {
        'function': lambda t, pow, a, b, c, d: c*np.pow(a * t + b, pow)+d,
        'params_grid': {'pow': [-10.0 + 0.5 * i for i in range(40)], 'a': [-10.0 + 0.5 * i for i in range(40)], 'b': [-10.0 + 0.5 * i for i in range(40)], 'c': [-10.0 + 0.5 * i for i in range(40)], 'd': [-10.0 + 0.5 * i for i in range(40)]}
    },
}

In [132]:
generate_random_dataset(t, functions_random, 1000, 100, DATASET_FILENAME)

  'function': lambda t, a, b, c, d: c*np.exp(a*t+b)+d,
  'function': lambda t, a, b, c, d: c*np.exp(a*t+b)+d,


In [145]:
data = pd.read_csv(DATASET_FILENAME, sep=",")
data = data.apply(lambda col: col.apply(lambda val: complex(val.strip())))

In [172]:
X_data = data.iloc[:, :1024].to_numpy()
X_data = torch.tensor(X_data, dtype=torch.cfloat)

In [173]:
y_data = data.iloc[:, 1024:].to_numpy()
y_data = torch.tensor(y_data, dtype=torch.cfloat)

In [174]:
X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=0.1, random_state=42, shuffle=True)

In [177]:
X_test.size()

torch.Size([909, 1024])

### Соберите и обучите нейросетку на pyTroch для преобразования Фурье

In [179]:
D_in = 1024
D_out = 1024

model = torch.nn.Sequential(
    torch.nn.Linear(2 * D_in, 1024),
    torch.nn.BatchNorm1d(1024),
    torch.nn.ReLU(),
    torch.nn.Linear(1024, 2 * D_out),
)

In [180]:
def batch_train(model, loss_fn, optimizer, x, y):
    y_pred = model(x)
    loss = loss_fn(y_pred, y)
    model.zero_grad()
    loss.backward()
    optimizer.step()
    return (loss.item())

In [188]:
def train(model, n_epochs, batch_size, loss_fn, optimizer, X, y, X_test, y_test):
    acc_train_all = []
    loss_train_all = []
    acc_test_all = []
    loss_test_all = []

    for epoch in range(n_epochs):

        permutation = torch.randperm(X.size()[0])

        for i in tqdm(range(0,X.float().size()[0], batch_size)):
            indices = permutation[i:i+batch_size]
            batch_x, batch_y = X[indices], y[indices]
            batch_train(model, loss_fn, optimizer, batch_x, batch_y)

        y_test_pred = model(X_test)
        y_train_pred = model(X)


        acc_train = accuracy_score(y, y_train_pred.argmax(dim=1))
        loss_train = loss_fn(y_train_pred, y).detach()
        acc_test = accuracy_score(y_test, y_test_pred.argmax(dim=1))
        loss_test = loss_fn(y_test_pred, y_test).detach()

        acc_train_all = np.append(acc_train_all, acc_train)
        loss_train_all = np.append(loss_train_all, loss_train)
        acc_test_all = np.append(acc_test_all, acc_test)
        loss_test_all = np.append(loss_test_all, loss_test)


        print(f'Epoch {epoch}: \n Accuracy - train: {acc_train} | test: {acc_test} \n Loss - train: {loss_train} | test: {loss_test}')

    return(acc_train_all, loss_train_all, acc_test_all, loss_test_all)

In [182]:
def init_weights(m):
    if type(m) == torch.nn.Linear:
        torch.nn.init.kaiming_normal_(m.weight)
        m.bias.data.fill_(0.01)

In [183]:
def vis_history(acc_train_all, loss_train_all, acc_test_all, loss_test_all):
    fig = plt.figure(figsize=(16, 4))

    plt.subplot(1, 2, 1)

    plt.plot(loss_train_all, label='loss')
    plt.plot(loss_test_all, label='val_loss')

    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(acc_train_all, label='acc')
    plt.plot(acc_test_all, label='val_acc')

    plt.legend()
    plt.show()

In [184]:
optimizerAdam = torch.optim.Adam(model.parameters(), lr=0.01)
model = model.apply(init_weights)
loss_fn = torch.nn.MSELoss()

In [159]:
n_epochs = 10
batch_size = 1000
learning_rate = 1e-3

In [189]:
acc_train_all, loss_train_all, acc_test_all, loss_test_all = \
          train(model, n_epochs, batch_size, loss_fn, optimizerAdam, X_train, y_train, X_test, y_test)

  0%|                                                     | 0/9 [00:00<?, ?it/s]


RuntimeError: mat1 and mat2 must have the same dtype

In [None]:
vis_history(acc_train_all, loss_train_all, acc_test_all, loss_test_all)

### Сравните свое решение с пакетным методом

In [None]:
signal_1 = np.sin(t)
signal_2 = signal.square(0.05 * np.pi * t)
plt.plot(t, signal_2)
plt.show()

In [None]:
sp = np.fft.fft(signal_2)
freq = np.fft.fftfreq(t.shape[-1])
plt.plot(freq, sp.real, freq, sp.imag)
plt.show()

## Часть 2 Power spectral density (dB)

### Подготовка данных

Используя пакетное решение, [например](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.periodogram.html), подготовьте обучающую выборку.

### Соберите и обучите нейросеть для предсказания спектральной плотности мощности

*Подсказка: для входа 1024 выход будет 512*

### Провалидируйте свое решение

*Примечание: Для сдачи достаточно качественного соответствия по форме спектра с пакетным решением*