# PyTorch를 활용한 이미지 분류 및 배포

## Nvidia-GPU 설정 여부 확인

In [None]:
!nvidia-smi

## 쓰레기 분류 데이터셋을 활용한 이미지 분류

4 종류의 쓰레기 사진을 촬영한 데이터셋으로 이미지 분류 모델을 훈련하고, 이를 활용한 쓰레기 분류 프로그램을 제작합니다.

### 데이터셋 클래스

이미지 분류를 위한 데이터셋을 다운로드 받은 후 이를 사용합니다.

데이터셋 위치: https://github.com/JinFree/Recycle_Classification_Dataset.git

In [None]:
import os
from torch.utils.data import Dataset
import torchvision.transforms as transforms
from PIL import Image

class PyTorch_Classification_Dataset_Class(Dataset):
    def __init__(self
                , dataset_dir = "/content/Recycle_Classification_Dataset"
                , transform = None):
        super().__init__()
        if not os.path.isdir(dataset_dir):
            os.system("git clone https://github.com/JinFree/Recycle_Classification_Dataset.git")
            os.system("rm -rf ./Recycle_Classification_Dataset/.git")
        self.image_abs_path = dataset_dir
        self.transform = transform
        if self.transform is None:
            self.transform = transforms.Compose([
                    transforms.Resize(256)
                    , transforms.RandomCrop(224)
                    , transforms.ToTensor()
                    , transforms.Normalize(mean=[0.485, 0.456, 0.406],
                            std=[0.229, 0.224, 0.225])
                    ])
        self.label_list = os.listdir(self.image_abs_path)
        self.label_list.sort()
        self.x_list = []
        self.y_list = []
        for label_index, label_str in enumerate(self.label_list):
            img_path = os.path.join(self.image_abs_path, label_str)
            img_list = os.listdir(img_path)
            for img in img_list:
                self.x_list.append(os.path.join(img_path, img))
                self.y_list.append(label_index)

    def __len__(self):
        return len(self.x_list)

    def __getitem__(self, idx):
        image = Image.open(self.x_list[idx])
        if image.mode is not "RGB":
            image = image.convert('RGB')
        if self.transform is not None:
            image = self.transform(image)
        return image, self.y_list[idx]

    def __save_label_map__(self, dst_text_path = "label_map.txt"):
        label_list = self.label_list
        f = open(dst_text_path, 'w')
        for i in range(len(label_list)):
            f.write(label_list[i]+'\n')
        f.close()

    def __num_classes__(self):
        return len(self.label_list)

### 이미지 분류 모델 클래스

빠른 수렴을 위해 MobileNet V2의 pre-trained weights로부터 마지막 Softmax 레이어만 바꿔서 훈련을 수행합니다.

In [None]:
import torch
from torchvision import models
import torch.nn as nn
import torch.nn.functional as F

class Model_Class_Transfer_Learning_MobileNet(nn.Module):
    def __init__(self, num_classes, pretrained=True):
        super().__init__()
        self.network = models.mobilenet_v2(pretrained=pretrained)
        num_ftrs = self.network.classifier[1].in_features
        self.network.classifier[1] = nn.Linear(num_ftrs, num_classes)
        self.classifier = nn.Sequential(nn.Softmax(dim=-1))

    def forward(self, x):
        x = self.network(x)
        x = self.classifier(x)
        return x

## 훈련을 위한 코드 전반적으로 구현한 클래스

In [None]:
import os
import torch
import torch.optim as optim
import torchvision.transforms as transforms
import torch.nn.functional as F
from tqdm import tqdm

class PyTorch_Classification_Training_Class():
    def __init__(self
                , dataset_dir = "/content/Recycle_Classification_Dataset"
                , batch_size = 16
                , train_ratio = 0.75
                ):
        if not os.path.isdir(dataset_dir):
            os.system("git clone https://github.com/JinFree/Recycle_Classification_Dataset.git")
            os.system("rm -rf ./Recycle_Classification_Dataset/.git")
            dataset_dir = os.path.join(os.getcwd(), 'Recycle_Classification_Dataset')
        self.USE_CUDA = torch.cuda.is_available()
        self.DEVICE = torch.device("cuda" if self.USE_CUDA else "cpu")
        self.transform = transforms.Compose([
                transforms.Resize(256)
                , transforms.RandomCrop(224)
                , transforms.ToTensor()
                , transforms.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225])
                ])
        dataset = PyTorch_Classification_Dataset_Class(dataset_dir = dataset_dir, transform = self.transform)
        dataset.__save_label_map__()
        self.num_classes = dataset.__num_classes__()
        train_size = int(train_ratio * len(dataset))
        test_size = len(dataset) - train_size
        train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
        self.train_loader = torch.utils.data.DataLoader(
            train_dataset
            , batch_size=batch_size
            , shuffle=True
        )
        self.test_loader = torch.utils.data.DataLoader(
            test_dataset
            , batch_size=batch_size
            , shuffle=False
        )
        self.model = None
        self.model_str = None

    def prepare_network(self):
        self.model = Model_Class_Transfer_Learning_MobileNet (self.num_classes)
        self.model_str = "PyTorch_Transfer_Learning_MobileNet"
        self.model.to(self.DEVICE)
        self.model_str += ".pt"

    def training_network(self
            , learning_rate = 0.0001
            , epochs = 10
            , step_size = 3
            , gamma = 0.3):
        if self.model is None:
            self.prepare_network(False)
        optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)
        acc = 0.0
        for epoch in range(1, epochs + 1):
            self.model.train()
            for data, target in tqdm(self.train_loader):
                data, target = data.to(self.DEVICE), target.to(self.DEVICE)
                optimizer.zero_grad()
                output = self.model(data)
                loss = F.cross_entropy(output, target)
                loss.backward()
                optimizer.step()
            scheduler.step()
            self.model.eval()
            test_loss = 0
            correct = 0
            with torch.no_grad():
                for data, target in tqdm(self.test_loader):
                    data, target = data.to(self.DEVICE), target.to(self.DEVICE)
                    output = self.model(data)
                    test_loss += F.cross_entropy(output, target, reduction='sum').item()
                    pred = output.max(1, keepdim=True)[1]
                    correct += pred.eq(target.view_as(pred)).sum().item()
            test_loss /= len(self.test_loader.dataset)
            test_accuracy = 100. * correct / len(self.test_loader.dataset)
            print('[{}] Test Loss: {:.4f}, Accuracy: {:.2f}%'.format(epoch, test_loss, test_accuracy))
            if acc < test_accuracy or epoch == epochs:
                acc = test_accuracy
                torch.save(self.model.state_dict(), self.model_str)
                print("model saved!")

### 훈련

훈련을 위한 데이터셋을 다운로드받고 모델을 준비합니다.

약 7분 정도의 시간이 필요합니다.

In [None]:
!wget https://github.com/jetsonai/DeepLearning4Projects/raw/refs/heads/main/Chap5/test_video.mp4

In [None]:
dataset_dir = "/content/Recycle_Classification_Dataset"
batch_size = 64
train_ratio = 0.75
training_class = PyTorch_Classification_Training_Class(dataset_dir = dataset_dir
                                                        , batch_size = batch_size
                                                        , train_ratio = train_ratio)
training_class.prepare_network()

훈련을 수행합니다.

T4 GPU를 사용할 떄 한 epoch에 약 5분이 필요합니다.

In [None]:
training_class.training_network(learning_rate = 0.00001, epochs=10, step_size=3, gamma=0.3)

모든 학습이 끝나면 label_map.txt와 PyTorch_Transfer_Learning_MobileNet.pt 파일을 다운로드할 수 있으며, 배포에는 이 파일이 필요합니다.

### 배포

이미지 분류 모델을 훈련한 후 배포할 때는 모델의 구조와 가중치, 각 출력이 어떤 클래스에 매핑되는지 정보가 필요합니다.

위 코드 중 Model_Class_Transfer_Learning_MobileNet 클래스는 모델의 구조이며, PyTorch_Transfer_Learning_MobileNet.pt 는 가중치입니다. label_map.txt은 모델의 출력이 어떤 클래스에 매핑되는지 정보입니다.

이를 토대로 추론 코드를 작성합니다.

In [None]:
import torch
import cv2
from PIL import Image
import torchvision.transforms as transforms
import numpy as np
import argparse
import cv2

class Inference_Class():
    def __init__(self):
        USE_CUDA = torch.cuda.is_available()
        self.DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
        self.model = None
        self.label_map = None
        self.transform_info = transforms.Compose([
                transforms.Resize(size=(224, 224)),
                transforms.ToTensor()
                ])


    def load_model(self, label_map_file = "label_map.txt"):
        self.label_map = np.loadtxt(label_map_file, str, delimiter='\t')
        num_classes = len(self.label_map)
        self.model = Model_Class_Transfer_Learning_MobileNet(num_classes).to(self.DEVICE)
        model_str = "PyTorch_Transfer_Learning_MobileNet"
        model_str += ".pt"
        self.model.load_state_dict(torch.load(model_str, map_location=self.DEVICE))
        self.model.eval()


    def inference_video(self, video_source="test_video.mp4", save_video="result_video.avi"):
        cap = cv2.VideoCapture(video_source)
        if cap.isOpened():
            print("Video Opened")
        else:
            print("Video Not Opened")
            print("Program Abort")
            exit()
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        out = None
        if save_video is not None:
            fourcc = cv2.VideoWriter_fourcc(*'XVID') #.avi
            out = cv2.VideoWriter(save_video, fourcc, fps, (width, height), True)
        with torch.no_grad():
            while cap.isOpened():
                ret, frame = cap.read()
                if ret:
                    output = self.inference_frame(frame)
                    if out is not None:
                        out.write(output)
                else:
                    break
            cap.release()
            if out is not None:
                out.release()
        return


    def inference_frame(self, opencv_frame):
        opencv_rgb = cv2.cvtColor(opencv_frame, cv2.COLOR_BGR2RGB)
        image = Image.fromarray(opencv_rgb)
        image_tensor = self.transform_info(image)
        image_tensor = image_tensor.unsqueeze(0)
        image_tensor = image_tensor.to(self.DEVICE)
        inference_result = self.model(image_tensor)
        inference_result = inference_result.squeeze()
        inference_result = inference_result.cpu().numpy()
        result_frame = np.copy(opencv_frame)
        label_text = self.label_map[np.argmax(inference_result)]
        label_text += " " + str(inference_result[np.argmax(inference_result)])
        result_frame = cv2.putText(result_frame, label_text, (10, 50), cv2.FONT_HERSHEY_PLAIN, fontScale=2.0, color=(0,0,255), thickness=3)
        return result_frame

추론을 수행합니다.

In [None]:
inferenceClass = Inference_Class()
inferenceClass.load_model()
inferenceClass.inference_video("test_video.mp4", "result_video.avi")

이 코드를 잘 수정하면 카메라에서 실시간으로 프레임을 받아오면서 이미지 분류를 수행하고, 그 결과에 따라 후처리를 수행할 수 있습니다.