In [1]:
import numpy as np

## Свертка

In [2]:
def create_axis_indexes(size_axis, center_w_l):
    coordinates = []
    for i in range(-center_w_l, size_axis-center_w_l):
        coordinates.append(i)
    return coordinates

In [3]:
def create_indexes(size_axis, center_w_l):
    # расчет координат на осях ядра свертки в зависимости от номера центрального элемента ядра
    coordinates_a = create_axis_indexes(size_axis=size_axis[0], center_w_l=center_w_l[0])
    coordinates_b = create_axis_indexes(size_axis=size_axis[1], center_w_l=center_w_l[1])
    return coordinates_a, coordinates_b

In [4]:
def convolution_feed_x_l(y_l_minus_1, w_l, conv_params):
    indexes_a, indexes_b = create_indexes(size_axis=w_l.shape, center_w_l=conv_params['center_w_l'])
    stride = conv_params['stride']
    
    # матрица выхода будет расширяться по мере добавления новых элементов
    x_l = np.zeros((1,1))
    
    # в зависимости от типа операции меняется основная формула функции
    if conv_params['convolution']:
        g = 1 # операция конволюции
    else:
        g = -1 # операция корреляции
    
    # итерация по i и j входной матрицы y_l_minus_1 из предположения, что размерность выходной матрицы x_l будет такой же
    for i in range(y_l_minus_1.shape[0]):
        for j in range(y_l_minus_1.shape[1]):
            demo = np.zeros([y_l_minus_1.shape[0], y_l_minus_1.shape[1]]) # матрица для демонстрации конволюции
            result = 0
            element_exists = False
            
            for a in indexes_a:
                for b in indexes_b:
                    # проверка, чтобы значения индексов не выходили за границы
                    if i*stride - g*a >= 0 and j*stride - g*b >= 0 \
                    and i*stride - g*a < y_l_minus_1.shape[0] and j*stride - g*b < y_l_minus_1.shape[1]:
                        
                        result += y_l_minus_1[i*stride - g*a][j*stride - g*b] * w_l[indexes_a.index(a)][indexes_b.index(b)] # перевод индексов в "нормальные" для извлечения элементов из матрицы w_l
                        demo[i*stride - g*a][j*stride - g*b] = w_l[indexes_a.index(a)][indexes_b.index(b)]
                        element_exists = True
            
            # запись полученных результатов только в том случае, если для данных i и j были произведены вычисления
            if element_exists:
                if i >= x_l.shape[0]:
                    # добавление строки, если не существует
                    x_l = np.vstack((x_l, np.zeros(x_l.shape[1])))
                if j >= x_l.shape[1]:
                    # добавление столбца, если не существует
                    x_l = np.hstack((x_l, np.zeros((x_l.shape[0],1))))
                x_l[i][j] = result
                
                # вывод матрицы demo для отслеживания хода свертки
                print('i=' + str(i) + '; j=' + str(j) + '\n', demo)
    return x_l

In [47]:
#w_l = np.array([
#    [1,2,3,4],
#    [5,6,7,8],
#    [9,10,11,12],
#    [13,14,15,16]])

#w_l = np.array([
#    [1,2,3],
#    [4,5,6],
#    [7,8,9]])

w_l = np.array([
    [1,2],
    [3,4]])

y_l_minus_1 = np.zeros((3,3))

other_parameters={
    'convolution':True,
    'stride':1,
    'center_w_l':(1,1)
}

convolution_feed_x_l(y_l_minus_1, w_l, other_parameters)

In [3]:
def function_activation(matrix, kind, feed):
    output_matrix = np.copy(matrix)
    if feed:
        if kind == 'sigmoid':
            output_matrix = 1 / (1+np.exp(-output_matrix))
        elif kind == 'relu':
            output_matrix[output_matrix < 0] = 0
        elif kind == 'softmax':
            output_matrix = np.exp(output_matrix) / np.exp(output_matrix).sum()
    else:
        if kind == 'sigmoid':
            output_matrix = output_matrix * (1 - output_matrix)
        elif kind == 'relu':
            output_matrix[output_matrix > 0] = 1
        elif kind == 'softmax':
            input_matrix = np.copy(matrix)
            output_matrix = np.zeros((matrix.shape[1], matrix.shape[1]))
            
            for i in range(output_matrix.shape[1]):
                for j in range(output_matrix.shape[1]):
                    if i == j:
                        output_matrix[i][i] = input_matrix[0][i] * (1 - input_matrix[0][i])
                    else:
                        output_matrix[i][j] = - input_matrix[0][i] * input_matrix[0][j]
        
    return output_matrix

In [None]:
def maxpool(y_l, conv_params):
    indexes_a, indexes_b = create_indexes(size_axis=conv_params['window_shape'], center_w_l=conv_params['center_window'])
    stride = conv_params['stride']
    
    # выходные матрицы будут расширяться по мере добавления новых элементов
    y_l_mp = np.zeros((1,1)) # матрица y_l после операции макспулинга
    y_l_mp_to_y_l = np.zeros((1,1), dtype='<U32')# матрица для backprop через слой макспулинга (внутри матрицы будет храниться текст)
    
    # в зависимости от типа операции меняется основная формула функции
    if conv_params['colvolution']:
        g = 1
    else:
        g = -1
        
    # итерация по i и j входной матрицы y_l из предположения, что размерность выходной матрицы будет такой же
    for i in range(y_l.shape[0]):
        for j in range(y_l.shape[1]):
            result = -np.inf
            element_exists = False
            
            for a in indexes_a:
                for b in indexes_b:
                    # проверка, чтобы значения индексов не выходили за границы
                    if i*stride - g*a >= 0 and j*stride - g*b >= 0 \
                    and i*stride - g*a < y_l.shape[0] and j*stride - g*b < y_l.shape[1]:
                        if (y_l[i*stride - g*a][j*stride - g*b] > result):
                            result = y_l[i*stride - g*a][j*stride - g*b]
                            i_back = i*stride - g*a
                            j_back = j*stride - g*b
                        
                        element_exists = True
            # запись полученных результатов только в том случае, если для данных i и j были произведены вычисления            
            if element_exists:
                if i >= y_l_mp.shape[0]:
                    # добавление строки, если не существует
                    y_l_mp = np.vstack((y_l_mp, np.zeros(y_l_mp.shape[1])))
                    # матрица y_l_mp_to_y_l расширяется соответственно матрице y_l_mp
                    y_l_mp_to_y_l = np.vstack((y_l_mp_to_y_l, np.zeros(y_l_mp_to_y_l.shape[1])))
                if j >= y_l_mp.shape[1]:
                    # добавление столбца, если не существует
                    y_l_mp = np.hstack((y_l_mp, np.zeros((y_l_mp.shape[0],1))))
                    y_l_mp_to_y_l = np.hstack((y_l_mp_to_y_l, np.zeros((y_l_mp_to_y_l.shape[0],1))))
                    
                y_l_mp[i][j] = result
                # в матрице y_l_mp_to_y_l хранятся координаты значений,
                # которые соответствуют выбранным в операции максипулинга ячейкам из матрицы y_l
                y_l_mp_to_y_l[i][j] = str(i_back) + ',' + str(j_back)
                
    return y_l_mp, y_l_mp_to_y_l

In [None]:
def maxpool_feed(y_l, conv_params):
    list_of_y_l_mp = []
    list_of_y_l_mp_to_y_l = []
    
    for i in range(len(y_l)):
        y_l_mp, y_l_mp_to_y_l = maxpool(y_l, conv_params)
        
        list_of_y_l_mp.append(y_l_mp)
        list_of_y_l_mp_to_y_l.append(y_l_mp_to_y_l)
        
    return list_of_y_l_mp, list_of_y_l_mp_to_y_l

In [None]:
def maxpool_back(dEdy_l_mp, y_l_mp_to_y_l, y_l_shape):
    list_of_dEdy_l = []
    
    for i in range(len(dEdy_l_mp)): # операция выполняется для каждой из feature map
        dEdy_l = np.zeros(y_l_shape) # матрица dEdy_l будет далее постепенно заполнятся значениями
        # проход по всем элементам матрицы dEdy_l_mp
        for k in range(dEdy_l_mp[i].shape[0]):
            for l in range(dEdy_l_mp[i].shape[1]):
                # каждый элемент матрицы dEdy_l_mp необходимо поставить в матрицу dEdy_l
                # для этого извлекаем необходимые координаты "назначения" из матрицы y_l_mp_to_y_l
                coordinates = y_l_mp_to_y_l[i][k][l] # коордианты выглядят так: 2,4 - то есть 2-ая строка и 4-ый столбец
                coordinates_row = int(coordinates[:coordinates.find(',')])
                coordinates_col = int(coordinates[coordinates.find(',') + 1:])
                # запись по этим коордианатам в матрицу dEdy_l элемента из матрицы dEdy_l_mp
                dEdy_l[coordinates_row][coordinates_col] = dEdy_l_mp[i][k][l]
        list_of_dEdy_l.append(dEdy_l) # добавляем получившуюся dEdy_l в лист с остальными feature map
        
    return list_of_dEdy_l

In [2]:
def fc_weights_init(shape, weights_name, dir_npy):
    try:
        weights_matrix = np.load(dir_npy).get(weights_name)
        print('веса для', weights_name, 'подгружены', weights_matrix)
    except:
        weights_matrix = 2 * np.random.random(shape) - 1
        print('веса для', weights_name, 'созданы', weights_matrix.size)
    return weights_matrix

In [1]:
def fc_layer(y_l_minus_1, w_l, w_l_name, b_l, b_l_name, neurons, act_fn, dir_npy, feed):
    if w_l.size == 0:
        w_l = fc_weights_init(shape=(y_l_minus_1.shape[1], neurons), weights_name=w_l_name, dir_npy=dir_npy)
        b_l = fc_weights_init(shape=(1, neurons), weights_name=b_l_name, dir_npy=dir_npy)
    x_l = np.dot(y_l_minus_1, w_l) + b_l
    y_l = function_activation(x_l, kind=act_fn)
    
    return x_l, y_l, b_l

In [None]:
def function_loss(y_truth, y_l, feed):
    if feed:
        error_matrix = - (y_truth * np.log(y_l))
    else:
        error_matrix = - (y_truth / y_l)
        
    return error_matrix

In [None]:
def fc_backpropagation(y_l_minus_1, dEdy_l, y_l, w_l, b_l, act_fn, alpha):
    # вычисление dE/dx_l, то есть backprop через функцию активации
    if act_fn == 'softmax':
        dEdx_l = np.dot(dEdy_l, activation_fn(y_l, fn_name=act_fn, feed=False))
    else:
        dEdx_l = dEdy_l * activation_fn(y_l, fn_name=act_fn, feed=False)
    # вычисление частных производных
    dEdw_l = np.dot(y_l_minus_1.T, dEdx_l)
    dEdb_l = dEdx_l
    dEdy_l_minus_1 = np.dot(dEdx_l, w_l.T)
    # обновление матриц весов
    w_l = w_l - alpha * dEdw_l
    b_l = b_l - alpha * dEdb_l
    return dEdy_l_minus_1, w_l, b_l