In [2]:
import numpy as np

import torch
import torch.nn.functional as F

from torch import nn

import torchvision

# preparing dataset

In [13]:
batch_size_train = 16
batch_size_test = 16

train_loader = torch.utils.data.DataLoader(
    torchvision.datasets.MNIST('./files/', train=True, download=True,
                              transform = torchvision.transforms.Compose(
                                  [torchvision.transforms.ToTensor(),
                                  ])),
    batch_size = batch_size_train, shuffle=True)

test_loader = torch.utils.data.DataLoader(
    torchvision.datasets.MNIST('./files/', train=False, download=True,
                              transform = torchvision.transforms.Compose(
                                  [torchvision.transforms.ToTensor(),
                                  ])),
    batch_size = batch_size_test, shuffle=True)

##### mutable vs immutable 
- mutable복사는 리스트를 갖고있는 변수 a를 b로 복사하여도 a와 b가 같은 주소를 갖고있는 것.
- immutable은 a를 복사하여 같은 데이터를 b도 같지만 주소가 다른 것.


##### shallow copy vs depp copy
- shallow copy는 mutable한 경우를 말한다.
- deep copy는 immutable한 경우를 말한다.

In [16]:
import copy

In [26]:
def clone(module, N):
    """
    같은 모듈을 계속 복사해주는 메소드.
    그러나, 모두 다른 주소값을 갖는 새로 생성된 모듈이다.
    
    INPUT: module
    OUTPUT: A list of the same modules 
    """
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

In [89]:
class MaskedConv2d(nn.Conv2d):
    def __init__(self, *args, mask='B', **kargs):
        super(MaskedConv2d, self).__init__(*args, **kargs)
        assert mask in {'A', 'B'}
        self.mask_type = mask
        self.register_buffer('mask', self.weight.data.clone())
        self.mask.fill_(1)
        
        
        _, _, H, W = self.mask.size()
    
        self.mask[:, :, H//2, W//2 + (self.mask_type == 'B'):] = 0  
        self.mask[:, :, H//2+1:, :] = 0
        
        """
                                           11, 12번 라인 적용
        tensor([[[[1., 1., 1., 1., 1.],                    tensor([[[[1., 1., 1., 1., 1.],
                  [1., 1., 1., 1., 1.],                              [1., 1., 1., 1., 1.],
                  [1., 1., 1., 1., 1.],                              [1., 1., 1., 0., 0.],
                  [1., 1., 1., 1., 1.],                              [0., 0., 0., 0., 0.],
                  [1., 1., 1., 1., 1.]]]])                           [0., 0., 0., 0., 0.]]]]) 
        
          """
        
    def forward(self, x):
        self.weight.data *= self.mask      # self.weight에 mask 씌어줄려고 mask생성 한거구나
                                           # 그 해당 부분에만 가중치가 생기게 할려고
        #print(self.weight.data)
        
        return super(MaskedConv2d, self).forward(x)

In [94]:
class MaskedConv1d(nn.Conv1d):
    def __init__(self, *args, mask='B', **kargs):
        super(MaskedConv1d, self).__init__(*args, **kargs)
        assert mask in {'A', 'B'}
        self.mask_type = mask
        self.register_buffer('mask', self.weight.data.clone())
        self.mask.fill_(1)
    
        _, _, W = self.mask.size()
    
        self.mask[:, :, W//2 + (self.mask_type == 'B'):] = 0
    
    def forward(self, x):
        self.weight.data *= self.mask
        return super(MaskedConv1d, self).forward(x)

In [90]:
import matplotlib.pyplot as plt

sample = torch.Tensor(np.ones((1,1,5,5)))
a, b, c, d = sample.size()

model = MaskedConv2d(a, b, c, d)
model(sample)

tensor([[[[-0.8702]]]], grad_fn=<MkldnnConvolutionBackward>)

In [91]:
def _padding(i, o, k, s=1, d=1, mode='same'):
    if mode == 'same':
        return ((o-1) * s + (k-1)*(d-1) + k - i) // 2
    else:
        raise RuntimeError('Not implemented')

In [92]:
MaskedConv2d(1,1,(1,3), padding=(0,_padding(5,5,3)))(sample)

tensor([[[[0.8049, 1.0001, 1.0001, 1.0001, 1.0001],
          [0.8049, 1.0001, 1.0001, 1.0001, 1.0001],
          [0.8049, 1.0001, 1.0001, 1.0001, 1.0001],
          [0.8049, 1.0001, 1.0001, 1.0001, 1.0001],
          [0.8049, 1.0001, 1.0001, 1.0001, 1.0001]]]],
       grad_fn=<MkldnnConvolutionBackward>)

In [98]:
class RowLSTMCell(nn.Module):
    def __init__(self, hidden_dims, image_size, channel_in, *args, **kargs):
        super(RowLSTMCell, self).__init__(*args, **kargs)

        self._hidden_dims = hidden_dims
        self._image_size = image_size
        self._channel_in = channel_in
        self._num_units = self._hidden_dims * self._image_size
        self._output_size = self._num_units
        self._state_size = self._num_units * 2

        self.conv_i_s = MaskedConv1d(self._hidden_dims, 4 * self._hidden_dims, 3, mask='B', padding=_padding(image_size, image_size, 3))
        self.conv_s_s = nn.Conv1d(channel_in, 4 * self._hidden_dims, 3, padding=_padding(image_size, image_size, 3))
   
    def forward(self, inputs, states):
        c_prev, h_prev = states



        h_prev = h_prev.view(-1, self._hidden_dims,  self._image_size)
        inputs = inputs.view(-1, self._channel_in, self._image_size)

        s_s = self.conv_s_s(h_prev) #(batch, 4*hidden_dims, width)
        i_s = self.conv_i_s(inputs) #(batch, 4*hidden_dims, width)



        s_s = s_s.view(-1, 4 * self._num_units) #(batch, 4*hidden_dims*width)
        i_s = i_s.view(-1, 4 * self._num_units) #(batch, 4*hidden_dims*width)

        #print(s_s.size(), i_s.size())

        lstm = s_s + i_s

        lstm = torch.sigmoid(lstm)

        i, g, f, o = torch.split(lstm, (4 * self._num_units)//4, dim=1)

        c = f * c_prev + i * g
        h = o * torch.tanh(c)

        new_state = (c, h)
        return h, new_state

In [99]:
class RowLSTM(nn.Module):
    def __init__(self, hidden_dims, input_size, channel_in, *args, init='zero', **kargs):
        super(RowLSTM, self).__init__(*args, **kargs)
        assert init in {'zero', 'noise', 'variable', 'variable noise'}

        self.init = init
        self._hidden_dims = hidden_dims
        #self.return_state = return_state
        if self.init == 'zero':
            self.init_state = (torch.zeros(1, input_size * hidden_dims), torch.zeros(1, input_size * hidden_dims))
        elif self.init == 'noise':
            self.init_state = (torch.Tensor(1, input_size * hidden_dims), torch.Tensor(1, input_size * hidden_dims))
            nn.init.uniform(self.init_state[0])
            nn.init.uniform(self.init_state[1])  
        elif self.init == 'variable':
            hidden0 = torch.zeros(1,input_size * hidden_dims)
            ##if use_cuda:
            ##  hidden0 = hidden0.cuda()
            ##else:
            ##  hidden0 = hidden0
            self._hidden_init_state = torch.nn.Parameter(hidden0, requires_grad=True)
            self._cell_init_state = torch.nn.Parameter(hidden0, requires_grad=True)
            self.init_state = (self._hidden_init_state, self._cell_init_state)
        else:
            hidden0 = torch.Tensor(1, input_size * hidden_dims) # size
            nn.init.uniform(hidden0)
            self._hidden_init_state = torch.nn.Parameter(hidden0, requires_grad=True)
            self._cell_init_state = torch.nn.Parameter(hidden0, requires_grad=True)
            self.init_state = (self._hidden_init_state, self._cell_init_state)

        self.lstm_cell = RowLSTMCell(hidden_dims, input_size, channel_in)
    
    def forward(self, inputs, initial_state=None):
        '''
        states --> (c, h), tuple
        c,h --> (batch, width * hidden_dims)
        inputs --> (batch, seq_length, input_shape)
        input_shape --> width, channel
        '''


        n_batch, channel, n_seq, width = inputs.size()
        #print(n_seq)
        #inputs = inputs.view(n_batch, channel, n_seq, width)
        if initial_state is None:
            hidden_init, cell_init = self.init_state

        else:
            hidden_init, cell_init = initial_state

        states = (hidden_init.repeat(n_batch,1), cell_init.repeat(n_batch, 1))

        steps = [] # --> (batch, width * hidden_dims) --> (batch, 1, width*hidden_dims)
        for seq in range(n_seq):
            #print(inputs[:, :, seq, :].size())
            h, states = self.lstm_cell(inputs[:, :, seq, :], states)
            steps.append(h.unsqueeze(1))

        return torch.cat(steps, dim=1).view(-1, n_seq, width, self._hidden_dims).permute(0,3,1,2) # --> (batch, seq_length(a.k.a height), width * hidden_dims)

In [100]:
for p in lstm.parameters():
    print(p.size())

NameError: name 'lstm' is not defined

In [101]:
batch_sample=torch.Tensor(np.random.random((16,28,28, 1)))
b, c, h, w = batch_sample.size()
print(b,c,h,w)

16 28 28 1


In [104]:
lstm = RowLSTM(32, 28, 1, init='variable')
#lstm(batch_sample, (torch.Tensor(np.random.random((16, 28 * 32))),torch.Tensor(np.random.random((16, 28 * 32))))).size()

In [105]:
class PixelRNN(nn.Module):
    def __init__(self, num_layers, hidden_dims, input_size, *args, **kargs):
        super(PixelRNN, self).__init__(*args, **kargs)

        pad_conv1 = _padding(input_size, input_size, 7)
        pad_conv2 = _padding(input_size, input_size, 1)
        self.conv1 = MaskedConv2d(1, hidden_dims, (7,7), mask='A', padding=(pad_conv1, pad_conv1))
        self.lstm_list = nn.ModuleList([RowLSTM(hidden_dims, input_size, hidden_dims) for _ in range(num_layers)])
        self.conv2 = nn.Conv2d(hidden_dims, 32, (1,1), padding=(pad_conv2, pad_conv2))
        self.conv_last = nn.Conv2d(32, 1, (1,1), padding=(pad_conv2, pad_conv2))
    
    
    def forward(self, inputs):
        x = self.conv1(inputs)
        for lstm in self.lstm_list:
            x = lstm(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = self.conv_last(x)
        return torch.sigmoid(x)

In [107]:
model = PixelRNN(num_layers=2, hidden_dims=16, input_size=28)

In [108]:
model

PixelRNN(
  (conv1): MaskedConv2d(1, 16, kernel_size=(7, 7), stride=(1, 1), padding=(3, 3))
  (lstm_list): ModuleList(
    (0): RowLSTM(
      (lstm_cell): RowLSTMCell(
        (conv_i_s): MaskedConv1d(16, 64, kernel_size=(3,), stride=(1,), padding=(1,))
        (conv_s_s): Conv1d(16, 64, kernel_size=(3,), stride=(1,), padding=(1,))
      )
    )
    (1): RowLSTM(
      (lstm_cell): RowLSTMCell(
        (conv_i_s): MaskedConv1d(16, 64, kernel_size=(3,), stride=(1,), padding=(1,))
        (conv_s_s): Conv1d(16, 64, kernel_size=(3,), stride=(1,), padding=(1,))
      )
    )
  )
  (conv2): Conv2d(16, 32, kernel_size=(1, 1), stride=(1, 1))
  (conv_last): Conv2d(32, 1, kernel_size=(1, 1), stride=(1, 1))
)

In [109]:
from torch import optim
loss_function = nn.BCELoss()
optimizer = optim.RMSprop(model.parameters())

### 계산에 필요한 파라미터 구하기
- 파라미터 이름과 수

In [110]:
accum = 0
for name, param in model.named_parameters():
    if param.requires_grad:
        print(name, param.size())

conv1.weight torch.Size([16, 1, 7, 7])
conv1.bias torch.Size([16])
lstm_list.0.lstm_cell.conv_i_s.weight torch.Size([64, 16, 3])
lstm_list.0.lstm_cell.conv_i_s.bias torch.Size([64])
lstm_list.0.lstm_cell.conv_s_s.weight torch.Size([64, 16, 3])
lstm_list.0.lstm_cell.conv_s_s.bias torch.Size([64])
lstm_list.1.lstm_cell.conv_i_s.weight torch.Size([64, 16, 3])
lstm_list.1.lstm_cell.conv_i_s.bias torch.Size([64])
lstm_list.1.lstm_cell.conv_s_s.weight torch.Size([64, 16, 3])
lstm_list.1.lstm_cell.conv_s_s.bias torch.Size([64])
conv2.weight torch.Size([32, 16, 1, 1])
conv2.bias torch.Size([32])
conv_last.weight torch.Size([1, 32, 1, 1])
conv_last.bias torch.Size([1])


- 축적된 파라미터 총 수

In [111]:
accum = 0
for name, param in model.named_parameters():
    if param.requires_grad:
        sz = param.size()
        tmp = 1
        for i in sz:
            tmp *= i
        accum += tmp
    
print(accum)

13921


In [116]:
train_loader

<torch.utils.data.dataloader.DataLoader at 0x196da0f52c8>

In [129]:
def binarize(image):
    # 평균 0, 편차 1의 정규분포에서 샘플링한 image.shape에서 image보다 픽셀값이 작으면 True, 크며 False
    return (np.random.uniform(0, 1, image.shape) < image).astype('float32')
    
    
binarize(np.array([0.5,0.2,0.4, 0.6, 0.9]))

array([0., 0., 0., 0., 1.], dtype=float32)

In [130]:
for epoch in range(1, 11): ## run the model for 10 epochs
    train_loss, valid_loss = [], []
    ## training part 
    model.train()
    for data, _ in train_loader:
        
        x = binarize(data.numpy())
        x = torch.Tensor(x)
        y = data.clone()
      
      
      
        optimizer.zero_grad()
        ## 1. forward propagation
        output = model(x)
        
        ## 2. loss calculation
        loss = loss_function(output, y)
        
        ## 3. backward propagation
        loss.backward()
        
        ## 4. weight optimization
        optimizer.step()
        
        train_loss.append(loss.item())
        
    print ("Epoch:", epoch, "Training Loss: ", np.mean(train_loss), "Valid Loss: ", np.mean(valid_loss))

  out=out, **kwargs)
  ret = ret.dtype.type(ret / rcount)


Epoch: 1 Training Loss:  0.10062909040848414 Valid Loss:  nan
Epoch: 2 Training Loss:  0.08875046478907267 Valid Loss:  nan
Epoch: 3 Training Loss:  0.08783200233777363 Valid Loss:  nan
Epoch: 4 Training Loss:  0.08740872054298719 Valid Loss:  nan
Epoch: 5 Training Loss:  0.08718138246734937 Valid Loss:  nan
Epoch: 6 Training Loss:  0.08697619158824285 Valid Loss:  nan
Epoch: 7 Training Loss:  0.08680357954303423 Valid Loss:  nan
Epoch: 8 Training Loss:  0.08674448099335035 Valid Loss:  nan
Epoch: 9 Training Loss:  0.08658883761763572 Valid Loss:  nan
Epoch: 10 Training Loss:  0.0865074438949426 Valid Loss:  nan


### reference

- 파이토치 구현 참고 https://github.com/heechan95/PixelRNN-pytorch/blob/master/PixelRNN%20pytorch.ipynb