In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ResNet(nn.Module):
    def __init__(self, num_layers, block, num_classes=10):
        super(ResNet, self).__init__()
        self.num_layers = num_layers
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)

        #2n개의 층마다 feature map 사이즈가 반이 되고 channel 수는 2배로 증가한다.
        # feature map size = 32x32x16
        self.layers_2n = self.get_layers(block, 16, 16, stride=1)
        # feature map size = 16x16x32
        self.layers_4n = self.get_layers(block, 16, 32, stride=2)
        # feature map size = 8x8x64
        self.layers_6n = self.get_layers(block, 32, 64, stride=2)

        # output layers
        self.avg_pool = nn.AvgPool2d(8, stride=1)
        self.fc_out = nn.Linear(64, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out',
                                        nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def get_layers(self, block, in_channels, out_channels, stride):
        if stride == 2:
            down_sample = True #feature map사이즈는 반이 되고 channel수는 2배가 됨
        else:
            down_sample = False

        layers_list = nn.ModuleList(
            [block(in_channels, out_channels, stride, down_sample)])

        for _ in range(self.num_layers - 1):
            layers_list.append(block(out_channels, out_channels))

        return nn.Sequential(*layers_list) #layers_list에 들어있는 block들을 차례대로 연산하기

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.layers_2n(x)
        x = self.layers_4n(x)
        x = self.layers_6n(x)

        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc_out(x)
        return x

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, down_sample=False):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.stride = stride

        if down_sample:
            self.down_sample = IdentityPadding(in_channels, out_channels, stride)
        else:
            self.down_sample = None

    def forward(self, x):
        shortcut = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.down_sample is not None:
            shortcut = self.down_sample(x)

        out += shortcut
        out = self.relu(out)
        return out

        #resnet의 깊이가 점점 깊어지면 파라미터 수가 많아지기 때문에 residual block으로 다른 구조를 사용한다.
        #1*1 conv를 통해 사이즈를 줄이고 3*3 conv를 하면 파라미터 수를 아낄 수 있다 - bottleneck block을 만드는 것 !

In [None]:
class IdentityPadding(nn.Module):
    def __init__(self, in_channels, out_channels, stride):
        super(IdentityPadding, self).__init__()

        self.pooling = nn.MaxPool2d(1, stride=stride) #피처 맵 사이즈 줄이기
        self.add_channels = out_channels - in_channels

    def forward(self, x):
        out = F.pad(x, (0, 0, 0, 0, 0, self.add_channels)) #채널 수 늘리기(피처 맵의)- 한 방향으로 padding하기
        out = self.pooling(out)
        return out

In [None]:
def resnet(): #resnet-20에서는 3,5,7,9에서 3을 넣어야한다. 6*3 + 2 = 20개
    block = ResidualBlock
    model = ResNet(3, block)
    return model

In [None]:

model = resnet().to('cuda')


In [None]:
import tqdm

from torchvision.datasets.cifar import CIFAR10
from torchvision.transforms import Compose, ToTensor
from torchvision.transforms import RandomHorizontalFlip, RandomCrop
from torchvision.transforms import Normalize
from torch.utils.data.dataloader import DataLoader

from torch.optim.adam import Adam


transforms = Compose([
    RandomCrop((32,32), padding=4),
    RandomHorizontalFlip(p = 0.5),
    ToTensor(),
    Normalize((0.4914, 0.4822, 0.4465),(0.247, 0.243, 0.2010))
])


In [None]:
#CIFAR-10 데이터셋 불러오기
training_data = CIFAR10(
    root="./",
    train=True,
    download = True,
    transform = transforms)

test_data = CIFAR10(
    root="./",
    train=False,
    download = True,
    transform = transforms)

train_loader = DataLoader(training_data, batch_size=32, shuffle=True)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

device = "cuda" if torch.cuda.is_available() else "cpu"

model = resnet()

model.to(device)


Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./cifar-10-python.tar.gz


  0%|          | 0/170498071 [00:00<?, ?it/s]

Extracting ./cifar-10-python.tar.gz to ./
Files already downloaded and verified


ResNet(
  (conv1): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (layers_2n): Sequential(
    (0): ResidualBlock(
      (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): ResidualBlock(
      (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), 

In [None]:
lr = 0.001
optim = Adam(model.parameters(), lr=lr)


In [None]:
for epoch in range(30):
  iterator = tqdm.tqdm(train_loader)
  for data, label in iterator:
    #최적화를 위한 기울기 초기화
    optim.zero_grad()

    #모델의 예측값
    preds = model(data.to(device))

    #손실 계산 및 역전파
    loss = nn.CrossEntropyLoss()(preds, label.to(device))
    loss.backward()
    optim.step()

    iterator.set_description(f"epoch:{epoch+1} loss:{loss.item()}")

torch.save(model.state_dict(), "ResNet.pth")

epoch:1 loss:1.1851348876953125: 100%|██████████| 1563/1563 [00:59<00:00, 26.21it/s]
epoch:2 loss:0.9505700469017029: 100%|██████████| 1563/1563 [00:51<00:00, 30.49it/s]
epoch:3 loss:0.47100722789764404: 100%|██████████| 1563/1563 [00:52<00:00, 29.91it/s]
epoch:4 loss:0.823797345161438: 100%|██████████| 1563/1563 [00:52<00:00, 29.86it/s]
epoch:5 loss:0.44031912088394165: 100%|██████████| 1563/1563 [00:52<00:00, 29.77it/s]
epoch:6 loss:0.5805164575576782: 100%|██████████| 1563/1563 [00:51<00:00, 30.29it/s]
epoch:7 loss:0.4214840233325958: 100%|██████████| 1563/1563 [00:51<00:00, 30.22it/s]
epoch:8 loss:0.8060622811317444: 100%|██████████| 1563/1563 [00:51<00:00, 30.59it/s]
epoch:9 loss:1.1175594329833984: 100%|██████████| 1563/1563 [00:51<00:00, 30.60it/s]
epoch:10 loss:0.3912796080112457: 100%|██████████| 1563/1563 [00:50<00:00, 30.83it/s]
epoch:11 loss:0.599243700504303: 100%|██████████| 1563/1563 [00:51<00:00, 30.60it/s]
epoch:12 loss:0.3482469320297241: 100%|██████████| 1563/1563 [0

In [None]:
def model_evaluate(model, data_loader, loss_fn, device):
   
    model.eval()
    
   
    with torch.no_grad():
     
        corr = 0
        running_loss = 0
       
        for data, label in data_loader:
          
            data, label = data.to(device), label.to(device)
            
         
            output = model(data)
            
       
            _, pred = output.max(dim=1)
          
            corr += torch.sum(pred.eq(label)).item()
            
         
            running_loss += loss_fn(output, label).item() * data.size(0)
        
     
        acc = corr / len(data_loader.dataset)
        
     
        return running_loss / len(data_loader.dataset), acc

In [None]:
model.load_state_dict(torch.load(f'ResNet.pth'))

<All keys matched successfully>

In [None]:
loss_fn = nn.CrossEntropyLoss()

final_loss, final_acc = model_evaluate(model, test_loader, loss_fn, device)
print(f'evaluation loss: {final_loss:.5f}, evaluation accuracy: {final_acc:.5f}')

evaluation loss: 0.38328, evaluation accuracy: 0.87150


In [None]:
#Vgg-19에서 파생해온 ResNet, but 실패한 코드,,,

# import torch
# import torch.nn as nn

# class BasicBlock(nn.Module):
#   def __init__(self, in_channels, out_channels, kernel_size=3):
#     super(BasicBlock, self).__init__()

#     #합성곱층 정의
#     self.c1 = nn.Conv2d(in_channels, out_channels,
#                          kernel_size=kernel_size, padding=1)
#     self.c2 = nn.Conv2d(out_channels, out_channels,
#                          kernel_size=kernel_size, padding=1)
    
#     self.downsample = nn.Conv2d(in_channels, out_channels,
#                                 kernel_size = 1)
    
#     #배치 정규화층 정의
#     self.bn1 = nn.BatchNorm2d(num_features=out_channels)
#     self.bn2 = nn.BatchNorm2d(num_features=out_channels)

#     self.relu = nn.ReLU()
  
#   def forward(self, x):
#     #스킵 커넥션을 위해 초기 입력을 저장한다.
#     x_ = x

#     #ResNet 기본 블럭에서 F(x) 부분
#     x = self.c1(x)
#     x = self.bn1(x)
#     x = self.relu(x)
#     x = self.c2(x)
#     x = self.bn2(x)

#     #합성곱의 결과와 입력의 채널 수를 맞춘다.
#     x_ = self.downsample(x_)

#     #합성곱층의 결과와 저장해놨던 입력값을 더해준다.(스킵 커넥션)
#     x += x_
#     x = self.relu(x)

#     return x


In [None]:
# class ResNet(nn.Module):
#   def __init__(self, num_classes=10):
#     super(ResNet, self).__init__()

#     #기본 블록
#     self.b1 = BasicBlock(in_channels=3, out_channels=64)
#     self.b2 = BasicBlock(in_channels=64, out_channels=128)
#     self.b3 = BasicBlock(in_channels=128, out_channels=256)
#     self.b4 = BasicBlock(in_channels=256, out_channels=512)


#     self.pool = nn.AvgPool2d(kernel_size=2, stride=2)

#     #분류기
#     self.fc1 = nn.Linear(in_features=1000, out_features=num_classes)
#     self.softmax = nn.Softmax()


#   def forward(self, x):
#   #기본 블록과 풀링층 통과
#     x = self.b1(x)
#     x = self.pool(x)
#     x = self.b2(x)
#     x = self.pool(x)
#     x = self.b3(x)
#     x = self.pool(x)
#     x = self.b4(x)
#     x = self.pool(x)

#     x = torch.flatten(x, start_dim=1)

#     #분류기로 예측값 출력하기
#     x = self.fc1(x)
#     x = self.softmax(x)


#     return x


In [None]:
# from tqdm import tqdm  # Progress Bar 출력

# def model_train(model, data_loader, loss_fn, optimizer, device):
#     model.train()
    
#     running_loss = 0
#     corr = 0
    
#     prograss_bar = tqdm(data_loader)
    
#     # mini-batch 학습을 시작
#     for data, label in prograss_bar:
 
#         data, label = data.to(device), label.to(device)
        
  
#         optimizer.zero_grad()
        
      
#         output = model(data)
        
       
#         loss = loss_fn(output, label)
        
        
#         loss.backward()
        
       
#         optimizer.step()
        
      
       
#         _, pred = output.max(dim=1)
        
     
#         corr += pred.eq(label).sum().item()
        
      
#         running_loss += loss.item() * data.size(0)
        
  
#     acc = corr / len(data_loader.dataset)
    
#     return running_loss / len(data_loader.dataset), acc

In [None]:
#모델 학습하기
# from tqdm import tqdm
# progress_bar = tqdm(train_loader)
# lr = 0.001
# optim = Adam(model.parameters(), lr=lr)

# for epoch in range(10):
#   for data, label in progress_bar:
#     optim.zero_grad() # 기울기 초기화

#     preds = model(data.to(device)) #모델의 예측

#     #오차역전파와 최적화 진행
#     loss = nn.CrossEntropyLoss()(preds, label.to(device))
#     loss.backward()
#     optim.step()

#   if epoch==0 or epoch%10==9:
#     print(f'epoch {epoch+1:02d}, loss: {loss:.5f}')


# torch.save(model.state_dict(), "CIFAR.pth")