In [2]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import os
import numpy as np
from tensorflow.keras.preprocessing.image import img_to_array
from torch.utils.data import random_split
import torchvision.models as models
import torch.optim as optim
print(torch.__version__)
print(torch.cuda.is_available())

1.9.0+cu111
True


In [3]:
# 데이터셋 및 라벨 파일 경로 설정
data_folder = 'C:/Users/A/Desktop/224_Data'
train_data_folder = data_folder + '/Train'
test_data_folder = data_folder + '/Test'
train_label_file = train_data_folder + '/train_labels.txt'
test_label_file = test_data_folder + '/test_labels.txt'

In [4]:
#calculate for normalize(mean,std)
def load_and_combine_data(train_data_folder):
    file_paths = [os.path.join(train_data_folder, f) for f in os.listdir(train_data_folder) if f.endswith('9.txt')]
    all_data = []

    for file_path in file_paths:
        #Loading arrary data from each txt file
        data = np.loadtxt(file_path)
        all_data.append(data)

    #combine all data to one file
    combined_data = np.concatenate(all_data, axis=0)
    return combined_data

def calculate_statistics(data):
    mean = np.mean(data)
    std = np.std(data)
    return mean, std

combined_data = load_and_combine_data(train_data_folder)
matrix_mean, matrix_std = calculate_statistics(combined_data)

In [5]:
#check the mean and std 
print(matrix_mean,matrix_std)

-0.019175920669329275 0.09508734685529101


In [6]:
class CustomDataset(Dataset):
    def __init__(self, data_folder, label_file, mean, std, label_mean=0, label_std=1):
        self.data_folder = data_folder
        self.parameters = self.read_parameters(label_file)  # 라벨 파일 경로를 인자로 받음
        self.transform = transforms.Compose([
            transforms.Normalize(mean, std)
        ])
        self.data_numbers = list(self.parameters.keys())
        all_params = np.array(list(self.parameters.values()))
        self.label_min = np.min(all_params, axis=0)
        self.label_max = np.max(all_params, axis=0)

    def read_parameters(self, file_path):
        parameters = {}
        with open(file_path, 'r') as file:
            for index, line in enumerate(file):
                if index == 0:  # 첫 번째 줄(헤더) 건너뛰기
                    continue
                parts = line.strip().split(',')
                data_number = parts[0]
                params = np.array(parts[1:4], dtype=np.float32)
                parameters[data_number] = params
        return parameters

    def __len__(self):
        return len(self.data_numbers)

    def __getitem__(self, idx):
        data_number = self.data_numbers[idx]
        matrix_path = os.path.join(self.data_folder, f'{data_number}.txt')
        matrix = np.loadtxt(matrix_path)  # Load matrix from a txt file
        matrix = torch.from_numpy(matrix).float().unsqueeze(0)  # 2D -> 3D tensor
        matrix = self.transform(matrix)
        params = self.parameters[data_number]
        
        # label normalize
        params = (params - self.label_min) / (self.label_max - self.label_min)
        
        return matrix, torch.from_numpy(params).float()

# 훈련 및 테스트 데이터셋 생성
train_dataset = CustomDataset(data_folder=train_data_folder, label_file=train_label_file, mean=0, std=1)
test_dataset = CustomDataset(data_folder=test_data_folder, label_file=test_label_file, mean=0, std=1)


In [7]:
#check the custom_dataset
print(train_dataset[0])
print(test_dataset[0])

(tensor([[[-2.0674e-01, -2.0678e-01, -2.0681e-01,  ...,  4.4643e-04,
           4.3100e-04,  4.1558e-04],
         [-2.0681e-01, -2.0672e-01, -2.0673e-01,  ...,  4.4543e-04,
           4.3001e-04,  4.1458e-04],
         [-2.0679e-01, -2.0671e-01, -2.0662e-01,  ...,  4.4443e-04,
           4.2901e-04,  4.1359e-04],
         ...,
         [ 4.4546e-04,  4.4463e-04,  4.4381e-04,  ..., -1.8422e-04,
          -1.8639e-04, -1.8865e-04],
         [ 4.2972e-04,  4.2889e-04,  4.2807e-04,  ..., -1.8644e-04,
          -1.8812e-04, -1.9030e-04],
         [ 4.1398e-04,  4.1315e-04,  4.1232e-04,  ..., -1.8866e-04,
          -1.9035e-04, -1.9203e-04]]]), tensor([0.0000, 0.0000, 0.3000]))
(tensor([[[-0.1882, -0.1882, -0.1883,  ...,  0.0003,  0.0003,  0.0003],
         [-0.1883, -0.1882, -0.1882,  ...,  0.0003,  0.0003,  0.0003],
         [-0.1882, -0.1882, -0.1881,  ...,  0.0003,  0.0003,  0.0003],
         ...,
         [ 0.0003,  0.0003,  0.0003,  ..., -0.0002, -0.0002, -0.0002],
         [ 0.0003, 

In [8]:
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [9]:
#Resnet50
def resnet50(pretrained=False, progress=True, **kwargs):
    return _resnet('resnet50', Bottleneck, [3, 4, 6, 3], pretrained, progress, **kwargs)

In [10]:
def _resnet(resnet50, Bottleneck, layers, pretrained, progress, **kwargs):
    r"""
    - pretrained: pretrained된 모델 가중치를 불러오기 (saved by caffe)
    - arch: ResNet모델 이름
    - block: 어떤 block 형태 사용할지 ("Basic or Bottleneck")
    - layers: 해당 block이 몇번 사용되는지를 list형태로 넘겨주는 부분
    """
    model = ResNet(Bottleneck, layers, **kwargs)
    if pretrained:
        state_dict = load_state_dict_from_url(model_urls[resnet50], progress=progress)
        model.load_state_dict(state_dict)
    return model

In [11]:
#convolution layer
def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
    r"""
    3x3 convolution with padding
    - in_planes: in_channels
    - out_channels: out_channels
    - bias=False: BatchNorm에 bias가 포함되어 있으므로, conv2d는 bias=False로 설정.
    """
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=False, dilation=dilation)

def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

In [12]:
#bottleneck architecture
class Bottleneck(nn.Module):
    # Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
    # while original implementation places the stride at the first 1x1 convolution(self.conv1)
    # according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385.
    # This variant is also known as ResNet V1.5 and improves accuracy according to
    # https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.

    expansion = 4 # 블록 내에서 차원을 증가시키는 3번째 conv layer에서의 확장계수
    
    def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
                 base_width=64, dilation=1, norm_layer=None):
        super(Bottleneck, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        # ResNext나 WideResNet의 경우 사용
        width = int(planes * (base_width / 64.)) * groups
        
        # Bottleneck Block의 구조
        self.conv1 = conv1x1(inplanes, width)
        self.bn1 = norm_layer(width)
        self.conv2 = conv3x3(width, width, stride, groups, dilation) # conv2에서 downsample
        self.bn2 = norm_layer(width)
        self.conv3 = conv1x1(width, planes * self.expansion)
        self.bn3 = norm_layer(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x
        # 1x1 convolution layer
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        # 3x3 convolution layer
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
        # 1x1 convolution layer
        out = self.conv3(out)
        out = self.bn3(out)
        # skip connection
        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

In [13]:
class ResNet(nn.Module):

    def __init__(self, block, layers, num_classes=1000, zero_init_residual=False,
                 groups=1, width_per_group=64, replace_stride_with_dilation=None,
                 norm_layer=None):
        super(ResNet, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer
        # default values
        self.inplanes = 64 # input feature map
        self.dilation = 1
        # stride를 dilation으로 대체할지 선택
        if replace_stride_with_dilation is None:
            # each element in the tuple indicates if we should replace
            # the 2x2 stride with a dilated convolution instead
            replace_stride_with_dilation = [False, False, False]
        if len(replace_stride_with_dilation) != 3:
            raise ValueError("replace_stride_with_dilation should be None "
                             "or a 3-element tuple, got {}".format(replace_stride_with_dilation))
        self.groups = groups
        self.base_width = width_per_group
        
        r"""
        - 처음 입력에 적용되는 self.conv1과 self.bn1, self.relu는 모든 ResNet에서 동일 
        - 3: 입력으로 RGB 이미지를 사용하기 때문에 convolution layer에 들어오는 input의 channel 수는 3 => matrix 사용할거라 1로 변경
        """
        self.conv1 = nn.Conv2d(1, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        r"""
        - 아래부터 block 형태와 갯수가 ResNet층마다 변화
        - self.layer1 ~ 4: 필터의 개수는 각 block들을 거치면서 증가(64->128->256->512)
        - self.avgpool: 모든 block을 거친 후에는 Adaptive AvgPool2d를 적용하여 (n, 512, 1, 1)의 텐서로
        - self.fc: 이후 fc layer를 연결
        """
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, # 여기서부터 downsampling적용
                                       dilate=replace_stride_with_dilation[0])
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2,
                                       dilate=replace_stride_with_dilation[1])
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2,
                                       dilate=replace_stride_with_dilation[2])
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight, 0)
                elif isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)

    def _make_layer(self, block, planes, blocks, stride=1, dilate=False):
        r"""
        convolution layer 생성 함수
        - block: block종류 지정
        - planes: feature map size (input shape)
        - blocks: layers[0]와 같이, 해당 블록이 몇개 생성돼야하는지, 블록의 갯수 (layer 반복해서 쌓는 개수)
        - stride와 dilate은 고정
        """
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if dilate:
            self.dilation *= stride
            stride = 1
        
        # the number of filters is doubled: self.inplanes와 planes 사이즈를 맞춰주기 위한 projection shortcut
        # the feature map size is halved: stride=2로 downsampling
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                norm_layer(planes * block.expansion),
            )

        layers = []
        # 블록 내 시작 layer, downsampling 필요
        layers.append(block(self.inplanes, planes, stride, downsample, self.groups,
                            self.base_width, previous_dilation, norm_layer))
        self.inplanes = planes * block.expansion # inplanes 업데이트
        # 동일 블록 반복
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes, groups=self.groups,
                                base_width=self.base_width, dilation=self.dilation,
                                norm_layer=norm_layer))

        return nn.Sequential(*layers)

    def _forward_impl(self, x):
        # See note [TorchScript super()]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    def forward(self, x):
        return self._forward_impl(x)

In [14]:
label_size = 3
model_instance = resnet50(pretrained=False)
model_instance.fc = nn.Linear(model_instance.fc.in_features, label_size)
criterion = nn.MSELoss()
optimizer = optim.Adam(model_instance.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5) #scheduler
device = torch.device("cuda:0")
model_instance.to(device)

ResNet(
  (conv1): Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [15]:
from torchsummary import summary
summary(model_instance, (1, 128, 128))

  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)


----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 64, 64]           3,136
       BatchNorm2d-2           [-1, 64, 64, 64]             128
              ReLU-3           [-1, 64, 64, 64]               0
         MaxPool2d-4           [-1, 64, 32, 32]               0
            Conv2d-5           [-1, 64, 32, 32]           4,096
       BatchNorm2d-6           [-1, 64, 32, 32]             128
              ReLU-7           [-1, 64, 32, 32]               0
            Conv2d-8           [-1, 64, 32, 32]          36,864
       BatchNorm2d-9           [-1, 64, 32, 32]             128
             ReLU-10           [-1, 64, 32, 32]               0
           Conv2d-11          [-1, 256, 32, 32]          16,384
      BatchNorm2d-12          [-1, 256, 32, 32]             512
           Conv2d-13          [-1, 256, 32, 32]          16,384
      BatchNorm2d-14          [-1, 256,

In [16]:
num_epochs = 10 #batch=32
losses = [] 

for epoch in range(num_epochs):
    model_instance.train()
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model_instance(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    epoch_loss = running_loss / len(train_loader)  #epoch당 평균 loss 계산
    losses.append(epoch_loss)  #add list
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss}')
print('Finished Training')

Epoch 1/10, Loss: 1.4509913950766389
Epoch 2/10, Loss: 0.05164829974896029
Epoch 3/10, Loss: 0.04778801806663212
Epoch 4/10, Loss: 0.04012734462556086
Epoch 5/10, Loss: 0.03962411819712112
Epoch 6/10, Loss: 0.03334951871319821
Epoch 7/10, Loss: 0.037603983263436114
Epoch 8/10, Loss: 0.03737572766840458
Epoch 9/10, Loss: 0.0324052474216411
Epoch 10/10, Loss: 0.03233631563029791
Finished Training


In [22]:
for inputs, labels in train_loader:
    inputs, labels = inputs.to(device), labels.to(device)
    outputs = model_instance(inputs)
    print("Outputs Shape:", outputs.shape)  # 모델 출력 차원 확인
    print("Labels Shape:", labels.shape)  # 라벨 차원 확인
    break  # 첫 번째 배치만 확인


Outputs Shape: torch.Size([64, 3])
Labels Shape: torch.Size([64, 3])


In [17]:
print(model_instance.fc)

Linear(in_features=2048, out_features=3, bias=True)


In [20]:
model_instance.eval()  # 평가 모드로 전환
test_loss = 0.0
correct = 0
total = 0

with torch.no_grad():  # 기울기 계산 비활성화
    for data in test_loader:
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model_instance(inputs)
        loss = criterion(outputs, labels)
        test_loss += loss.item()  # 테스트 손실 누적
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

avg_test_loss = test_loss / len(test_loader)
test_accuracy = 100 * correct / total
print(f'Test Loss: {avg_test_loss}, Test Accuracy: {test_accuracy}%')

RuntimeError: The size of tensor a (64) must match the size of tensor b (3) at non-singleton dimension 1