In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import matplotlib.pyplot as plt

# Custom module to create synthetic dataset
from utils.sine_dataset import signal_dataset_harmonic_creator

In [2]:
# LSTM parameters
hidden_dim = 20
n_signals = 1
N = 64
harmonics = 3
tagset_size = harmonics 


device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

cpu


In [3]:
# Model creation
class LSTMHarmonic(nn.Module):
    """Some Information about LSTMHarmonic"""

    def __init__(self, N, hidden_dim, tagset_size):
        super(LSTMHarmonic, self).__init__()
        self.lstm = nn.LSTM(N, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, tagset_size)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        fc_layer = self.fc(lstm_out[:, -1, :])
        output = self.softmax(fc_layer)

        return output
        


model = LSTMHarmonic(n_signals, hidden_dim, tagset_size).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)
print(f"Model structure: {model}\n")

Model structure: LSTMHarmonic(
  (lstm): LSTM(1, 20, batch_first=True)
  (fc): Linear(in_features=20, out_features=3, bias=True)
  (softmax): Softmax(dim=1)
)



In [4]:
# Define Dataset Class
# Define Dataset Class
class SineWaveDataset(torch.utils.data.Dataset):
    """Some Information about SineWaveDataset"""

    def __init__(self, signals, labels, transform=None, labels_transform=None):
        super(SineWaveDataset, self).__init__()
        self.signals = signals
        self.labels = labels
        self.transform = transform
        self.labels_transform = labels_transform

    def __getitem__(self, index):
        signal = self.signals[index]
        label = self.labels[index]
        if self.transform:
            signal = self.transform(signal)
        if self.labels_transform:
            label = self.labels_transform(label)
        return signal, label

    def __len__(self):
        return len(self.signals)

In [10]:
dataiter = iter(test_dataloader)
signal, labels = next(dataiter)
print(len(labels))
print(len(signal))


16
16


In [6]:
# Dataset Parameters
fs = 3840
N = 64
m = 1200

# Create Dataset Signals
signals, target = signal_dataset_harmonic_creator(
    fs, N, m, harmonics=harmonics, multiclass=True
)
signals = np.expand_dims(signals, axis=2)
signals = torch.from_numpy(signals).float()
target = torch.from_numpy(target).to(torch.int64)
# target_one_hot = nn.functional.one_hot(target.to(torch.int64))

# Create Dataset instance

dataset = SineWaveDataset(signals, target)
train_dataset, test_dataset, val_dataset = random_split(dataset, [0.9, 0.09, 0.01])
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=True)


In [7]:
# Create Training / Test / Validation Loops

# Training
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (x, y) in enumerate(dataloader):
        x, y = x.to(device), y.to(device)

        # Compute prediction error
        pred = model(x)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 10 == 0:
            loss, current = loss.item(), batch * len(x)
            print(f"loss: {loss:>f} [{current:>5d}/{size:>5d}]")

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)
            pred = model(x)
            test_loss += loss_fn(pred, y).item()
            correct += (torch.round(pred.argmax(1)) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(
        f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n"
    )

In [8]:
epochs = 2
for epoch in range(epochs):
    print(f"Epoch {epoch+1}\n--------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)

Epoch 1
--------------------------------
loss: 1.080036 [    0/  721]
loss: 1.098338 [  320/  721]
loss: 1.071649 [  640/  721]


RuntimeError: The size of tensor a (3) must match the size of tensor b (16) at non-singleton dimension 1