In [2]:
import os
import random
import time
import json
import warnings 
warnings.filterwarnings('ignore')

import shutil
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import cv2

import numpy as np
import pandas as pd
from tqdm import tqdm

from pycocotools.coco import COCO

import albumentations as A
from albumentations.pytorch import ToTensorV2

In [3]:
dataset_path = '/opt/ml/input/data'

In [4]:
def get_classname(classID, cats):
    for i in range(len(cats)):
        if cats[i]['id']==classID:
            return cats[i]['name']
    return "None"

In [5]:
category_names = [
    "Background","General trash","Paper","Paper pack","Metal",
    "Glass","Plastic","Styrofoam","Plastic bag","Battery","Clothing",
]

In [6]:
class CustomDataLoader(Dataset):
    """COCO format"""
    def __init__(self, data_dir, mode = 'train', transform = None):
        super().__init__()
        self.mode = mode
        self.transform = transform
        self.coco = COCO(data_dir)

    def __getitem__(self, index: int):
        # dataset이 index되어 list처럼 동작
        image_id = self.coco.getImgIds(imgIds=index)
        image_infos = self.coco.loadImgs(image_id)[0]
        
        # cv2 를 활용하여 image 불러오기
        images = cv2.imread(os.path.join(dataset_path, image_infos['file_name']))
        images = cv2.cvtColor(images, cv2.COLOR_BGR2RGB).astype(np.float32)
        images /= 255.0
        
        if (self.mode in ('train', 'val')):
            ann_ids = self.coco.getAnnIds(imgIds=image_infos['id'])
            anns = self.coco.loadAnns(ann_ids)

            # Load the categories in a variable
            cat_ids = self.coco.getCatIds()
            cats = self.coco.loadCats(cat_ids)

            # masks : size가 (height x width)인 2D
            # 각각의 pixel 값에는 "category id" 할당
            # Background = 0
            masks = np.zeros((image_infos["height"], image_infos["width"]))
            # General trash = 1, ... , Cigarette = 10
            anns = sorted(anns, key=lambda idx : idx['area'], reverse=True)
            for i in range(len(anns)):
                className = get_classname(anns[i]['category_id'], cats)
                pixel_value = category_names.index(className)
                masks[self.coco.annToMask(anns[i]) == 1] = pixel_value
            masks = masks.astype(np.int8)
                        
            # transform -> albumentations 라이브러리 활용
            if self.transform is not None:
                transformed = self.transform(image=images, mask=masks)
                images = transformed["image"]
                masks = transformed["mask"]
            return images, masks, image_infos
        
        if self.mode == 'test':
            # transform -> albumentations 라이브러리 활용
            if self.transform is not None:
                transformed = self.transform(image=images)
                images = transformed["image"]
            return images, image_infos
    
    def __len__(self) -> int:
        # 전체 dataset의 size를 return
        return len(self.coco.getImgIds())

In [7]:
# collate_fn needs for batch
def collate_fn(batch):
    return tuple(zip(*batch))

In [8]:
train_transform = A.Compose([
                            ToTensorV2()
                            ])

val_transform = A.Compose([
                          ToTensorV2()
                          ])

test_transform = A.Compose([
                           ToTensorV2()
                           ])

In [84]:
def dir_check():
    seg_dir = '/opt/ml/input/data/mmseg'
    if not os.path.isdir(seg_dir):
        os.mkdir(seg_dir)

    ann_dir = '/opt/ml/input/data/mmseg/ann_dir'
    if not os.path.isdir(ann_dir):
        os.mkdir(ann_dir)

    img_dir = '/opt/ml/input/data/mmseg/img_dir'
    if not os.path.isdir(img_dir):
        os.mkdir(img_dir)

In [85]:
dir_check()

In [86]:
def convert_annotation(json_file):
    json_path = f'/opt/ml/input/data/{json_file}'
    train_dataset = CustomDataLoader(data_dir=json_path, mode='train', transform=train_transform)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset, 
                                           batch_size=1,
                                           shuffle=False,
                                           num_workers=4,
                                           collate_fn=collate_fn)

    fold_name = json_file.split('.')[0]
    save_dir = f'/opt/ml/input/data/mmseg/ann_dir/{fold_name}'
    if not os.path.isdir(save_dir):
        os.mkdir(save_dir)

    
    for imgs, masks, image_infos in (train_loader):
        image_infos = image_infos[0]
        temp_masks = masks[0].numpy()
        # numpy로 바꿔서 저장

        # print(image_infos['id'])
        cv2.imwrite(os.path.join(save_dir, f"{image_infos['id']:04}.png"), temp_masks)
    print(f"{fold_name} annotation Done")

In [87]:
def copy_image(json_file):
    fold_name = json_file.split('.')[0]

    save_dir = f'/opt/ml/input/data/mmseg/img_dir/{fold_name}'
    if not os.path.isdir(save_dir):
        os.mkdir(save_dir)

    json_path = os.path.join('/opt/ml/input/data', json_file)
    with open(json_path, 'r', encoding='UTF-8') as fold_json:
        data = json.load(fold_json)
        fold_images = data['images']

    for image in fold_images:
        shutil.copyfile(
            os.path.join('/opt/ml/input/data', image['file_name']),
            os.path.join(f'/opt/ml/input/data/mmseg/img_dir/{fold_name}', f"{image['id']:04}.jpg")
        )
    print(f"{fold_name} image Done")

In [88]:
def copy_test_image(json_file):
    fold_name = json_file.split('.')[0]

    save_dir = f'/opt/ml/input/data/mmseg/img_dir/{fold_name}'
    if not os.path.isdir(save_dir):
        os.mkdir(save_dir)

    json_path = os.path.join('/opt/ml/input/data', json_file)
    with open(json_path, 'r', encoding='UTF-8') as fold_json:
        data = json.load(fold_json)
        fold_images = data['images']

    for image in fold_images:
        shutil.copyfile(
            os.path.join('/opt/ml/input/data', image['file_name']),
            os.path.join(f'/opt/ml/input/data/mmseg/img_dir/test', f"{image['id']:04}.jpg")
        )
    print(f"{fold_name} Done")

In [89]:
for filename in os.listdir(os.path.join(dataset_path)):
    
    if filename.startswith('K-fold'):
        print(filename, "시작")
        convert_annotation(filename)
        copy_image(filename)

    elif filename.startswith('test'):
        print(filename, "시작")
        copy_image(filename)

test.json 시작
test image Done
K-fold_train2.json 시작
loading annotations into memory...
Done (t=4.53s)
creating index...
index created!
K-fold_train2 annotation Done
K-fold_train2 image Done
K-fold_val1.json 시작
loading annotations into memory...
Done (t=0.78s)
creating index...
index created!
K-fold_val1 annotation Done
K-fold_val1 image Done
K-fold_train5.json 시작
loading annotations into memory...
Done (t=3.90s)
creating index...
index created!
K-fold_train5 annotation Done
K-fold_train5 image Done
K-fold_val5.json 시작
loading annotations into memory...
Done (t=0.76s)
creating index...
index created!
K-fold_val5 annotation Done
K-fold_val5 image Done
K-fold_val2.json 시작
loading annotations into memory...
Done (t=0.83s)
creating index...
index created!
K-fold_val2 annotation Done
K-fold_val2 image Done
K-fold_val3.json 시작
loading annotations into memory...
Done (t=0.76s)
creating index...
index created!
K-fold_val3 annotation Done
K-fold_val3 image Done
K-fold_train3.json 시작
loading annot