In [2]:
import torch
import torch.nn as nn
import memcnn


# define a new torch Module with a sequence of operations: Relu o BatchNorm2d o Conv2d
class ExampleOperation(nn.Module):
    def __init__(self, channels):
        super(ExampleOperation, self).__init__()
        self.seq = nn.Sequential(
                                    nn.Conv2d(in_channels=channels, out_channels=channels,
                                              kernel_size=(3, 3), padding=1),
                                    nn.BatchNorm2d(num_features=channels),
                                    nn.ReLU(inplace=True)
                                )

    def forward(self, x):
        return self.seq(x)

In [3]:

class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
        else:
            self.up = nn.ConvTranspose2d(in_channels , in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)


    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        # if you have padding issues, see
        # https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
        # https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

In [4]:
class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=True):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = DoubleConv(n_channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        factor = 2 if bilinear else 1
        self.down4 = Down(512, 1024 // factor)
        self.up1 = Up(1024, 512 // factor, bilinear)
        self.up2 = Up(512, 256 // factor, bilinear)
        self.up3 = Up(256, 128 // factor, bilinear)
        self.up4 = Up(128, 64, bilinear)
        self.outc = OutConv(64, n_classes)

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

In [18]:
in_channels=3
mid_channels=64
exmp_conv2d = torch.nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1)

In [23]:
input_example = torch.rand(10,in_channels,100,100)
output_example = exmp_conv2d(input_example)
print(input_example.shape, "-->conv2d-->", output_example.shape)

torch.Size([10, 3, 100, 100]) -->conv2d--> torch.Size([10, 64, 100, 100])


In [45]:
fm_input_channel = 1
gm_input_channel = 2

invertible_module = memcnn.AdditiveCoupling(
    Fm=torch.nn.Conv2d(fm_input_channel, mid_channels, kernel_size=3, padding=1),
    Gm=torch.nn.Conv2d(gm_input_channel, mid_channels, kernel_size=3, padding=1)
)

In [46]:

assert memcnn.is_invertible_module(invertible_module, test_input_shape=assert_input.shape)

RuntimeError: The size of tensor a (2) must match the size of tensor b (64) at non-singleton dimension 1

In [54]:
mid_channels=64
exmp_batchnorm = torch.nn.BatchNorm2d(mid_channels)
fm_input_channel = mid_channels // 2
gm_input_channel = mid_channels - fm_input_channel

invertible_module = memcnn.AdditiveCoupling(
    Fm=torch.nn.BatchNorm2d(fm_input_channel),
    Gm=torch.nn.BatchNorm2d(gm_input_channel),
)

In [64]:
input_example = torch.rand(10,mid_channels,100,100)
assert memcnn.is_invertible_module(invertible_module, test_input_shape=input_example.shape)

In [58]:
invertible_module_wrapper = memcnn.InvertibleModuleWrapper(fn=invertible_module, \
                                                    keep_input=False, keep_input_inverse=False)
#keek_input : forward의 input을 memory에 유지할지. False면 Backward에 역연산하여 구함
#keep_input_inverse : inverse연상의 input을 memory에 유지할지. False면 Backward에 역연산하여 구함

In [67]:
input_example = torch.rand(10,mid_channels,100,100)
copy_input = input_example.clone() #input_example이 forward될때 삭제되기 때문에, 검증용 복사본 만듬
Y_fw = invertible_module_wrapper.forward(input_example)
X_iv = invertible_module_wrapper.inverse(Y_fw)
print("reconstruction error :", torch.sum((copy_input - X_iv) ** 2.))

reconstruction error : tensor(1.4945e-08, grad_fn=<SumBackward0>)


In [69]:
mid_channels=64
exmp_batchnorm = torch.nn.BatchNorm2d(mid_channels)
fm_input_channel = mid_channels // 2
gm_input_channel = mid_channels - fm_input_channel

invertible_module = memcnn.AdditiveCoupling(
    Fm=torch.nn.BatchNorm2d(fm_input_channel),
    Gm=torch.nn.BatchNorm2d(gm_input_channel),
)

invertible_module_wrapper = memcnn.InvertibleModuleWrapper(fn=invertible_module, \
                                                    keep_input=False, keep_input_inverse=False)

input_example = torch.rand(10,mid_channels,100,100)
output_example = torch.rand(10,mid_channels,100,100)
mse = torch.nn.MSELoss()
criterion = torch.optim.Adam(invertible_module_wrapper.parameters(), lr=1e-2)

In [70]:
criterion.zero_grad()
Y_fw = invertible_module_wrapper.forward(input_example)
loss = mse(Y_fw, output_example)
loss.backward()
criterion.step()

In [106]:
import torchvision
import torchvision.transforms as transforms
cifar10 = torchvision.datasets.CIFAR10(root='../../20210306_gan/dataset', download=False,
                             transform=transforms.Compose([
                               transforms.Resize((32,32)),
                               transforms.ToTensor(),
                               transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
                           ]))

In [107]:
cifar_loader =  torch.utils.data.DataLoader(cifar10, batch_size=1024, shuffle=True)

In [125]:
class Simple(nn.Module):
    def __init__(self, n_channels, n_classes):
        super(Simple, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes

        self.conv = nn.Conv2d(in_channels=n_channels, out_channels=16, kernel_size=(3, 3), padding=1)
        self.pool = nn.MaxPool2d(kernel_size=(11,11), stride=(3,3))
        self.batch = nn.BatchNorm2d(num_features=16)
        self.activation = memcnn.InvertibleModuleWrapper(fn= \
                             memcnn.AdditiveCoupling(
                                    Fm=torch.nn.ReLU(),
                                    Gm=torch.nn.ReLU(),
                             ), keep_input=False, keep_input_inverse=False)
        self.fcn = nn.Linear(16*8*8, n_classes)

    def forward(self, x):
        x1 = self.conv(x)
        x2 = self.pool(x1)
        x3 = self.batch(x2)
        x4 = self.activation(x3)
        x5 = self.fcn(x4.view(-1, 16*8*8))
        return x5

In [108]:
data, label = next(iter(cifar_loader))

In [126]:
model = Simple(n_channels=3, n_classes=10).to('cuda')
print(model)

Simple(
  (conv): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=(11, 11), stride=(3, 3), padding=0, dilation=1, ceil_mode=False)
  (batch): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (activation): InvertibleModuleWrapper(
    (_fn): AdditiveCoupling(
      (Gm): ReLU()
      (Fm): ReLU()
    )
  )
  (fcn): Linear(in_features=1024, out_features=10, bias=True)
)


In [127]:
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [131]:
epoch = 0
while epoch<1e+4 : 
    for data, label in cifar_loader : 
        predict = model(data.to('cuda'))
        loss = criterion(predict, label.to('cuda'))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    epoch += 1
    print(epoch, loss.item())

1 1.4968420267105103
2 1.3536299467086792
3 1.3239946365356445
4 1.2239649295806885
5 1.2539869546890259
6 1.1471829414367676
7 1.1970001459121704
8 1.1888761520385742
9 1.2050492763519287
10 1.099938988685608
11 1.1242170333862305
12 1.2335519790649414
13 1.2399179935455322
14 1.2075155973434448
15 1.1155601739883423
16 1.0674936771392822
17 1.0859053134918213
18 1.094795823097229
19 1.0883667469024658
20 1.0919687747955322
21 1.1286555528640747


KeyboardInterrupt: 

In [None]:
class Simple_revrelu(nn.Module):
    def __init__(self, n_channels, n_classes):
        super(Simple, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes

        self.conv = nn.Conv2d(in_channels=n_channels, out_channels=16, kernel_size=(3, 3), padding=1)
        self.pool = nn.MaxPool2d(kernel_size=(11,11), stride=(3,3))
        self.batch = nn.BatchNorm2d(num_features=16)
        self.activation = nn.ReLU(inplace=True)
        self.fcn = nn.Linear(16*8*8, n_classes)

    def forward(self, x):
        x1 = self.conv(x)
        x2 = self.pool(x1)
        x3 = self.batch(x2)
        x4 = self.activation(x3)
        x5 = self.fcn(x4.view(-1, 16*8*8))
        #print(x.shape, x1.shape, x2.shape, x3.shape, x4.shape, x5.shape)
        return x5