In [None]:
import os
import numpy as np
import pandas as pd
import torch
import seaborn as sns

In [None]:
# check available GPUs

if torch.cuda.is_available() is True:
    for i in range(torch.cuda.device_count()):
        print('Device {}:'.format(i), torch.cuda.get_device_name(i))

device = 'cuda:0'   # use Nvidia RTX A4500 GPU

In [None]:
n_bit = 12
# scaler = 2 ** 12
scaler = 1000

# 1. Define and instantiate simulation dataset

The file to be loaded should be saved with '.npy' format. And the data and labels are supposed to be saved seperately.

In [None]:
from torch.utils.data import Dataset

class SimulationDataset(Dataset):
    def __init__(self, data_path, label_path, data_raw_path='', noises_path='', device='cuda') -> None:
        super().__init__()
        self.data_path = data_path
        self.label_path = label_path
        self.data_raw_path = data_raw_path
        self.noises_path = noises_path

        self.device = device

        self.data = np.load(self.data_path).astype(np.float32)
        self.labels = np.load(self.label_path).astype(np.float32)

        self.data = torch.tensor(self.data)
        self.labels = torch.tensor(self.labels)

        if noises_path != '':
            self.noises = np.load(noises_path)
            # self.noises = torch.tensor(self.noises) / (2 ** n_bit)
            self.noises = torch.tensor(self.noises) / scaler

        # self.data = torch.tensor(self.data, device=device)
        # self.labels = torch.tensor(self.labels, device=device)


        # self.data_max = torch.max(self.data, dim=1).values
        # print(self.data_max.shape)
        # self.data = self.data / self.data_max.unsqueeze(1)

        # self.data = self.data / (2 ** n_bit)
        self.data = self.data / scaler
        self.labels = self.labels
    def __len__(self):
        assert len(self.data) == len(self.labels)
        return len(self.data)

    def __getitem__(self, index):
        # sorted_data, _ = torch.sort(self.data[index])
        # return sorted_data, self.labels[index][-2]
        # return self.data[index].to(self.device), self.labels[index][-2].to(self.device)
        num_timestamps = 1024
        if self.noises_path != '':
            return self.data[index, 0:num_timestamps].to(self.device), self.labels[index].to(self.device), self.noises[index, 0:num_timestamps].to(self.device)
        else:
            return self.data[index, 0:num_timestamps].to(self.device), self.labels[index].to(self.device)
        # return self.data[index], self.labels[index][-2]


Install the googledrivedonwloader and download [datasets](https://drive.google.com/drive/folders/1FdoF63YvHkfgpiSFmD5dKMZdPIP13ojs?usp=sharing).

In [None]:
dataset_path = 'dataset'

simulation_dataset = SimulationDataset(
    data_path = dataset_path + 'data.npy',
    label_path = dataset_path + 'label.npy',
    device = device
)


In [None]:
data_sample = simulation_dataset[np.random.randint(0, len(simulation_dataset))]
print(data_sample[0])
print(data_sample[0].shape, data_sample[1].shape)
sns.histplot(data_sample[0].detach().cpu().numpy())

## 2.3 Split dataset and define dataloader

In [None]:
from torch.utils.data import random_split
train_set_size = int(0.8 * len(simulation_dataset))
dev_set_size = int(0.1 * len(simulation_dataset))
test_set_size = len(simulation_dataset) - train_set_size - dev_set_size
print('train/dev/test size is {}/{}/{}.'.format(
    train_set_size,
    dev_set_size,
    test_set_size
))
train_set, dev_set, test_set = \
    random_split(simulation_dataset, [train_set_size, dev_set_size, test_set_size], generator=torch.Generator().manual_seed(42))

In [None]:
from torch.utils.data import DataLoader

batch_size = 128

train_loader = DataLoader(train_set, batch_size=batch_size)
dev_loader = DataLoader(dev_set, batch_size=batch_size)
test_loader = DataLoader(test_set, batch_size=batch_size)

# 2. Define model

In [None]:
import torch.nn as nn
from torch.nn import GRU, LSTM, RNN

class RNNModel(nn.Module):
    def __init__(self, rnn='gru', hidden_size=32, num_layers=1, keep_state=True):
        super().__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.keep_state = keep_state

        self.rnn_type = rnn

        # print(self.rnn_type)

        if rnn == 'gru':
            self.rnn = GRU(
                input_size=1,
                hidden_size=hidden_size,
                num_layers=num_layers,
                batch_first=True
            )
        elif rnn == 'rnn':
            self.rnn = RNN(
                input_size=1,
                hidden_size=hidden_size,
                num_layers=num_layers,
                batch_first=True
            )
        else:
            self.rnn = LSTM(
                input_size=1,
                hidden_size=hidden_size,
                num_layers=num_layers,
                batch_first=True
            )
    
        self.linear1 = nn.Linear(hidden_size, hidden_size//2)
        self.linear2 = nn.Linear(hidden_size//2, 1)


    def forward(self, x, hn=None):
        if self.keep_state is False or hn is None:
            output, hn = self.rnn(x)
        else:
            output, hn = self.rnn(x, hn)

        output = torch.relu(self.linear1(output))
        output = self.linear2(output)
        output = output.squeeze(2)

        # print(self.rnn)
        if (self.rnn_type == 'rnn') or (self.rnn_type == 'gru'):
            return output, hn.detach()
        elif self.rnn_type == 'lstm':
            return output, (hn[0].detach(), hn[1].detach())


    def _reinitialize(self):
        """
        Tensorflow/Keras-like initialization
        """
        for name, p in self.named_parameters():
            if 'rnn' in name:
                if 'weight_ih' in name:
                    nn.init.xavier_uniform_(p.data)
                elif 'weight_hh' in name:
                    nn.init.orthogonal_(p.data)
                elif 'bias_ih' in name:
                    p.data.fill_(0)
                    # Set forget-gate bias to 1
                    n = p.size(0)
                    p.data[(n // 4):(n // 2)].fill_(1)
                elif 'bias_hh' in name:
                    p.data.fill_(0)
            elif 'fc' in name:
                if 'weight' in name:
                    nn.init.xavier_uniform_(p.data)
                elif 'bias' in name:
                    p.data.fill_(0)

In [None]:
model = RNNModel(rnn='gru', hidden_size=12, num_layers=1)

In [None]:
model._reinitialize()

In [None]:
def get_n_params(model):
    pp=0
    for p in list(model.parameters()):
        nn=1
        for s in list(p.size()):
            nn = nn*s
        pp += nn
    return pp


n_paras = get_n_params(model.rnn)
print('Parameter number of model:', n_paras)

# 3. Train the model

In [None]:
if device == 'cpu':
    pass
else:
    model.to(device)
    print("Training with CUDA.")

In [None]:
import torch.optim as optim

learning_rate = 0.001

optimizer = optim.AdamW(lr=learning_rate, params=model.parameters())

scheduler = optim.lr_scheduler.StepLR(
    optimizer=optimizer,
    step_size=5,
    gamma=0.9
)

In [None]:
import torch.nn as nn

weights = torch.sigmoid(torch.tensor((np.arange(1024) - 256) / 256)).reshape(1, -1).to(device)


def loss_func(pred, label):
  
    # print(pred.shape, label.shape)
    label = label.repeat(1024, 1).T
    # print(label.shape)
    error = (pred - label) ** 2 / (label ** 2)
    error = error * weights
    error = torch.mean(error)
    return error


criterion = loss_func

In [None]:
from datetime import datetime

now = datetime.now()
date = now.strftime("%Y%m%d_%H%M%S")

model_type = 'gru'

model_para = str(model.rnn.hidden_size)
filename = '_'.join([date, model_type, model_para])

print(filename)

In [None]:
from tqdm import tqdm

epoch = 100

hn = None

for i in range(epoch):
    loss_total = 0
    for j, (data, label) in tqdm(enumerate(train_loader)):

        data = data.unsqueeze(2)

        output, hn = model(data, None)

        loss = criterion(output, label)

        loss_total += loss.item()

        optimizer.zero_grad()
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), 5)
        optimizer.step()

    scheduler.step()

    torch.save(model.state_dict(), os.path.join('checkpoints', filename + '.pt'))

    print('Finished training epoch {}. Average loss is {:.8f}'.format(i + 1, loss_total / len(train_loader)))
        

In [None]:
import os
import pandas as pd

record_path = 'checkpoints/record.csv'

record = {
    'filename': [filename],
    'dataset_path': [dataset_path]
}

record = pd.DataFrame(record)

if os.path.exists(record_path):
    df = pd.read_csv(record_path)
    df = pd.concat([df, record], ignore_index=True)
    df.to_csv(record_path, index=False)
else:
    record.to_csv(record_path, index=False)
    

In [None]:
results = []
labels = []
for data, label in dev_loader:
# for data, label, _ in dev_loader:
# for data, label, _, _ in dev_loader:

    data = data.unsqueeze(2)
    # output = model(data)[:, -1]
    output, _ = model(data)
    # output = torch.mean(model(data), dim=1)

    results.append(output.detach().cpu().numpy())
    labels.append(label.detach().cpu().numpy())

    # break


results = np.concatenate(results, axis=0)
labels = np.concatenate(labels, axis=0)
print(results.shape)



In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error

# results_to_plot = [
#     results[:, -1],
#     labels,
#     results[:, -1].squeeze() - labels,
#     np.abs(results[:, -1].squeeze() - labels) / labels
# ]

idx = 1023

results_to_plot = [
    results[:, idx],
    labels,
    results[:, idx].squeeze() - labels,
    np.abs(results[:, idx].squeeze() - labels) / labels
]

print('Average absolute error is:', np.mean(np.abs(results_to_plot[2])))
print('Root mean squared error is:', np.sqrt(mean_squared_error(labels, results[:, -1])))
print('Mean absolute percentage error is:', np.mean(np.abs((results[:, idx] - labels) / labels)))

titles = [
    'results',
    'labels',
    'errors',
    'errors %'
]

fig, ax = plt.subplots(ncols=4)
fig.set_size_inches((20, 6))
for col, to_plot, title in zip(ax, results_to_plot, titles):
    sns.histplot(to_plot, ax=col)
    col.set_title(title)
    col.set_xlabel('(ns)')

plt.show()

In [None]:
df = pd.DataFrame({'pred': results[:, -1], 'truth': labels})

plt.figure(figsize=(6, 6))
sns.scatterplot(x=results[:, -1], y=labels, alpha=0.05)
plt.plot([0.2, 5], [0.2, 5], '-r')
plt.xlabel('Prediction (ns)')
plt.ylabel('Ground Truth (ns)')

## Save the weights

In [None]:
torch.cat([model.rnn.bias_ih_l0[0:2*model.hidden_size] + model.rnn.bias_hh_l0[0:2*model.hidden_size], model.rnn.bias_ih_l0[2*model.hidden_size:], model.rnn.bias_hh_l0[2*model.hidden_size:]]).shape

In [None]:
import os

def save_weights(model, path, lstm_size=12):
    if not os.path.exists(path):
        os.mkdir(path)
    Wih = model.rnn.weight_ih_l0.T.detach().cpu().numpy().reshape(1, -1)
    Whh = model.rnn.weight_hh_l0.T.detach().cpu().numpy().reshape(1, -1)

    B = torch.cat([
        model.rnn.bias_ih_l0[0:2*model.hidden_size] + model.rnn.bias_hh_l0[0:2*model.hidden_size],
        model.rnn.bias_ih_l0[2*model.hidden_size:],
        model.rnn.bias_hh_l0[2*model.hidden_size:]]).T.detach().cpu().numpy().reshape(1, -1)

    np.savetxt(os.path.join(path, "rnn.weight_ih_l0.txt"), Wih, delimiter=",")
    np.savetxt(os.path.join(path, "rnn.weight_hh_l0.txt"), Whh, delimiter=",")
    np.savetxt(os.path.join(path, "rnn.bias.txt"), B, delimiter=",")
    np.savetxt(os.path.join(path, "linear1.weight.txt"), model.linear1.weight.T.detach().cpu().numpy().reshape(1, -1), delimiter=",")
    np.savetxt(os.path.join(path, "linear1.bias.txt"), model.linear1.bias.T.detach().cpu().numpy().reshape(1, -1), delimiter=",")
    np.savetxt(os.path.join(path, "linear2.weight.txt"), model.linear2.weight.T.detach().cpu().numpy().reshape(1, -1), delimiter=",")
    np.savetxt(os.path.join(path, "linear2.bias.txt"), model.linear2.bias.T.detach().cpu().numpy().reshape(1, -1), delimiter=",")

save_weights(model, 'weights')