In [1]:
import numpy as np
from numpy.linalg import inv
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
class Preprocessing:
    @classmethod
    def unique_value(self, X):
        unique_value = np.unique(X)
        return unique_value
    
    @classmethod
    def one_hot_matrix(self, X, unique_value):
        N = X.shape[0]
        n_value = len(unique_value)
        one_hot_matrix = np.zeros([N, n_value])
        for i, x in enumerate(X[:, 0:1]):
            j = np.argwhere(unique_value == x[0]).ravel()[0]
            one_hot_matrix[i, j] = 1
        return one_hot_matrix
    
    @classmethod
    def create_one_hot_matrix(self, X):
        unique_value = self.unique_value(X)
        one_hot_matrix = self.one_hot_matrix(X, unique_value)
        return one_hot_matrix

In [3]:
class ErrorScore:
    @classmethod
    def find_error(self, Y, Yhat, error_type):
        if error_type == 'SSE':
            error = self.find_SSE(Y, Yhat)
        elif error_type == 'MSE':
            error = self.find_MSE(Y, Yhat)
        elif error_type == 'MAE':
            error = self.find_MAE(Y, Yhat)
        elif error_type == 'MAPE':
            error = self.find_MAPE(Y, Yhat)
        elif error_type == 'entropy':
            error = self.find_entropy(Y, Yhat)
        elif error_type == 'binary':
            error = self.find_error_bin_class(Y, Yhat)
        elif error_type == 'multiclass':
            error = self.find_error_mul_class(Y, Yhat)
        return error
    
    @classmethod
    def find_SSE(self, Y, Yhat):
        SSE = ((Y - Yhat)**2).sum()
        return SSE
    
    @classmethod
    def find_MSE(self, Y, Yhat):
        N = Y.shape[0]
        SSE = ((Y - Yhat)**2).sum()
        MSE = SSE/N
        return MSE
    
    @classmethod
    def find_MAE(self, Y, Yhat):
        N = Y.shape[0]
        MAE = (np.abs(Y - Yhat)).sum()/N
        return MAE
    
    @classmethod
    def find_MAPE(self, Y, Yhat):
        N = Y.shape[0]
        MAPE = np.abs((Y - Yhat)/Y).sum()*100/N
        return MAPE
    
    @classmethod
    def find_entropy(self, Y, Yhat):
        entropy = (-Y*np.log(Yhat)).sum()
        return entropy
    
    @classmethod
    def find_error_bin_class(self, Y, Yhat):
        N = Y.shape[0]
        _Y = np.round(Y, 0)
        _Yhat = np.round(Yhat, 0)
        error = 100*(_Y != _Yhat).sum()/N
        return error
    
    @classmethod
    def find_error_mul_class(self, Y, Yhat):
        N = Y.shape[0]
        Y_argmax = np.argmax(Y, axis=1)
        Yhat_argmax = np.argmax(Yhat, axis=1)
        error = 100*(Y_argmax != Yhat_argmax).sum()/N
        return error

In [4]:
class LinearRegression:
    def __init__(self, n_jobs=None):
        self.n_jobs = n_jobs
    
    @classmethod
    def r(self, X, Y, sample_weight=None):
        if X.shape[0] != Y.shape[0]:
            raise Exception('n_samples must be same')
        if X.shape[1] != 1:
            raise Exception('n_features must be 1')
        if Y.shape[1] != 1:
            raise Exception('n_targets must be 1')
            
        if sample_weight is None:
            X_mean = X.mean()
            Y_mean = Y.mean()
        else:
            X = sample_weight*X
            Y = sample_weight*Y
            X_mean = X.mean()
            Y_mean = Y.mean()
        fraction = ((X - X_mean)*(Y - Y_mean)).sum()
        denominator = np.sqrt(((X- X_mean)**2).sum()*((Y - Y_mean)**2).sum())
        r = fraction/denominator
        return r
            
    def fit(self, X, Y, sample_weight=None):
        if X.shape[0] != Y.shape[0]:
            raise Exception('n_samples must be same')
        if X.shape[1] != 1:
            raise Exception('n_features must be 1')
        if Y.shape[1] != 1:
            raise Exception('n_targets must be 1')
            
        XY = X*Y
        X2 = X**2
        if sample_weight is None:
            X_mean = X.mean()
            Y_mean = Y.mean()
            XY_mean = XY.mean()
            X2_mean = X2.mean()
        else:
            X = sample_weight*X
            Y = sample_weight*Y
            XY = sample_weight*XY
            X2 = sample_weight*X2
            X_mean = X.mean()
            Y_mean = Y.mean()
            XY_mean = XY.mean()
            X2_mean = X2.mean()
        denominator = X2_mean - X_mean**2
        self.a = (XY_mean - X_mean*Y_mean)/denominator
        self.b = (X2_mean*Y_mean - X_mean*XY_mean)/denominator
        
    def predict(self, X):
        Yhat = self.a*X + self.b
        return Yhat
    
    def scatter(self, X, Y, line=False):
        plt.scatter(X, Y)
        plt.plot(X, self.a*X + self.b, color='red')

In [5]:
class MultipleRegression(ErrorScore):
    def __init__(self, approach='Global', n_jobs=None):
        self.approach = approach
        self.n_jobs = n_jobs
    
    def fit(self, X, Y, epoch=1000, learning_rate=0.01, error_type='SSE', sample_weight=None):
        if X.shape[0] != Y.shape[0]:
            raise Exception('n_samples must be same')
        if Y.shape[1] != 1:
            raise Exception('n_targets must be 1')
        N = X.shape[0]
        Xb = np.hstack([np.ones([N, 1]), X])
        if self.approach == 'Global':
            front = inv(np.dot(Xb.T, Xb))
            back = np.dot(Xb.T, Y)
            self.W = np.dot(front, back)
        elif self.approach == 'Local':
            error_list = []
            D = Xb.shape[1]
            W = np.random.randn(D, 1)
            for i in range(epoch):
                Yhat = np.dot(Xb, W)
                error = ErrorScore.find_error(Y, Yhat, error_type)
                error_list.append(error)
                W = W + (learning_rate/N)*np.dot(Xb.T, Y-Yhat)
            self.error_list = error_list
            self.W = W
    
    def predict(self, X):
        N = X.shape[0]
        Xb = np.hstack([np.ones([N, 1]), X])
        Yhat = np.dot(Xb, self.W)
        return Yhat

In [6]:
class ActivationFunction:
    @classmethod
    def sigmoid(self, Z):
        # Yhat = 1/(1 + np.e**(-Z))
        Yhat = 1/(1 + np.exp(-Z)) #faster
        return Yhat
    
    @classmethod
    def softmax(self, Z):
        Yhat = np.exp(Z)/np.exp(Z).sum(axis=1, keepdims = True)
        return Yhat

In [7]:
class LogisticRegression(ActivationFunction, ErrorScore):
    def __init__(self, n_jobs=None):
        self.n_jobs = n_jobs
    
    def fit(self, X, Y, epoch=1000, learning_rate=0.01, error_type='entropy', sample_weight=None):
        if X.shape[0] != Y.shape[0]:
            raise Exception('n_samples must be same')
        N = X.shape[0]
        D = X.shape[1] + 1
        Xb = np.hstack([np.ones([N, 1]), X])
        K = Y.shape[1]
        W = np.random.randn(D, K)
        error_list = []
        for i in range(epoch):
            Z = np.dot(Xb, W)
            if K == 1:
                Yhat = ActivationFunction.sigmoid(Z)
            elif K > 1:
                Yhat = ActivationFunction.softmax(Z)
            
            error = ErrorScore.find_error(Y, Yhat, error_type)
            error_list.append(error)
            W = W + (learning_rate/N)*np.dot(Xb.T, Y-Yhat)
        self.error_list = error_list
        self.W = W
    
    def predict(self, X):
        N = X.shape[0]
        Xb = np.hstack([np.ones([N, 1]), X])
        Z = np.dot(Xb, self.W)
        K = self.W.shape[1]
        if K == 1:
            Yhat = ActivationFunction.sigmoid(Z)
        elif K > 1:
            Yhat = ActivationFunction.softmax(Z)
        return Yhat

In [8]:
class tensor:
    adj = []
    total_id = 0
    e = 2.718281828459045
    n_samples = 'wait for assigned'
    
    def __init__(self, data, represent, torch_type='sample'):
        _dict = {}
        _dict['id'] = tensor.total_id
        _dict['data'] = data
        _dict['represent'] = represent
        _dict['torch_type'] = torch_type
        _dict['created_from'] = 'assigned'
        _dict['send_to'] = []
        _dict['diff'] = []
        _dict['family'] = set()
        _dict['storage_chain_rule'] = None
        if torch_type == 'sample' and tensor.n_samples == 'wait for assigned':
            tensor.n_samples = represent.shape[0]
        tensor.adj.append(_dict)
        self.id = tensor.total_id
        tensor.total_id += 1
        self.data = data
        
    def update_so(self, self_id, other_id, new_var_id, operation):
        tensor.adj[self_id]['send_to'].append(new_var_id)
        tensor.adj[other_id]['send_to'].append(new_var_id)
        tensor.adj[new_var_id]['created_from'] = operation
        
        self_set = tensor.adj[self_id]['family']
        other_set = tensor.adj[other_id]['family']
        tensor.adj[new_var_id]['family'].add(self_id)
        tensor.adj[new_var_id]['family'].add(other_id)
        tensor.adj[new_var_id]['family'] = tensor.adj[new_var_id]['family'] | self_set | other_set
        
    def update_s(self, self_id, new_var_id, operation):
        tensor.adj[self_id]['send_to'].append(new_var_id)
        tensor.adj[new_var_id]['created_from'] = operation
        
        self_set = tensor.adj[self_id]['family']
        tensor.adj[new_var_id]['family'].add(self_id)
        tensor.adj[new_var_id]['family'] = tensor.adj[new_var_id]['family'] | self_set
        
    def __add__(self, other):
        if isinstance(other, tensor):
            represent = tensor.adj[self.id]['represent'] + tensor.adj[other.id]['represent']
            new_var = tensor(self.data + other.data, represent)
            operation = ('+', self.id, other.id)
            self.update_so(self.id, other.id, new_var.id, operation)
            tensor.adj[self.id]['diff'].append(np.ones([tensor.n_samples, 1]))
            tensor.adj[other.id]['diff'].append(np.ones([tensor.n_samples, 1]))
        else:
            represent = tensor.adj[self.id]['represent'] + other
            new_var = tensor(self.data + other, represent)
            operation = ('+', self.id, str(other))
            self.update_s(self.id, new_var.id, operation)
            tensor.adj[self.id]['diff'].append(np.ones([tensor.n_samples, 1]))
        return new_var
    
    def __radd__(self, other):
        if not isinstance(other, tensor):
            represent = other + tensor.adj[self.id]['represent']
            new_var = tensor(other + self.data, represent)
            operation = ('+', str(other), self.id)
            self.update_s(self.id, new_var.id, operation)
            tensor.adj[self.id]['diff'].append(np.ones([tensor.n_samples, 1]))
        return new_var
    
    def __sub__(self, other):
        if isinstance(other, tensor):
            represent = tensor.adj[self.id]['represent'] - tensor.adj[other.id]['represent']
            new_var = tensor(self.data - other.data, represent)
            operation = ('-', self.id, other.id)
            self.update_so(self.id, other.id, new_var.id, operation)
            tensor.adj[self.id]['diff'].append(np.ones([tensor.n_samples, 1]))
            tensor.adj[other.id]['diff'].append(-np.ones([tensor.n_samples, 1]))
        else:
            represent = tensor.adj[self.id]['represent'] - other
            new_var = tensor(self.data - other, represent)
            operation = ('-', self.id, str(other))
            self.update_s(self.id, new_var.id, operation)
            tensor.adj[self.id]['diff'].append(np.ones([tensor.n_samples, 1]))
        return new_var
    
    def __rsub__(self, other):
        if not isinstance(other, tensor):
            represent = other - tensor.adj[self.id]['represent']
            new_var = tensor(other - self.data, represent)
            operation = ('-', str(other), self.id)
            self.update_s(self.id, new_var.id, operation)
            tensor.adj[self.id]['diff'].append(-np.ones([tensor.n_samples, 1]))
        return new_var
    
    def __mul__(self, other):
        if isinstance(other, tensor):
            represent = tensor.adj[self.id]['represent']*tensor.adj[other.id]['represent']
            new_var = tensor(self.data * other.data, represent)
            operation = ('*', self.id, other.id)
            self.update_so(self.id, other.id, new_var.id, operation)
            tensor.adj[self.id]['diff'].append(tensor.adj[other.id]['represent'])
            tensor.adj[other.id]['diff'].append(tensor.adj[self.id]['represent'])
        else:
            represent = tensor.adj[self.id]['represent']*other
            new_var = tensor(self.data * other, represent)
            operation = ('*', self.id, str(other))
            self.update_s(self.id, new_var.id, operation)
            tensor.adj[self.id]['diff'].append(other * np.ones([tensor.n_samples, 1]))
        return new_var
    
    def __rmul__(self, other):
        if not isinstance(other, tensor):
            represent = other*tensor.adj[self.id]['represent']
            new_var = tensor(other * self.data, represent)
            operation = ('*', str(other), self.id)
            self.update_s(self.id, new_var.id, operation)
            tensor.adj[self.id]['diff'].append(other * np.ones([tensor.n_samples, 1]))
        return new_var
    
    def __truediv__(self, other):
        if isinstance(other, tensor):
            represent = tensor.adj[self.id]['represent']/tensor.adj[other.id]['represent']
            new_var = tensor(self.data / other.data, represent)
            operation = ('/', self.id, other.id)
            self.update_so(self.id, other.id, new_var.id, operation)
            tensor.adj[self.id]['diff'].append(1/tensor.adj[other.id]['represent'])
            tensor.adj[other.id]['diff'].append(-tensor.adj[self.id]['represent']/tensor.adj[other.id]['represent']**2)
        else:
            represent = tensor.adj[self.id]['represent']/other
            new_var = tensor(self.data / other, represent)
            operation = ('/', self.id, str(other))
            self.update_s(self.id, new_var.id, operation)
            tensor.adj[self.id]['diff'].append((1/other) * np.ones([tensor.n_samples, 1]))
        return new_var
    
    def __rtruediv__(self, other):
        if not isinstance(other, tensor):
            represent = other / tensor.adj[self.id]['represent']
            new_var = tensor(other / self.data, represent)
            operation = ('/', str(other), self.id)
            self.update_s(self.id, new_var.id, operation)
            tensor.adj[self.id]['diff'].append(-other/tensor.adj[self.id]['represent']**2)
        return new_var
        
    def __pow__(self, other):
        if isinstance(other, tensor):
            represent = tensor.adj[self.id]['represent'] ** tensor.adj[other.id]['represent']
            new_var = tensor(self.data ** other.data, represent)
            operation = ('**', self.id, other.id)
            self.update_so(self.id, other.id, new_var.id, operation)
            self_rep = tensor.adj[self.id]['represent']
            other_rep = tensor.adj[other.id]['represent']
            tensor.adj[self.id]['diff'].append(other_rep*self_rep**(other_rep-1))
            tensor.adj[other.id]['diff'].append((self_rep**other_rep)*np.log(self_rep))
        else:
            represent = tensor.adj[self.id]['represent'] ** other
            new_var = tensor(self.data ** other, represent)
            operation = ('**', self.id, str(other))
            self.update_s(self.id, new_var.id, operation)
            self_rep = tensor.adj[self.id]['represent']
            tensor.adj[self.id]['diff'].append(other*self_rep**(other-1))
        return new_var
    
    def __rpow__(self, other):
        if not isinstance(other, tensor):
            represent = other ** tensor.adj[self.id]['represent']
            new_var = tensor(other ** self.data, represent)
            operation = ('**', str(other), self.id)
            self.update_s(self.id, new_var.id, operation)
            self_rep = tensor.adj[self.id]['represent']
            tensor.adj[self.id]['diff'].append((other**self_rep)*np.log(other))
        return new_var
    
    def __neg__(self):
        represent = -tensor.adj[self.id]['represent']
        new_var = tensor(-self.data, represent)
        operation = ('-1*', self.id)
        self.update_s(self.id, new_var.id, operation)
        tensor.adj[self.id]['diff'].append(-np.ones([tensor.n_samples, 1]))
        return new_var
    
    def __abs__(self):
        represent = np.abs(tensor.adj[self.id]['represent'])
        new_var = tensor(abs(self.data), represent)
        operation = ('abs', self.id)
        self.update_s(self.id, new_var.id, operation)
        diff_abs = (tensor.adj[self.id]['represent'] > 0) -1*(tensor.adj[self.id]['represent'] < 0)
        tensor.adj[self.id]['diff'].append(diff_abs * np.ones([tensor.n_samples, 1]))
        return new_var
    
    def log(self):
        represent = np.log(tensor.adj[self.id]['represent'])
        new_var = tensor(np.log(self.data), represent)
        operation =  ('log', self.id)
        self.update_s(self.id, new_var.id, operation)
        tensor.adj[self.id]['diff'].append(1/(tensor.adj[self.id]['represent']))
        return new_var
    
    def maximum(self, other):
        if isinstance(other, tensor):
            represent = np.maximum(tensor.adj[self.id]['represent'], tenor.adj[other.id]['represent'])
            new_var = tensor(np.maximum(self.data, other.data), represent)
            operation = ('maximum', self.id, other.id)
            self.update_so(self.id, other.id, new_var.id, operation)
            diff_self = (represent == tensnor.adj[self.id]['represent'])
            diff_other = (represent == tensor.adj[other.id]['represent'])
            tensor.adj[self.id]['diff'].append(diff_self)
            tensor.adj[other.id]['diff'].append(diff_other)
        else:
            represent = np.maximum(tensor.adj[self.id]['represent'], other)
            new_var = tensor(np.maximum(self.data, other), represent)
            operation = ('maximum', self.id, str(other))
            self.update_s(self.id, new_var.id, operation)
            diff_self = (represent == tensor.adj[self.id]['represent'])
            tensor.adj[self.id]['diff'].append(diff_self)
        return new_var
    
    def minimum(self, other):
        if isinstance(other, tensor):
            represent = np.minimum(tensor.adj[self.id]['represent'], tenor.adj[other.id]['represent'])
            new_var = tensor(np.minimum(self.data, other.data), represent)
            operation = ('minimum', self.id, other.id)
            self.update_so(self.id, other.id, new_var.id, operation)
            diff_self = (represent == tensnor.adj[self.id]['represent'])
            diff_other = (represent == tensor.adj[other.id]['represent'])
            tensor.adj[self.id]['diff'].append(diff_self)
            tensor.adj[other.id]['diff'].append(diff_other)
        else:
            represent = np.minimum(tensor.adj[self.id]['represent'], other)
            new_var = tensor(np.minimum(self.data, other), represent)
            operation = ('minimum', self.id, str(other))
            self.update_s(self.id, new_var.id, operation)
            diff_self = (represent == tensor.adj[self.id]['represent'])
            tensor.adj[self.id]['diff'].append(diff_self)
        return new_var

In [9]:
class torch(tensor):
    e = 2.718281828459045
    n_samples = 'wait for assigned'
    
    def __init__(self, data, torch_type='sample', assigned=False):
        self.data = data
        self.torch_type = torch_type
        if assigned == True:
            if torch_type == 'sample':
                self.rep = self.ele_to_tensor(data, torch_type)
                if torch.n_samples == 'wait for assigned':
                    torch.n_samples = data.shape[0]
            elif torch_type == 'weight':
                self.rep = self.ele_to_tensor(data, torch_type)
            elif torch_type == 'bias':
                self.rep = self.ele_to_tensor(data, torch_type)
        else:
            self.rep = 'wait for assigned'

    def ele_to_tensor(self, matrix, torch_type):
        if torch_type == 'sample':
            sample_matrix = matrix[:1, :]
            o_matrix = np.array(sample_matrix, dtype='object')
        elif torch_type != 'sample':
            o_matrix = np.array(matrix, dtype='object')
        n_rows, n_cols = o_matrix.shape
        for r in range(n_rows):
            for c in range(n_cols):
                if torch_type == 'sample':
                    o_matrix[r,c] = tensor(o_matrix[r,c], matrix[:,c:c+1], torch_type)
                elif torch_type != 'sample':
                    o_matrix[r,c] = tensor(o_matrix[r,c], 
                                    o_matrix[r,c]*np.ones([torch.n_samples, 1]), torch_type)
        return o_matrix
    
    @classmethod
    def ele_to_numeric(self, matrix):
        matrix = np.array(matrix, dtype='object')
        n_rows, n_cols = matrix.shape
        for r in range(n_rows):
            for c in range(n_cols):
                matrix[r,c] = matrix[r,c].data
        return matrix
    
    @classmethod
    def clear_adj(self):
        tensor.adj = []
        tensor.total_id = 0
        tensor.n_samples = 'wait for assigned'
        torch.n_samples = 'wait for assigned'
            
    def dot(self, other):
        if isinstance(other, tensor):
            new_var = torch(np.dot(self.data, other.data))
            new_var.rep = np.dot(self.rep, other.rep)
        else:
            new_var = torch(np.dot(self.data, other))
            new_var.rep = np.dot(self.rep, other)
        return new_var
    
    def __add__(self, other):
        if isinstance(other, torch):
            new_var = torch(self.data + other.data)
            new_var.rep = self.rep + other.rep
        else:
            new_var = torch(self.data + other)
            new_var.rep = self.rep + other
        return new_var
    
    def __radd__(self, other):
        if not isinstance(other, torch):
            new_var = torch(other + self.data)
            new_var.rep = other + self.rep
        return new_var
    
    def __sub__(self, other):
        if isinstance(other, torch):
            new_var = torch(self.data - other.data)
            new_var.rep = self.rep - other.rep
        else:
            new_var = torch(self.data - other)
            new_var.rep = self.rep - other
        return new_var
            
    def __rsub__(self, other):
        if not isinstance(other, torch):
            new_var = torch(other - self.data)
            new_var.rep = other - self.rep
        return new_var
    
    def __mul__(self, other):
        if isinstance(other, torch):
            new_var = torch(self.data * other.data)
            new_var.rep = self.rep * other.rep
        else:
            new_var = torch(self.data * other)
            new_var.rep = self.rep * other
        return new_var
    
    def __rmul__(self, other):
        if not isinstance(other, torch):
            new_var = torch(other * self.data)
            new_var.rep = other * self.rep
        return new_var
    
    def __truediv__(self, other):
        if isinstance(other, torch):
            new_var = torch(self.data / other.data)
            new_var.rep = self.rep / other.rep
        else:
            new_var = torch(self.data / other)
            new_var.rep = self.rep / other
        return new_var
        
    def __rtruediv__(self, other):
        if not isinstance(other, torch):
            new_var = torch(other / self.data)
            new_var.rep = other / self.rep
        return new_var
    
    def __pow__(self, other):
        if isinstance(other, torch):
            new_var = torch(self.data ** other.data)
            new_var.rep = self.rep ** other.rep
        else:
            new_var = torch(self.data ** other)
            new_var.rep = self.rep ** other
        return new_var
    
    def __rpow__(self, other):
        if not isinstance(other, torch):
            new_var = torch(other ** self.data)
            new_var.rep = other ** self.rep
        return new_var
    
    def __neg__(self):
        new_var = torch(-self.data)
        new_var.rep = -self.rep
        return new_var
    
    def abs(self):
        new_var = torch(np.abs(self.data))
        new_var.rep = np.abs(self.rep)
        return new_var
    
    def log(self):
        new_var = torch(np.log(self.data))
        new_var.rep = np.log(self.rep)
        return new_var

    def maximum(self, other):
        if isinstance(self, torch):
            if isinstance(other, torch):
                new_var = torch(np.maximum(self.data, other.data))
                n_rows, n_cols = self.rep.shape
                max_matrix = np.zeros([n_rows, n_cols], dtype='object')
                for r in range(n_rows):
                    for c in range(n_cols):
                        max_tensor = tensor.maximum(self.rep[r,c], other.rep[r,c])
                        max_matrix[r,c] = max_tensor
                new_var.rep = max_matrix
            else:
                new_var = torch(np.maximum(self.data, other))
                n_rows, n_cols = self.rep.shape
                max_matrix = np.zeros([n_rows, n_cols], dtype='object')
                for r in range(n_rows):
                    for c in range(n_cols):
                        max_tensor = tensor.maximum(self.rep[r,c], other)
                        max_matrix[r,c] = max_tensor
                new_var.rep = max_matrix
        else:
            if isinstance(other, torch):
                new_var = torch(np.maximum(self, other.data))
                n_rows, n_cols = other.rep.shape
                max_matrix = np.zeros([n_rows, n_cols], dtype='object')
                for r in range(n_rows):
                    for c in range(n_cols):
                        max_tensor = tensor.maximum(other.rep[r,c], self)
                        max_matrix[r,c] = max_tensor
                new_var.rep = max_matrix
            else:
                new_var = torch(np.maximum(self, other))
        return new_var
    
    def minimum(self, other):
        if isinstance(self, torch):
            if isinstance(other, torch):
                new_var = torch(np.minimum(self.data, other.data))
                n_rows, n_cols = self.rep.shape
                min_matrix = np.zeros([n_rows, n_cols], dtype='object')
                for r in range(n_rows):
                    for c in range(n_cols):
                        min_tensor = tensor.minimum(self.rep[r,c], other.rep[r,c])
                        min_matrix[r,c] = min_tensor
                new_var.rep = min_matrix
            else:
                new_var = torch(np.minimum(self.data, other))
                n_rows, n_cols = self.rep.shape
                min_matrix = np.zeros([n_rows, n_cols], dtype='object')
                for r in range(n_rows):
                    for c in range(n_cols):
                        min_tensor = tensor.minimum(self.rep[r,c], other)
                        min_matrix[r,c] = min_tensor
                new_var.rep = min_matrix
        else:
            if isinstance(other, torch):
                new_var = torch(np.minimum(self, other.data))
                n_rows, n_cols = other.rep.shape
                min_matrix = np.zeros([n_rows, n_cols], dtype='object')
                for r in range(n_rows):
                    for c in range(n_cols):
                        min_tensor = tensor.minimum(other.rep[r,c], self)
                        min_matrix[r,c] = min_tensor
                new_var.rep = min_matrix
            else:
                new_var = torch(np.maximum(self, other))
        return new_var

    def relu(self):
        relu = torch.maximum(self, 0)
        return relu
    
    def prelu(self, alpha):
        prelu = torch.maximum(self, 0) + alpha*torch.minimum(self, 0)
        return prelu
    
    def sigmoid(self):
        sigmoid = 1/(1 + torch.e**(-self))
        return sigmoid
    
    def tanh(self):
        tanh = (torch.e**self - torch.e**(-self))/(torch.e**self + torch.e**(-self))
        return tanh
    
    def softmax(self):
        eZ = torch.e**self
        denominator = torch.col_sum(eZ)
        softmax = eZ/denominator
        return softmax
    
    def col_sum(self):
        new_var = torch(np.sum(self.data, axis=1, keepdims=True))
        new_var.rep = np.sum(self.rep, axis=1, keepdims=True)
        return new_var
    
    def sum(self):
        new_var = torch(np.sum(self.data))
        new_var.rep = np.sum(self.rep)
        return new_var
    
    def mean(self):
        new_var = torch(np.mean(self.data))
        # new_var.rep = np.mean(self.rep)
        new_var.rep = self.rep/tensor.n_samples
        return new_var
    
    def sse(self, other):
        error = (self - other)**2
        sse = torch.sum(error)
        return sse
    
    def mse(self, other):
        error = (self - other)**2
        mse = torch.mean(error)
        return mse
    
    def m2_entropy(self, other):
        error = -(self*np.log(other+np.finfo(np.float32).eps) + (1-self)*np.log(1-other+np.finfo(np.float32).eps))
        entropy = torch.mean(error)
        return entropy
    
    def m_entropy(self, other):
        error = -self*np.log(other+np.finfo(np.float32).eps)
        col_error = torch.col_sum(error)
        entropy = torch.mean(col_error)
        return entropy
    
    @classmethod
    def forward_diff(self, y_id, x_id):
        if (x_id in tensor.adj[y_id]['family']) or (x_id == y_id):
            if tensor.adj[x_id]['storage_chain_rule'] is not None:
                return tensor.adj[x_id]['storage_chain_rule']
            else:
                send_to = tensor.adj[x_id]['send_to']
                n_send = len(send_to)
                if n_send == 1:
                    target_id = send_to[0]
                    dy_dx = tensor.adj[x_id]['diff'][0] * self.forward_diff(y_id, target_id)
                    tensor.adj[x_id]['storage_chain_rule'] = dy_dx
                    return dy_dx
                elif n_send > 1:
                    dy_dx = 0
                    start_index = n_send - 5
                    if start_index <= 0:
                        start_index = 0
                    for i in range(start_index, n_send):
                        target_id = send_to[i]
                        dy_dx += tensor.adj[x_id]['diff'][i] * self.forward_diff(y_id, target_id)
                    tensor.adj[x_id]['storage_chain_rule'] = dy_dx
                    return dy_dx
                elif n_send == 0:
                    return 1
        else:
            return 0
    
    def gradient(self):
        n_rows, n_cols = self.rep.shape
        diff_matrix = np.zeros([n_rows, n_cols])
        for r in range(n_rows):
            for c in range(n_cols):
                diff_matrix[r,c] = np.sum(torch.forward_diff(tensor.adj[-1]['id'], self.rep[r,c].id))
        return diff_matrix

In [10]:
class NN(torch):
    def __init__(self, HL, AF, n_jobs=None):
        self.n_jobs = n_jobs
        if len(HL) != len(AF):
            raise Exception('n_layers must be same')
        self.HL = HL
        self.AF = AF
        
    def fit(self, X, Y, loss_function, epoch=1000, learning_rate=0.01, sample_weight=None):
        if X.shape[0] != Y.shape[0]:
            raise Exception('n_sample must be same')
        D = X.shape[1]
        W, B = self.create_WB(D, self.HL)
        loss_list = []
        for i in range(epoch):
            t_X0 = torch(X, torch_type='sample', assigned=True)
            t_Y = torch(Y, torch_type='sample', assigned=True)
            t_W = self.to_torch(W, torch_type='weight')
            t_B = self.to_torch(B, torch_type='bias')
            t_X = self.forward(t_X0, t_W, t_B, self.AF)
            loss = self.compute_loss(t_Y, t_X[-1], loss_function)
            loss_list.append(loss.data)
            dW, dB = self.compute_gradient(t_W, t_B)
            W, B = self.update_WB(W, B, learning_rate, dW, dB)
            torch.clear_adj()
        self.loss_list = loss_list
        self.W = W
        self.B = B
         
    @classmethod
    def create_WB(self, D, HL):
        W = []
        B = []
        for i in range(len(HL)):
            if i == 0:
                W_i = np.random.randn(D, HL[0])/np.sqrt(HL[0])
            else:
                W_i = np.random.randn(HL[i-1], HL[i])/np.sqrt(HL[i])
            B_i = np.random.randn(1, HL[i])/np.sqrt(HL[i])
            W.append(W_i)
            B.append(B_i)
        return W, B
    
    @classmethod
    def to_torch(self, tensor_list, torch_type, assigned=True):
        torch_list = []
        for i in range(len(tensor_list)):
            _torch = torch(tensor_list[i], torch_type, assigned)
            torch_list.append(_torch)
        return torch_list
    
    @classmethod
    def forward(self, t_X0, t_W, t_B, AF):
        n_layers = len(t_W)
        Z = []
        X = []
        for i in range(n_layers):
            if i == 0:
                Z_i = torch.dot(t_X0, t_W[0]) + t_B[0]
            else:
                Z_i = torch.dot(X_i, t_W[i]) + t_B[i]
            X_i = self.compute_X(Z_i, AF[i])
            Z.append(Z_i)
            X.append(X_i)
        return X
    
    @classmethod
    def compute_X(self, Z_i, af):
        if isinstance(af, str):
            if af == 'sigmoid':
                X_i = torch.sigmoid(Z_i)
            elif af == 'tanh':
                X_i = torch.tanh(Z_i)
            elif af == 'relu':
                X_i = torch.relu(Z_i)
            elif af == 'softmax':
                X_i = torch.softmax(Z_i)
        elif isinstance(af, list):
            if af[0] == 'prelu':
                X_i = torch.prelu(Z_i, af[1])
        return X_i
    
    @classmethod
    def compute_loss(self, t_Y, t_Yhat, loss_function):
        if loss_function == 'sse':
            loss = torch.sse(t_Y, t_Yhat)
        elif loss_function == 'mse':
            loss = torch.mse(t_Y, t_Yhat)
        elif loss_function == 'm2_entropy':
            loss = torch.m2_entropy(t_Y, t_Yhat)
        elif loss_function == 'm_entropy':
            loss = torch.m_entropy(t_Y, t_Yhat)
        return loss
        
    @classmethod
    def compute_gradient(self, t_W, t_B):
        n_layers = len(t_W)
        dW = []
        dB = []
        for i in range(n_layers):
            dW_i = torch.gradient(t_W[i])
            dB_i = torch.gradient(t_B[i])
            dW.append(dW_i)
            dB.append(dB_i)
        return dW, dB
    
    @classmethod
    def update_WB(self, W, B, learning_rate, dW, dB):
        n_layers = len(W)
        for i in range(n_layers):
            W[i] -= learning_rate*dW[i]
            B[i] -= learning_rate*dB[i]
        return W, B
    
    def predict(self, X):
        t_X0 = torch(X, torch_type='sample', assigned=True)
        t_W = self.to_torch(self.W, torch_type='weight')
        t_B = self.to_torch(self.B, torch_type='bias')
        t_X = self.forward(t_X0, t_W, t_B, self.AF)
        torch.clear_adj()
        return t_X[-1].data

In [11]:
class RNN(NN):
    def __init__(self, n_ts_feature, n_ts_target, RNN_HL, ANN_HL, ANN_AF, n_jobs=None):
        self.n_jobs = n_jobs
        if n_ts_feature < n_ts_target:
            raise Exception('timestep of feature must be greater than timestep of target')
        if len(ANN_HL) != len(ANN_AF):
            raise Exception('n_layers must be same')
        self.n_ts_feature = n_ts_feature
        self.n_ts_target = n_ts_target
        self.RNN_HL = RNN_HL
        self.ANN_HL = ANN_HL
        self.ANN_AF = ANN_AF
    
    def fit(self, X, Y, loss_function, epoch=1000, learning_rate=0.01, sample_weight=None):
        if X.shape[0] != Y.shape[0]:
            raise Exception('n_samples must be same')
        _D = X.shape[1]
        _Y = Y.shape[1]
        n_features = int(_D/self.n_ts_feature)
        n_classes = int(_Y/self.n_ts_target)
        X0_TS = self.seperate_timestep(X, self.n_ts_feature)
        Y_TS = self.seperate_timestep(Y, self.n_ts_target)
        W, U, B = self.create_WUB(n_features, self.RNN_HL)
        Wout, Bout = NN.create_WB(self.RNN_HL[-1], self.ANN_HL)
        loss_list = []
        for i in range(epoch):
            t_X0_TS = NN.to_torch(X0_TS, torch_type='sample')
            t_Y_TS = NN.to_torch(Y_TS, torch_type='sample')
            t_W = NN.to_torch(W, torch_type='weight')
            t_U = NN.to_torch(U, torch_type='weight')
            t_B = NN.to_torch(B, torch_type='bias')
            t_Wout = NN.to_torch(Wout, torch_type='weight')
            t_Bout = NN.to_torch(Bout, torch_type='bias')
            t_X = self.forward(t_X0_TS, t_W, t_U, t_B, t_Wout, t_Bout)
            loss = 0
            for j in range(self.n_ts_target):
                loss += NN.compute_loss(t_Y_TS[j], t_X[-1][j], loss_function)
            loss_list.append(loss.data)
            dW, dU, dB, dWout, dBout = self.compute_gradient(t_W, t_U, t_B, t_Wout, t_Bout)
            W, U, B, Wout, Bout = self.update_all_weight(W, U, B, Wout, Bout, learning_rate, dW, dU, dB, dWout, dBout)
            torch.clear_adj()
        self.loss_list = loss_list
        self.W = W
        self.U = U
        self.B = B
        self.Wout = Wout
        self.Bout = Bout
    
    @classmethod
    def seperate_timestep(self, Data, n_ts):
        n_cols = Data.shape[1]
        n_features = int(n_cols/n_ts)
        Data_TS = []
        for i in range(n_ts):
            Data_TS_i = Data[:, i*n_features:(i+1)*n_features]
            Data_TS.append(Data_TS_i)
        return Data_TS
    
    def create_WUB(self, n_features, HL):
        W = []
        U = []
        B = []
        for i in range(len(HL)):
            if i == 0:
                W_i = np.random.randn(n_features, HL[0])/np.sqrt(HL[0])
            else:
                W_i = np.random.randn(HL[i-1], HL[i])/np.sqrt(HL[i])
            U_i = np.random.randn(HL[i], HL[i])/np.sqrt(HL[i])
            B_i = np.random.randn(1, HL[i])/np.sqrt(HL[i])
            W.append(W_i)
            U.append(U_i)
            B.append(B_i)
        return W, U, B
        
    def forward(self, t_X0_TS, t_W, t_U, t_B, t_Wout, t_Bout):
        n_RNN_layers = len(t_W)
        Z = []
        H = []                
        for i in range(n_RNN_layers):
            Z_i = []
            H_i = []
            if i == 0: 
                for j in range(self.n_ts_feature):
                    if j == 0:
                        Z_ij = torch.dot(t_X0_TS[j], t_W[i]) + t_B[i]
                    else:
                        Z_ij = torch.dot(t_X0_TS[j], t_W[i]) + torch.dot(H_ij, t_U[i]) + t_B[i]
                    H_ij = torch.tanh(Z_ij)
                    Z_i.append(Z_ij)
                    H_i.append(H_ij)
            else:
                for j in range(self.n_ts_feature):
                    if j == 0:
                        Z_ij = torch.dot(H[-1][j], t_W[i]) + t_B[i]
                    else:
                        Z_ij = torch.dot(H[-1][j], t_W[i]) + torch.dot(H_ij, t_U[i]) + t_B[i]
                    H_ij = torch.tanh(Z_ij)
                    Z_i.append(Z_ij)
                    H_i.append(H_ij)
            Z.append(Z_i)
            H.append(H_i)
        n_ANN_layers = len(t_Wout)
        S = []
        X = []
        for i in range(n_ANN_layers):
            S_i = []
            X_i = []
            if i == 0:
                for j in range(self.n_ts_target, 0, -1):
                    S_ij = torch.dot(H[-1][-j], t_Wout[i]) + t_Bout[i]
                    X_ij = NN.compute_X(S_ij, self.ANN_AF[i])
                    S_i.append(S_ij)
                    X_i.append(X_ij)
            else:
                for j in range(self.n_ts_target):
                    S_ij = torch.dot(X[-1][j], t_Wout[i]) + t_Bout[i]
                    X_ij = NN.compute_X(S_ij, self.ANN_AF[i])
                    S_i.append(S_ij)
                    X_i.append(X_ij)
            S.append(S_i)
            X.append(X_i)
        return X
    
    def compute_gradient(self, t_W, t_U, t_B, t_Wout, t_Bout):
        n_RNN_layers = len(t_W)
        dW = []
        dU = []
        dB = []
        for i in range(n_RNN_layers):
            dW_i = torch.gradient(t_W[i])
            dU_i = torch.gradient(t_U[i])
            dB_i = torch.gradient(t_B[i])
            dW.append(dW_i)
            dU.append(dU_i)
            dB.append(dB_i)
        n_ANN_layers = len(t_Wout)
        dWout = []
        dBout = []
        for i in range(n_ANN_layers):
            dWout_i = torch.gradient(t_Wout[i])
            dBout_i = torch.gradient(t_Bout[i])
            dWout.append(dWout_i)
            dBout.append(dBout_i)
        return dW, dU, dB, dWout, dBout
    
    def update_all_weight(self, W, U, B, Wout, Bout, learning_rate, dW, dU, dB, dWout, dBout):
        n_RNN_layers = len(W)
        for i in range(n_RNN_layers):
            W[i] -= learning_rate*dW[i]
            U[i] -= learning_rate*dU[i]
            B[i] -= learning_rate*dB[i]
        n_ANN_layers = len(Wout)
        for i in range(n_ANN_layers):
            Wout[i] -= learning_rate*dWout[i]
            Bout[i] -= learning_rate*dBout[i]
        return W, U, B, Wout, Bout
    
    def predict(self, X):
        X0_TS = self.seperate_timestep(X, self.n_ts_feature)
        t_X0_TS = NN.to_torch(X0_TS, torch_type='sample')
        t_W = NN.to_torch(self.W, torch_type='weight')
        t_U = NN.to_torch(self.U, torch_type='weight')
        t_B = NN.to_torch(self.B, torch_type='bias')
        t_Wout = NN.to_torch(self.Wout, torch_type='weight')
        t_Bout = NN.to_torch(self.Bout, torch_type='bias')
        t_X = self.forward(t_X0_TS, t_W, t_U, t_B, t_Wout, t_Bout)
        torch.clear_adj()
        return t_X[-1]

In [14]:
class GRU(RNN, NN):
    def __init__(self, n_ts_feature, n_ts_target, RNN_HL, ANN_HL, ANN_AF, n_jobs=None):
        self.n_jobs = n_jobs
        if n_ts_feature < n_ts_target:
            raise Exception('timestep of feature must be greater than timestep of target')
        if len(ANN_HL) != len(ANN_AF):
            raise Exception('n_layers must be same')
        self.n_ts_feature = n_ts_feature
        self.n_ts_target = n_ts_target
        self.RNN_HL = RNN_HL
        self.ANN_HL = ANN_HL
        self.ANN_AF = ANN_AF
        
    def fit(self, X, Y, loss_function, epoch=1000, learning_rate=0.01, sample_weight=None):
        if X.shape[0] != Y.shape[0]:
            raise Exception('n_samples must be same')
        _D = X.shape[1]
        _Y = Y.shape[1]
        n_features = int(_D/self.n_ts_feature)
        n_classes = int(_Y/self.n_ts_target)
        X0_TS = RNN.seperate_timestep(X, self.n_ts_feature)
        Y_TS = RNN.seperate_timestep(Y, self.n_ts_target)
        Wr, Wz, Wht, Ur, Uz, Uht, Br, Bz, Bht = self.create_GRU_weight(n_features, self.RNN_HL)
        Wout, Bout = NN.create_WB(self.RNN_HL[-1], self.ANN_HL)
        loss_list = []
        for i in range(epoch):
            t_X0_TS = NN.to_torch(X0_TS, torch_type='sample')
            t_Y_TS = NN.to_torch(Y_TS, torch_type='sample')
            t_Wr = NN.to_torch(Wr, torch_type='weight')
            t_Wz = NN.to_torch(Wz, torch_type='weight')
            t_Wht = NN.to_torch(Wht, torch_type='weight')
            t_Ur = NN.to_torch(Ur, torch_type='weight')
            t_Uz = NN.to_torch(Uz, torch_type='weight')
            t_Uht = NN.to_torch(Uht, torch_type='weight')
            t_Br = NN.to_torch(Br, torch_type='bias')
            t_Bz = NN.to_torch(Bz, torch_type='bias')
            t_Bht = NN.to_torch(Bht, torch_type='bias')
            t_Wout = NN.to_torch(Wout, torch_type='weight')
            t_Bout = NN.to_torch(Bout, torch_type='bias')
            t_X = self.forward(t_X0_TS, t_Wr, t_Wz, t_Wht, t_Ur, t_Uz, t_Uht, 
                               t_Br, t_Bz, t_Bht, t_Wout, t_Bout)
            loss = 0
            for j in range(self.n_ts_target):
                loss += NN.compute_loss(t_Y_TS[j], t_X[-1][j], loss_function)
            loss_list.append(loss.data)
            dWr, dWz, dWht, dUr, dUz, dUht, dBr, dBz, dBht, dWout, dBout = self.compute_gradient(t_Wr, t_Wz, t_Wht, t_Ur, t_Uz, t_Uht, 
                                                                                                 t_Br, t_Bz, t_Bht, t_Wout, t_Bout)
            Wr, Wz, Wht, Ur, Uz, Uht, Br, Bz, Bht, Wout, Bout = self.update_all_weight(Wr, Wz, Wht, Ur, Uz, Uht, 
                                                                                       Br, Bz, Bht, Wout, Bout, 
                                                                                       learning_rate, dWr, dWz, dWht, 
                                                                                       dUr, dUz, dUht, dBr, dBz, dBht, dWout, dBout)
            torch.clear_adj()
        self.loss_list = loss_list
        self.Wr = Wr; self.Wz = Wz; self.Wht = Wht
        self.Ur = Ur; self.Uz = Uz; self.Uht = Uht
        self.Br = Br; self.Bz = Bz; self.Bht = Bht
        self.Wout = Wout; self.Bout = Bout
        
    def create_GRU_weight(self, n_features, HL):
        Wr = []; Wz = []; Wht = []
        Ur = []; Uz = []; Uht = []
        Br = []; Bz = []; Bht = []
        for i in range(len(HL)):
            if i == 0:
                Wr_i = np.random.randn(n_features, HL[0])/np.sqrt(HL[0])
                Wz_i = np.random.randn(n_features, HL[0])/np.sqrt(HL[0])
                Wht_i = np.random.randn(n_features, HL[0])/np.sqrt(HL[0])
            else:
                Wr_i = np.random.randn(HL[i-1], HL[i])/np.sqrt(HL[i])
                Wz_i = np.random.randn(HL[i-1], HL[i])/np.sqrt(HL[i])
                Wht_i = np.random.randn(HL[i-1], HL[i])/np.sqrt(HL[i])
            Ur_i = np.random.randn(HL[i], HL[i])/np.sqrt(HL[i])
            Uz_i = np.random.randn(HL[i], HL[i])/np.sqrt(HL[i])
            Uht_i = np.random.randn(HL[i], HL[i])/np.sqrt(HL[i])
            Br_i = np.random.randn(1, HL[i])/np.sqrt(HL[i])
            Bz_i = np.random.randn(1, HL[i])/np.sqrt(HL[i])
            Bht_i = np.random.randn(1, HL[i])/np.sqrt(HL[i])
            
            Wr.append(Wr_i); Wz.append(Wz_i); Wht.append(Wht_i)
            Ur.append(Ur_i); Uz.append(Uz_i); Uht.append(Uht_i)
            Br.append(Br_i); Bz.append(Bz_i); Bht.append(Bht_i)
        return Wr, Wz, Wht, Ur, Uz, Uht, Br, Bz, Bht
        
    def forward(self, t_X0_TS, t_Wr, t_Wz, t_Wht, t_Ur, t_Uz, t_Uht, t_Br, t_Bz, t_Bht, t_Wout, t_Bout):
        n_RNN_layers = len(t_Wr)
        R = []
        Z = []
        HT = []
        H = []
        for i in range(n_RNN_layers):
            R_i = []
            Z_i = []
            HT_i = []
            H_i = []
            if i == 0:
                for j in range(self.n_ts_feature):
                    if j == 0:
                        R_ij = torch.sigmoid(torch.dot(t_X0_TS[j], t_Wr[i]) + t_Br[i])
                        Z_ij = torch.sigmoid(torch.dot(t_X0_TS[j], t_Wz[i]) + t_Bz[i])
                        HT_ij = torch.tanh(torch.dot(t_X0_TS[j], t_Wht[i]) + t_Bht[i])
                        H_ij = Z_ij*HT_ij
                    else:
                        R_ij = torch.sigmoid(torch.dot(t_X0_TS[j], t_Wr[i]) + torch.dot(H_ij, t_Ur[i]) + t_Br[i])
                        Z_ij = torch.sigmoid(torch.dot(t_X0_TS[j], t_Wz[i]) + torch.dot(H_ij, t_Uz[i]) + t_Bz[i])
                        HT_ij = torch.tanh(torch.dot(t_X0_TS[j], t_Wht[i]) + torch.dot(H_ij*R_ij, t_Uht[i]) + t_Bht[i])
                        H_ij = H_ij*(1-Z_ij) + Z_ij*HT_ij
                    R_i.append(R_ij)
                    Z_i.append(Z_ij)
                    HT_i.append(HT_ij)
                    H_i.append(H_ij)
            else:
                for j in range(self.n_ts_feature):
                    if j == 0:
                        R_ij = torch.sigmoid(torch.dot(H[-1][j], t_Wr[i]) + t_Br[i])
                        Z_ij = torch.sigmoid(torch.dot(H[-1][j], t_Wz[i]) + t_Bz[i])
                        HT_ij = torch.tanh(torch.dot(H[-1][j], t_Wht[i]) + t_Bht[i])
                        H_ij = Z_ij*HT_ij
                    else:
                        R_ij = torch.sigmoid(torch.dot(H[-1][j], t_Wr[i]) + torch.dot(H_ij, t_Ur[i]) + t_Br[i])
                        Z_ij = torch.sigmoid(torch.dot(H[-1][j], t_Wz[i]) + torch.dot(H_ij, t_Uz[i]) + t_Bz[i])
                        HT_ij = torch.tanh(torch.dot(H[-1][j], t_Wht[i]) + torch.dot(H_ij*R_ij, t_Uht[i]) + t_Bht[i])
                        H_ij = H_ij*(1-Z_ij) + Z_ij*HT_ij
                    R_i.append(R_ij)
                    Z_i.append(Z_ij)
                    HT_i.append(HT_ij)
                    H_i.append(H_ij)
            R.append(R_i)
            Z.append(Z_i)
            HT.append(HT_i)
            H.append(H_i)
        n_ANN_layers = len(t_Wout)
        S = []
        X = []
        for i in range(n_ANN_layers):
            S_i = []
            X_i = []
            if i == 0:
                for j in range(self.n_ts_target, 0, -1):
                    S_ij = torch.dot(H[-1][-j], t_Wout[i]) + t_Bout[i]
                    X_ij = NN.compute_X(S_ij, self.ANN_AF[i])
                    S_i.append(S_ij)
                    X_i.append(X_ij)
            else:
                for j in range(self.n_ts_target):
                    S_ij = torch.dot(X[-1][j], t_Wout[i]) + t_Bout[i]
                    X_ij = NN.compute_X(S_ij, self.ANN_AF[i])
                    S_i.append(S_ij)
                    X_i.append(X_ij)
            S.append(S_i)
            X.append(X_i)
        return X
    
    def compute_gradient(self, t_Wr, t_Wz, t_Wht, t_Ur, t_Uz, t_Uht, t_Br, t_Bz, t_Bht, t_Wout, t_Bout):
        n_RNN_layers = len(t_Wr)
        dWr = []; dWz = []; dWht = []
        dUr = []; dUz = []; dUht = []
        dBr = []; dBz = []; dBht = []
        for i in range(n_RNN_layers):
            dWr_i = torch.gradient(t_Wr[i])
            dWz_i = torch.gradient(t_Wz[i])
            dWht_i = torch.gradient(t_Wht[i])
            dUr_i = torch.gradient(t_Ur[i])
            dUz_i = torch.gradient(t_Uz[i])
            dUht_i = torch.gradient(t_Uht[i])
            dBr_i = torch.gradient(t_Br[i])
            dBz_i = torch.gradient(t_Bz[i])
            dBht_i = torch.gradient(t_Bht[i])
            
            dWr.append(dWr_i); dWz.append(dWz_i); dWht.append(dWht_i)
            dUr.append(dUr_i); dUz.append(dUz_i); dUht.append(dUht_i)
            dBr.append(dBr_i); dBz.append(dBz_i); dBht.append(dBht_i)
        n_ANN_layers = len(t_Wout)
        dWout = []
        dBout = []
        for i in range(n_ANN_layers):
            dWout_i = torch.gradient(t_Wout[i])
            dBout_i = torch.gradient(t_Bout[i])
            dWout.append(dWout_i)
            dBout.append(dBout_i)
        return dWr, dWz, dWht, dUr, dUz, dUht, dBr, dBz, dBht, dWout, dBout
    
    def update_all_weight(self, Wr, Wz, Wht, Ur, Uz, Uht, Br, Bz, Bht, Wout, Bout, learning_rate, dWr, dWz, dWht, dUr, dUz, dUht, dBr, dBz, dBht, dWout, dBout):
        n_RNN_layers = len(Wr)
        for i in range(n_RNN_layers):
            Wr[i] -= learning_rate*dWr[i]
            Wz[i] -= learning_rate*dWz[i]
            Wht[i] -= learning_rate*dWht[i]
            Ur[i] -= learning_rate*dUr[i]
            Uz[i] -= learning_rate*dUz[i]
            Uht[i] -= learning_rate*dUht[i]
            Br[i] -= learning_rate*dBr[i]
            Bz[i] -= learning_rate*dBz[i]
            Bht[i] -= learning_rate*dBht[i]
        n_ANN_layers = len(Wout)
        for i in range(n_ANN_layers):
            Wout[i] -= learning_rate*dWout[i]
            Bout[i] -= learning_rate*dBout[i]
        return Wr, Wz, Wht, Ur, Uz, Uht, Br, Bz, Bht, Wout, Bout
        
    def predict(self, X):
        X0_TS = RNN.seperate_timestep(X, self.n_ts_feature)
        t_X0_TS = NN.to_torch(X0_TS, torch_type='sample')
        t_Wr = NN.to_torch(self.Wr, torch_type='weight')
        t_Wz = NN.to_torch(self.Wz, torch_type='weight')
        t_Wht = NN.to_torch(self.Wht, torch_type='weight')
        t_Ur = NN.to_torch(self.Ur, torch_type='weight')
        t_Uz = NN.to_torch(self.Uz, torch_type='weight')
        t_Uht = NN.to_torch(self.Uht, torch_type='weight')
        t_Br = NN.to_torch(self.Br, torch_type='bias')
        t_Bz = NN.to_torch(self.Bz, torch_type='bias')
        t_Bht = NN.to_torch(self.Bht, torch_type='bias')
        t_Wout = NN.to_torch(self.Wout, torch_type='weight')
        t_Bout = NN.to_torch(self.Bout, torch_type='bias')
        t_X = self.forward(t_X0_TS, t_Wr, t_Wz, t_Wht, t_Ur, t_Uz, t_Uht, 
                           t_Br, t_Bz, t_Bht, t_Wout, t_Bout)
        torch.clear_adj()
        return t_X[-1]

In [None]:
class LSTM(RNN, NN):
    def __init__(self, n_ts_feature, n_ts_target, RNN_HL, ANN_HL, ANN_AF, n_jobs=None):
        self.n_jobs = n_jobs
        if n_ts_feature < n_ts_target:
            raise Exception('timestep of feature must be greater than timestep of target')
        if len(ANN_HL) != len(ANN_AF):
            raise Exception('n_layers must be same')
        self.n_ts_feature = n_ts_feature
        self.n_ts_target = n_ts_target
        self.RNN_HL = RNN_HL
        self.ANN_HL = ANN_HL
        self.ANN_AF = ANN_AF
        
    def fit(self, X, Y, loss_function, epoch=1000, learning_rate=0.01, sample_weight=None):
        if X.shape[0] != Y.shape[0]:
            raise Exception('n_samples must be same')
        _D = X.shape[1]
        _Y = Y.shape[1]
        n_features = int(_D/self.n_ts_feature)
        n_classes = int(_Y/self.n_ts_target)
        X0_TS = RNN.seperate_timestep(X, self.n_ts_feature)
        Y_TS = RNN.seperate_timestep(Y, self.n_ts_target)
        Wf, Wi, Wct, Wo, Uf, Ui, Uct, Uo, Bf, Bi, Bct, Bo = self.create_LSTM_weight(n_features, self.RNN_HL)
        Wout, Bout = NN.create_WB(self.RNN_HL[-1], self.ANN_HL)
        loss_list = []
        for i in range(epoch):
            t_X0_TS = NN.to_torch(X0_TS, torch_type='sample')
            t_Y_TS = NN.to_torch(Y_TS, torch_type='sample')
            t_Wf = NN.to_torch(Wf, torch_type='weight')
            t_Wi = NN.to_torch(Wi, torch_type='weight')
            t_Wct = NN.to_torch(Wct, torch_type='weight')
            t_Wo = NN.to_torch(Wo, torch_type='weight')
            t_Uf = NN.to_torch(Uf, torch_type='weight')
            t_Ui = NN.to_torch(Ui, torch_type='weight')
            t_Uct = NN.to_torch(Uct, torch_type='weight')
            t_Uo = NN.to_torch(Uo, torch_type='weight')
            t_Bf = NN.to_torch(Bf, torch_type='bias')
            t_Bi = NN.to_torch(Bi, torch_type='bias')
            t_Bct = NN.to_torch(Bct, torch_type='bias')
            t_Bo = NN.to_torch(Bo, torch_type='bias')
            t_Wout = NN.to_torch(Wout, torch_type='weight')
            t_Bout = NN.to_torch(Bout, torch_type='bias')
            t_X = self.forward(t_X0_TS, t_Wf, t_Wi, t_Wct, t_Wo, 
                              t_Uf, t_Ui, t_Uct, t_Uo, 
                              t_Bf, t_Bi, t_Bct, t_Bo, t_Wout, t_Bout)
            loss = 0
            for j in range(self.n_ts_target):
                loss += NN.compute_loss(t_Y_TS[j], t_X[-1][j], loss_function)
            loss_list.append(loss.data)
            dWf, dWi, dWct, dWo, dUf, dUi, dUct, dUo, dBf, dBi, dBct, dBo, dWout, dBout = self.compute_gradient(t_Wf, t_Wi, t_Wct, t_Wo, 
                                                                                                 t_Uf, t_Ui, t_Uct, t_Uo, 
                                                                                                 t_Bf, t_Bi, t_Bct, t_Bo, t_Wout, t_Bout)
            Wf, Wi, Wct, Wo, Uf, Ui, Uct, Uo, Bf, Bi, Bct, Bo, Wout, Bout = self.update_all_weight(Wf, Wi, Wct, Wo, 
                                                                                                   Uf, Ui, Uct, Uo, 
                                                                                                   Bf, Bi, Bct, Bo, Wout, Bout, 
                                                                                                   learning_rate, dWf, dWi, dWct, dWo, 
                                                                                                   dUf, dUi, dUct, dUo, 
                                                                                                   dBf, dBi, dBct, dBo, dWout, dBout)
            torch.clear_adj()
        self.loss_list = loss_list
        self.Wf = Wf; self.Wi = Wi
        self.Wct = Wct; self.Wo = Wo
        self.Uf = Uf; self.Ui = Ui
        self.Uct = Uct; self.Uo = Uo
        self.Bf = Bf; self.Bi = Bi
        self.Bct = Bct; self.Bo = Bo
        self.Wout = Wout; self.Bout = Bout
        
    def create_LSTM_weight(self, n_features, HL):
        Wf = []; Wi = []; Wct = []; Wo = []
        Uf = []; Ui = []; Uct = []; Uo = []
        Bf = []; Bi = []; Bct = []; Bo = []
        for i in range(len(HL)):
            if i == 0:
                Wf_i = np.random.randn(n_features, HL[0])/np.sqrt(HL[0])
                Wi_i = np.random.randn(n_features, HL[0])/np.sqrt(HL[0])
                Wct_i = np.random.randn(n_features, HL[0])/np.sqrt(HL[0])
                Wo_i = np.random.randn(n_features, HL[0])/np.sqrt(HL[0])
            else:
                Wf_i = np.random.randn(HL[i-1], HL[i])/np.sqrt(HL[i])
                Wi_i = np.random.randn(HL[i-1], HL[i])/np.sqrt(HL[i])
                Wct_i = np.random.randn(HL[i-1], HL[i])/np.sqrt(HL[i])
                Wo_i = np.random.randn(HL[i-1], HL[i])/np.sqrt(HL[i])
            Uf_i = np.random.randn(HL[i], HL[i])/np.sqrt(HL[i])
            Ui_i = np.random.randn(HL[i], HL[i])/np.sqrt(HL[i])
            Uct_i = np.random.randn(HL[i], HL[i])/np.sqrt(HL[i])
            Uo_i = np.random.randn(HL[i], HL[i])/np.sqrt(HL[i])
            Bf_i = np.random.randn(1, HL[i])/np.sqrt(HL[i])
            Bi_i = np.random.randn(1, HL[i])/np.sqrt(HL[i])
            Bct_i = np.random.randn(1, HL[i])/np.sqrt(HL[i])
            Bo_i = np.random.randn(1, HL[i])/np.sqrt(HL[i])
            
            Wf.append(Wf_i); Wi.append(Wi_i)
            Wct.append(Wct_i); Wo.append(Wo_i)
            Uf.append(Uf_i); Ui.append(Ui_i)
            Uct.append(Uct_i); Uo.append(Uo_i)
            Bf.append(Bf_i); Bi.append(Bi_i)
            Bct.append(Bct_i); Bo.append(Bo_i)
        return Wf, Wi, Wct, Wo, Uf, Ui, Uct, Uo, Bf, Bi, Bct, Bo 
        
    def forward(self, t_X0_TS, t_Wf, t_Wi, t_Wct, t_Wo, t_Uf, t_Ui, t_Uct, t_Uo, t_Bf, t_Bi, t_Bct, t_Bo, t_Wout, t_Bout):
        n_RNN_layers = len(t_Wf)
        F = []
        I = []
        CT = []
        O = []
        C = []
        H = []
        for i in range(n_RNN_layers):
            F_i = []
            I_i = []
            CT_i = []
            O_i = []
            C_i = []
            H_i = []
            if i == 0:
                for j in range(self.n_ts_feature):
                    if j == 0:
                        F_ij = torch.sigmoid(torch.dot(t_X0_TS[j], t_Wf[i]) + t_Bf[i])
                        I_ij = torch.sigmoid(torch.dot(t_X0_TS[j], t_Wi[i]) + t_Bi[i])
                        CT_ij = torch.tanh(torch.dot(t_X0_TS[j], t_Wct[i]) + t_Bct[i])
                        O_ij = torch.sigmoid(torch.dot(t_X0_TS[j], t_Wo[i]) + t_Bo[i])
                        C_ij = I_ij*CT_ij
                        H_ij = torch.tanh(O_ij*C_ij)
                    else:
                        F_ij = torch.sigmoid(torch.dot(t_X0_TS[j], t_Wf[i]) + torch.dot(H_ij, t_Uf[i])  + t_Bf[i])
                        I_ij = torch.sigmoid(torch.dot(t_X0_TS[j], t_Wi[i]) + torch.dot(H_ij, t_Ui[i]) + t_Bi[i]) 
                        CT_ij = torch.tanh(torch.dot(t_X0_TS[j], t_Wct[i]) + torch.dot(H_ij, t_Uct[i]) + t_Bct[i])
                        O_ij = torch.sigmoid(torch.dot(t_X0_TS[j], t_Wo[i]) + torch.dot(H_ij, t_Uo[i]) + t_Bo[i])
                        C_ij = C_ij*F_ij + I_ij*CT_ij
                        H_ij = torch.tanh(O_ij*C_ij)
                    F_i.append(F_ij)
                    I_i.append(I_ij)
                    CT_i.append(CT_ij)
                    O_i.append(O_ij)
                    C_i.append(C_ij)
                    H_i.append(H_ij)
            else:
                for j in range(self.n_ts_feature):
                    if j == 0:
                        F_ij = torch.sigmoid(torch.dot(H[-1][j], t_Wf[i]) + t_Bf[i])
                        I_ij = torch.sigmoid(torch.dot(H[-1][j], t_Wi[i]) + t_Bi[i])
                        CT_ij = torch.tanh(torch.dot(H[-1][j], t_Wct[i]) + t_Bct[i])
                        O_ij = torch.sigmoid(torch.dot(H[-1][j], t_Wo[i]) + t_Bo[i])
                        C_ij = I_ij*CT_ij
                        H_ij = torch.tanh(O_ij*C_ij)
                    else:
                        F_ij = torch.sigmoid(torch.dot(H[-1][j], t_Wf[i]) + torch.dot(H_ij, t_Uf[i]) + t_Bf[i])
                        I_ij = torch.sigmoid(torch.dot(H[-1][j], t_Wi[i]) + torch.dot(H_ij, t_Ui[i]) + t_Bi[i])
                        CT_ij = torch.tanh(torch.dot(H[-1][j], t_Wct[i]) + torch.dot(H_ij, t_Uct[i]) + t_Bct[i])
                        O_ij = torch.sigmoid(torch.dot(H[-1][j], t_Wo[i]) + torch.dot(H_ij, t_Uo[i]) + t_Bo[i])
                        C_ij = C_ij*F_ij + I_ij*CT_ij
                        H_ij = torch.tanh(O_ij*C_ij)
                    F_i.append(F_ij)
                    I_i.append(I_ij)
                    CT_i.append(CT_ij)
                    O_i.append(O_ij)
                    C_i.append(C_ij)
                    H_i.append(H_ij)
            F.append(F_i)
            I.append(I_i)
            CT.append(CT_i)
            O.append(O_i)
            C.append(C_i)
            H.append(H_i)
        n_ANN_layers = len(t_Wout)
        S = []
        X = []
        for i in range(n_ANN_layers):
            S_i = []
            X_i = []
            if i == 0:
                for j in range(self.n_ts_target, 0, -1):
                    S_ij = torch.dot(H[-1][-j], t_Wout[i]) + t_Bout[i]
                    X_ij = NN.compute_X(S_ij, self.ANN_AF[i])
                    S_i.append(S_ij)
                    X_i.append(X_ij)
            else:
                for j in range(self.n_ts_target):
                    S_ij = torch.dot(X[-1][j], t_Wout[i]) + t_Bout[i]
                    X_ij = NN.compute_X(S_ij, self.ANN_AF[i])
                    S_i.append(S_ij)
                    X_i.append(X_ij)
            S.append(S_i)
            X.append(X_i)
        return X
    
    def compute_gradient(self, t_Wf, t_Wi, t_Wct, t_Wo, t_Uf, t_Ui, t_Uct, t_Uo, t_Bf, t_Bi, t_Bct, t_Bo, t_Wout, t_Bout):
        n_RNN_layers = len(t_Wf)
        dWf = []; dWi = []; dWct = []; dWo = []
        dUf = []; dUi = []; dUct = []; dUo = []
        dBf = []; dBi = []; dBct = []; dBo = []
        for i in range(n_RNN_layers):
            dWf_i = torch.gradient(t_Wf[i])
            dWi_i = torch.gradient(t_Wi[i])
            dWct_i = torch.gradient(t_Wct[i])
            dWo_i = torch.gradient(t_Wo[i])
            dUf_i = torch.gradient(t_Uf[i])
            dUi_i = torch.gradient(t_Ui[i])
            dUct_i = torch.gradient(t_Uct[i])
            dUo_i = torch.gradient(t_Uo[i])
            dBf_i = torch.gradient(t_Bf[i])
            dBi_i = torch.gradient(t_Bi[i])
            dBct_i = torch.gradient(t_Bct[i])
            dBo_i = torch.gradient(t_Bo[i])
            
            dWf.append(dWf_i); dWi.append(dWi_i)
            dWct.append(dWct_i); dWo.append(dWo_i)
            dUf.append(dUf_i); dUi.append(dUi_i)
            dUct.append(dUct_i); dUo.append(dUo_i)
            dBf.append(dBf_i); dBi.append(dBi_i)
            dBct.append(dBct_i); dBo.append(dBo_i)
        n_ANN_layers = len(t_Wout)
        dWout = []
        dBout = []
        for i in range(n_ANN_layers):
            dWout_i = torch.gradient(t_Wout[i])
            dBout_i = torch.gradient(t_Bout[i])
            dWout.append(dWout_i)
            dBout.append(dBout_i)
        return dWf, dWi, dWct, dWo, dUf, dUi, dUct, dUo, dBf, dBi, dBct, dBo, dWout, dBout
    
    def update_all_weight(self, Wf, Wi, Wct, Wo, Uf, Ui, Uct, Uo, Bf, Bi, Bct, Bo, Wout, Bout, learning_rate, dWf, dWi, dWct, dWo, dUf, dUi, dUct, dUo, dBf, dBi, dBct, dBo, dWout, dBout):
        n_RNN_layers = len(Wf)
        for i in range(n_RNN_layers):
            Wf[i] -= learning_rate*dWf[i]
            Wi[i] -= learning_rate*dWi[i]
            Wct[i] -= learning_rate*dWct[i]
            Wo[i] -= learning_rate*dWo[i]
            Uf[i] -= learning_rate*dUf[i]
            Ui[i] -= learning_rate*dUi[i]
            Uct[i] -= learning_rate*dUct[i]
            Uo[i] -= learning_rate*dUo[i]
            Bf[i] -= learning_rate*dBf[i]
            Bi[i] -= learning_rate*dBi[i]
            Bct[i] -= learning_rate*dBct[i]
            Bo[i] -= learning_rate*dBo[i]
        n_ANN_layers = len(Wout)
        for i in range(n_ANN_layers):
            Wout[i] -= learning_rate*dWout[i]
            Bout[i] -= learning_rate*dBout[i]
        return Wf, Wi, Wct, Wo, Uf, Ui, Uct, Uo, Bf, Bi, Bct, Bo, Wout, Bout
        
    def predict(self, X):
        X0_TS = RNN.seperate_timestep(X, self.n_ts_feature)
        t_X0_TS = NN.to_torch(X0_TS, torch_type='sample')
        t_Wf = NN.to_torch(self.Wf, torch_type='weight')
        t_Wi = NN.to_torch(self.Wi, torch_type='weight')
        t_Wct = NN.to_torch(self.Wct, torch_type='weight')
        t_Wo = NN.to_torch(self.Wo, torch_type='weight')
        t_Uf = NN.to_torch(self.Uf, torch_type='weight')
        t_Ui = NN.to_torch(self.Ui, torch_type='weight')
        t_Uct = NN.to_torch(self.Uct, torch_type='weight')
        t_Uo = NN.to_torch(self.Uo, torch_type='weight')
        t_Bf = NN.to_torch(self.Bf, torch_type='bias')
        t_Bi = NN.to_torch(self.Bi, torch_type='bias')
        t_Bct = NN.to_torch(self.Bct, torch_type='bias')
        t_Bo = NN.to_torch(self.Bo, torch_type='bias')
        t_Wout = NN.to_torch(self.Wout, torch_type='weight')
        t_Bout = NN.to_torch(self.Bout, torch_type='bias')
        t_X = self.forward(t_X0_TS, t_Wf, t_Wi, t_Wct, t_Wo, 
                          t_Uf, t_Ui, t_Uct, t_Uo, 
                          t_Bf, t_Bi, t_Bct, t_Bo, t_Wout, t_Bout)
        torch.clear_adj()
        return t_X[-1]

In [12]:
class Normalize:
    def __init__(self, Data, norm_type, min_norm=-1, max_norm=1):
        self.norm_type = norm_type
        
        if self.norm_type == 'minmaxNorm':
            self.Data_min = self.min4norm(Data)
            self.Data_max = self.max4norm(Data)
        elif self.norm_type == 'rescale':
            self.Data_min_norm = min_norm
            self.Data_max_norm = max_norm
            self.Data_min = self.min4norm(Data)
            self.Data_max = self.max4norm(Data)
        elif self.norm_type == 'meanNorm':
            self.Data_min = self.min4norm(Data)
            self.Data_max = self.max4norm(Data)
            self.Data_mean = self.mean4norm(Data)
        elif self.norm_type == 'standardization':
            self.Data_mean = self.mean4norm(Data)
            self.Data_std = self.std4norm(Data)
        elif self.norm_type == 'SUL':
            self.Data_ed = self.ed4norm(Data)
        
    def min4norm(self, Data):
        _min = Data.min(axis=0)
        return _min.reshape(1, -1)
    def max4norm(self, Data):
        _max = Data.max(axis=0)
        return _max.reshape(1, -1)
    def mean4norm(self, Data):
        _mean = Data.mean(axis=0)
        return _mean.reshape(1, -1)
    def std4norm(self, Data):
        _std = Data.std(axis=0)
        return _std.reshape(1, -1)
    def ed4norm(self, Data):
        _ed = np.sqrt((Data**2).sum(axis=0))
        return _ed.reshape(1, -1)
    
    def minmaxNorm(self, Data, _min, _max):
        if 0 in (_max - _min):
            raise Exception('max and min are equal')
        Data_norm = (Data - _min)/(_max - _min)
        return Data_norm
    def rescale(self, Data, _min, _max, min_norm, max_norm):
        if max_norm <= min_norm:
            raise Exception('max_norm has to greater than min_norm')
        if 0 in (_max - _min):
            raise Exception('max and min are equal')
        Data_norm = (max_norm - min_norm)*(Data - _min)/(_max - _min) + min_norm
        return Data_norm
    def meanNorm(self, Data, _min, _max, _mean):
        if 0 in (_max - _min):
            raise Exception('max and min are equal')
        Data_norm = (Data - _mean)/(_max - _min)
        return Data_norm
    def standardization(self, Data, _mean, _std):
        if 0 in _std:
            raise Exception('std is equal 0')
        Data_norm = (Data - _mean)/_std
        return Data_norm
    def SUL(self, Data, _ed):
        if 0 in _ed:
            raise Exception('ed is equal 0')
        Data_norm = Data/_ed
        return Data_norm
    
    def fit(self, Data):
        if self.norm_type == 'minmaxNorm':
            Data_norm = self.minmaxNorm(Data, self.Data_min, self.Data_max)
        elif self.norm_type == 'rescale':
            Data_norm = self.rescale(Data, self.Data_min, self.Data_max, self.Data_min_norm, self.Data_max_norm)
        elif self.norm_type == 'meanNorm':
            Data_norm = self.meanNorm(Data, self.Data_min, self.Data_max, self.Data_mean)
        elif self.norm_type == 'standardization':
            Data_norm = self.standardization(Data, self.Data_mean, self.Data_std)
        elif self.norm_type == 'SUL':
            Data_norm = self.SUL(Data, self.Data_ed)
        return Data_norm
    
    def de_minmaxNorm(self, Data_norm, _min, _max):
        Data = Data_norm*(_max - _min) + _min
        return Data
    def de_rescale(self, Data_norm, _min, _max, min_norm, max_norm):
        if max_norm <= min_norm:
            raise Exception('max_norm has to greater than min_norm')
        Data = (_max - _min)*(Data_norm - min_norm)/(max_norm - min_norm) + _min
        return Data
    def de_meanNorm(self, Data_norm, _min, _max, _mean):
        Data = Data_norm*(_max - _min) + _mean
        return Data
    def de_standardization(self, Data_norm, _mean, _std):
        Data = Data_norm*_std + _mean
        return Data
    def de_SUL(self, Data_norm, _ed):
        Data = Data_norm*_ed
        return Data
    
    def transform(self, Data_norm):
        if self.norm_type == 'minmaxNorm':
            Data = self.de_minmaxNorm(Data_norm, self.Data_min, self.Data_max)
        elif self.norm_type == 'rescale':
            Data = self.de_rescale(Data_norm, self.Data_min, self.Data_max, self.Data_min_norm, self.Data_max_norm)
        elif self.norm_type == 'meanNorm':
            Data = self.de_meanNorm(Data_norm, self.Data_min, self.Data_max, self.Data_mean)
        elif self.norm_type == 'standardization':
            Data = self.de_standardization(Data_norm, self.Data_mean, self.Data_std)
        elif self.norm_type == 'SUL':
            Data = self.de_SUL(Data_norm, self.Data_ed)
        return Data