In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from torch.autograd import Variable
import time

import joblib

In [2]:
cuda = torch.cuda.is_available()

# Dataset

In [3]:
class EQDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.transform = transform
        self.data_dir = data_dir
        train_x, train_y = joblib.load(data_dir)
        self.train_x, self.train_y = torch.tensor(train_x, dtype = torch.float), torch.tensor(train_y, dtype = torch.float)
        self.len = self.train_x.shape[0]
        
    def __len__(self):
        return self.len
    
    def __getitem__(self, idx):
        
        X = self.train_x[idx]
        y = self.train_y[idx]

        if self.transform:
            X = self.transform(X)
            
        return X, y

# Model

In [4]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv1d(3, 5, kernel_size=5)
        self.maxP1 = nn.MaxPool1d(5, stride=2)
        self.lin = nn.Linear(246 * 5, 2, bias=False)
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.maxP1(x)
        x = F.leaky_relu(x)
        x = x.view(-1, self.num_flat_features(x))
        x = self.lin(x)
        
        return x
    
    def forwardToHidden(self, x):
        x = self.conv1(x)
        x = self.maxP1(x)
        x = F.leaky_relu(x)
        x = x.view(-1, self.num_flat_features(x))
        
        return x
    
    def num_flat_features(self, x):
        size = x.size()[1:]  # all dimensions except the batch dimension
        num_features = 1
        for s in size:
            num_features *= s
        return num_features

# Optimizer

In [5]:
class pseudoInverse(object):
    def __init__(self, params, C = 1e-2, L = 100):
        self.C = C
        self.L = L
        
        self.params = list(params)
        self.w = self.params[-1]
        self.w.data.fill_(0)
        self.cuda = self.w.is_cuda
        self.dimOutput, self.dimInput = self.w.data.size()
        
    def pseudoSparse(self, H, oneHotTarget):
        hth = torch.mm(H.t(), H) #[n_hidden, n_hidden]
        dimHidden = H.size()[1]
        I = Variable(torch.eye(dimHidden), requires_grad = False)
        if self.cuda:
            I = I.cuda()
        
        if self.L > 0.0:
            mu = torch.mean(H, dim = 0, keepdim = True) #[1, n_features]
            S = H - mu
            S = torch.mm(S.t(), S)
            self.B = Variable(torch.inverse(hth.data + self.C * (I.data + self.L * S)), requires_grad = False)
        else:
            self.B = Variable(torch.inverse(hth.data + self.C * I.data), requires_grad = False)
            
        w = torch.mm(self.B, H.t())
        # print('B: ', self.B.shape)
        # print('H: ', H.shape)
        # print('Y: ', oneHotTarget.shape)

        w = torch.mm(w, oneHotTarget)
        self.w.data = w.t().data
    
    def pseudoCompress(self, H, oneHotTarget):
        hht = torch.mm(H, H.t()) #[n_features, n_features]
        numSample = H.size()[0]
        I = Variable(torch.eye(numSample), requires_grad = False)
        if self.cuda:
            I = I.cuda()
            
        self.B = Variable(torch.inverse(hht.data + self.C * I.data), requires_grad = False)
        w = torch.mm(H.t(), self.B)
        w = torch.mm(w, oneHotTarget)
        self.w.data = w.t().data
        
    def get_hidden(self, hidden):
        return {'sparse':self.pseudoSparse, 'compress':self.pseudoCompress}[hidden]
    
    
    def train(self, H, y, oneHot = True, hiddenType = 'sparse'):
        y = y.view(y.size(0),-1)
        
        if oneHot:
            oneHotTarget = self.oneHotVectorize(y)
            
        hidden = self.get_hidden(hiddenType)
        hidden(H, oneHotTarget)

        
    def oneHotVectorize(self,targets):
        oneHotTarget = torch.zeros(targets.size()[0], self.dimOutput)

        for i in range(targets.size()[0]):
            oneHotTarget[i][int(targets[i].item())] = 1

        if self.cuda:
            oneHotTarget=oneHotTarget.cuda()
            
        oneHotTarget = Variable(oneHotTarget, requires_grad=False)

        return oneHotTarget

# Load Data

In [6]:
train_dir = f'../ISMP/dataset/trainset_v2_500_label4.jb'
test_dir = f'../ISMP/dataset/testing_500_label4.jb'

train_datasets = EQDataset(train_dir)
test_datasets = EQDataset(test_dir)

X_train, y_train = train_datasets[:]
X_test, y_test = test_datasets[:]

# Train

In [7]:
model = Net()
if cuda:
    model.cuda()
optimizer= pseudoInverse(params=model.parameters(), C = 0.01, L = 0)

In [8]:
%%time
X_train, y_train = X_train.cuda(), y_train.cuda()
X_train, y_train = Variable(X_train, requires_grad=False), Variable(y_train, requires_grad=False)

hiddenOut = model.forwardToHidden(X_train)
optimizer.train(hiddenOut, y_train)
output = model.forward(X_train)
pred = output.data.max(1)[1]
score_train = pred.eq(y_train.data).cpu().sum()
print(score_train / y_train.shape[0])

tensor(0.9035)
Wall time: 2.44 s


# Test

In [9]:
X_test, y_test = X_test.cuda(), y_test.cuda()
X_train, y_test = Variable(X_test, requires_grad=False), Variable(y_test, requires_grad=False)

output = model.forward(X_test)
pred = output.data.max(1)[1]
score_test = pred.eq(y_test.data).cpu().sum()
print(score_test / y_test.shape[0])

tensor(0.9555)


In [10]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

In [11]:
y_pred = pred.cpu().numpy()
y_test = y_test.cpu().numpy()

In [12]:
print('Precision: %.3f' % precision_score(y_test, y_pred))
print('Recall: %.3f' % recall_score(y_test, y_pred))
print('F1: %.3f' % f1_score(y_test, y_pred))
print('Accuracy: %.3f' % accuracy_score(y_test, y_pred))

Precision: 0.268
Recall: 0.788
F1: 0.400
Accuracy: 0.955
