<a href="https://colab.research.google.com/github/Papa-Panda/random_thoughts/blob/main/TCN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim

# Load MNIST dataset
transform = transforms.Compose([transforms.ToTensor(),
                                transforms.Normalize((0.5,), (0.5,))])
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
val_dataset = datasets.MNIST(root='./data', train=False, transform=transform)

# Create data loaders for training and validation
batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)


In [None]:

# Define TCN model
class TCN(nn.Module):
    def __init__(self, input_size, output_size, num_channels, kernel_size, dropout):
        super(TCN, self).__init__()
        self.tcn = nn.Sequential(
            nn.Conv1d(input_size, num_channels, kernel_size=kernel_size, padding=(kernel_size - 1) // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Conv1d(num_channels, num_channels, kernel_size=kernel_size, dilation=2, padding=2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Conv1d(num_channels, num_channels, kernel_size=kernel_size, dilation=4, padding=4),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Conv1d(num_channels, num_channels, kernel_size=kernel_size, dilation=8, padding=8),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Conv1d(num_channels, num_channels, kernel_size=kernel_size, dilation=16, padding=16),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Conv1d(num_channels, output_size, kernel_size=1),
        )

    def forward(self, x):
        return self.tcn(x)

# Define hyperparameters and instantiate model
input_size = 1
output_size = 10
num_channels = 64
kernel_size = 3
dropout = 0.2
model = TCN(input_size, output_size, num_channels, kernel_size, dropout)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Train the model
num_epochs = 50
for epoch in range(num_epochs):
    train_loss = 0.0
    val_loss = 0.0
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        inputs = inputs.squeeze(1)  # Remove input_size dimension
        inputs = inputs.transpose(1, 2)  # Transpose remaining two dimensions
        outputs = model(inputs)[:, :, -1]  # Extract output at last time step
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * inputs.size(0)
    train_loss /= len(train_loader.dataset)
    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs = inputs.squeeze(1)  # Remove input_size dimension
            inputs = inputs.transpose(1, 2)  # Transpose remaining two dimensions
            outputs = model(inputs)

RuntimeError: ignored

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms

# Define the CNN model
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv1d(1, 64, kernel_size=3)
        self.conv2 = nn.Conv1d(64, 64, kernel_size=3)
        self.fc1 = nn.Linear(64*26, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = x.view(-1, 1, 28*28)
        x = self.conv1(x)
        x = nn.functional.relu(x)
        x = self.conv2(x)
        x = nn.functional.relu(x)
        x = x.view(-1, 64*26)
        x = self.fc1(x)
        x = nn.functional.relu(x)
        x = self.fc2(x)
        return nn.functional.log_softmax(x, dim=1)

# Load the MNIST dataset
train_dataset = datasets.MNIST('./data', train=True, download=True,
                               transform=transforms.Compose([
                                   transforms.ToTensor(),
                                   transforms.Normalize((0.1307,), (0.3081,))
                               ]))
test_dataset = datasets.MNIST('./data', train=False, transform=transforms.Compose([
                                   transforms.ToTensor(),
                                   transforms.Normalize((0.1307,), (0.3081,))
                               ]))

# Define the data loaders
# train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=16, shuffle=True)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)

test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=16)

# Instantiate the model, loss function, and optimizer
model = CNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model
for epoch in range(10):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
    # Evaluate the model on the test set
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    print('Epoch: {}, Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        epoch, test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))


ValueError: ignored

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import Dataset, DataLoader

class LSTNet(nn.Module):
    def __init__(self, input_size, hidden_size, skip_size, output_size, window_size, cnn_ksize):
        super(LSTNet, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.skip_size = skip_size
        self.output_size = output_size
        self.window_size = window_size
        self.cnn_ksize = cnn_ksize
        self.num_cnn_layers = len(cnn_ksize)

        self.cnn_layers = nn.ModuleList()
        for i in range(self.num_cnn_layers):
            conv = nn.Conv2d(1, self.hidden_size, kernel_size=(self.cnn_ksize[i], self.input_size))
            nn.init.normal_(conv.weight, mean=0, std=0.01)
            nn.init.constant_(conv.bias, 0)
            self.cnn_layers.append(conv)

        self.lst_layers = nn.LSTM(self.hidden_size, self.hidden_size)
        self.fc_layers = nn.Linear(self.hidden_size + self.skip_size * self.num_cnn_layers, self.output_size)
        nn.init.xavier_uniform_(self.fc_layers.weight)

    def forward(self, x):
        batch_size = x.shape[0]
        # CNN
        x = x.unsqueeze(1)
        cnn_out = []
        for i in range(self.num_cnn_layers):
            cnn_out.append(nn.functional.relu(self.cnn_layers[i](x)).squeeze(3))
        # Attention
        cnn_out = torch.stack(cnn_out, dim=3)
        cnn_out = cnn_out.permute(0, 3, 2, 1)
        attn_out = nn.functional.softmax(cnn_out, dim=1)
        attn_out = attn_out.permute(0, 3, 2, 1)
        # LST
        x = attn_out.reshape(batch_size, -1, self.hidden_size)
        h0 = torch.zeros(1, batch_size, self.hidden_size)
        c0 = torch.zeros(1, batch_size, self.hidden_size)
        h = h0
        c = c0
        for t in range(self.window_size):
            h, (h_new, c_new) = self.lst_layers(x[:, t:t+1, :], (h, c))
            c = c_new
        # Skip-connection
        skip_out = torch.cat([c[:, -self.skip_size*(i+1):] for i in range(self.num_cnn_layers)], dim=1)
        # FC
        out = self.fc_layers(torch.cat([h.squeeze(0), skip_out], dim=1))
        return out

class ElectricityDataset(Dataset):
    def __init__(self, data, target):
        self.data = data
        self.target = target
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.target[idx]

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

def train(model, train_loader, valid_loader, criterion, optimizer, num_epochs):
    train_loss_history = []
    valid_loss_history = []
    best_valid_loss = float('inf')
    for epoch in range(num_epochs):
        # train the model
        model.train()
        train_loss = 0.0
        for i, (inputs, targets) in enumerate(train_loader):
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * inputs.size(0)
        train_loss /= len(train_loader.dataset)
        train_loss_history.append(train_loss)
        
        # validate the model
        model.eval()
        valid_loss = 0.0
        with torch.no_grad():
            for inputs, targets in valid_loader:
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                valid_loss += loss.item() * inputs.size(0)
            valid_loss /= len(valid_loader.dataset)
            valid_loss_history.append(valid_loss)
            
            # save the model with the best validation loss
            if valid_loss < best_valid_loss:
                best_valid_loss = valid_loss
                torch.save(model.state_dict(), 'best_model.pt')
        
        # print the epoch loss
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Valid Loss: {valid_loss:.4f}")
    
    return train_loss_history, valid_loss_history

hidden_size = 10
skip_size = 2
ar_window = 10
cnn_kernel_size = 5

# set up the dataset and data loaders
train_dataset = datasets.MNIST('./data', train=True, download=True,
                               transform=transforms.Compose([
                                   transforms.ToTensor(),
                                   transforms.Normalize((0.1307,), (0.3081,))
                               ]))
test_dataset = datasets.MNIST('./data', train=False, transform=transforms.Compose([
                                   transforms.ToTensor(),
                                   transforms.Normalize((0.1307,), (0.3081,))
                               ]))
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# set up the model, loss function, and optimizer
model = LSTNet(input_size, output_size, hidden_size, skip_size, ar_window, cnn_kernel_size)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# train the model
num_epochs = 10
train_loss_history, valid_loss_history = train(model, train_loader, valid_loader, criterion, optimizer, num_epochs)


TypeError: ignored