In [1]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
from torch.autograd import Variable

from torch.utils.data import Dataset
from torch.utils.data import DataLoader

In [2]:
# https://discuss.pytorch.org/t/how-to-implement-keras-layers-core-lambda-in-pytorch/5903
class BestMeanPooling(nn.Module):
    
  def __init__(self, n_topN):
    super(BestMeanPooling, self).__init__()
    self.topN = n_topN
        
  def forward(self, aX_tr):
    aX_tr_sorted, _ = torch.sort(aX_tr, axis=1) # also return index
    return torch.mean(aX_tr_sorted[:, -self.topN:,:], axis=1)

# Test
aA = torch.randn(1, 200).reshape(4, 10, 5)
mean_pool = BestMeanPooling(2) 
aA = mean_pool(aA)
print(aA)

tensor([[1.8978, 0.5370, 0.8774, 1.3456, 0.8028],
        [0.9870, 1.4182, 1.0391, 0.3232, 1.0637],
        [0.7647, 1.4796, 0.9319, 2.3831, 1.6316],
        [1.1999, 1.7689, 1.4049, 1.7123, 1.4961]])


In [3]:
class MassCNN(nn.Module):

  def __init__(self, n_in, n_out, topN, prob=0.00):
    super(MassCNN,self).__init__()
    self.n_filter = n_out

    #first layers
    self.conv_1        = nn.Conv1d(n_in, n_out, kernel_size=1)
    
    # output of the layer is # of filters
    self.bmpool_1      = BestMeanPooling(topN)
    self.dropout_L1    = nn.Dropout(p=prob)
    
    # Dense layer, no flattening is required 
    self.fcn_1 = nn.Linear(n_out, 2)
    self.softmax = nn.LogSoftmax(dim=1)

  def forward(self, aX_tr):
    aX_tr = self.conv_1(aX_tr)
    aX_tr = torch.relu(aX_tr)
    aX_tr = self.bmpool_1(aX_tr)
    
    if self.n_filter > 4: 
      aX_tr = self.dropout_L1(aX_tr)
    print("Coding: after view ",  aX_tr.size())
    aX_tr = aX_tr.view(self.n_filter, -1)
    print("Coding: after view 1 ",  aX_tr.size())
    aX_tr = self.fcn_1(aX_tr)
    print("Coding: fcn_1 ", aX_tr.size())
    aX_tr = self.softmax(aX_tr)
    return aX_tr

In [4]:
class SyntheticData(Dataset):
    
    def __init__(self, upper, x, y, z):
      self.aX_tr_sy = np.random.random_sample(upper).reshape(x, y, z)
      self.ay_tr_sy = np.random.randint(0, 2, x)

    def __len__(self):
        return len(self.aX_tr_sy)

    def __getitem__(self, idx):
        return self.aX_tr_sy[idx], self.ay_tr_sy[idx] #.squeeze(1)

In [5]:
def train(n_epochs=10, lr=0.1, filter=5):
    """
      Aim:

      params:

      returns:
    """
    
    d_trains = SyntheticData(5000000, 10000, 50, 10)
    print(len(d_trains))
    d_valids = SyntheticData(50000, 100, 50, 10)
    d_tests  = SyntheticData(50000, 100, 50, 10)
    
    massModel = MassCNN(50, filter, 5)
    massModel = massModel.float()
    #print("Coding: ", massModel.parameters())
    optimizer = torch.optim.SGD(massModel.parameters(), lr=0.0001)
    fcost     = torch.nn.CrossEntropyLoss()
  
    cost_per_epoch = []
    #print("k", n_epochs)
    
    # aX_tr, ay_tr 
    aTrs = DataLoader(d_trains, batch_size=50, shuffle=True)
    aVas = DataLoader(d_valids, batch_size=10, shuffle=True)
    for epoch in range(n_epochs):
      fgr_cost = 0
      for x, y in aTrs:
        ## Back propagation the loss
        optimizer.zero_grad()
        y_pr  = massModel(x.float())
        loss  = fcost(y_pr, y)
        loss.backward()
        optimizer.step() # back propagate
        fgr_cost += loss.data
      cost_per_epoch.append(fgr_cost)
    
      correct=0
      for x_va, y_va in aVas:
        z  = massModel(x_va.float()).data
        temp, yhat = torch.max(z.data, 1)
        correct += (yhat == y_va).sum().item()
      accuracy = correct / 1000
      print(epoch, accuracy)
            
      # get accuracy list 
    #return f_ac_va



In [6]:
train()

10000
Coding: after view  torch.Size([50, 10])
Coding: after view 1  torch.Size([5, 100])


RuntimeError: size mismatch, m1: [5 x 100], m2: [5 x 2] at ../aten/src/TH/generic/THTensorMath.cpp:197