In [1]:
from google.colab import drive
import os
drive.mount('/content/drive')
path = "/content/drive/MyDrive/247/project"
os.chdir(path)

Mounted at /content/drive


In [2]:
import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable
from torchvision import models
from torchsummary import summary

In [3]:
input = torch.randn(10, 6, 6)
upsample = nn.ConvTranspose2d(10, 6, 3, stride=2, padding=1) # Cin = 10, Cout = 6
output = upsample(input)
print(output.size())
layer = nn.ConvTranspose1d(6, 10, 3, stride=2, padding=1) # Cin = 6, Cout = 10
output = layer(input)
print(output.size())
input = torch.randn((1, 100, 1))
layer = nn.Sequential(
    nn.ConvTranspose1d(100, 220, 4, 2, 0),
    nn.BatchNorm1d(220),
    nn.ReLU(True),
    nn.ConvTranspose1d(220, 154, 4, 2, 0),
    nn.BatchNorm1d(154),
    nn.ReLU(True),
    nn.ConvTranspose1d(154, 88, 4, 2, 0),
    nn.BatchNorm1d(88),
    nn.ReLU(True),
    nn.ConvTranspose1d(88, 44, 7, 2, 0),
    nn.BatchNorm1d(44),
    nn.ReLU(True),
    nn.ConvTranspose1d(44, 10, 4, 2, 0),
    nn.ReLU(True),
    # nn.LSTM(10, 22, 2, batch_first = True)
)
output = layer(input)
print(output.size())
output = output.transpose(2, 1)
l = nn.LSTM(10, 22, 2, batch_first = True)
output = l(output)
print(len(output), len(output[0]), len(output[0][0]), len(output[0][0][0]))

torch.Size([6, 11, 11])
torch.Size([10, 10, 11])
torch.Size([1, 10, 100])
2 1 100 22


In [3]:
X_test = np.load("X_test.npy")
y_test = np.load("y_test.npy")
person_train_valid = np.load("person_train_valid.npy")
X_train_valid = np.load("X_train_valid.npy")
y_train_valid = np.load("y_train_valid.npy")
person_test = np.load("person_test.npy")

### Shape of data

In [4]:
print ('Training/Valid data shape: {}'.format(X_train_valid.shape))
print ('Test data shape: {}'.format(X_test.shape))
print ('Training/Valid target shape: {}'.format(y_train_valid.shape))
print ('Test target shape: {}'.format(y_test.shape))
print ('Person train/valid shape: {}'.format(person_train_valid.shape))
print ('Person test shape: {}'.format(person_test.shape))

Training/Valid data shape: (2115, 22, 1000)
Test data shape: (443, 22, 1000)
Training/Valid target shape: (2115,)
Test target shape: (443,)
Person train/valid shape: (2115, 1)
Person test shape: (443, 1)


In [4]:
class Generator(nn.Module):

  def __init__(self,input_dim, hidden_dims):
    super(Generator,self).__init__()
    self.input_dim = input_dim
    self.layer = nn.Sequential(
      nn.ConvTranspose1d(input_dim, hidden_dims[0], 4, 2, 0, bias = False),
      nn.BatchNorm1d(hidden_dims[0]),
      nn.ReLU(True),
    
      nn.ConvTranspose1d(hidden_dims[0], hidden_dims[1], 4, 2, 0, bias = False),
      nn.BatchNorm1d(hidden_dims[1]),
      nn.ReLU(True),

      nn.ConvTranspose1d(hidden_dims[1], hidden_dims[2], 4, 2, 0, bias = False),
      nn.BatchNorm1d(hidden_dims[2]),
      nn.ReLU(True),

      nn.ConvTranspose1d(hidden_dims[2], hidden_dims[3], 7, 2, 0, bias = False),
      nn.BatchNorm1d(hidden_dims[3]),
      nn.ReLU(True),

      nn.ConvTranspose1d(hidden_dims[3], 22, 4, 2, 0, bias = False),
      nn.ReLU(True),

      # nn.LSTM(10, 22, 2, batch_first = True)
    )

  def forward(self,input):
    return self.layer(input)


In [5]:
class Discriminator(nn.Module):
  """
  Discriminator that uses CNN layers
  follows the DCGAN of the pytorch tutorial
  https://pytorch.org/tutorials/beginner/dcgan_faces_tutorial.html
  input is (batch, in_dim, seq)
  output is (batch, 1, seq)
  """
  def __init__(self, input_dim, hidden_dims):
    super(Discriminator, self).__init__()
    self.layer = nn.Sequential(
      nn.Conv1d(input_dim, hidden_dims[0], 3, 2, 1, bias=False),
      nn.LeakyReLU(0.2, inplace=True),
    # output batch x dim1 x 50

      nn.Conv1d(hidden_dims[0], hidden_dims[1], 3, 2, 1, bias=False),
      nn.BatchNorm1d(hidden_dims[1]),
      nn.LeakyReLU(0.2, inplace=True),
    # ouptut batch x dim2 x 25

      nn.Conv1d(hidden_dims[1], hidden_dims[2], 3, 2, 1, bias=False),
      nn.BatchNorm1d(hidden_dims[2]),
      nn.LeakyReLU(0.2, inplace=True),

    # ouptut batch x dim3 x 13
      nn.Conv1d(hidden_dims[2], hidden_dims[3], 3, 2, 1, bias=False),
      nn.BatchNorm1d(hidden_dims[3]),
      nn.LeakyReLU(0.2, inplace=True),
    # ouptut batch x dim3 x 7

      nn.Conv1d(hidden_dims[3], 1, 3, 2, 1, bias=False),
      nn.BatchNorm1d(1),
      nn.LeakyReLU(0.2, inplace=True),
      nn.Conv1d(1, 1, 4, 1, 0, bias=False),
      nn.Sigmoid()
    )

  def forward(self, x):
    return self.layer(x)

In [6]:
class GAN(object):
  """
  Class object to control and abstract the training and logging of vanilla GANs
  Assumes that the noise input or z vector is (batch_size, 2, 600) 
  where the first channel is noise and the second channel is the label
  """
  def __init__(self, generator, discriminator, dataset, **kwargs):
    self.generator = generator
    self.discriminator = discriminator
    self.loss = nn.BCELoss()
    self.dataset = dataset
    
    # setting up optimizers
    lr = kwargs.get('learn_rate', 0.0002)
    w_decay = kwargs.get('weight_decay', 0.00001)
    self.discriminator_optimizer = torch.optim.Adam(self.discriminator.parameters(), lr=lr, weight_decay=w_decay)
    self.generator_optimizer = torch.optim.Adam(self.generator.parameters(), lr=lr, weight_decay=w_decay)


  def train(self, epochs, batch_size, verbose=True, print_every=100):
    
    self.epochs = epochs
    self.batch_size = batch_size
    self.generator_loss = list()
    self.discriminator_loss = list()
    self.generated_test = list()

    # setup the loader
    data_loader = torch.utils.data.DataLoader(self.dataset, batch_size=batch_size, shuffle=True)
    iteration = 0

    # TODO: change the input
    fixed_z = torch.rand((1, self.generator.input_dim - 1, 1))
    fixed_labels = torch.zeros((1 , 1)).reshape(-1, 1).repeat(1, 1).reshape(-1, 1, 1)
    fixed_z = torch.cat([fixed_z, fixed_labels], dim=1)
    real_label = 1
    fake_label = 0
    for epoch in range(self.epochs + 1):
      for i, sample in enumerate(data_loader):
        # What to do for each batch
        if i == data_loader.dataset.__len__() // self.batch_size:
          break

        self.generator.train()

        # assert data and labels
        eeg_data = sample['data']
        eeg_labels = sample['label']

        # input vector for 
        noise = torch.rand((self.batch_size, self.generator.input_dim - 1, 1))
        labels = eeg_labels.reshape(-1, 1).repeat(1, 1).reshape(-1, 1, 1)
        
        z = torch.cat([noise, labels], dim=1)

        seq_length = eeg_data.shape[2]

        benchmark = torch.full((self.batch_size, 1, 1), real_label, dtype=torch.float)

       
        eeg_data = Variable(eeg_data)
        z = Variable(z)
        

        ##############
        # Training the Discriminator
        ##############
        self.discriminator_optimizer.zero_grad()
        self.discriminator.zero_grad() # remove previous gradients
        # train the discriminator on real data
        real_score = self.discriminator(eeg_data)
        discriminator_loss_real = self.loss(real_score, benchmark)
        discriminator_loss_real.backward()
        real_output_score = real_score.mean().item()

        # train the discriminator on fake data

        benchmark.fill_(fake_label)
        fake_data = self.generator(z)
        fake_score = self.discriminator(fake_data.detach())
        discriminator_loss_fake = self.loss(fake_score, benchmark)
        discriminator_loss_fake.backward()
        # print(fake_score)
        G_output_score1 = fake_score.mean().item()
        

        # optimize the discriminator
        discriminator_loss = discriminator_loss_real + discriminator_loss_fake
        self.discriminator_optimizer.step()


        ##############
        # Training the Generator
        ##############
        self.generator.zero_grad() # remove previous gradients
        self.generator_optimizer.zero_grad()

        benchmark.fill_(real_label)
        fake_score2 = self.discriminator(fake_data)
        generator_output_score2 = fake_score2.mean().item()
        generator_loss = self.loss(fake_score2, benchmark)
        generator_loss.backward()
        self.generator_optimizer.step()

        iteration += 1
        
        if iteration % print_every == 0 and verbose:
          # output the loss and the scores
          print("Iteration : ", iteration)
          print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tReal Score: %.4f\t Fake Scores: %.4f / %.4f'
                  % (epoch, self.epochs, i, len(data_loader),
                     discriminator_loss.item(), generator_loss.item(), real_output_score, G_output_score1, generator_output_score2))
          
        self.generator_loss.append(generator_loss.cpu().item())
        self.discriminator_loss.append(discriminator_loss.cpu().item())

      # check how the Generator is doing
      with torch.no_grad() :
        self.generator.eval()
        fake_data = self.generator(fixed_z).detach().cpu()
        self.generated_test.append(np.mean(fake_data.numpy(), axis=(0, 1)))


      # if epoch % 250 == 0:
      #   save_str = save_path + '/Generator_' + str(epoch) + '.pth'
      #   self.save_models(save_str)


  def save_models(self, path):
    torch.save(self.generator.state_dict(), path)
    print("Saved generator at " + path)
    return True
     

In [7]:
def moving_average(x, w):
    """
    Simple 1d moving average using numpy
    :input x: 1d np array
    :input w: size of moving average filter
    returns np array with applied moving average
    """
    return np.convolve(x, np.ones(w), 'valid') / w

class eegData():
    """
    PyTorch style dataset to be loaded into torch data loader for training
    provides simple functionality
    """
    def __init__(self, data_file_name, label_file_name, device=torch.device('cpu'), preprocessing_params={}):
        """
        :input data_file_name: file path of the data
        :input label_file_name: file path of the labels
        :input validation_size: size of validation (percentage given to validation)
        :input device: specify if CUDA or GPU
        :input preprocessing_params:
            'subsample': int on the size of step of the subsampling
            'mov_avg': int on the size of the moving average window
            'trim': how many of the last indices will be trimmed off
        """
        subsample = preprocessing_params.get('subsample', 1) # we can increase our trial count
        mov_avg_window = preprocessing_params.get('mov_avg', 1) # limit ourselves to 2115
        trimming = preprocessing_params.get('trim', 0) # how much you want to trim
        eeg_data = np.load(data_file_name)
        label_data = np.load(label_file_name) - 769
        self.device = device
        

        # remove the last x amount of time steps
        trimmed_indices = eeg_data.shape[2] - trimming 
        eeg_data = eeg_data[:, :, :trimmed_indices].copy()

        #begin_subsample
        stack_eeg_data = []
        stack_label_data = []
        for i in range(subsample):
            sampled_eeg_data = eeg_data[:, :, i::subsample].copy()
            stack_label_data.append(label_data.copy())
            stack_eeg_data.append(sampled_eeg_data)

        eeg_data = np.vstack(stack_eeg_data)
        label_data = np.concatenate(stack_label_data)

        # begin applying moving_average
        eeg_data = np.apply_along_axis(func1d=moving_average, axis=2, arr=eeg_data, w=mov_avg_window)


        self.eeg_data = torch.from_numpy(eeg_data).float().to(self.device)
        self.label_data = torch.from_numpy(label_data).float().long().to(self.device)
        self.mov_avg_window = mov_avg_window 
        self.trim = trimming 
        self.sampling = subsample

    def __len__(self):
        assert self.eeg_data.shape[0] == self.label_data.shape[0]
        return self.eeg_data.shape[0]


    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.numpy()

        sample = {}
        sample['data'] = self.eeg_data[idx]
        sample['label'] = self.label_data[idx]

        return sample

In [8]:
def data_prep(X,y,sub_sample,average,noise):
    
    total_X = None
    total_y = None
    
    # Trimming the data (sample,22,1000) -> (sample,22,500)
    X = X[:,:,0:500]
    print('Shape of X after trimming:',X.shape)
    
    # Maxpooling the data (sample,22,1000) -> (sample,22,500/sub_sample)
    X_max = np.max(X.reshape(X.shape[0], X.shape[1], -1, sub_sample), axis=3)
    
    
    total_X = X_max
    total_y = y
    print('Shape of X after maxpooling:',total_X.shape)
    
    # Averaging + noise 
    X_average = np.mean(X.reshape(X.shape[0], X.shape[1], -1, average),axis=3)
    X_average = X_average + np.random.normal(0.0, 0.5, X_average.shape)
    
    total_X = np.vstack((total_X, X_average))
    total_y = np.hstack((total_y, y))
    print('Shape of X after averaging+noise and concatenating:',total_X.shape)
    
    # Subsampling
    
    for i in range(sub_sample):
        
        X_subsample = X[:, :, i::sub_sample] + \
                            (np.random.normal(0.0, 0.5, X[:, :,i::sub_sample].shape) if noise else 0.0)
            
        total_X = np.vstack((total_X, X_subsample))
        total_y = np.hstack((total_y, y))
        
    
    print('Shape of X after subsampling and concatenating:',total_X.shape)
    print('Shape of y after subsampling and concatenating:',total_y.shape)
    return total_X,total_y

In [9]:
## Adjusting the labels so that 

# Cue onset left - 0
# Cue onset right - 1
# Cue onset foot - 2
# Cue onset tongue - 3

y_train_valid -= 769
y_test -= 769


## Random splitting and reshaping the data
# First generating the training and validation indices using random splitting

ind_valid = np.random.choice(2115, 375, replace=False)
ind_train = np.array(list(set(range(2115)).difference(set(ind_valid))))

# Creating the training and validation sets using the generated indices
(X_train, X_valid) = X_train_valid[ind_train], X_train_valid[ind_valid] 
(y_train, y_valid) = y_train_valid[ind_train], y_train_valid[ind_valid]


## Preprocessing the dataset
x_train,y_train = data_prep(X_train,y_train,2,2,True)
x_valid,y_valid = data_prep(X_valid,y_valid,2,2,True)
X_test_prep,y_test_prep = data_prep(X_test,y_test,2,2,True)

Shape of X after trimming: (1740, 22, 500)
Shape of X after maxpooling: (1740, 22, 250)
Shape of X after averaging+noise and concatenating: (3480, 22, 250)
Shape of X after subsampling and concatenating: (6960, 22, 250)
Shape of y after subsampling and concatenating: (6960,)
Shape of X after trimming: (375, 22, 500)
Shape of X after maxpooling: (375, 22, 250)
Shape of X after averaging+noise and concatenating: (750, 22, 250)
Shape of X after subsampling and concatenating: (1500, 22, 250)
Shape of y after subsampling and concatenating: (1500,)
Shape of X after trimming: (443, 22, 500)
Shape of X after maxpooling: (443, 22, 250)
Shape of X after averaging+noise and concatenating: (886, 22, 250)
Shape of X after subsampling and concatenating: (1772, 22, 250)
Shape of y after subsampling and concatenating: (1772,)


In [10]:
from torch.utils.data import DataLoader, TensorDataset
from torch import Tensor

testinp = torch.rand((1,99,1))
test_label = torch.zeros((1,1,1))
inp = torch.cat([testinp, test_label], dim = 1) # (1, 100, 1)
generator = Generator(100, [220, 154, 88, 44])
x = generator(inp)
discriminator = Discriminator(22,[18, 14, 10, 6])
dataset = eegData('X_train_valid.npy', 'y_train_valid.npy', preprocessing_params={'subsample':6, 'trim':400})
gan = GAN(generator,discriminator,dataset, cuda = True)

In [11]:
gan.train(50,250)

Iteration :  100
[1/50][49/51]	Loss_D: 1.6890	Loss_G: 0.4324	Real Score: 0.5330	 Fake Scores: 0.6491 / 0.6489
Iteration :  200
[3/50][49/51]	Loss_D: 1.6132	Loss_G: 0.4481	Real Score: 0.5568	 Fake Scores: 0.6389 / 0.6388
Iteration :  300
[5/50][49/51]	Loss_D: 1.5529	Loss_G: 0.4628	Real Score: 0.5760	 Fake Scores: 0.6296 / 0.6295
Iteration :  400
[7/50][49/51]	Loss_D: 1.5175	Loss_G: 0.4767	Real Score: 0.5819	 Fake Scores: 0.6209 / 0.6208
Iteration :  500
[9/50][49/51]	Loss_D: 1.4986	Loss_G: 0.4899	Real Score: 0.5802	 Fake Scores: 0.6128 / 0.6127
Iteration :  600
[11/50][49/51]	Loss_D: 1.4758	Loss_G: 0.5023	Real Score: 0.5811	 Fake Scores: 0.6052 / 0.6051
Iteration :  700
[13/50][49/51]	Loss_D: 1.4631	Loss_G: 0.5141	Real Score: 0.5776	 Fake Scores: 0.5981 / 0.5980
Iteration :  800
[15/50][49/51]	Loss_D: 1.4474	Loss_G: 0.5254	Real Score: 0.5767	 Fake Scores: 0.5914 / 0.5913
Iteration :  900
[17/50][49/51]	Loss_D: 1.4412	Loss_G: 0.5362	Real Score: 0.5713	 Fake Scores: 0.5850 / 0.5850
Iterat

In [8]:
!pip install torchinfo --quiet
import torchinfo

In [38]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
batch = 250
Gtest = generator
Gtest.to(device)
print(torchinfo.summary(Gtest, input_size=(250, 100, 1)))
Dtest = discriminator
Dtest.to(device)
print(torchinfo.summary(Dtest, input_size=(250, 22, 100)))

Layer (type:depth-idx)                   Output Shape              Param #
Generator                                [250, 22, 100]            --
├─Sequential: 1-1                        [250, 22, 100]            --
│    └─ConvTranspose1d: 2-1              [250, 220, 4]             88,000
│    └─BatchNorm1d: 2-2                  [250, 220, 4]             440
│    └─ReLU: 2-3                         [250, 220, 4]             --
│    └─ConvTranspose1d: 2-4              [250, 154, 10]            135,520
│    └─BatchNorm1d: 2-5                  [250, 154, 10]            308
│    └─ReLU: 2-6                         [250, 154, 10]            --
│    └─ConvTranspose1d: 2-7              [250, 88, 22]             54,208
│    └─BatchNorm1d: 2-8                  [250, 88, 22]             176
│    └─ReLU: 2-9                         [250, 88, 22]             --
│    └─ConvTranspose1d: 2-10             [250, 44, 49]             27,104
│    └─BatchNorm1d: 2-11                 [250, 44, 49]           

In [13]:
## Testing the model
GAN_score = gan.evaluate(X_test, y_test, verbose=0)
print('Test accuracy of the hybrid CNN-LSTM model:', GAN_score[1])

AttributeError: ignored