In [2]:
import numpy as np
import matplotlib.pyplot as plt

import os
import torch
from torch import nn, optim
from torch.utils.data import Dataset
from torchvision import models, transforms
from PIL import Image
from pathlib import Path

### Pytorch custom 데이터셋 클래스

In [3]:
%%writefile Dataset_Class.py

import os
from torch.utils.data import Dataset
import torchvision.transforms as transforms
from PIL import Image
import shutil

class PyTorch_Classification_Dataset_Class(Dataset):
    def __init__(self
                , dataset_dir
                , transform):
        super().__init__()

        self.image_abs_path = dataset_dir
        self.transform = transform

        self.label_list = os.listdir(self.image_abs_path)
        self.label_list.sort()
        self.x_list = []
        self.y_list = []

        for label_index, label_str in enumerate(self.label_list):
            img_path = os.path.join(self.image_abs_path, label_str)
            img_list = os.listdir(img_path)
            for img in img_list:
                self.x_list.append(os.path.join(img_path, img))
                self.y_list.append(label_index)

    def __len__(self):
        return len(self.x_list)
    
    def __getitem__(self, idx):
        image = Image.open(self.x_list[idx])
        if image.mode != "RGB":
            image = image.convert('RGB')
        # if self.transform is not None:
        image = self.transform(image)
        return image, self.y_list[idx]
    
    def __save_label_map__(self, dst_text_path = "label_map.txt"):
        label_list = self.label_list
        f = open(dst_text_path, 'w')
        for i in range(len(label_list)):
            f.write(label_list[i]+'\n')
        f.close()

    def __num_classes__(self):
        return len(self.label_list)

Writing Dataset_Class.py


### Model from scratch

In [4]:
%%writefile Model_Class_From_the_Scratch.py

import torch
import torch.nn as nn
import torch.nn.functional as F

class MODEL_From_Scratch(nn.Module):
    def __init__(self, num_classes):
        super().__init__()

        self.classifier = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size = 3, stride = 2, padding = 1)
            , nn.BatchNorm2d(32)
            , nn.ReLU()
            , nn.Conv2d(32, 64, kernel_size = 3, stride = 2, padding = 1)
            , nn.BatchNorm2d(64)
            , nn.ReLU()
            , nn.Conv2d(64, 128, kernel_size = 3, stride = 2, padding = 1)
            , nn.BatchNorm2d(128)
            , nn.ReLU()
            , nn.AdaptiveAvgPool2d(1)
            , nn.Flatten()
            , nn.Linear(128, 512)
            , nn.ReLU()
            , nn.Dropout()
            , nn.Linear(512, 64)
            , nn.ReLU()
            , nn.Dropout()
            , nn.Linear(64, num_classes)
            # , nn.Softmax(dim=-1)
        )
    def forward(self, x):
        return self.classifier(x)

Writing Model_Class_From_the_Scratch.py


###  MobileNet class

In [5]:
%%writefile Model_Class_Transfer_Learning_MobileNet.py

import torch
from torchvision import models
import torch.nn as nn
import torch.nn.functional as F

class MobileNet(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        weights = models.MobileNet_V2_Weights.IMAGENET1K_V2
        self.network = models.mobilenet_v2(weights=weights)
        num_ftrs = self.network.classifier[1].in_features
        self.network.classifier[1] = nn.Linear(num_ftrs, num_classes)

    def forward(self, x):
        x = self.network(x)
        # x = self.classifier(x)
        return x

Writing Model_Class_Transfer_Learning_MobileNet.py


### Training class

In [None]:
# %%writefile PyTorch_Classification_Training_Class.py

import os
import torch
import torch.optim as optim
import torchvision.transforms as transforms
import torch.nn.functional as F
from tqdm import tqdm
import shutil

from Model_Class_From_the_Scratch import MODEL_From_Scratch
from Model_Class_Transfer_Learning_MobileNet import MobileNet
from Dataset_Class import PyTorch_Classification_Dataset_Class as Dataset

class PyTorch_Classification_Training_Class():
    def __init__(self
                 , dataset_dir = os.path.join(os.getcwd(), "Recycle_Classification_Dataset")
                , batch_size = 16
                , train_ratio = 0.75
                ):
        
        if not os.path.isdir(dataset_dir):
            os.system("git clone https://github.com/JinFree/Recycle_Classification_Dataset.git")
            shutil.rmtree(os.path.join(os.getcwd(), "Recycle_Classification_Dataset", ".git"))
        self.USE_CUDA = torch.cuda.is_available()
        self.DEVICE = torch.device("cuda" if self.USE_CUDA else "cpu")
        self.transform = transforms.Compose([
                transforms.Resize(256)
                , transforms.RandomCrop(224)
                , transforms.ToTensor()
                , transforms.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225])
                ])
        
        dataset = Dataset(dataset_dir = dataset_dir, transform = self.transform)
        dataset.__save_label_map__()
        self.num_classes = dataset.__num_classes__()
        train_size = int(train_ratio * len(dataset))
        test_size = len(dataset) - train_size
        train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

        self.train_loader = torch.utils.data.DataLoader(
            train_dataset
            , batch_size=batch_size
            , shuffle=True
        )
        self.test_loader = torch.utils.data.DataLoader(
            test_dataset
            , batch_size=batch_size
            , shuffle=False
        )
        self.model = None
        self.model_str = None

    def prepare_network(self, is_scratch = True):
        if is_scratch:
            self.model = MODEL_From_Scratch(self.num_classes)
            self.model_str = "PyTorch_Training_From_Scratch"
        else:
            self.model = MobileNet(self.num_classes)
            self.model_str = "PyTorch_Transfer_Learning_MobileNet"
        self.model.to(self.DEVICE)
        self.model_str += ".pt"

    def training_network(self
            , learning_rate = 0.0001
            , epochs = 10
            , step_size = 3
            , gamma = 0.3):
        if self.model is None:
            self.prepare_network(False)

        optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)
        acc = 0.0

        for epoch in range(1, epochs + 1):
            print(f"Use CUDA = {torch.cuda.is_available()}")
            self.model.train()
            for data, target in tqdm(self.train_loader):
                data, target = data.to(self.DEVICE), target.to(self.DEVICE)
                optimizer.zero_grad()
                output = self.model(data)
                loss = F.cross_entropy(output, target)
                loss.backward()
                optimizer.step()
            scheduler.step()
            self.model.eval()
            test_loss = 0
            correct = 0

            with torch.no_grad():
                for data, target in tqdm(self.test_loader):
                    data, target = data.to(self.DEVICE), target.to(self.DEVICE)
                    output = self.model(data)
                    test_loss += F.cross_entropy(output, target, reduction='sum').item()
                    pred = output.max(1, keepdim=True)[1]
                    correct += (pred == target).float().mean().item()

            test_loss /= len(self.test_loader.dataset)
            test_accuracy = 100. * correct / len(self.test_loader.dataset)
            print('[{}] Test Loss: {:.4f}, Accuracy: {:.2f}%'.format(epoch, test_loss, test_accuracy))
            if acc < test_accuracy:
                acc = test_accuracy
                torch.save(self.model.state_dict(), self.model_str)
                print("model saved!")

if __name__ == "__main__":
    training_class = PyTorch_Classification_Training_Class()
    training_class.prepare_network(True) # Scratch model
    # training_class.prepare_network(False) # MobileNet
    training_class.training_network(learning_rate = 0.00001, epochs=10, step_size=3, gamma=0.3)


In [1]:
%%writefile Inference_Cam.py

import torch
import cv2
from PIL import Image
from torchvision import transforms
import numpy as np
from Model_Class_From_the_Scratch import MODEL_From_Scratch
from Model_Class_Transfer_Learning_MobileNet import MobileNet

class Inference_Class():
    def __init__(self):
        USE_CUDA = torch.cuda.is_available()
        self.DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
        self.model = None
        self.label_map = None
        self.transform_info = transforms.Compose(
                [
                transforms.Resize(size=(224, 224)),
                transforms.ToTensor()
                ])
        
    def load_model(self, is_train_from_scratch, label_map_file = "label_map.txt"):
        self.label_map = np.loadtxt(label_map_file, str, delimiter='\t')
        num_classes = len(self.label_map)
        model_str = None
        if is_train_from_scratch:
            self.model = MODEL_From_Scratch(num_classes).to(self.DEVICE)
            model_str = "PyTorch_Training_From_Scratch"
        else:
            self.model = MobileNet(num_classes).to(self.DEVICE)
            model_str = "PyTorch_Transfer_Learning_MobileNet"
        model_str += ".pt"
        self.model.load_state_dict(torch.load(model_str, map_location=self.DEVICE))
        self.model.eval()

    def inference_video(self, video_source="test_video.mp4"):
        cap = cv2.VideoCapture(video_source)
        if cap.isOpened():
            print("Video Opened")
        else:
            print("Video Not Opened")
            print("Program Abort")
            exit()
        cv2.namedWindow("Output", cv2.WINDOW_GUI_EXPANDED)
        with torch.no_grad():
            while cap.isOpened():
                ret, frame = cap.read()
                if ret:
                    output = self.inference_frame(frame)
                    cv2.imshow("Output", output)
                else:
                    break
                if cv2.waitKey(33) & 0xFF == ord('q'):
                    break
            cap.release()
            cv2.destroyAllWindows()
        return
    
# if __name__ == "__main__":
#     parser = argparse.ArgumentParser()
#     parser.add_argument("-s", "--is_scratch",
#             required=False,
#             action="store_true",
#             help="inference with model trained from the scratch")
#     parser.add_argument("-src", "--source",
#             required=False,
#             type=str,
#             default="./test_video.mp4",
#             help="OpenCV Video source")
#     args = parser.parse_args()
#     is_train_from_scratch = False
#     source = args.source
#     if args.is_scratch:
#         is_train_from_scratch = True
#     inferenceClass = Inference_Class()
#     inferenceClass.load_model(is_train_from_scratch)
#     inferenceClass.inference_video(source)

Writing Inference_Cam.py


### 추론 클래스

In [None]:
from Inference_Cam import Inference_Class

# 클래스를 초기화하고 모델을 불러옵니다.
inferenceClass = Inference_Class()
is_train_from_scratch = False
inferenceClass.load_model(is_train_from_scratch)

In [None]:
from google.colab.patches import cv2_imshow
import cv2

def inference(input_image):
    cv_image = []
    if isinstance(input_image, str):
        cv_image = cv2.imread(input_image, cv2.IMREAD_COLOR)
    else:
        cv_image = np.copy(input_image)
    result_frame, label_text, class_prob = inferenceClass.inference_image(cv_image)
    print("입력 이미지는 {} % 확률로 {}으로 분류됩니다.".format((float)(class_prob) * 100, label_text))
    cv2_imshow(result_frame)

    return result_frame

In [None]:
input_image_path = os.path.join(os.getcwd(), "test_image_1.jpg")
result = inference(input_image_path)