In [53]:
import numpy as np
import pandas as pd
import math

from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')

### Подготовка данных

In [45]:
titanic_df = pd.read_csv('data/titanic_train.csv')
titanic_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 891 entries, 0 to 890
Data columns (total 12 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   PassengerId  891 non-null    int64  
 1   Survived     891 non-null    int64  
 2   Pclass       891 non-null    int64  
 3   Name         891 non-null    object 
 4   Sex          891 non-null    object 
 5   Age          714 non-null    float64
 6   SibSp        891 non-null    int64  
 7   Parch        891 non-null    int64  
 8   Ticket       891 non-null    object 
 9   Fare         891 non-null    float64
 10  Cabin        204 non-null    object 
 11  Embarked     889 non-null    object 
dtypes: float64(2), int64(5), object(5)
memory usage: 83.7+ KB


In [46]:
live = { 
    0: 'Perished',
    1: 'Survived'
}

sex = {
    'male': 0,
    'female': 1
}

titanic_df['Bsex'] = titanic_df['Sex'].apply(lambda x: sex[x])
features = titanic_df[['Pclass', 'Bsex']].to_numpy()
labels = titanic_df['Survived'].to_numpy()

titanic_df = titanic_df.fillna(titanic_df.mean()).drop(['Sex', 'Cabin', 'Embarked'], axis = 1)
titanic_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 891 entries, 0 to 890
Data columns (total 10 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   PassengerId  891 non-null    int64  
 1   Survived     891 non-null    int64  
 2   Pclass       891 non-null    int64  
 3   Name         891 non-null    object 
 4   Age          891 non-null    float64
 5   SibSp        891 non-null    int64  
 6   Parch        891 non-null    int64  
 7   Ticket       891 non-null    object 
 8   Fare         891 non-null    float64
 9   Bsex         891 non-null    int64  
dtypes: float64(2), int64(6), object(2)
memory usage: 69.7+ KB


In [47]:
X_train, X_test, Y_train, Y_test = train_test_split(features, labels, test_size=0.30)

### Функции активации

In [29]:
def get_layer(p = 4, activation = 'ReLU'):
    return (p, activation)

In [30]:
def sigmoid_act(x, der = False):
    # Производная
    if (der == True):
        f = 1 / (1 + np.exp(-x)) * (1 - 1 / (1 + np.exp(-x)))
    else:
        f = 1 / (1 + np.exp(-x))
    return f

In [31]:
# Rectifier Linear Unit
def ReLU_act(x, der = False):
    if (der == True):
        f = np.heaviside(x, 1)
    else:
        f = np.maximum(x, 0)
    return f
    
def list_act():
    return ['sigmoid', 'ReLU'] 
    
def get_act(string = 'ReLU'):
    if string == 'ReLU':
        return ReLU_act
    elif string == 'sigmoid':
        return sigmoid_act
    else :
        return sigmoid_act

### Класс нейронной сети

In [48]:
class NN:
    np.random.seed(10)
    
    
    def __init__(self):
        self.HiddenLayer = []
        self.w = []
        self.b = []
        self.phi = []
        self.mu = []
        self.eta = 1
    
    
    # Метод добавления слоя
    def add(self, lay = (4, 'ReLU')):
        self.HiddenLayer.append(lay)
    
    
    @staticmethod
    def FeedForward(w, b, phi, x):
        return phi(np.dot(w, x) + b)
    
    
    # Градиентный спуск
    def BackPropagation(self, x, z, Y, w, b, phi):
        self.delta = []
        self.W = []
        self.B = []
        
        # Вычисление последней ошибкт 
        self.delta.append((z[len(z) - 1] - Y) * phi[len(z) - 1](z[len(z) - 1], der = True))
        for i in range(0, len(z) - 1):
            self.delta.append(np.dot(self.delta[i], w[len(z) - 1 - i] ) * phi[len(z) - 2 - i](z[len(z) - 2 - i], der = True) )
        
        # Сортировка массива ошибок от первой к последней
        self.delta = np.flip(self.delta, 0)  
        
        # Определение дельты как ошибкы, деленной на количество обучающих выборок
        self.delta = self.delta/self.X.shape[0] 
        
        # С первого слоя
        self.W.append( w[0] - self.eta * np.kron(self.delta[0], x).reshape( len(z[0]), x.shape[0] ) )
        self.B.append( b[0] - self.eta * self.delta[0] )
        
        for i in range(1, len(z)):
            self.W.append( w[i] - self.eta * np.kron(self.delta[i], z[i-1]).reshape(len(z[i]), len(z[i-1])) )
            self.B.append( b[i] - self.eta * self.delta[i] )
            
        # Возврат параметров w, b по убыванию
        return np.array(self.W), np.array(self.B)
    
    
    def Fit(self, X_train, Y_train):       
        self.X = X_train
        self.Y = Y_train
        
        for i in range(0, len(self.HiddenLayer)):
            if i == 0:
                self.w.append( np.random.randn(self.HiddenLayer[i][0] , self.X.shape[1])/np.sqrt(2/self.X.shape[1]) )
                self.b.append( np.random.randn(self.HiddenLayer[i][0])/np.sqrt(2/self.X.shape[1]))
                
                for act in list_act():
                    if self.HiddenLayer[i][1] == act :
                        self.phi.append(get_act(act))
            else:
                self.w.append( np.random.randn(self.HiddenLayer[i][0] , self.HiddenLayer[i-1][0] ) / np.sqrt(2 / self.HiddenLayer[i - 1][0]))
                self.b.append( np.random.randn(self.HiddenLayer[i][0]) / np.sqrt(2 / self.HiddenLayer[i - 1][0]))

                for act in list_act():
                    if self.HiddenLayer[i][1] == act :
                        self.phi.append(get_act(act))
          
        # Цикл по обучающей выборке
        for I in range(0, self.X.shape[0]): 
            self.z = []
            self.z.append(self.FeedForward(self.w[0], self.b[0], self.phi[0], self.X[I]))
            
            for i in range(1, len(self.HiddenLayer)):
                self.z.append(self.FeedForward(self.w[i] , self.b[i], self.phi[i], self.z[i - 1]))
                  
            self.w, self.b  = self.BackPropagation(self.X[I], self.z, self.Y[I], self.w, self.b, self.phi)
            self.mu.append((1/2) * np.dot(self.z[len(self.z) - 1] - self.Y[I], self.z[len(self.z) - 1] - self.Y[I]))
            
            
    def predict(self, X_test):
        self.pred = []
        self.XX = X_test
        
        # Цикл по обучающей выборке
        for I in range(0, self.XX.shape[0]):
            self.z = []
            self.z.append(self.FeedForward(self.w[0] , self.b[0], self.phi[0], self.XX[I]))
            
            for i in range(1, len(self.HiddenLayer)):
                self.z.append( self.FeedForward(self.w[i] , self.b[i], self.phi[i], self.z[i-1]))
       
            # Теперь необходим бинарный классификатор, порог 0,5
            # if y < 0.5 the output is zero, otherwise is zero
            self.pred.append(np.heaviside(self.z[-1] - 0.5, 1)[0])
        return np.array(self.pred)
    
    
    def get_accuracy(self):
        return np.array(self.mu)
    
    
    def get_avg_accuracy(self):
        self.batch_loss = []
        
        for i in range(0, 10):
            self.loss_avg = 0
            
            for m in range(0, (int(math.ceil((self.X.shape[0]-10) / 10.0))   ) - 1):
                #self.loss_avg += self.mu[60*i+m]/60
                self.loss_avg += self.mu[(int(math.ceil((self.X.shape[0]-10) / 10.0)) )*i + m] / (int(math.ceil((self.X.shape[0]-10) / 10.0)) )
            
            self.batch_loss.append(self.loss_avg)
        return np.array(self.batch_loss)
    
    
    def set_learning_rate(self, et=1):
        self.eta = et

In [51]:
titanic_model = NN()

titanic_model.add(get_layer(8, 'ReLU'))
titanic_model.add(get_layer(4, 'ReLU'))
titanic_model.add(get_layer(1, 'sigmoid'))

titanic_model.set_learning_rate(0.8)
titanic_model.Fit(X_train, Y_train)

acc_val = titanic_model.get_accuracy()
acc_avg_val = titanic_model.get_avg_accuracy()

predictions = titanic_model.predict(X_test)