In [1]:
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.functional as F
from torch.autograd  import Variable
import torch.optim as optim




In [2]:
class error_module(nn.Module):
    def __init__(self,size):
        super(error_module,self).__init__()
        self.error_linear = nn.Linear(size,1)
        self.Var_e = Variable(torch.ones(1, 1), requires_grad=True)
    def forward(self,x,prev_error):
        x = self.error_linear(x) + self.Var_e * prev_error
        
        return x



class classifier_module(nn.Module):
    def __init__(self,size):
        super(classifier_module,self).__init__()
        self.classifier_linear = nn.Linear(size,1)
        self.Var_w = Variable(torch.ones(1, 1), requires_grad=True)
    def forward(self,x, prev_error):
        x = self.classifier_linear(x) + self.Var_w * prev_error 
        
        return x
        


In [3]:
dataset_names = ["german", "ionosphere", "spambase", "magic", "a8a"]
root_path, extension = "./datasets/", "_numeric"


def get_path(name):
    '''returns a path pair to the preprocessed datasets
    X and y csv files.'''
    path = root_path + name + extension
    return path + "_X.csv", path + "_y.csv"


def read_dataset(X_path, y_path):
    '''reads and returns numpy arrays in a given pair of paths for 
    X and y.'''
    X = pd.read_csv(X_path).values
    y = pd.read_csv(y_path)['0'].values
    return X, y


def simulate_varying(X):  # multivariate normal distribution
    '''Get the data and generate a varying feature space pattern.
    Possible concerns: thresholding messing up the distribution?'''
    
    # create a covariance matrix
    cov = np.random.rand(num_features, num_features)
    cov = np.dot(cov, cov.transpose())  # to have a positive semi-definite matrix
    
    # create a mean vector
    mean = np.random.rand(len(X[0]))
    
    # sample from multivariate gaussian w/ given mean and cov
    spaces = np.random.multivariate_normal(mean, cov, len(X))
    
    # threshold samples for 1-hot encoding
    spaces[spaces < 0] = 0
    spaces[spaces != 0] = 1

    return spaces

def simulate_random_varying(X): # discrete uniform distribution
    matrix = np.random.randint(2, size=(len(X), len(X[0])))  
    return matrix


def quant(x, l):  # l: num_layers, x:input
    one_hot = []
    for i in x:
        if i != 0:
            one_hot.append(1)
        else:
            one_hot.append(0)
    one_hot = np.array(one_hot)
    
    qt = (one_hot-x)/l
    qts = []
    qts.append(one_hot)
    
    for i in range(l):
        qts.append(x + qt * (l-i+1))
        
    qts.append(x)    
    
    return np.array(qts)

In [4]:
X_path, y_path = get_path("german")
X, y = read_dataset(X_path, y_path)
print(np.unique(y,return_counts=True))
num_features = len(X[0])
folds = 20
learning_rate = 0.001

(array([-1.,  1.]), array([300, 700]))


In [5]:
class OPNet(nn.Module):
    def __init__(self,number_layers,size):
        super(OPNet,self).__init__()
        self.classifier_module = classifier_module(size)
        self.number_layers = number_layers
        self.error_modules = nn.ModuleList([error_module(size) for i in range(number_layers-1)])
            
    def forward(self,x):
        predict= torch.zeros(1, 1).double()
        errors = []
        errors.append(torch.zeros(1, 1).double())
        for i in range (self.number_layers - 1):
            predict = self.error_modules[i](x[i], predict) 
            errors.append(predict - errors[-1])
        
            
        pred = self.classifier_module(x[-1], predict) 
        errors.append(pred - errors[-1])
        
        return pred, errors
        

In [6]:

l = 5
net = OPNet(l,X.shape[1])
net = net.to(torch.double)
print(net)
parameter = list(net.parameters())
criterion = nn.HingeEmbeddingLoss()
optimizer = optim.SGD(net.parameters(), lr=0.1)

OPNet(
  (classifier_module): classifier_module(
    (classifier_linear): Linear(in_features=24, out_features=1, bias=True)
  )
  (error_modules): ModuleList(
    (0): error_module(
      (error_linear): Linear(in_features=24, out_features=1, bias=True)
    )
    (1): error_module(
      (error_linear): Linear(in_features=24, out_features=1, bias=True)
    )
    (2): error_module(
      (error_linear): Linear(in_features=24, out_features=1, bias=True)
    )
    (3): error_module(
      (error_linear): Linear(in_features=24, out_features=1, bias=True)
    )
  )
)


In [7]:

error=0
for i in range(len(X)):
    x= quant(X[i],l)
    x = torch.from_numpy(x)
    y_ = torch.from_numpy(y[i].reshape(1,1))
    pred = net(x)
    if torch.sign(pred[0]).detach().numpy()[0][0]!= y_:
        error +=1
    loss = criterion(torch.sign(pred[0]), y_)
    loss.backward()
    optimizer.step()
print("error_rate : ",error/len(X))
    
    
    
    
    

error_rate :  0.3
