In [50]:
import pandas as pd
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')
from efficientnet_pytorch import EfficientNet
from torchsummary import summary

### Reference

https://deep-learning-study.tistory.com/563

In [16]:
path = './train/train_data.csv'

df = pd.read_csv(path)
df

Unnamed: 0,filen_name,label
0,train0001.png,8
1,train0002.png,8
2,train0003.png,8
3,train0004.png,8
4,train0005.png,8
...,...,...
4995,train4996.png,6
4996,train4997.png,6
4997,train4998.png,6
4998,train4999.png,6


In [17]:
train_file_name = df['filen_name']
train_label = df['label']

# image 파일을 불러온뒤 변수에 저장
train_image = []
for file in train_file_name:
    train_image.append(Image.open('./train/' + file))
image_to_number = np.array([np.array(image).flatten() for image in train_image])


In [18]:
all_images = pd.DataFrame(image_to_number)
all_images['labels'] = df['label']

In [88]:
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn
from torchvision import transforms

class MNIST(Dataset):
    
    def __init__(self, file_path_list, labels = None):
        self.file_path_list = file_path_list
        self.labels = labels
        self.PIL2tensor = transforms.PILToTensor()
    
    def __getitem__(self,idx):
        image = Image.open(self.file_path_list[idx]) # 해당 인덱스에 맞는 image 추출
        image = np.stack((image,) *3,axis=-1)
        image = Image.fromarray(image)
        tensor_image = self.PIL2tensor(image) # PIL로 읽은 이미지를 torch tensor형으로 변환
        flattened_image = tensor_image.float() # 2차원 이미지를 1차원으로 변환
        
        if self.labels is not  None: # 라벨이 존재 하는경우 : 학습에 이용할경우
            label = self.labels[idx] # 해당 인덱스에 맞는 라벨 추출
            return flattened_image, label # 1차원으로 변환한 이미지와 라벨을 return
        
        return flattened_image # test 단계에선 label이 존재하지 않기 때문에 image만을 return
    
    def __len__(self):
        return len(self.file_path_list)

In [89]:
file_path_list  = './train/' + df['filen_name']
labels = df['label']

mnist_dataset = MNIST(file_path_list, labels)

In [90]:
mnist_dataset[0][0].shape

torch.Size([3, 28, 28])

In [91]:
mnist_loader = DataLoader(mnist_dataset, batch_size = 32, shuffle = True)

In [66]:
class Swish(nn.Module):
    def __init__(self):
        super().__init__()
        self.sigmoid = nn.Sigmoid()
            
    def forward(self, x):
        return x * self.sigmoid(x)
if __name__ == '__main__':
    x = torch.randn(1,1,28,28)
    model = Swish()
    output = model(x)
    print('output size : ',output.size())

output size :  torch.Size([1, 1, 28, 28])


In [67]:
class SEBlock(nn.Module):
    def __init__(self,in_channels, r=4):
        super().__init__()

        self.squeeze = nn.AdaptiveAvgPool2d((1,1))
        self.excitation = nn.Sequential(
            nn.Linear(in_channels, in_channels * r),
            Swish(),
            nn.Linear(in_channels * r, in_channels),
            nn.Sigmoid()
        )
    def forward(self,x):
        x = self.squeeze(x)
        x = x.view(x.size(0), -1)
        x = self.excitation(x)
        x = x.view(x.size(0),x.size(1),1,1)
        return x
    
if __name__ == '__main__':
    x = torch.randn(1,1,28,28)
    model = SEBlock(x.size(1))
    output = model(x)
    print('output size : ',output.size())

output size :  torch.Size([1, 1, 1, 1])


In [68]:
class MBConv(nn.Module):
    expand = 6
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, se_scale=4, p=0.5):
        super().__init__()
        # first MBConv is not using stochastic depth
        self.p = torch.tensor(p).float() if (in_channels == out_channels) else torch.tensor(1).float()

        self.residual = nn.Sequential(
            nn.Conv2d(in_channels, in_channels * MBConv.expand, 1, stride=stride, padding=0, bias=False),
            nn.BatchNorm2d(in_channels * MBConv.expand, momentum=0.99, eps=1e-3),
            Swish(),
            nn.Conv2d(in_channels * MBConv.expand, in_channels * MBConv.expand, kernel_size=kernel_size,
                      stride=1, padding=kernel_size//2, bias=False, groups=in_channels*MBConv.expand),
            nn.BatchNorm2d(in_channels * MBConv.expand, momentum=0.99, eps=1e-3),
            Swish()
        )

        self.se = SEBlock(in_channels * MBConv.expand, se_scale)

        self.project = nn.Sequential(
            nn.Conv2d(in_channels*MBConv.expand, out_channels, kernel_size=1, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(out_channels, momentum=0.99, eps=1e-3)
        )

        self.shortcut = (stride == 1) and (in_channels == out_channels)

    def forward(self, x):
        # stochastic depth
        if self.training:
            if not torch.bernoulli(self.p):
                return x

        x_shortcut = x
        x_residual = self.residual(x)
        x_se = self.se(x_residual)

        x = x_se * x_residual
        x = self.project(x)

        if self.shortcut:
            x= x_shortcut + x

        return x

# check
if __name__ == '__main__':
    x = torch.randn(3, 16, 24, 24)
    model = MBConv(x.size(1), x.size(1), 3, stride=1, p=1)
    model.train()
    output = model(x)
    x = (output == x)
    print('output size:', output.size(), 'Stochastic depth:', x[1,0,0,0])

output size: torch.Size([3, 16, 24, 24]) Stochastic depth: tensor(False)


In [69]:
class SepConv(nn.Module):
    expand = 1
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, se_scale=4, p=0.5):
        super().__init__()
        # first SepConv is not using stochastic depth
        self.p = torch.tensor(p).float() if (in_channels == out_channels) else torch.tensor(1).float()

        self.residual = nn.Sequential(
            nn.Conv2d(in_channels * SepConv.expand, in_channels * SepConv.expand, kernel_size=kernel_size,
                      stride=1, padding=kernel_size//2, bias=False, groups=in_channels*SepConv.expand),
            nn.BatchNorm2d(in_channels * SepConv.expand, momentum=0.99, eps=1e-3),
            Swish()
        )

        self.se = SEBlock(in_channels * SepConv.expand, se_scale)

        self.project = nn.Sequential(
            nn.Conv2d(in_channels*SepConv.expand, out_channels, kernel_size=1, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(out_channels, momentum=0.99, eps=1e-3)
        )

        self.shortcut = (stride == 1) and (in_channels == out_channels)

    def forward(self, x):
        # stochastic depth
        if self.training:
            if not torch.bernoulli(self.p):
                return x

        x_shortcut = x
        x_residual = self.residual(x)
        x_se = self.se(x_residual)

        x = x_se * x_residual
        x = self.project(x)

        if self.shortcut:
            x= x_shortcut + x

        return x

# check
if __name__ == '__main__':
    x = torch.randn(3, 16, 24, 24)
    model = SepConv(x.size(1), x.size(1), 3, stride=1, p=1)
    model.train()
    output = model(x)
    # stochastic depth check
    x = (output == x)
    print('output size:', output.size(), 'Stochastic depth:', x[1,0,0,0])

output size: torch.Size([3, 16, 24, 24]) Stochastic depth: tensor(False)


In [70]:
class EfficientNet(nn.Module):
    def __init__(self, num_classes=10, width_coef=1., depth_coef=1., scale=1., dropout=0.2, se_scale=4, stochastic_depth=False, p=0.5):
        super().__init__()
        channels = [32, 16, 24, 40, 80, 112, 192, 320, 1280]
        repeats = [1, 2, 2, 3, 3, 4, 1]
        strides = [1, 2, 2, 2, 1, 2, 1]
        kernel_size = [3, 3, 5, 3, 5, 5, 3]
        depth = depth_coef
        width = width_coef

        channels = [int(x*width) for x in channels]
        repeats = [int(x*depth) for x in repeats]

        # stochastic depth
        if stochastic_depth:
            self.p = p
            self.step = (1 - 0.5) / (sum(repeats) - 1)
        else:
            self.p = 1
            self.step = 0


        # efficient net
        self.upsample = nn.Upsample(scale_factor=scale, mode='bilinear', align_corners=False)

        self.stage1 = nn.Sequential(
            nn.Conv2d(3, channels[0],3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(channels[0], momentum=0.99, eps=1e-3)
        )

        self.stage2 = self._make_Block(SepConv, repeats[0], channels[0], channels[1], kernel_size[0], strides[0], se_scale)

        self.stage3 = self._make_Block(MBConv, repeats[1], channels[1], channels[2], kernel_size[1], strides[1], se_scale)

        self.stage4 = self._make_Block(MBConv, repeats[2], channels[2], channels[3], kernel_size[2], strides[2], se_scale)

        self.stage5 = self._make_Block(MBConv, repeats[3], channels[3], channels[4], kernel_size[3], strides[3], se_scale)

        self.stage6 = self._make_Block(MBConv, repeats[4], channels[4], channels[5], kernel_size[4], strides[4], se_scale)

        self.stage7 = self._make_Block(MBConv, repeats[5], channels[5], channels[6], kernel_size[5], strides[5], se_scale)

        self.stage8 = self._make_Block(MBConv, repeats[6], channels[6], channels[7], kernel_size[6], strides[6], se_scale)

        self.stage9 = nn.Sequential(
            nn.Conv2d(channels[7], channels[8], 1, stride=1, bias=False),
            nn.BatchNorm2d(channels[8], momentum=0.99, eps=1e-3),
            Swish()
        ) 

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.dropout = nn.Dropout(p=dropout)
        self.linear = nn.Linear(channels[8], num_classes)

    def forward(self, x):
        x = self.upsample(x)
        x = self.stage1(x)
        x = self.stage2(x)
        x = self.stage3(x)
        x = self.stage4(x)
        x = self.stage5(x)
        x = self.stage6(x)
        x = self.stage7(x)
        x = self.stage8(x)
        x = self.stage9(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.dropout(x)
        x = self.linear(x)
        return x


    def _make_Block(self, block, repeats, in_channels, out_channels, kernel_size, stride, se_scale):
        strides = [stride] + [1] * (repeats - 1)
        layers = []
        for stride in strides:
            layers.append(block(in_channels, out_channels, kernel_size, stride, se_scale, self.p))
            in_channels = out_channels
            self.p -= self.step

        return nn.Sequential(*layers)


def efficientnet_b0(num_classes=10):
    return EfficientNet( width_coef=1.0, depth_coef=1.0, scale=1.0,dropout=0.2, se_scale=4)

def efficientnet_b1(num_classes=10):
    return EfficientNet(num_classes=num_classes, width_coef=1.0, depth_coef=1.1, scale=240/224, dropout=0.2, se_scale=4)

def efficientnet_b2(num_classes=10):
    return EfficientNet(num_classes=num_classes, width_coef=1.1, depth_coef=1.2, scale=260/224., dropout=0.3, se_scale=4)

def efficientnet_b3(num_classes=10):
    return EfficientNet(num_classes=num_classes, width_coef=1.2, depth_coef=1.4, scale=300/224, dropout=0.3, se_scale=4)

def efficientnet_b4(num_classes=10):
    return EfficientNet(num_classes=num_classes, width_coef=1.4, depth_coef=1.8, scale=380/224, dropout=0.4, se_scale=4)

def efficientnet_b5(num_classes=10):
    return EfficientNet(num_classes=num_classes, width_coef=1.6, depth_coef=2.2, scale=456/224, dropout=0.4, se_scale=4)

def efficientnet_b6(num_classes=10):
    return EfficientNet(num_classes=num_classes, width_coef=1.8, depth_coef=2.6, scale=528/224, dropout=0.5, se_scale=4)

def efficientnet_b7(num_classes=10):
    return EfficientNet(num_classes=num_classes, width_coef=2.0, depth_coef=3.1, scale=600/224, dropout=0.5, se_scale=4)


# check
if __name__ == '__main__':
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    x = torch.randn(3, 3, 224, 224).to(device)
    model = efficientnet_b0().to(device)
    output = model(x)
    print('output size:', output.size())

output size: torch.Size([3, 10])


In [71]:
model = efficientnet_b0().to(device)
summary(model, (3,28,28), device=device.type)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
          Upsample-1            [-1, 3, 28, 28]               0
            Conv2d-2           [-1, 32, 14, 14]             864
       BatchNorm2d-3           [-1, 32, 14, 14]              64
            Conv2d-4           [-1, 32, 14, 14]             288
       BatchNorm2d-5           [-1, 32, 14, 14]              64
           Sigmoid-6           [-1, 32, 14, 14]               0
             Swish-7           [-1, 32, 14, 14]               0
 AdaptiveAvgPool2d-8             [-1, 32, 1, 1]               0
            Linear-9                  [-1, 128]           4,224
          Sigmoid-10                  [-1, 128]               0
            Swish-11                  [-1, 128]               0
           Linear-12                   [-1, 32]           4,128
          Sigmoid-13                   [-1, 32]               0
          SEBlock-14             [-1, 3

In [94]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-5)
model.to(device)
param = list(model.parameters())


In [96]:
def compute_acc(true, pred):
    return sum(true == pred) / len(true)

In [97]:
from tqdm import tqdm

for Epoch in tqdm(range(30)):
    for batch, labels in mnist_loader:
        batch = batch.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        
        output = model(batch)
        loss = criterion(output,labels)
        loss.backward()
        optimizer.step()
        acc = compute_acc(labels.detach().cpu().numpy(), output.detach().cpu().numpy().argmax(-1))
        
    if Epoch % 10 == 0 or Epoch == 29:
        print(f'Epoch {Epoch}, loss : {loss}, acc : {acc}')

  3%|██▊                                                                                | 1/30 [00:22<10:59, 22.76s/it]

Epoch 0, loss : 2.406503677368164, acc : 0.0


 37%|██████████████████████████████                                                    | 11/30 [04:08<07:12, 22.74s/it]

Epoch 10, loss : 0.8348522186279297, acc : 0.625


 70%|█████████████████████████████████████████████████████████▍                        | 21/30 [07:49<03:21, 22.42s/it]

Epoch 20, loss : 1.3117749691009521, acc : 0.625


100%|██████████████████████████████████████████████████████████████████████████████████| 30/30 [11:20<00:00, 22.67s/it]

Epoch 29, loss : 2.3525431156158447, acc : 0.375





In [98]:
test_df = pd.read_csv('./test/test_data.csv') 
test_file_dir = './test/'

In [99]:
test_mnist_dataset = MNIST(test_file_dir + test_df['file_name'])
test_mnist_loader = DataLoader(test_mnist_dataset, batch_size = 32)
preds = None

for test_batch in tqdm(test_mnist_loader):
    test_batch = test_batch.to(device)
    output = model(test_batch)
    
    digit_pred = output.detach().cpu().numpy().argmax(-1)
    if preds is None:
        preds = digit_pred
    else:
        preds = np.concatenate([preds,digit_pred])
        

100%|████████████████████████████████████████████████████████████████████████████████| 157/157 [00:06<00:00, 24.34it/s]


In [100]:
preds

array([1, 8, 8, ..., 5, 7, 0], dtype=int64)

In [20]:
submission = pd.read_csv('./sample_submission.csv') # sample submission 불러오기

submission['label'] = preds

submission.to_csv('submission.csv', index=False)