In [2]:
import urllib.request
import os.path
import gzip
import pickle
import os
import numpy as np
import sys,os

file_path = ['./data/train-images-idx3-ubyte.gz','./data/train-labels-idx1-ubyte.gz','./data/t10k-images-idx3-ubyte.gz','./data/t10k-labels-idx1-ubyte.gz']

def file_load(path):
  if path.find('images') != -1:
    with gzip.open(path, 'rb') as f:
      data = np.frombuffer(f.read(), np.uint8, offset=16)
    data = data.reshape(-1, 784)
    return data
  else:
    with gzip.open(path, 'rb') as f:
      labels = np.frombuffer(f.read(), np.uint8, offset=8)
    return labels

dataset = {}
dataset['train_img'] =  file_load(file_path[0])
dataset['train_label'] = file_load(file_path[1])    
dataset['test_img'] = file_load(file_path[2])
dataset['test_label'] = file_load(file_path[3])


#normalize 
dataset['train_img'] = dataset['train_img'].astype(np.float32)
dataset['train_img'] /= 255.0
dataset['test_img'] = dataset['test_img'].astype(np.float32)
dataset['test_img'] /= 255.0

#one_hot_label
T = np.zeros((dataset['train_label'].size, 10))
for idx, row in enumerate(T):
  row[dataset['train_label'][idx]] = 1
dataset['train_label'] =T
T = np.zeros((dataset['test_label'].size, 10))
for idx, row in enumerate(T):
  row[dataset['test_label'][idx]] = 1
dataset['test_label'] =T
#not flatten
for key in ('train_img', 'test_img'):
  dataset[key] = dataset[key].reshape(-1, 1, 28, 28)


(X_train, Y_train), (X_test, Y_test) = (dataset['train_img'], dataset['train_label']), (dataset['test_img'], dataset['test_label']) 
#(X_train, Y_train), (X_test, Y_test) = load_mnist(normalize=True, one_hot_label=True)
print(X_train.shape)
print(Y_train.shape)
print(X_test.shape)

(60000, 1, 28, 28)
(60000, 10)
(10000, 1, 28, 28)


In [3]:
import random
def split_validation(X_train, n=0.4):
  #X_train = random.shuffle(X_train)
  num = int(X_train.shape[0] * 0.4)
  print(num)
  return X_train[num:], X_train[:num], Y_train[num:], Y_train[:num] #train, vallidation

X_train1, X_train_valid, Y_train1, Y_train_valid = split_validation(X_train,n=0.2)
print(X_train1.shape, X_train_valid.shape)
print(Y_train1.shape, Y_train_valid.shape)

12000
(48000, 1, 28, 28) (12000, 1, 28, 28)
(48000, 10) (12000, 10)


In [4]:
print(X_train1[0].shape)
batch = 32
batch_size = np.random.choice(X_train1.shape[0], batch) #array로 return
X_train_b = X_train1[batch_size]
Y_train_b = Y_train1[batch_size]
print(X_train_b.shape)

(1, 28, 28)
(32, 1, 28, 28)


In [5]:
class ReLU:
   def __init__(self):
     self.x_bool = None

   def forward(self,x):
     #print('RELU_forward')
     #print('RELU_forward.x: ',x.shape)
     #print('ReLU_forward')
     self.x_bool = (x<=0) #T/F bool Array
     relu = x.copy()
     relu[self.x_bool] = 0
     return relu

   def backward(self,back):
     #print('RELU_backward')
     #print('ReLU_backward')
     back[self.x_bool] = 0
     return back

class LReLU:
  def __init__(self):
    self.x_bool = None
  
  def forward(self,x):
    self.x_bool = (x<=0)
    lrelu = x.copy()
    lrelu[self.x_bool] *= 0.1
    return lrelu

  def backward(self,back):
    back[self.x_bool] *= 0.1
    return back
   
class Softmax_Cross_Entropy_Error:
  def __init__(self):
    self.loss = 0
    self.p = 0
    self.y = 0

  def forward(self, p, y):
    #print('Softmax_Cross_Entropy_Error_forward')
    self.y = y
    self.p = softmax(p)
    self.loss = cross_entropy_error(self.p, self.y)
    return self.loss

  def backward(self, back = 1):
    #print('Softmax_Cross_Entropy_Error_backward')
    batch_size = self.y.shape[0]
    return (self.p - self.y) / batch_size
class Linear: #바꿔야됨!!!
  def __init__(self, W, b):
    self.W = W
    self.b = b
        
    self.x = None
    self.original_x_shape = None
    # 가중치와 편향 매개변수의 미분
    self.dW = None
    self.db = None

  def forward(self, x):
    #print('Linear.forward')
    #print('Linear.forward.x: ',x.shape)
    #print('Linear.forward.w: ',self.W.shape)
    # 텐서 대응
    self.original_x_shape = x.shape
    x = x.reshape(x.shape[0], -1)
    #print('after: ',x.shape)
    #print('self.b: ',self.b.shape)
    self.x = x

    out = np.dot(self.x, self.W) + self.b

    return out

  def backward(self, dout):
    #print('Linear.backward')
    dx = np.dot(dout, self.W.T)
    self.dW = np.dot(self.x.T, dout)
    self.db = np.sum(dout, axis=0)
        
    dx = dx.reshape(*self.original_x_shape)  # 입력 데이터 모양 변경(텐서 대응)
    return dx
#각종 함수들
def softmax(x):
  #print('_softmax')
  #if x.ndim == 2:
  x = x.T
  x = x - np.max(x, axis=0)
  y = np.exp(x) / np.sum(np.exp(x), axis=0)
  return y.T
def cross_entropy_error(p,y):
  #cross_entropy_error.p:  (100, 40, 13, 10)
  #cross_entropy_error.y:  (100, 10)
  #print('cross_entropy_error')
  #print('cross_entropy_error.p: ',p.shape)
  #print('cross_entropy_error.y: ',y.shape)
  if p.ndim == 1:
      y = y.reshape(1, y.size)
      p = p.reshape(1, p.size)
        
    # 훈련 데이터가 원-핫 벡터라면 정답 레이블의 인덱스로 반환
  if y.size == p.size:
      y = y.argmax(axis=1)
             
  batch_size = p.shape[0]
  return -np.sum(np.log(p[np.arange(batch_size), y] + 1e-7)) / batch_size

In [6]:
batch=100
batch_size = np.random.choice(X_train1.shape[0], batch) #array로 return
X_train_b = X_train1[batch_size]
Y_train_b = Y_train1[batch_size]
print(X_train_b.shape)

(100, 1, 28, 28)


In [7]:
def im2col(input_image, filter, stride=1, pad=0):
  output_height, output_width = cal_output(input_image, filter, stride, pad)
  if type(filter) != tuple:
    output_col = np.zeros((int(input_image.shape[0]), int(input_image.shape[1]), int(filter.shape[2]), int(filter.shape[3]), int(output_height), int(output_width)))
    fh = int(filter.shape[2])
    fw = int(filter.shape[3])
  else:
    fh = filter[0]
    fw = filter[1]
    output_col = np.zeros((int(input_image.shape[0]), int(input_image.shape[1]), int(filter[0]), int(filter[1]), int(output_height), int(output_width)))
  pad_img = np.pad(input_image, [(0,0),(0,0),(pad,pad),(pad,pad)], 'constant')

  for i in range(fh):#각 filter의 원소마다 연산해 줄 이미지 추출
    y_max = i + stride * output_height
    for j in range(fw):
      x_max = j + stride * output_width
      output_col[:, :, i, j, :, :] = pad_img[:, :, i: y_max: stride, j:x_max:stride]
  output_col = output_col.transpose(0,4,5,1,2,3).reshape(input_image.shape[0]*output_height*output_width,-1)
  return output_col

#https://cding.tistory.com/112
def col2im(col, input_image, filter, stride=1, pad=0):
  output_height, output_width = cal_output(input_image, filter, stride, pad)
  if type(filter) != tuple:
    col = col.reshape(int(input_image.shape[0]), int(output_height), int(output_width), int(input_image.shape[1]), int(filter.shape[2]), int(filter.shape[3])).transpose(0, 3, 4, 5, 1, 2)
    fh = int(filter.shape[2])
    fw = int(filter.shape[3])
  else:
    fh = filter[0]
    fw = filter[1]
    col = col.reshape(int(input_image.shape[0]), int(output_height), int(output_width), int(input_image.shape[1]), int(filter[1]), int(filter[1])).transpose(0, 3, 4, 5, 1, 2)
  
  img = np.zeros((int(input_image.shape[0]), int(input_image.shape[1]), int(input_image.shape[2]) + 2 * pad + stride - 1, int(input_image.shape[3]) + 2 * pad + stride - 1))
  for y in range(fw):
      y_max = y + stride * output_height
      for x in range(fh):
          x_max = x + stride * output_width
          img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :]

  return img[:, :, pad:input_image.shape[2] + pad, pad:input_image.shape[3] + pad]


In [8]:
def cal_output(input_img, filter, stride, pad):
  if type(filter) != tuple:
    output_height = (input_img.shape[2] + 2*pad - filter.shape[2])//stride + 1 #int로 만들어주기 위해 //를 사용
    output_width = (input_img.shape[3] + 2*pad - filter.shape[3])//stride + 1
  else: #Pooling 계층일 때
    output_height = (input_img.shape[2] + 2*pad - filter[0])//stride + 1
    output_width = (input_img.shape[3] + 2*pad - filter[1])//stride + 1
  
  return output_height, output_width

In [9]:
import numpy

class Maxpool:
  def __init__(self, pool_size ,stride,pad): #stride, pad 초기화 안함
    self.pool = pool_size #(2,2) tuple형태, tuple도 indexing 가능
    self.stride = stride
    self.pad = pad
    self.col_max = None
    self.x = None
    
  def forward(self, x):
    #print('Maxpool_forward')
    self.x = x
    output_height, output_width = cal_output(x, self.pool, self.stride, pad = 0)
    _col = im2col(x, self.pool, self.stride, self.pad).reshape(-1, self.pool[0]*self.pool[1])

    col_max = np.max(_col, axis = 1) #행단위로 max찾아
    self.col_max = np.argmax(_col, axis = 1) #max의 index값
    #print('maxpool_forward_output: ',col_max.reshape(x.shape[0], output_height, output_width, x.shape[1]).transpose(0,3,1,2).shape)
    return col_max.reshape(x.shape[0], output_height, output_width, x.shape[1]).transpose(0,3,1,2)
  
  def backward(self, back): #이름만 바꿨으므로 수정필요
    #print('Maxpool_backward')

    back = back.transpose(0, 2, 3, 1)
        
    pool_size = self.pool[0] * self.pool[1]
    dmax = np.zeros((back.size, pool_size))
    dmax[np.arange(self.col_max.size), self.col_max.flatten()] = back.flatten()
    dmax = dmax.reshape(back.shape + (pool_size,)) 
        
    dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)
    #print('maxpool_dcol: ',dcol.shape) #maxpool_dcol:  (16900, 160)
    #print('dcol: ',dcol)
    dx = col2im(dcol, self.x, self.pool, self.stride, self.pad)
    return dx


class Conv: #filter가 정사각, 직사각 둘다 가능
  def __init__(self, filter_weights, bias, stride = 1, pad = 0):
    #print('__init__COnv')
    self.filter = filter_weights
    self.bias = bias
    self.stride = stride
    self.pad = pad
    self.d_filter = None
    self.db = None
    self.input = None
    self.output = None

  def forward(self, x):
    #print('conv_forward')
    #print('conv_x.shape: ',x.shape)
    #print('conv_filter.type: ', type(self.filter))
    #print('conv_filter.shape: ',self.filter.shape)
    self.x = x
    output_height = (x.shape[2] + 2*self.pad - self.filter.shape[2])//self.stride + 1
    output_width = (x.shape[3] + 2*self.pad - self.filter.shape[3])//self.stride + 1
    
    image_col = im2col(x, self.filter, self.stride, self.pad)
    self.input = image_col
    filter_col = self.filter.reshape(self.filter.shape[0],-1).T
    self.output= filter_col
    #print('conv_forward..image_col: ',image_col.shape)
    #print('conv_forward..filter_col: ',filter_col.shape)
    out_col = np.dot(image_col,filter_col) + self.bias
    
    return out_col.reshape(int(x.shape[0]),int(output_height), output_width, -1).transpose(0,3,1,2)
  
  def backward(self, back):
    #print('conv_backward')
    #print('conv_back: ',back.shape)
    #print('conv_input: ',self.input.shape)
    #print('conv_output: ',self.output.shape)

    back = back.transpose(0,2,3,1).reshape(-1,self.filter.shape[0])
    self.db = np.sum(back, axis=0)
    self.d_filter = np.dot(self.input.T, back).transpose(1,0).reshape(self.filter.shape[0],self.filter.shape[1] , self.filter.shape[2], self.filter.shape[3])
    back_propagation = np.dot(back, self.output.T)
    return col2im(back_propagation, self.x, self.filter, self.stride, self.pad)
    '''self.d_filter = np.dot(self.input.T, back).transpose(1, 0).reshape(self.filter.shape[0],self.filter.shape[1] , self.filter.shape[2], self.filter.shape[3])
    self.db = np.sum(back, axis=0)
    back_propagation = np.dot(back, self.output.T)

    return col2im(back_propagation, self.x, self.filter, self.stride, self.pad) #col2im 함수 내가 만들어야해(input달라)'''

In [10]:
class Two_layer_CNN: # Conv-ReLU-MaxPool - Conv-ReLU-MaxPool -Linear-SoftMax (The input and output size of NN: input 28x28, output 10)
  def __init__(self, filter_num = 20, filter_size=(5,5), pad = 0, stride = 1):
    self.input_dim = 28 #pool의 pad는 여기 안에서 수정할 수 있도록
    self.filter_num = filter_num
    self.filter_size = filter_size
    self.pad = pad
    self.stride = stride
    self.pool_pad = 0
    self.pool_size = (2,2) #정사각형으로 고정 !! --> 직사각형으로도 고칠 수 있긴 함..
    self.pool_stride = 1
    print(self.filter_size)
    self.output_height_first_layer = ((self.input_dim + 2*self.pad - self.filter_size[0]) // self.stride) +1
    self.output_width_first_layer = ((self.input_dim + 2*self.pad - self.filter_size[1]) // self.stride) + 1
    #(0,0,28,28)에서 0,0은 임의의 값.함수사용하기 위해 아무숫자넣은 것.

    self.pool_output_first_layer_h = ((self.output_height_first_layer + 2*self.pool_pad - self.pool_size[0])//self.pool_stride) +1
    self.pool_output_first_layer_w = ((self.output_width_first_layer + 2*self.pool_pad - self.pool_size[1])//self.pool_stride) +1

    self.output_height_second_layer = ((self.pool_output_first_layer_h + 2*self.pad - self.filter_size[0]*2)// self.stride ) +1
    self.output_width_second_layer = ((self.pool_output_first_layer_w + 2*self.pad - self.filter_size[1]*2)//self.stride) + 1
    self.pool_output_second_layer = ((self.output_height_second_layer + 2*self.pool_pad - self.pool_size[0])//self.pool_stride ) +1
    #???????pool_size 정사각형으로만 고려해야 하나??
    #pool_size가 직사각형이면 weight값을 어떻게 설정하지????????
    #int(filter_num * (self.output_height/2) * (self.output_width/2)) #w2가 filter_num*2이므로 그 출력값에 맞춰서

    self.weights = {}
    self.weights['w1'] = np.random.randn(filter_num, 1, self.filter_size[0], self.filter_size[1]) #일단 filter 직사각형도 허용...!!!!
    self.weights['b1'] = np.random.randn(filter_num)
    self.weights['w2'] = np.random.randn(self.filter_num*2,self.filter_num, self.filter_size[0] * 2, self.filter_size[1] *2) #일단 그냥 *2해봤음. .. 맞는지 모르겠음!!!!
    self.weights['b2'] = np.random.randn(self.filter_num*2)
    print('b2: ',self.weights['b2'].shape)
    print('self.pool_output: ',self.pool_output_second_layer)
    self.weights['w3'] = np.random.randn(self.pool_output_second_layer * self.pool_output_second_layer * self.filter_num*2, 10) #hidden_size = 50, output_size = 10 #pool_size 정사각형 취급
    self.weights['b3'] = np.random.randn(10)

    self.layers = {}
    self.layers['CL1'] = Conv(self.weights['w1'], self.weights['b1'], self.stride, self.pad) #1,2층 conv 같은 stride, pad적용!!
    self.layers['RL1'] = ReLU()
    self.layers['MP1'] = Maxpool(self.pool_size,self.pool_stride,self.pool_pad) #pool size 변화시킬 수도 있음.
    self.layers['CL2'] = Conv(self.weights['w2'], self.weights['b2'], self.stride, self.pad)
    self.layers['RL2'] = ReLU()
    self.layers['MP2'] = Maxpool(self.pool_size,self.pool_stride,self.pool_pad)
    self.layers['L1'] = Linear(self.weights['w3'], self.weights['b3'])
    self.layers['softmax_cross'] = Softmax_Cross_Entropy_Error()

    self.l = ['CL1','RL1','MP1','CL2','RL2','MP2','L1','softmax_cross']

  def forward(self, x):
    #print('forward')
    for layer in self.l:
      if layer != 'softmax_cross':
        x = self.layers[layer].forward(x)
    return x

  def loss(self, x, y):
    #print('loss')
    f = self.forward(x)
    loss = self.layers['softmax_cross'].forward(f, y)
    return loss

  def accuracy(self, x, y):
    #print('accuracy')
    f = self.forward(x)
    p = np.argmax(f, axis = 1) #argmax는 가장 큰 값의 인덱스 값을 반환한다.
    #one_hot encoding이니까 if문 적용 x.!!
    y = np.argmax(y, axis=1) #행 (1,batch_size)
    accuracy = np.sum(y == p) / float(x.shape[0]) #batch_size로 나눠
    return accuracy

  def back_propagate_train(self,x,y):
    #print('back_propagate_train')
    loss = self.loss(x,y)
    back = self.layers['softmax_cross'].backward(1)
    for reversed_layer in reversed(self.l):
      #print('back_propa: ', reversed_layer)
      back = self.layers[reversed_layer].backward(back)
    
    gradients = {}
    gradients['w1'] = self.layers['CL1'].d_filter
    gradients['b1'] = self.layers['CL1'].db
    gradients['w2'] = self.layers['CL2'].d_filter
    gradients['b2'] = self.layers['CL2'].db
    gradients['w3'] = self.layers['L1'].dW
    gradients['b3'] = self.layers['L1'].db

    return gradients

In [11]:
#train
model = Two_layer_CNN(20,(5,5),0,1)

batch = 100
iteration = X_train1.shape[0]//batch 
epoch = 20
w = ['w1','b1','w2','b2','w3','b3']
lr = 0.1
# iteration는 epoch를 나누어서 실행하는 횟수

train_loss =[]
valid_loss= []
train_acc = []
valid_acc = []
x_plot = []
n = 0


#train
for i in range(iteration*epoch):
  batch_size = np.random.choice(X_train1.shape[0], batch) #array로 return
  X_train_b = X_train1[batch_size]
  Y_train_b = Y_train1[batch_size]
  print(X_train_b.shape)
  print(Y_train_b.shape)
  #batch_size만큼 한번 진행
  gradients = model.back_propagate_train(X_train_b, Y_train_b)

  #param updates
  for j in w:
    model.weights[j] -= lr * gradients[j]
  
  '''if i % epoch == 0:
    print('-')
    x_plot.append(n) #count epoch
    n+=1
    train_l = model.loss(X_train1,Y_train1) #전체 trainset loss계산
    valid_l = model.loss(X_train_valid, Y_train_valid)
    train_loss.append(train_l)
    valid_loss.append(valid_l)
    print("train loss: ",train_l,"valid loss: ", valid_l)
    train_a = model.accuracy(X_train1,Y_train1)
    valid_a = model.accuracy(X_train_valid, Y_train_valid)
    train_acc.append(train_a)
    valid_acc.append(valid_a)
    print("train accuracy: ",train_a,"valid accuracy: ", valid_a)'''

(5, 5)
b2:  (40,)
self.pool_output:  13
__init__COnv
__init__COnv
(100, 1, 28, 28)
(100, 10)
back_propagate_train
loss
forward
conv_forward
conv_x.shape:  (100, 1, 28, 28)
conv_filter.type:  <class 'numpy.ndarray'>
conv_filter.shape:  (20, 1, 5, 5)
conv_forward..image_col:  (57600, 25)
conv_forward..filter_col:  (25, 20)
RELU_forward
RELU_forward.x:  (100, 20, 24, 24)
Maxpool_forward
maxpool_forward_output:  (100, 20, 23, 23)
conv_forward
conv_x.shape:  (100, 20, 23, 23)
conv_filter.type:  <class 'numpy.ndarray'>
conv_filter.shape:  (40, 20, 10, 10)
conv_forward..image_col:  (19600, 2000)
conv_forward..filter_col:  (2000, 40)
RELU_forward
RELU_forward.x:  (100, 40, 14, 14)
Maxpool_forward
maxpool_forward_output:  (100, 40, 13, 13)
Linear.forward
Linear.forward.x:  (100, 40, 13, 13)
Linear.forward.w:  (6760, 10)
after:  (100, 6760)
self.b:  (10,)
Softmax_Cross_Entropy_Error_forward
_softmax
cross_entropy_error
cross_entropy_error.p:  (100, 10)
cross_entropy_error.y:  (100, 10)
back_prop

MemoryError: Unable to allocate 5.15 GiB for an array with shape (48000, 24, 24, 1, 5, 5) and data type float64