In [3]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset

from sklearn.model_selection import train_test_split
import os
from PIL import Image
import glob
from sklearn.preprocessing import LabelEncoder  # label 수로 변환

In [4]:
# 데이터 폴더 경로
path = 'C:\\Users\\JungHyeona\\Documents\\Project\\AI_deeplearning\\image\\Convex\\V8\\CA1-7S'

In [5]:
class BDataset(Dataset):
    def __init__(self, path, transform=None):
        
        self.path = path
        
        # error 이미지 경로
        self.br = path + '\\Broken'
        # 정상 이미지 경로
        self.nor = path + '\\Normal'
        
        # 정상 이미지 저장
        self.normal_list = glob.glob(self.nor + '\\*.png')
        # error 이미지 저장
        self.broken_list = glob.glob(self.br + '\\*.png')
        
        # 모든 이미지를 한 개로 묶기
        self.img_list = self.normal_list + self.broken_list
        self.label_list = [0] * len(self.normal_list) + [1] * len(self.broken_list)

        self.transform = transform
        
    def __len__(self):
        return len(self.img_list)
    
    def __getitem__(self, index):
        img_path = self.img_list[index]
        label = self.label_list[index]
        
        image = Image.open(img_path)

        if self.transform is not None:
            image = self.transform(image)
            
        return image, label

In [None]:
'''
images = []
labels = []

for for_name in os.listdir(path):  # i = ['BackgroundNoise', 'LoopNoise', 'Normal(Air)']
    forder = os.path.join(path, for_name)  # ex) path/Normal(Air)

    for j in os.listdir(forder):
        file = os.path.join(forder, j)  # ex) path/Normal(Air)/a.png
        
        image = Image.open(file)
        label = os.path.basename(os.path.dirname(file))
        
        images.append(image)
        labels.append(label)
'''

## 데이터 전처리<br>

### normal : 1000개
shape: (1000, 3, 808, 1182)<br>
min: (0.0, 0.05882353, 0.019607844)<br>
max: (0.92156863, 0.92156863, 0.92941177)<br>
mean: (0.11181334, 0.111890435, 0.1135668)<br>
std: (0.026530448, 0.02637402, 0.02658573)

### broken
shape: (523, 3, 808, 1182)<br>
min: (0.0, 0.05882353, 0.019607844)<br>
max: (0.92156863, 0.92156863, 0.92941177)<br>
mean: (0.11188801, 0.111966185, 0.11256382)<br>
std: (0.03245135, 0.032322615, 0.03192493)

In [None]:
# 데이터 전처리
transform = transform = transforms.Compose([
                transforms.Resize((512, 512)),
                transforms.RandomHorizontalFlip(),   # 좌우반전
                # transforms.RandomResizedCrop((60, 60)),   # (w, h). 이미지에서 잘라낼 크기 설정하면, 크기만큼 랜덤으로 잘린다.
                # transforms.RandomErasing(),      # 이미지에서 랜덤하게 박스 모양으로 지운다. PIL image에 바로 적용 불가 -> tensor 필요
                transforms.ToTensor(),
                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) ])

In [12]:
# DataLoader를 통해 데이터를 불러올 수 있도록 설정
class Dataloader():
    def __init__(self, path):
        normalize = transforms.Normalize((0.111850675, 0.11192831, 0.11306531), (0.029490899, 0.0293483175, 0.029255329999999996))
        transform_train = transform = transforms.Compose([
                transforms.Resize((512, 512)),
                transforms.RandomHorizontalFlip(),   # 좌우반전
                # transforms.RandomResizedCrop((60, 60)),   # (w, h). 이미지에서 잘라낼 크기 설정하면, 크기만큼 랜덤으로 잘린다.
                # transforms.RandomErasing(),      # 이미지에서 랜덤하게 박스 모양으로 지운다. PIL image에 바로 적용 불가 -> tensor 필요
                transforms.ToTensor(),
                normalize ])
        
        transform_test = transform = transforms.Compose([
                transforms.Resize((512, 512)),
                transforms.ToTensor(),
                normalize ])
        
        trainset = BDataset(path, transform_train)
        testset = BDataset(path, transform_test)
        
        train_loader = DataLoader(trainset, batch_size=64, shuffle=True, num_workers=4)
        test_loader = DataLoader(trainset, batch_size=64, shuffle=True, num_workers=4)
        self.train_loader = train_loader
        self.test_loader = test_loader
        
        
        def getloader(self):
            return self.trainloader, self.testloader

In [14]:
train_loader = Dataloader(path)
test_loader = Dataloader(path)

## 모델

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        
        self.model = nn.Sequential(
            # (512, 512, 3) -> (32, 32, 32), (16, 16, 32)
            nn.Conv2d(3, 64, stride=1, padding=1),  #cnn layer
            nn.BatchNormalization(),    #batch layer
            nn.Conv2d(3, 64, stride=1, padding=1),  #cnn layer
            nn.BatchNormalization(),    #batch layer
            nn.Conv2d(3, 64, stride=1, padding=1),  #cnn layer
            nn.BatchNormalization(),    #batch layer
            
            nn.MaxPooling2D((2, 2)),    #pooling layer
            
            nn.Conv2d(3, 64, stride=1, padding=1),  #cnn layer
            nn.BatchNormalization(),    #batch layer
            nn.Conv2d(3, 64, stride=1, padding=1),  #cnn layer
            nn.BatchNormalization(),    #batch layer
            nn.Conv2d(3, 64, stride=1, padding=1),  #cnn layer
            nn.BatchNormalization(),    #batch layer
            
            nn.MaxPooling2D((2, 2)),    #pooling layer
            
            nn.Conv2d(3, 64, stride=1, padding=1),  #cnn layer
            nn.BatchNormalization(),    #batch layer
            nn.Flatten(),
            nn.Linear(4*4*128, 18),
            nn.Linear(18, 3)
        )
        

    def forward(self,x):
    	# self.layer에 정의한 연산 수행
        out = self.model(x)
        return out

In [None]:
loss_func = nn.CrossEntropyLoss()
learning_rate = 0.0002
optimizer = torch.optim.RMSprop(model.parameters(), lr=learning_rate)

In [None]:
total_batch = len(train_loader)
print('총 배치의 수 : {}'.format(total_batch))

for epoch in range(30):
    avg_cost = 0

    for X, Y in train_loader: # 미니 배치 단위로 꺼내온다. X는 미니 배치, Y는 레이블.
        # image is already size of (28x28), no reshape
        # label is not one-hot encoded
        X = X.to(device)
        Y = Y.to(device)

        optimizer.zero_grad()
        hypothesis = model(X)
        cost = loss_func(hypothesis, Y)
        cost.backward()
        optimizer.step()

        avg_cost += cost / total_batch

    print('[Epoch: {:>4}] cost = {:>.9}'.format(epoch + 1, avg_cost))

In [None]:
# 모델 평가 함수
def evaluate(model, dataloader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    accuracy = 100 * correct / total
    return accuracy

In [None]:
# 모델 평가
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
val_accuracy = evaluate(model, val_loader, device)
print(f'Validation Accuracy: {val_accuracy:.2f}%')

In [None]:
# 모델 저장
torch.save(model, path+'B_cnn.pt')