# Define TCN components

In [None]:
import torch
import torch.nn as nn
from torch.nn.utils import weight_norm


class Chomp1d(nn.Module):
    def __init__(self, chomp_size):
        super(Chomp1d, self).__init__()
        self.chomp_size = chomp_size

    def forward(self, x):
        return x[:, :, :-self.chomp_size].contiguous()


class TemporalBlock(nn.Module):
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
        super(TemporalBlock, self).__init__()
        self.conv1 = weight_norm(nn.Conv1d(n_inputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp1 = Chomp1d(padding)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)

        self.conv2 = weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp2 = Chomp1d(padding)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout)

        self.net = nn.Sequential(self.conv1, self.chomp1, self.relu1, self.dropout1,
                                 self.conv2, self.chomp2, self.relu2, self.dropout2)
        self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.relu = nn.ReLU()
        self.init_weights()

    def init_weights(self):
        self.conv1.weight.data.normal_(0, 0.01)
        self.conv2.weight.data.normal_(0, 0.01)
        if self.downsample is not None:
            self.downsample.weight.data.normal_(0, 0.01)

    def forward(self, x):
        out = self.net(x)
        res = x if self.downsample is None else self.downsample(x)
        return self.relu(out + res)


class TemporalConvNet(nn.Module):
    def __init__(self, num_inputs, num_channels, kernel_size=2, dropout=0.2):
        super(TemporalConvNet, self).__init__()
        layers = []
        num_levels = len(num_channels)
        for i in range(num_levels):
            dilation_size = 2 ** i
            in_channels = num_inputs if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            layers += [TemporalBlock(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size,
                                     padding=(kernel_size-1) * dilation_size, dropout=dropout)]

        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)


# Example: Adding problem

## The Adding Problem

### Overview

In this task, each input consists of a length-T sequence of depth 2, with all values randomly
chosen randomly in [0, 1] in dimension 1. The second dimension consists of all zeros except for
two elements, which are marked by 1. The objective is to sum the two random values whose second 
dimensions are marked by 1. One can think of this as computing the dot product of two dimensions.

Simply predicting the sum to be 1 should give an MSE of about 0.1767. 

### Note

Because a TCN's receptive field depends on depth of the network and the filter size, we need
to make sure these the model we use can cover the sequence length T. 

## Data generator for adding problem

In [None]:
import torch
import numpy as np
from torch.autograd import Variable


def data_generator(N, seq_length):
    """
    Args:
        seq_length: Length of the adding problem data
        N: # of data in the set
    """
    X_num = torch.rand([N, 1, seq_length])
    X_mask = torch.zeros([N, 1, seq_length])
    Y = torch.zeros([N, 1])
    for i in range(N):
        positions = np.random.choice(seq_length, size=2, replace=False)
        X_mask[i, 0, positions[0]] = 1
        X_mask[i, 0, positions[1]] = 1
        Y[i,0] = X_num[i, 0, positions[0]] + X_num[i, 0, positions[1]]
    X = torch.cat((X_num, X_mask), dim=1)
    return Variable(X), Variable(Y)

## TCN for adding problem

In [None]:
from torch import nn

class TCN(nn.Module):
    def __init__(self, input_size, output_size, num_channels, kernel_size, dropout):
        super(TCN, self).__init__()
        self.tcn = TemporalConvNet(input_size, num_channels, kernel_size=kernel_size, dropout=dropout)
        self.linear = nn.Linear(num_channels[-1], output_size)
        self.init_weights()

    def init_weights(self):
        self.linear.weight.data.normal_(0, 0.01)

    def forward(self, x):
        y1 = self.tcn(x)
        return self.linear(y1[:, :, -1])

## Evaluate TCN on Adding problem

In [None]:
import torch
import torch.optim as optim
import torch.nn.functional as F
import sys

# Manually define configuration variables
import torch

# Check if CUDA is available
if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")  # For Apple Silicon Macs
else:
    device = torch.device("cpu")

use_cuda = device.type == "cuda"

print(f"Using device: {device}")

batch_size = 32
dropout = 0.0
clip = -1
epochs = 10
ksize = 7
levels = 8
seq_len = 400
log_interval = 100
lr = 4e-3
optim_choice = 'Adam'
nhid = 30
seed = 1111

# Set random seed for reproducibility
torch.manual_seed(seed)
if torch.cuda.is_available():
    if not use_cuda:
        print("WARNING: You have a CUDA device, so you should probably enable CUDA")

input_channels = 2
n_classes = 1

print("Producing data...")
X_train, Y_train = data_generator(50000, seq_len)
X_test, Y_test = data_generator(1000, seq_len)

# Define model architecture
channel_sizes = [nhid] * levels
kernel_size = ksize
model = TCN(input_channels, n_classes, channel_sizes, kernel_size=kernel_size, dropout=dropout)

# Move data and model to GPU if CUDA is enabled
if use_cuda:
    model.cuda()
    X_train = X_train.cuda()
    Y_train = Y_train.cuda()
    X_test = X_test.cuda()
    Y_test = Y_test.cuda()

# Initialize optimizer
optimizer = getattr(optim, optim_choice)(model.parameters(), lr=lr)

def train(epoch):
    global lr
    model.train()
    batch_idx = 1
    total_loss = 0
    for i in range(0, X_train.size(0), batch_size):
        if i + batch_size > X_train.size(0):
            x, y = X_train[i:], Y_train[i:]
        else:
            x, y = X_train[i:(i+batch_size)], Y_train[i:(i+batch_size)]
        optimizer.zero_grad()
        output = model(x)
        loss = F.mse_loss(output, y)
        loss.backward()
        if clip > 0:
            torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        batch_idx += 1
        total_loss += loss.item()

        if batch_idx % log_interval == 0:
            cur_loss = total_loss / log_interval
            processed = min(i+batch_size, X_train.size(0))
            print('Train Epoch: {:2d} [{:6d}/{:6d} ({:.0f}%)]\tLearning rate: {:.4f}\tLoss: {:.6f}'.format(
                epoch, processed, X_train.size(0), 100.*processed/X_train.size(0), lr, cur_loss))
            total_loss = 0

def evaluate():
    model.eval()
    with torch.no_grad():
        output = model(X_test)
        test_loss = F.mse_loss(output, Y_test)
        print('\nTest set: Average loss: {:.6f}\n'.format(test_loss.item()))
        return test_loss.item()

for ep in range(1, epochs+1):
    train(ep)
    tloss = evaluate()


# Example: Char problem

## Character-level Language Modeling

### Overview

In character-level language modeling tasks, each sequence is broken into elements by characters. 
Therefore, in a character-level language model, at each time step the model is expected to predict
the next coming character. We evaluate the temporal convolutional network as a character-level
language model on the PennTreebank dataset and the text8 dataset.

### Data

- **PennTreebank**: When used as a character-level lan-
guage corpus, PTB contains 5,059K characters for training,
396K for validation, and 446K for testing, with an alphabet
size of 50. PennTreebank is a well-studied (but relatively
small) language dataset.

- **text8**: text8 is about 20 times larger than PTB, with 
about 100M characters from Wikipedia (90M for training, 5M 
for validation, and 5M for testing). The corpus contains 27 
unique alphabets.

See `data_generator` in `utils.py`. We download the language corpus using [observations](#) package 
in python.


### Note

- Just like in a recurrent network implementation where it is common to repackage 
hidden units when a new sequence begins, we pass into TCN a sequence `T` consisting 
of two parts: 1) effective history `L1`, and 2) valid sequence `L2`:

```
Sequence [---------T---------] = [--L1-- -----L2-----]
```

In the forward pass, the whole sequence is passed into TCN, but only the `L2` portion is used for 
training. This ensures that the training data are also provided with sufficient history. The size
of `T` and `L2` can be adjusted via flags `seq_len` and `validseqlen`.

- The choice of dataset to use can be specified via the `--dataset` flag. For instance, running

```
python char_cnn_test.py --dataset ptb
```

would (download if no data found, and) train on the PennTreebank (PTB) dataset.

- Empirically, we found that Adam works better than SGD on the text8 dataset.

## Data loader / generator

In [None]:
import unidecode
import torch
from torch.autograd import Variable
from collections import Counter
import observations
import os
import pickle


cuda = torch.cuda.is_available()


def data_generator(args):
    file, testfile, valfile = getattr(observations, args["dataset"])('data/')
    file_len = len(file)
    valfile_len = len(valfile)
    testfile_len = len(testfile)
    corpus = Corpus(file + " " + valfile + " " + testfile)

    #############################################################
    # Use the following if you want to pickle the loaded data
    #
    # pickle_name = "{0}.corpus".format(args.dataset)
    # if os.path.exists(pickle_name):
    #     corpus = pickle.load(open(pickle_name, 'rb'))
    # else:
    #     corpus = Corpus(file + " " + valfile + " " + testfile)
    #     pickle.dump(corpus, open(pickle_name, 'wb'))
    #############################################################

    return file, file_len, valfile, valfile_len, testfile, testfile_len, corpus


def read_file(filename):
    file = unidecode.unidecode(open(filename).read())
    return file, len(file)


class Dictionary(object):
    def __init__(self):
        self.char2idx = {}
        self.idx2char = []
        self.counter = Counter()

    def add_word(self, char):
        self.counter[char] += 1

    def prep_dict(self):
        for char in self.counter:
            if char not in self.char2idx:
                self.idx2char.append(char)
                self.char2idx[char] = len(self.idx2char) - 1

    def __len__(self):
        return len(self.idx2char)


class Corpus(object):
    def __init__(self, string):
        self.dict = Dictionary()
        for c in string:
            self.dict.add_word(c)
        self.dict.prep_dict()


def char_tensor(corpus, string):
    tensor = torch.zeros(len(string)).long()
    for i in range(len(string)):
        tensor[i] = corpus.dict.char2idx[string[i]]
    return Variable(tensor).cuda() if cuda else Variable(tensor)


def batchify(data, batch_size, args):
    """The output should have size [L x batch_size], where L could be a long sequence length"""
    # Work out how cleanly we can divide the dataset into batch_size parts (i.e. continuous seqs).
    nbatch = data.size(0) // batch_size
    # Trim off any extra elements that wouldn't cleanly fit (remainders).
    data = data.narrow(0, 0, nbatch * batch_size)
    # Evenly divide the data across the batch_size batches.
    data = data.view(batch_size, -1)
    if args["cuda"]:
        data = data.cuda()
    return data


def get_batch(source, start_index, args):
    seq_len = min(args["seq_len"], source.size(1) - 1 - start_index)
    end_index = start_index + seq_len
    inp = source[:, start_index:end_index].contiguous()
    target = source[:, start_index+1:end_index+1].contiguous()  # The successors of the inp.
    return inp, target


def save(model):
    save_filename = 'model.pt'
    torch.save(model, save_filename)
    print('Saved as %s' % save_filename)




## TCN for Char problem

In [None]:
from torch import nn
import sys

class TCN(nn.Module):
    def __init__(self, input_size, output_size, num_channels, kernel_size=2, dropout=0.2, emb_dropout=0.2):
        super(TCN, self).__init__()
        self.encoder = nn.Embedding(output_size, input_size)
        self.tcn = TemporalConvNet(input_size, num_channels, kernel_size=kernel_size, dropout=dropout)
        self.decoder = nn.Linear(input_size, output_size)
        self.decoder.weight = self.encoder.weight
        self.drop = nn.Dropout(emb_dropout)
        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.fill_(0)
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, x):
        # input has dimension (N, L_in), and emb has dimension (N, L_in, C_in)
        emb = self.drop(self.encoder(x))
        y = self.tcn(emb.transpose(1, 2))
        o = self.decoder(y.transpose(1, 2))
        return o.contiguous()

## Evaluate TCN on Char problem

In [None]:

import torch.nn as nn
import torch.optim as optim
import sys
import time
import math
import torch
import warnings

warnings.filterwarnings("ignore")   # Suppress the RunTimeWarning on unicode

# Replace parser with a dictionary of variables
args = {
    'batch_size': 32,
    'cuda': False,
    'dropout': 0.1,
    'emb_dropout': 0.1,
    'clip': 0.15,
    'epochs': 10,
    'ksize': 3,
    'levels': 3,
    'log_interval': 100,
    'lr': 4.0,
    'emsize': 100,
    'optim': 'SGD',
    'nhid': 450,
    'validseqlen': 320,
    'seq_len': 400,
    'seed': 1111,
    'dataset': 'ptb'
}

# Set the random seed manually for reproducibility.
torch.manual_seed(args['seed'])

# Check for available accelerators
device = torch.device('cpu')  # Default to CPU
if torch.cuda.is_available():
    device = torch.device('cuda')
    args['cuda'] = True
    print("CUDA is available. Running with CUDA.")
elif torch.backends.mps.is_available():
    device = torch.device('mps')
    print("MPS is available. Running with MPS.")
else:
    print("No GPU accelerator is available. Running with CPU.")

print(args)
file, file_len, valfile, valfile_len, testfile, testfile_len, corpus = data_generator(args)

n_characters = len(corpus.dict)
train_data = batchify(char_tensor(corpus, file), args['batch_size'], args)
val_data = batchify(char_tensor(corpus, valfile), 1, args)
test_data = batchify(char_tensor(corpus, testfile), 1, args)
print("Corpus size: ", n_characters)

num_chans = [args['nhid']] * (args['levels'] - 1) + [args['emsize']]
k_size = args['ksize']
dropout = args['dropout']
emb_dropout = args['emb_dropout']
model = TCN(args['emsize'], n_characters, num_chans, kernel_size=k_size, dropout=dropout, emb_dropout=emb_dropout)

model.to(device)  # Move model to the appropriate device

criterion = nn.CrossEntropyLoss()
lr = args['lr']
optimizer = getattr(optim, args['optim'])(model.parameters(), lr=lr)

def evaluate(source):
    model.eval()
    total_loss = 0
    count = 0
    source_len = source.size(1)
    with torch.no_grad():
        for batch, i in enumerate(range(0, source_len - 1, args['validseqlen'])):
            if i + args['seq_len'] - args['validseqlen'] >= source_len:
                continue
            inp, target = get_batch(source, i, args)
            inp, target = inp.to(device), target.to(device)  # Move to device
            output = model(inp)
            eff_history = args['seq_len'] - args['validseqlen']
            final_output = output[:, eff_history:].contiguous().view(-1, n_characters)
            final_target = target[:, eff_history:].contiguous().view(-1)
            loss = criterion(final_output, final_target)

            total_loss += loss.data * final_output.size(0)
            count += final_output.size(0)

        val_loss = total_loss.item() / count * 1.0
        return val_loss

def train(epoch):
    model.train()
    total_loss = 0
    start_time = time.time()
    losses = []
    source = train_data
    source_len = source.size(1)
    for batch_idx, i in enumerate(range(0, source_len - 1, args['validseqlen'])):
        if i + args['seq_len'] - args['validseqlen'] >= source_len:
            continue
        inp, target = get_batch(source, i, args)
        inp, target = inp.to(device), target.to(device)  # Move to device
        optimizer.zero_grad()
        output = model(inp)
        eff_history = args['seq_len'] - args['validseqlen']
        final_output = output[:, eff_history:].contiguous().view(-1, n_characters)
        final_target = target[:, eff_history:].contiguous().view(-1)
        loss = criterion(final_output, final_target)
        loss.backward()

        if args['clip'] > 0:
            torch.nn.utils.clip_grad_norm_(model.parameters(), args['clip'])
        optimizer.step()
        total_loss += loss.item()

        if batch_idx % args['log_interval'] == 0 and batch_idx > 0:
            cur_loss = total_loss / args['log_interval']
            losses.append(cur_loss)
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.5f} | ms/batch {:5.2f} | '
                  'loss {:5.3f} | bpc {:5.3f}'.format(
                epoch, batch_idx, int((source_len-0.5) / args['validseqlen']), lr,
                              elapsed * 1000 / args['log_interval'], cur_loss, cur_loss / math.log(2)))
            total_loss = 0
            start_time = time.time()

    return sum(losses) * 1.0 / len(losses)

def main():
    global lr
    try:
        print("Training for %d epochs..." % args['epochs'])
        all_losses = []
        best_vloss = 1e7
        for epoch in range(1, args['epochs'] + 1):
            loss = train(epoch)

            vloss = evaluate(val_data)
            print('-' * 89)
            print('| End of epoch {:3d} | valid loss {:5.3f} | valid bpc {:8.3f}'.format(
                epoch, vloss, vloss / math.log(2)))

            test_loss = evaluate(test_data)
            print('=' * 89)
            print('| End of epoch {:3d} | test loss {:5.3f} | test bpc {:8.3f}'.format(
                epoch, test_loss, test_loss / math.log(2)))
            print('=' * 89)

            if epoch > 5 and vloss > max(all_losses[-3:]):
                lr = lr / 10.
                for param_group in optimizer.param_groups:
                    param_group['lr'] = lr
            all_losses.append(vloss)

            if vloss < best_vloss:
                print("Saving...")
                save(model)
                best_vloss = vloss

    except KeyboardInterrupt:
        print('-' * 89)
        print("Saving before quit...")
        save(model)

    # Run on test data.
    test_loss = evaluate(test_data)
    print('=' * 89)
    print('| End of training | test loss {:5.3f} | test bpc {:8.3f}'.format(
        test_loss, test_loss / math.log(2)))
    print('=' * 89)

# train_by_random_chunk()
if __name__ == "__main__":
    main()


# Example: MNIST Pixel problem

## Sequential MNIST & Permuted Sequential MNIST

### Overview

MNIST is a handwritten digit classification dataset (Lecun et al., 1998) that is frequently used to 
test deep learning models. In particular, sequential MNIST is frequently used to test a recurrent 
network’s ability to retain information from the distant past (see paper for references). In 
this task, each MNIST image (28 x 28) is presented to the model as a 784 × 1 sequence 
for digit classification. In the more challenging permuted MNIST (P-MNIST) setting, the order of 
the sequence is permuted at a (fixed) random order.

### Note

- Because a TCN's receptive field depends on depth of the network and the filter size, we need
to make sure these the model we use can cover the sequence length 784. 

- While this is a sequence model task, we only use the very last output (i.e. at time T=784) for 
the eventual classification.

## data generator

In [None]:
import torch
from torchvision import datasets, transforms


def data_generator(root, batch_size):
    train_set = datasets.MNIST(root=root, train=True, download=True,
                               transform=transforms.Compose([
                                   transforms.ToTensor(),
                                   transforms.Normalize((0.1307,), (0.3081,))
                               ]))
    test_set = datasets.MNIST(root=root, train=False, download=True,
                              transform=transforms.Compose([
                                  transforms.ToTensor(),
                                  transforms.Normalize((0.1307,), (0.3081,))
                              ]))

    train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size)
    test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size)
    return train_loader, test_loader


## TCN for MNIST Pixel problem

In [None]:
import torch.nn.functional as F
from torch import nn

class TCN(nn.Module):
    def __init__(self, input_size, output_size, num_channels, kernel_size, dropout):
        super(TCN, self).__init__()
        self.tcn = TemporalConvNet(input_size, num_channels, kernel_size=kernel_size, dropout=dropout)
        self.linear = nn.Linear(num_channels[-1], output_size)

    def forward(self, inputs):
        """Inputs have to have dimension (N, C_in, L_in)"""
        y1 = self.tcn(inputs)  # input should have dimension (N, C, L)
        o = self.linear(y1[:, :, -1])
        return F.log_softmax(o, dim=1)

## Evaluate TCN on MNIST Pixel problem

In [None]:
import torch
from torch.autograd import Variable
import torch.optim as optim
import torch.nn.functional as F
import sys
import numpy as np

# Replace parser with a dictionary of variables
args = {
    'batch_size': 64,
    'dropout': 0.05,
    'clip': -1,
    'epochs': 10,
    'ksize': 7,
    'levels': 8,
    'log_interval': 100,
    'lr': 2e-3,
    'optim': 'Adam',
    'nhid': 25,
    'seed': 1111,
    'permute': False
}

torch.manual_seed(args['seed'])

# Check for available accelerators
device = torch.device('cpu')  # Default to CPU
if torch.cuda.is_available():
    device = torch.device('cuda')
    print("CUDA is available. Running with CUDA.")
elif torch.backends.mps.is_available():
    device = torch.device('mps')
    print("MPS is available. Running with MPS.")
else:
    print("No GPU accelerator is available. Running with CPU.")

root = './data/mnist'
batch_size = args['batch_size']
n_classes = 10
input_channels = 1
seq_length = int(784 / input_channels)
epochs = args['epochs']
steps = 0

print(args)
train_loader, test_loader = data_generator(root, batch_size)

permute = torch.Tensor(np.random.permutation(784).astype(np.float64)).long().to(device)
channel_sizes = [args['nhid']] * args['levels']
kernel_size = args['ksize']
model = TCN(input_channels, n_classes, channel_sizes, kernel_size=kernel_size, dropout=args['dropout'])
model.to(device)  # Move model to the appropriate device

lr = args['lr']
optimizer = getattr(optim, args['optim'])(model.parameters(), lr=lr)

def train(ep):
    global steps
    train_loss = 0
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)  # Move to device
        data = data.view(-1, input_channels, seq_length)
        if args['permute']:
            data = data[:, :, permute]
        data, target = Variable(data), Variable(target)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        if args['clip'] > 0:
            torch.nn.utils.clip_grad_norm_(model.parameters(), args['clip'])
        optimizer.step()
        train_loss += loss
        steps += seq_length
        if batch_idx > 0 and batch_idx % args['log_interval'] == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\tSteps: {}'.format(
                ep, batch_idx * batch_size, len(train_loader.dataset),
                100. * batch_idx / len(train_loader), train_loss.item() / args['log_interval'], steps))
            train_loss = 0

def test():
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)  # Move to device
            data = data.view(-1, input_channels, seq_length)
            if args['permute']:
                data = data[:, :, permute]
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).cpu().sum()

        test_loss /= len(test_loader.dataset)
        print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
            test_loss, correct, len(test_loader.dataset),
            100. * correct / len(test_loader.dataset)))
        return test_loss

if __name__ == "__main__":
    for epoch in range(1, epochs+1):
        train(epoch)
        test()
        if epoch % 10 == 0:
            lr /= 10
            for param_group in optimizer.param_groups:
                param_group['lr'] = lr
                
import matplotlib.pyplot as plt  # Import matplotlib for plotting
def plot_samples(data_loader, num_samples=5):
    """Plot a few samples from the dataset."""
    data_iter = iter(data_loader)
    images, labels = next(data_iter)
    
    # Plot the first `num_samples` images
    for i in range(num_samples):
        image = images[i].view(28, 28)  # Reshape to 28x28 for visualization
        plt.subplot(1, num_samples, i + 1)
        plt.imshow(image, cmap='gray')
        plt.title(f"Label: {labels[i].item()}")
        plt.axis('off')
    plt.show()
plot_samples(train_loader, num_samples=5)



# Example: Poly Music

## Polyphonic Music Dataset

### Overview

We evaluate temporal convolutional network (TCN) on two popular polyphonic music dataset, described below.

- **JSB Chorales** dataset (Allan & Williams, 2005) is a polyphonic music dataset con-
sisting of the entire corpus of 382 four-part harmonized chorales by J. S. Bach. In a polyphonic
music dataset, each input is a sequence of elements having 88 dimensions, representing the 88 keys
on a piano. Therefore, each element `x_t` is a chord written in as binary vector, in which a “1” indicates
a key pressed.

- **Nottingham** dataset is a collection of 1200 British and American folk tunes. Not-
tingham is a much larger dataset than JSB Chorales. Along with JSB Chorales, Nottingham has
been used in a number of works that investigated recurrent models’ applicability in polyphonic mu-
sic, and the performance for both tasks are measured in terms
of negative log-likelihood (NLL) loss.

The goal here is to predict the next note given some history of the notes played.

### Note

- Each sequence can have a different length. In the current implementation, we simply train each
sequence separately (i.e. batch size is 1), but one can zero-pad all sequences to the same length
and train by batch.

- One can use different datasets by specifying through the `--data` flag on the command line. The
default is `Nott`, for Nottingham.

- While each data is binary, the fact that there are 88 dimensions (for 88 keys) means there are
essentially `2^88` "classes". Therefore, instead of directly predicting each key directly, we
follow the standard practice so that a sigmoid is added at the end of the network. This ensures
that every entry is converted to a value between 0 and 1 to compute the NLL loss.

## Data generator

In [None]:
from scipy.io import loadmat
import torch
import numpy as np


def data_generator(dataset):
    if dataset == "JSB":
        print('loading JSB data...')
        data = loadmat('./TCN/poly_music/mdata/JSB_Chorales.mat')
    elif dataset == "Muse":
        print('loading Muse data...')
        data = loadmat('./TCN/poly_music/mdata/MuseData.mat')
    elif dataset == "Nott":
        print('loading Nott data...')
        data = loadmat('./TCN/poly_music/mdata/Nottingham.mat')
    elif dataset == "Piano":
        print('loading Piano data...')
        data = loadmat('./TCN/poly_music/mdata/Piano_midi.mat')

    X_train = data['traindata'][0]
    X_valid = data['validdata'][0]
    X_test = data['testdata'][0]

    for data in [X_train, X_valid, X_test]:
        for i in range(len(data)):
            data[i] = torch.Tensor(data[i].astype(np.float64))

    return X_train, X_valid, X_test

## TCN for Poly Music

In [None]:
from torch import nn
import torch.nn.functional as F


class TCN(nn.Module):
    def __init__(self, input_size, output_size, num_channels, kernel_size, dropout):
        super(TCN, self).__init__()
        self.tcn = TemporalConvNet(input_size, num_channels, kernel_size, dropout=dropout)
        self.linear = nn.Linear(num_channels[-1], output_size)
        self.sig = nn.Sigmoid()

    def forward(self, x):
        # x needs to have dimension (N, C, L) in order to be passed into CNN
        output = self.tcn(x.transpose(1, 2)).transpose(1, 2)
        output = self.linear(output).double()
        return self.sig(output)


## Evaluate TCN on Poly Music

In [None]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.optim as optim
import sys
import numpy as np

# Replace argparse with dictionary for configuration
args = {
    'dropout': 0.25,
    'clip': 0.2,
    'epochs': 10,
    'ksize': 5,
    'levels': 4,
    'log_interval': 100,
    'lr': 1e-3,
    'optim': 'Adam',
    'nhid': 150,
    'data': 'Nott',
    'seed': 1111
}

# Set the random seed manually for reproducibility.
torch.manual_seed(args['seed'])

# Check for available accelerators
device = torch.device('cpu')  # Default to CPU
if torch.cuda.is_available():
    device = torch.device('cuda')
    print("CUDA is available. Running with CUDA.")
elif torch.backends.mps.is_available():
    device = torch.device('mps')
    print("MPS is available. Running with MPS.")
else:
    print("No GPU accelerator is available. Running with CPU.")

print(args)
input_size = 88
X_train, X_valid, X_test = data_generator(args['data'])

n_channels = [args['nhid']] * args['levels']
kernel_size = args['ksize']
dropout = args['dropout']

model = TCN(input_size, input_size, n_channels, kernel_size, dropout=args['dropout'])
model.to(device)  # Move model to the appropriate device

criterion = nn.CrossEntropyLoss()
lr = args['lr']
optimizer = getattr(optim, args['optim'])(model.parameters(), lr=lr)

def evaluate(X_data, name='Eval'):
    model.eval()
    eval_idx_list = np.arange(len(X_data), dtype="int32")
    total_loss = 0.0
    count = 0
    with torch.no_grad():
        for idx in eval_idx_list:
            data_line = X_data[idx]
            x, y = Variable(data_line[:-1]).to(device), Variable(data_line[1:]).to(device)
            output = model(x.unsqueeze(0)).squeeze(0)
            loss = -torch.trace(torch.matmul(y, torch.log(output).float().t()) +
                                torch.matmul((1-y), torch.log(1-output).float().t()))
            total_loss += loss.item()
            count += output.size(0)
        eval_loss = total_loss / count
        print(name + " loss: {:.5f}".format(eval_loss))
        return eval_loss

def train(ep):
    model.train()
    total_loss = 0
    count = 0
    train_idx_list = np.arange(len(X_train), dtype="int32")
    np.random.shuffle(train_idx_list)
    for idx in train_idx_list:
        data_line = X_train[idx]
        x, y = Variable(data_line[:-1]).to(device), Variable(data_line[1:]).to(device)

        optimizer.zero_grad()
        output = model(x.unsqueeze(0)).squeeze(0)
        loss = -torch.trace(torch.matmul(y, torch.log(output).float().t()) +
                            torch.matmul((1 - y), torch.log(1 - output).float().t()))
        total_loss += loss.item()
        count += output.size(0)

        if args['clip'] > 0:
            torch.nn.utils.clip_grad_norm_(model.parameters(), args['clip'])
        loss.backward()
        optimizer.step()
        if idx > 0 and idx % args['log_interval'] == 0:
            cur_loss = total_loss / count
            print("Epoch {:2d} | lr {:.5f} | loss {:.5f}".format(ep, lr, cur_loss))
            total_loss = 0.0
            count = 0

if __name__ == "__main__":
    best_vloss = 1e8
    vloss_list = []
    model_name = "poly_music_{0}.pt".format(args['data'])
    for ep in range(1, args['epochs']+1):
        train(ep)
        vloss = evaluate(X_valid, name='Validation')
        tloss = evaluate(X_test, name='Test')
        if vloss < best_vloss:
            with open(model_name, "wb") as f:
                torch.save(model, f)
                print("Saved model!\n")
            best_vloss = vloss
        if ep > 10 and vloss > max(vloss_list[-3:]):
            lr /= 10
            for param_group in optimizer.param_groups:
                param_group['lr'] = lr

        vloss_list.append(vloss)

    print('-' * 89)
    model = torch.load(open(model_name, "rb"))
    tloss = evaluate(X_test)
