In [None]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import random
import numpy as np
from torch.nn import Module
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.optim import Adam
from torch.nn import CrossEntropyLoss
from functools import partial
from typing import Any, Callable, List, Optional, Type, Union
from torch import Tensor

In [None]:
class CustomTrainDataset1(Dataset,Module):
    def __init__(self, folder_path, index_array, transform=None):
        self.folder_path = folder_path
        self.image_files=[os.path.join(folder_path,file) for file in os.listdir(folder_path)]
        self.image_paths=random.sample(self.image_files,10000)
        #self.image_paths = [os.path.join(folder_path, img) for img in os.listdir(folder_path)[:7000] if img.endswith('.jpg')]
        self.label = index_array
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image_name = os.path.basename(image_path)  # 이미지 파일의 이름을 가져오기
        image = Image.open(image_path).convert('RGB')  # 이미지를 열고 RGB 형식으로 변환
        if self.transform:
            image = self.transform(image)
        return image,self.label

# 이미지 전처리를 위한 변환기 설정
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 이미지 크기 조절
    transforms.ToTensor(),           # 이미지를 텐서로 변환
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 정규화
])



In [None]:
#라벨 정보
LABELS={'pleasure':0,
        'embarrased':1,
        'angry':2,
        'anxiety':3,
        'sad':4,
        'neutral':5}

datasets=[]

for label_name,label in LABELS.items():
  folder_path1=f'/content/drive/Othercomputers/내MacBookAir/valid{label_name}'
  # CustomDataset 객체 생성
  datasets.append(CustomTrainDataset1(folder_path1, label, transform=transform))

FileNotFoundError: ignored

In [None]:
# DataLoader를 사용하여 배치로 나누기
batch_size = 128

In [None]:
train_loader = DataLoader(torch.utils.data.ConcatDataset(datasets), batch_size=batch_size, shuffle=True,num_workers=8)


In [None]:
class CustomTrainDataset2(Dataset,Module):
    def __init__(self, folder_path, index_array, transform=None):
        self.folder_path = folder_path
        image_files=[os.path.join(folder_path,file) for file in os.listdir(folder_path)]
        self.image_paths=random.sample(image_files,5000)
        self.label = index_array
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image_name = os.path.basename(image_path)  # 이미지 파일의 이름을 가져오기
        image = Image.open(image_path).convert('RGB')  # 이미지를 열고 RGB 형식으로 변환
        if self.transform:
            image = self.transform(image)
        return image,self.label

# 이미지 전처리를 위한 변환기 설정
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 이미지 크기 조절
    transforms.ToTensor(),           # 이미지를 텐서로 변환
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 정규화
])


In [None]:
class CustomValidDataset(Dataset,Module):
    def __init__(self, folder_path, index_array, transform=None):
        self.folder_path = folder_path
        image_files=[os.path.join(folder_path,file) for file in os.listdir(folder_path)]
        self.image_paths=random.sample(image_files,1000)
        #self.image_paths = [os.path.join(folder_path, img) for img in os.listdir(folder_path)[:1000] if img.endswith('.jpg')]
        self.label = index_array
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image_name = os.path.basename(image_path)  # 이미지 파일의 이름을 가져오기
        image = Image.open(image_path).convert('RGB')  # 이미지를 열고 RGB 형식으로 변환
        if self.transform:
            image = self.transform(image)
        return image,self.label

# 이미지 전처리를 위한 변환기 설정
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 이미지 크기 조절
    transforms.ToTensor(),           # 이미지를 텐서로 변환
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 정규화
])

In [None]:
#라벨 정보
LABELS={'기쁨':0,
        '당황':1,
        '분노':2,
        '불안':3,
        '슬픔':4,
        '중립':5}

datasets=[]

for label_name,label in LABELS.items():
  folder_path=f'/content/drive/Othercomputers/내MacBookAir/valid{label_name}'
  # CustomDataset 객체 생성
  datasets.append(CustomValidDataset(folder_path, label, transform=transform))

In [None]:
valid_loader = DataLoader(torch.utils.data.ConcatDataset(datasets), batch_size=batch_size, shuffle=True,num_workers=8)

In [None]:
import pickle

def save_data_loader(dataloader, file_path):
    with open(file_path, 'wb') as file:
        pickle.dump(dataloader, file)

def load_data_loader(file_path):
    with open(file_path, 'rb') as file:
        dataloader = pickle.load(file)
    return dataloader

In [None]:
# DataLoader 객체를 파일에 저장
#save_data_loader(train_loader, '/content/drive/MyDrive/종설/train_Dataloader3.pkl')
#save_data_loader(valid_loader, '/content/drive/MyDrive/종설/valid_Dataloader3.pkl')

# 파일에서 DataLoader 객체 불러오기
train_loader = load_data_loader('/content/drive/MyDrive/종설/train_Dataloader3.pkl')
valid_loader = load_data_loader('/content/drive/MyDrive/종설/valid_Dataloader.pkl')

In [None]:
print(len(train_loader.dataset))
print(len(valid_loader.dataset))

In [None]:
def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=False, dilation=dilation)


def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

In [None]:
class BasicBlock(nn.Module):
    def __init__(
        self,
        inplanes: int,
        planes: int,
        stride: int = 1,
        downsample: Optional[nn.Module] = None,
        groups: int = 1,
        #dilation: int = 1,
        norm_layer: Optional[Callable[..., nn.Module]] = None
    ) -> None:
        super(BasicBlock, self).__init__()

        # Normalization Layer
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d

        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = norm_layer(planes)
        self.relu = nn.LeakyReLU(0.1)
        #self.relu=nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = norm_layer(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x: Tensor) -> Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        # downsampling이 필요한 경우 downsample layer를 block에 인자로 넣어주어야함
        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity  # residual connection
        out = self.relu(out)

        return out

In [None]:
class ResNet(nn.Module):
    def __init__(
        self,
        block: BasicBlock,
        layers: List[int],
        num_classes: int = 1000,
        zero_init_residual: bool = False,
        norm_layer: Optional[Callable[..., nn.Module]] = None
    ) -> None:
        super(ResNet, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer  # batch norm layer

        self.inplanes = 64  # input shape
        #self.dilation = 1  # dilation fixed
        self.groups = 1  # groups fixed

        # input block
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3,
                               bias=False)
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.LeakyReLU(0.1)
        #self.relu=nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # residual blocks
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.layer5 = self._make_layer(block, 1024, layers[4], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        #self.fc = nn.Linear(512, num_classes)
        self.fc = nn.Linear(1024, num_classes)


        # weight initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)  # type: ignore[arg-type]

    def _make_layer(self, block: BasicBlock, planes: int, blocks: int,
                    stride: int = 1) -> nn.Sequential:
        norm_layer = self._norm_layer
        downsample = None

        # downsampling 필요할경우 downsample layer 생성
        if stride != 1 or self.inplanes != planes:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes, stride),
                norm_layer(planes),
            )

        layers = []
        #layers.append(block(self.inplanes, planes, stride, downsample, self.groups,self.dilation, norm_layer))
        layers.append(block(self.inplanes, planes, stride, downsample, self.groups,
                            norm_layer))
        self.inplanes = planes
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes, groups=self.groups,
                                norm_layer=norm_layer))

        return nn.Sequential(*layers)

    def forward(self, x: Tensor) -> Tensor:
        print('input shape:', x.shape)
        x = self.conv1(x)
        print('conv1 shape:', x.shape)
        x = self.bn1(x)
        print('bn1 shape:', x.shape)
        x = self.relu(x)
        print('relu shape:', x.shape)
        x = self.maxpool(x)
        print('maxpool shape:', x.shape)

        x = self.layer1(x)
        print('layer1 shape:', x.shape)
        x = self.layer2(x)
        print('layer2 shape:', x.shape)
        x = self.layer3(x)
        print('layer3 shape:', x.shape)
        x = self.layer4(x)
        print('layer4 shape:', x.shape)
        x = self.layer5(x)
        print('layer5 shape:', x.shape)
        x = self.avgpool(x)
        print('avgpool shape:', x.shape)
        x = torch.flatten(x, 1)
        print('flatten shape:', x.shape)
        x = self.fc(x)
        print('fc shape:', x.shape)

        return x


In [None]:
# ResNet-50 모델 정의
#model = ResNet(BasicBlock,[3,4,8,3],num_classes=6,zero_init_residual=True)
model = ResNet(BasicBlock,[3,4,4,4,3],num_classes=6,zero_init_residual=True)
# 손실 함수 및 옵티마이저 설정
criterion = CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001)

In [None]:
# 모델 학습
num_epochs = 20
# 에폭을 반복하면서 학습과 검증을 수행
for epoch in range(num_epochs):
    '''
    checkpoint = torch.load('/content/drive/MyDrive/Resnet(3,3,3,3,3)_checkpoint_epoch{}.pt'.format(epoch))
    # 모델과 옵티마이저의 상태 복원
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    '''
    i=0
    # 학습 단계
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs,targets)
        loss.backward()
        optimizer.step()
        print(f'epoch: {epoch}, batch: {i}')
        i+=1

    # 각 에폭 종료 후 검증 단계
    model.eval()  # 모델을 평가 모드로 전환
    val_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    with torch.no_grad():
        for val_inputs, val_targets in valid_loader:
            val_outputs = model(val_inputs)
            val_loss += criterion(val_outputs, val_targets).item()

            # 정확도 계산
            _, predicted = torch.max(val_outputs, 1)
            correct_predictions += (predicted == val_targets).sum().item()
            total_samples += val_targets.size(0)

    average_val_loss = val_loss / len(valid_loader)
    accuracy = (correct_predictions / total_samples) * 100

    # 에폭 종료 후 출력
    print(f'Epoch [{epoch}/{num_epochs}], Accuracy: {accuracy:.2f}%')


    checkpoint = {
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'accuracy':accuracy,
    }

    # 저장 경로 설정
    #save_path = '/content/drive/MyDrive/Resnet(3,4,8,3)_checkpoint_epoch{}.pt'.format(epoch)
    save_path = '/content/drive/MyDrive/Resnet(3,4,4,4,3)_checkpoint_epoch{}.pt'.format(epoch)
    # 딕셔너리를 파일로 저장
    torch.save(checkpoint, save_path)


    model.train()  # 모델을 학습 모드로 전환


In [None]:
saved_epoch=0
checkpoint = torch.load('/content/drive/MyDrive/Resnet(3,4,4,4,3)_checkpoint_epoch{}.pt'.format(saved_epoch))
# 모델과 옵티마이저의 상태 복원
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
#epoch = checkpoint['epoch']
accuracy = checkpoint['accuracy']
print(accuracy)