In [1]:
from pathlib import Path

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import yaml
from icecream import ic
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.utils.data import random_split
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
import random

DEVICE = 'cuda'
torch.manual_seed(22)

<torch._C.Generator at 0x7f4a00a0ccb0>

In [2]:
class SineDataset(Dataset):
    """Face Landmarks dataset."""

    def __init__(self, data_count):
        self.freq = 10*torch.rand(size=(data_count, 1), device=DEVICE) + 1
        
        random_uniform_shift = 10*torch.rand(size=(data_count, 1), device=DEVICE)
        random_normal_shift = torch.normal(0, 1, size=(1, 1), device=DEVICE)
        
        self.points = torch.arange(0,6,1/40, device=DEVICE).repeat(data_count, 1)  + random_uniform_shift
        self.phase = torch.normal(0, 5, size=(data_count, 1), device=DEVICE)
        self.amplitude = (self.points[0] - random_uniform_shift[0])**2 
        # print(self.amplitude)
        self.data_matrix = torch.sin(self.points * self.freq + self.phase)
        self.data_matrix_without_noise = torch.sin(self.points * self.freq + self.phase)
        for i in range(data_count):
            if random.random() < 0.3:
                self.amplitude = torch.flip(self.amplitude, dims=(-1,))
                random_noise = (self.amplitude+1)*torch.normal(0, 1, size=(1, len(self.data_matrix[0])), device=DEVICE)
                self.data_matrix_without_noise[i] = self.amplitude*self.data_matrix[i]
                self.data_matrix[i] = self.amplitude*self.data_matrix[i] + random_noise
            elif 0.3<=random.random() <0.7:
                random_noise = (self.amplitude+1)*torch.normal(0, 1, size=(1, len(self.data_matrix[0])), device=DEVICE)
                self.data_matrix_without_noise[i] = self.amplitude*self.data_matrix[i]
                self.data_matrix[i] = self.amplitude*self.data_matrix[i] + random_noise               
            else:
                random_noise = torch.normal(0, 1, size=(1, len(self.data_matrix[0])), device=DEVICE)
                self.data_matrix_without_noise[i] = self.data_matrix[i]
                self.data_matrix[i] = self.data_matrix[i] + random_noise
    def __len__(self):
        return len(self.freq)
        
    def __num__(self):
        return
    def __getitem__(self, idx):
        return self.data_matrix[idx], self.freq[idx]

In [3]:
class RNN_CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.rnn = nn.LSTM(input_size=240, hidden_size=64, num_layers=1, batch_first = True)
        self.bn = nn.BatchNorm1d(num_features=1)
        self.conv = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=6, stride=2)
        self.lin1 = nn.Linear(30, 1)
        self.lin2 = nn.Linear(240, 30)
    def forward(self, x):
        residual = self.lin2(x)
        out = torch.unsqueeze(x, 1)
        # out, (h0, c0) = self.rnn(out)
        # out = self.bn(out)
        # out = torch.permute(out, (0, 2, 1))
        out = F.relu(self.conv(x))
        # out = self.bn(out)
        out = torch.squeeze(out, 1)
        out = out + residual
        out = F.relu(self.lin1(out))
        return out

In [4]:
"""
Block1:
    conv1d (krnl_sz = 3)
    bn (on out_channels)
    relu after batchnorm 
    conv1d
    bn 
Block2:
    if you have different stride more than 2 or in_channels != out_channels you have to do convolution on your shortcut to maintain dimensionality with kernel_size = 1 so basically dense layer with multiplying channels
    
Add the outputs from 2 blocks together
"""

'\nBlock1:\n    conv1d (krnl_sz = 3)\n    bn (on out_channels)\n    relu after batchnorm \n    conv1d\n    bn \nBlock2:\n    if you have different stride more than 2 or in_channels != out_channels you have to do convolution on your shortcut to maintain dimensionality with kernel_size = 1 so basically dense layer with multiplying channels\n    \nAdd the outputs from 2 blocks together\n'

In [5]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int, stride=1):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn = nn.BatchNorm1d(out_channels)
        
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size = 3, stride = 1, padding=1) 
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv1d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm1d(out_channels)
            )
        else:
            self.shortcut=nn.Sequential()
            
    def forward(self, x):
        # print(x.shape)
        out = self.conv1(x)
        # print(f"first {out.shape}")
        out = F.relu((self.bn(out)))
        out = self.conv2(out)
        # print(f"second {out.shape}")
        x = self.shortcut(x)
        # print(f"third {x.shape}")
        out += x
        out = F.relu(self.bn(out))
        # print(f"forth {out.shape}")
        return out       
                               
        

In [6]:
"""
so you make stack these Residual blocks successively you choose how many blocks you connect in num_blocks
"""

'\nso you make stack these Residual blocks successively you choose how many blocks you connect in num_blocks\n'

In [7]:
class ResNet(nn.Module):
    def __init__(self, num_blocks: int, out_channels: int):
        super().__init__()
        self.in_channels = 1 # You can adjust the initial number of channels
        self.rnn = nn.LSTM(input_size=240, hidden_size=240, num_layers=2, batch_first=True )
        self.conv = nn.Conv1d(1, self.in_channels, kernel_size=7, padding=3)
        self.bn = nn.BatchNorm1d(self.in_channels)
        self.relu = nn.ReLU()
        self.layer1 = self.make_layer(ResidualBlock, num_blocks, out_channels)
        self.global_avg_pooling = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(out_channels, 1)
        self.attention = nn.Linear(240, 240)
    def make_layer(self, block, num_blocks, out_channels):
        layers = []
        for _ in range(num_blocks):
            layers.append(block(self.in_channels, out_channels))
            self.in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        x = torch.unsqueeze(x, 1)
        x = F.relu(self.bn(self.conv(x)))
        x = self.bn(x)
        x = self.layer1(x)
        out, (h0,c0) = self.rnn(x)
        attention_weights = F.softmax(self.attention(out), dim=-1)
        out = torch.sum(attention_weights * out, dim=1)
        out = self.global_avg_pooling(out)
        out = torch.squeeze(out, 1)
        out = self.fc(out)
        return out

In [8]:
class CNN_RNN(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.bn1 = nn.BatchNorm1d(118)
        self.conv = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=5, stride=2)
        self.rnn = nn.LSTM(input_size=118, hidden_size=64, num_layers=1, batch_first = True, bidirectional=True)
        self.conv2 = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=20, stride=8)
        self.lin1 = nn.Linear(128, 64)
        self.lin2 = nn.Linear(64,1)
    def forward(self, x):
        out = torch.unsqueeze(x, 1)
        out = F.relu(self.conv(out))
        # self.bn1 = nn.BatchNorm1d(118)
        out, (h0, c0) = self.rnn(out)
        out = self.conv2(out)
        out = torch.squeeze(out, 1)
        out = F.relu(self.lin1(out))
        out = F.relu(self.lin2(out))
        return out

In [9]:
class RNN_CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.rnn = nn.LSTM(input_size=240, hidden_size=64, num_layers=1, batch_first=True)
        self.bn = nn.BatchNorm1d(num_features=1)
        self.conv = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=6, stride=2)
        self.lin1 = nn.Linear(240, 30)
        self.lin2 = nn.Linear(30, 1)
    def forward(self, x):
        residual = self.lin1(x)
        out = torch.unsqueeze(x, 1)
        out, (h0, c0) = self.rnn(out)
        out = self.bn(out)
        # out = torch.permute(out, (0, 2, 1))
        out = F.relu(self.conv(out))
        # out = self.bn(out)
        out = torch.squeeze(out, 1)
        out = out + residual
        out = F.relu(self.lin2(out))
        return out

In [10]:
class NeuroNet:
    def __init__(self):
        self.model = self.build_model()
        self.model.to('cuda')
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(self.optimizer, 10)
        self.criterion = nn.MSELoss()
        self.history = []
        self.mse_avg = 0
        

    def build_model(self):
            return ResNet(num_blocks=2,out_channels=1)


    def train_model(self, training_data: Dataset, testing_data: Dataset):
        writer = SummaryWriter(comment="_ResNet")
        step = 0
        epoch_num = 200
        train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
        test_dataloader = DataLoader(testing_data, batch_size=64, shuffle=True)

        for epoch in range(epoch_num):
            self.model.train()
            self.history = []
            for batch_id, (inputs, targets) in enumerate(
                    tqdm(train_dataloader, desc=f'epoch {epoch + 1}/ {epoch_num}')):
                inputs, targets = inputs.to(DEVICE), targets.to(DEVICE)
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.criterion(outputs, targets)
                loss.backward()
                self.optimizer.step()
                writer.add_scalar('Training Loss', loss, global_step=step)
                step += 1

            self.model.eval()
            with torch.no_grad():
                for batch_id, (inputs, targets) in enumerate(test_dataloader):
                    outputs = self.model(inputs)
                    mse = np.mean((outputs.detach().cpu().numpy() - targets.detach().cpu().numpy()) ** 2)
                    self.history.append(mse)
                    self.mse_avg = sum(self.history) / len(self.history)
                    writer.add_scalar('validation loss', self.mse_avg, global_step=epoch)

            ic(self.scheduler.get_last_lr())
            last_lr = self.scheduler.get_last_lr()[0]
            writer.add_scalar('learning rate', last_lr, global_step=epoch)
            self.scheduler.step()
            print(self.mse_avg)
        writer.close()
        # torch.optim.lr_scheduler.print_lr()

    def predict(self, data: torch.Tensor):
        with torch.no_grad():
            points, freq = data
            return print(f'predicted freq: {self.model(points)}, real freq: {freq}')


dataset = SineDataset(64*2)
train_data, test_data = random_split(dataset, [64, 64])

neuro_net = NeuroNet()

neuro_net.train_model(train_data, test_data)


epoch 1/ 200:   0%|                                                                                                                                                                                                                                                 | 0/1 [00:00<?, ?it/s]


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x64 and 1x1)

In [None]:
a = np.random.rand(3, 5)
print(a)
print(a[:, :])

In [None]:
rnn = nn.LSTM(10, 20, num_layers=2, batch_first = True)
input = torch.randn(3, 240, 10)
h0 = torch.randn(2, 3, 20)
c0 = torch.randn(2, 3, 20)
output1, (hn, cn) = rnn(input, (h0, c0))
output1 = torch.permute(output1, (0, 2, 1))
print(output1.shape)

In [None]:
m = nn.Conv1d(20, 20, 20, stride=2)
output2 = m(output1)
print(output2.shape)

In [None]:
x = torch.rand(64,1,1)
y = x.view(x.size(0), -1)
z = torch.squeeze(x , 1)
# print(y==z)
w = z[::2]
h = z[1::2]
print(z.shape)
print(w.shape)
print[]