In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from tqdm import tqdm
import copy

# Import self-implemented torch dataset
from datasets import BDLibDataset

## Now training and testing on BDLib

In [2]:
window_seconds = 1.0 # The duration per sample in seconds
                     # This affects the sample size (length of the time axis in mel spectrogram),
                     # and also the final accuracy
overlap = 0.5 # The overlap between two samples
n_mels = 32

train_data = BDLibDataset(root='BDLib', 
                          fold_ids=[1,2],
                          download=True,
                          win_secs=window_seconds,
                          overlap=overlap,
                          n_mels=n_mels)
valid_data = BDLibDataset(root='BDLib', 
                          fold_ids=[3],
                          download=False,
                          win_secs=window_seconds,
                          overlap=overlap,
                          n_mels=n_mels)

train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
valid_loader = DataLoader(valid_data, batch_size=16, shuffle=True)

print('train data length', len(train_data))
print('valid data length', len(valid_data))
print('Sample shape', train_data.data[0].shape)

Download not needed, files already on disk.
train data length 1080
valid data length 540
Sample shape (1, 32, 87)


In [3]:
class timeseries_Encoder():
    def __init__(self,
                 feat_num,
                 quantization_num,
                 D,
                 P,
                 min=0.0,
                 max=1.0):
        self.feat_num = feat_num
        self.quantization_num = int(quantization_num)
        self.D = int(D)
        self.P = float(P)
        self.min = float(min)
        self.max = float(max)
        self.range = max - min
        self.init_hvs()

    def init_hvs(self):
        # level hvs
        num_flip = int(self.D * self.P)
        self.level_hvs = [np.random.randint(2, size=self.D)]
        for i in range(self.quantization_num-1):
            new = copy.deepcopy(self.level_hvs[-1])
            idx = np.random.choice(self.D,num_flip,replace=False)
            new[idx] = 1-new[idx]
            self.level_hvs.append(new)
        self.level_hvs = np.stack(self.level_hvs)

        #id hvs
        self.id_hvs = []
        for i in range(self.feat_num):
            self.id_hvs.append(np.random.randint(2, size=self.D))
        self.id_hvs = np.stack(self.id_hvs)

    def quantize(self, one_sample):
        quantization = self.level_hvs[((((one_sample - self.min) / self.range) * self.quantization_num) - 1).astype('i')]
        return quantization

    def bind(self,a,b):
        return np.logical_xor(a,b).astype('i')

    def permute(self,a):
        for i in range(len(a)):
            a[i] = np.roll(a[i],i,axis=1)
        return a

    def sequential_bind(self,a):
        return np.sum(a,axis=0) % 2

    def bipolarize(self,a):
        a[a==0] = -1
        return a

    def encode_one_time_series_sample(self, one_sample):
        one_sample = one_sample.cpu().numpy()
        T = len(one_sample)
        out = self.quantize(one_sample)
        out = self.bind(out,np.repeat(np.expand_dims(self.id_hvs,0),T,0))
        out = self.permute(out)
        out = self.sequential_bind(out)
        out = self.bipolarize(out)
        out = np.sum(out,axis=0)
        return torch.from_numpy(out).float()

In [4]:
import torchhd
from torchhd.models import Centroid

class Encoder(nn.Module):
    def __init__(self, input_dim, levels, hd_dim, flipping, device):
        """
        :param input_dim: dimension of the number of sensors
        :param levels: number of quantized levels
        :param hd_dim: HDC dimension
        :param flipping: flipping ratio to generate the next level hypervector
        :param device: cuda or cpu within torch
        """
        super(Encoder, self).__init__()
        self.device = device
        self.timeseries_Encoder = timeseries_Encoder(input_dim, levels, hd_dim, flipping)

    def forward(self, x):
        x = x.squeeze()
        enc = []
        batch_size = x.shape[0]
        for i in range(batch_size):
            enc.append(
                self.timeseries_Encoder.encode_one_time_series_sample(x[i]).to(self.device))
        sample_hv = torch.stack(enc, dim=0)
        return torchhd.hard_quantize(sample_hv)

In [5]:
def hd_train(train_loader, model, encode, epoch, device):
    with torch.no_grad():
        for samples, labels in tqdm(train_loader, desc="Training"):
            samples = samples.to(device)
            labels = labels.to(device)
    
            # After the following steps, the input data shape should be (batch_size, num_of_time_steps, num_of_sensors)
            samples = samples.squeeze()  # First get rid of the dimension with length=1
            samples = samples.swapaxes(1,2)  # Temp fix for BDLib, need to switch the time and frequency axis
            # print('input sample shape', samples.shape)
            
            samples_hv = encode(samples)
            model.add(samples_hv, labels)
            if epoch > 0: # Retraining
                predict = torch.argmax(model(samples_hv, dot=True), dim=-1)
                wrong_pred = (predict != labels)
                model.add(-samples_hv[wrong_pred], predict[wrong_pred])
                model.add(samples_hv[wrong_pred], labels[wrong_pred])

                    
def hd_test(valid_loader, model, encode, device):
    true_labels, pred_labels = [], []
    with torch.no_grad():
        model.normalize()
    
        for samples, labels in tqdm(valid_loader, desc="Testing"):
            samples = samples.to(device)
    
            # After the following steps, the input data shape should be (batch_size, num_of_time_steps, num_of_sensors)
            samples = samples.squeeze()  # First get rid of the dimension with length=1
            samples = samples.swapaxes(1,2)  # Temp fix for BDLib, need to switch the time and frequency axis
            
            samples_hv = encode(samples)
            outputs = model(samples_hv, dot=True)
            
            true_labels.extend(labels.tolist())
            pred_labels.extend(torch.argmax(outputs, dim=1).tolist())
            
    true_labels = np.array(true_labels)
    pred_labels = np.array(pred_labels)
    acc = (true_labels == pred_labels).sum() / true_labels.size
    acc_pc = []
    for c in np.unique(true_labels):
        mask = (true_labels == c)
        acc_pc.append((true_labels[mask] == pred_labels[mask]).sum() / mask.sum())
    print(f"Testing accuracy of {acc}%")
    print("Testing accuracy per class: ", acc_pc)

In [6]:
HD_DIM = 1000
LEVELS = 10
FLIPPING = 0.005
EPOCHS = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [7]:
encode = Encoder(n_mels, LEVELS, HD_DIM, FLIPPING, device)
encode = encode.to(device)

model = Centroid(HD_DIM, train_data.num_classes)
model = model.to(device)

for epoch in range(EPOCHS):
    hd_train(train_loader, model, encode, epoch, device)
    hd_test(valid_loader, copy.deepcopy(model), encode, device)

Training: 100%|██████████| 68/68 [00:16<00:00,  4.24it/s]
Testing: 100%|██████████| 34/34 [00:07<00:00,  4.29it/s]


Testing accuracy of 0.5518518518518518%
Testing accuracy per class:  [0.8518518518518519, 0.6111111111111112, 0.8333333333333334, 0.4074074074074074, 0.7592592592592593, 0.7407407407407407, 0.037037037037037035, 0.46296296296296297, 0.4074074074074074, 0.4074074074074074]


Training: 100%|██████████| 68/68 [00:15<00:00,  4.46it/s]
Testing: 100%|██████████| 34/34 [00:07<00:00,  4.44it/s]


Testing accuracy of 0.5611111111111111%
Testing accuracy per class:  [0.8518518518518519, 0.6851851851851852, 0.7962962962962963, 0.4074074074074074, 0.7592592592592593, 0.6666666666666666, 0.07407407407407407, 0.5185185185185185, 0.42592592592592593, 0.42592592592592593]


Training: 100%|██████████| 68/68 [00:15<00:00,  4.44it/s]
Testing: 100%|██████████| 34/34 [00:07<00:00,  4.51it/s]


Testing accuracy of 0.5648148148148148%
Testing accuracy per class:  [0.8518518518518519, 0.6666666666666666, 0.7962962962962963, 0.4074074074074074, 0.7592592592592593, 0.6666666666666666, 0.05555555555555555, 0.5185185185185185, 0.48148148148148145, 0.4444444444444444]


Training: 100%|██████████| 68/68 [00:15<00:00,  4.45it/s]
Testing: 100%|██████████| 34/34 [00:07<00:00,  4.45it/s]


Testing accuracy of 0.562962962962963%
Testing accuracy per class:  [0.8518518518518519, 0.6851851851851852, 0.7962962962962963, 0.4074074074074074, 0.7592592592592593, 0.6296296296296297, 0.05555555555555555, 0.5, 0.5, 0.4444444444444444]


Training: 100%|██████████| 68/68 [00:15<00:00,  4.50it/s]
Testing: 100%|██████████| 34/34 [00:07<00:00,  4.50it/s]


Testing accuracy of 0.5555555555555556%
Testing accuracy per class:  [0.8518518518518519, 0.6851851851851852, 0.7962962962962963, 0.4074074074074074, 0.7592592592592593, 0.6296296296296297, 0.037037037037037035, 0.46296296296296297, 0.48148148148148145, 0.4444444444444444]


Training: 100%|██████████| 68/68 [00:15<00:00,  4.51it/s]
Testing: 100%|██████████| 34/34 [00:07<00:00,  4.52it/s]


Testing accuracy of 0.5592592592592592%
Testing accuracy per class:  [0.8518518518518519, 0.6851851851851852, 0.7962962962962963, 0.4074074074074074, 0.7592592592592593, 0.6666666666666666, 0.037037037037037035, 0.46296296296296297, 0.48148148148148145, 0.4444444444444444]


Training:  18%|█▊        | 12/68 [00:02<00:12,  4.35it/s]


KeyboardInterrupt: 