In [49]:
import os
import random
from collections import defaultdict
from enum import Enum
from typing import Callable, Tuple, List

import numpy as np
import torch
from PIL import Image
from torch.utils.data import Dataset, Subset, random_split
from torchvision.transforms import Resize, ToTensor, Normalize, Compose, CenterCrop, ColorJitter

IMG_EXTENSIONS = [
    ".jpg", ".JPG", ".jpeg", ".JPEG", ".png",
    ".PNG", ".ppm", ".PPM", ".bmp", ".BMP",
]


def is_image_file(filename):
    return any(filename.endswith(extension) for extension in IMG_EXTENSIONS)


class BaseAugmentation:
    def __init__(self, resize, mean, std, **args):
        self.transform = Compose([
            Resize(resize, Image.BILINEAR),
            ToTensor(),
            Normalize(mean=mean, std=std),
        ])

    def __call__(self, image):
        return self.transform(image)


class AddGaussianNoise(object):
    """
        transform 에 없는 기능들은 이런식으로 __init__, __call__, __repr__ 부분을
        직접 구현하여 사용할 수 있습니다.
    """

    def __init__(self, mean=0., std=1.):
        self.std = std
        self.mean = mean

    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean

    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)


class CustomAugmentation:
    def __init__(self, resize, mean, std, **args):
        self.transform = Compose([
            CenterCrop((420, 256)),
            Resize(resize, Image.BILINEAR),
            ToTensor(),
            Normalize(mean=mean, std=std),
            AddGaussianNoise()
        ])

    def __call__(self, image):
        return self.transform(image)


class MaskLabels(int, Enum):
    MASK = 0
    INCORRECT = 1
    NORMAL = 2


class GenderLabels(int, Enum):
    MALE = 0
    FEMALE = 1

    @classmethod
    def from_str(cls, value: str) -> int:
        value = value.lower()
        if value == "male":
            return cls.MALE
        elif value == "female":
            return cls.FEMALE
        else:
            raise ValueError(f"Gender value should be either 'male' or 'female', {value}")


class AgeLabels(int, Enum):
    YOUNG = 0
    MIDDLE = 1
    OLD = 2

    @classmethod
    def from_number(cls, value: str) -> int:
        try:
            value = int(value)
        except Exception:
            raise ValueError(f"Age value should be numeric, {value}")

        if value < 30:
            return cls.YOUNG
        elif value < 60:
            return cls.MIDDLE
        else:
            return cls.OLD


class MaskBaseDataset(Dataset):
    num_classes = 3 * 2 * 3

    _file_names = {
        "mask1": MaskLabels.MASK,
        "mask2": MaskLabels.MASK,
        "mask3": MaskLabels.MASK,
        "mask4": MaskLabels.MASK,
        "mask5": MaskLabels.MASK,
        "incorrect_mask": MaskLabels.INCORRECT,
        "normal": MaskLabels.NORMAL
    }

    image_paths = []
    mask_labels = []
    gender_labels = []
    age_labels = []

    def __init__(self, data_dir, mean=(0.548, 0.504, 0.479), std=(0.237, 0.247, 0.246), val_ratio=0.2):
        self.data_dir = data_dir
        self.mean = mean
        self.std = std
        self.val_ratio = val_ratio

        self.transform: Callable[[Image.Image], Image.Image] = None
        self.setup()
        self.calc_statistics()

    def setup(self):
        profiles = os.listdir(self.data_dir)
        for profile in profiles:
            if profile.startswith("."):  # "." 로 시작하는 파일은 무시합니다
                continue

            img_folder = os.path.join(self.data_dir, profile)
            for file_name in os.listdir(img_folder):
                _file_name, ext = os.path.splitext(file_name)
                if _file_name not in self._file_names:  # "." 로 시작하는 파일 및 invalid 한 파일들은 무시합니다
                    continue

                img_path = os.path.join(self.data_dir, profile, file_name)  # (resized_data, 000004_male_Asian_54, mask1.jpg)
                mask_label = self._file_names[_file_name]

                id, gender, race, age = profile.split("_")
                gender_label = GenderLabels.from_str(gender)
                age_label = AgeLabels.from_number(age)

                self.image_paths.append(img_path)
                self.mask_labels.append(mask_label)
                self.gender_labels.append(gender_label)
                self.age_labels.append(age_label)

    def calc_statistics(self):
        has_statistics = self.mean is not None and self.std is not None
        if not has_statistics:
            print("[Warning] Calculating statistics... It can take a long time depending on your CPU machine")
            sums = []
            squared = []
            for image_path in self.image_paths[:3000]:
                image = np.array(Image.open(image_path)).astype(np.int32)
                sums.append(image.mean(axis=(0, 1)))
                squared.append((image ** 2).mean(axis=(0, 1)))

            self.mean = np.mean(sums, axis=0) / 255
            self.std = (np.mean(squared, axis=0) - self.mean ** 2) ** 0.5 / 255

    def set_transform(self, transform):
        self.transform = transform

    def __getitem__(self, index):
        assert self.transform is not None, ".set_tranform 메소드를 이용하여 transform 을 주입해주세요"

        image = self.read_image(index)
        mask_label = self.get_mask_label(index)
        gender_label = self.get_gender_label(index)
        age_label = self.get_age_label(index)
        multi_class_label = self.encode_multi_class(mask_label, gender_label, age_label)

        image_transform = self.transform(image)
        return image_transform, multi_class_label

    def __len__(self):
        return len(self.image_paths)

    def get_mask_label(self, index) -> MaskLabels:
        return self.mask_labels[index]

    def get_gender_label(self, index) -> GenderLabels:
        return self.gender_labels[index]

    def get_age_label(self, index) -> AgeLabels:
        return self.age_labels[index]

    def read_image(self, index):
        image_path = self.image_paths[index]
        return Image.open(image_path)

    @staticmethod
    def encode_multi_class(mask_label, gender_label, age_label) -> int:
        return mask_label * 6 + gender_label * 3 + age_label

    @staticmethod
    def decode_multi_class(multi_class_label) -> Tuple[MaskLabels, GenderLabels, AgeLabels]:
        mask_label = (multi_class_label // 6) % 3
        gender_label = (multi_class_label // 3) % 2
        age_label = multi_class_label % 3
        return mask_label, gender_label, age_label

    @staticmethod
    def denormalize_image(image, mean, std):
        img_cp = image.copy()
        img_cp *= std
        img_cp += mean
        img_cp *= 255.0
        img_cp = np.clip(img_cp, 0, 255).astype(np.uint8)
        return img_cp

    def split_dataset(self) -> Tuple[Subset, Subset]:
        """
        데이터셋을 train 과 val 로 나눕니다,
        pytorch 내부의 torch.utils.data.random_split 함수를 사용하여
        torch.utils.data.Subset 클래스 둘로 나눕니다.
        구현이 어렵지 않으니 구글링 혹은 IDE (e.g. pycharm) 의 navigation 기능을 통해 코드를 한 번 읽어보는 것을 추천드립니다^^
        """
        n_val = int(len(self) * self.val_ratio)
        n_train = len(self) - n_val
        train_set, val_set = random_split(self, [n_train, n_val])
        return train_set, val_set


class MaskSplitByProfileDataset(MaskBaseDataset):
    """
        train / val 나누는 기준을 이미지에 대해서 random 이 아닌
        사람(profile)을 기준으로 나눕니다.
        구현은 val_ratio 에 맞게 train / val 나누는 것을 이미지 전체가 아닌 사람(profile)에 대해서 진행하여 indexing 을 합니다
        이후 `split_dataset` 에서 index 에 맞게 Subset 으로 dataset 을 분기합니다.
    """

    def __init__(self, data_dir, mean=(0.548, 0.504, 0.479), std=(0.237, 0.247, 0.246), val_ratio=0.2):
        self.indices = defaultdict(list)
        super().__init__(data_dir, mean, std, val_ratio)

    @staticmethod
    def _split_profile(profiles, val_ratio):
        length = len(profiles)
        n_val = int(length * val_ratio)

        val_indices = set(random.choices(range(length), k=n_val))
        train_indices = set(range(length)) - val_indices
        return {
            "train": train_indices,
            "val": val_indices
        }

    def setup(self):
        profiles = os.listdir(self.data_dir)
        profiles = [profile for profile in profiles if not profile.startswith(".")]
        split_profiles = self._split_profile(profiles, self.val_ratio)

        cnt = 0
        for phase, indices in split_profiles.items():
            for _idx in indices:
                profile = profiles[_idx]
                img_folder = os.path.join(self.data_dir, profile)
                for file_name in os.listdir(img_folder):
                    _file_name, ext = os.path.splitext(file_name)
                    if _file_name not in self._file_names:  # "." 로 시작하는 파일 및 invalid 한 파일들은 무시합니다
                        continue

                    img_path = os.path.join(self.data_dir, profile, file_name)  # (resized_data, 000004_male_Asian_54, mask1.jpg)
                    mask_label = self._file_names[_file_name]

                    id, gender, race, age = profile.split("_")
                    gender_label = GenderLabels.from_str(gender)
                    age_label = AgeLabels.from_number(age)

                    self.image_paths.append(img_path)
                    self.mask_labels.append(mask_label)
                    self.gender_labels.append(gender_label)
                    self.age_labels.append(age_label)

                    self.indices[phase].append(cnt)
                    cnt += 1

    def split_dataset(self) -> List[Subset]:
        train_set = Subset(self, self.indices['train'])
        val_set = Subset(self, self.indices['val'])
        return train_set, val_set


class TestDataset(Dataset):
    def __init__(self, img_paths, resize, mean=(0.548, 0.504, 0.479), std=(0.237, 0.247, 0.246)):
        self.img_paths = img_paths
        self.transform = Compose([
            Resize(resize, Image.BILINEAR),
            ToTensor(),
            Normalize(mean=mean, std=std),
        ])

    def __getitem__(self, index):
        image = Image.open(self.img_paths[index])

        if self.transform:
            image = self.transform(image)
        return image

    def __len__(self):
        return len(self.img_paths)


In [50]:
data_dir = '/opt/ml/input/data/train/images'

In [64]:
dataset = MaskSplitByProfileDataset(
        data_dir=data_dir,
    )

In [55]:
dataset = MaskBaseDataset(
        data_dir=data_dir,
    )

In [74]:
transform = BaseAugmentation(
        resize=[384,512],
        mean=dataset.mean,
        std=dataset.std,
    )



In [75]:
dataset.set_transform(transform)

In [70]:
len(dataset.indices['train'])

15477

In [69]:
len(dataset.indices['val'])

3423

In [78]:
Subset(dataset, dataset.indices['train'])[15476]

(tensor([[[ 1.1957,  1.1957,  1.1957,  ...,  0.1698,  0.1698,  0.1201],
          [ 1.1957,  1.1957,  1.1957,  ...,  0.1698,  0.2029,  0.2194],
          [ 1.1957,  1.1957,  1.1957,  ...,  0.1201,  0.2194,  0.2856],
          ...,
          [-0.9720, -1.5842, -2.0475,  ..., -0.1943, -0.1943, -0.1612],
          [-0.9554, -1.5676, -2.0309,  ..., -0.1943, -0.2108, -0.1943],
          [-0.9720, -1.5842, -2.0475,  ..., -0.1943, -0.2274, -0.2108]],
 
         [[ 1.2778,  1.2778,  1.2778,  ...,  0.3728,  0.3728,  0.3252],
          [ 1.2778,  1.2778,  1.2778,  ...,  0.3728,  0.4045,  0.4204],
          [ 1.2778,  1.2778,  1.2778,  ...,  0.3252,  0.4204,  0.4839],
          ...,
          [-0.6275, -1.2149, -1.6594,  ..., -0.0083, -0.0083,  0.0235],
          [-0.6116, -1.1990, -1.6436,  ..., -0.0083, -0.0241, -0.0083],
          [-0.6275, -1.2149, -1.6436,  ..., -0.0083, -0.0400, -0.0241]],
 
         [[ 1.3049,  1.3049,  1.3049,  ...,  0.2687,  0.2687,  0.2209],
          [ 1.3049,  1.3049,

In [58]:
train_set, val_set = dataset.split_dataset()

In [59]:
dataset.split_dataset()

(<torch.utils.data.dataset.Subset at 0x7f3aad227e50>,
 <torch.utils.data.dataset.Subset at 0x7f3b381673a0>)

In [63]:
import os
from glob import glob
import pandas as pd
import shutil

def isFileChanged(file_path): # check if file changed based on last modified time 
    if os.path.getmtime(file_path) > 1614865992.0: 
        return True 

    return False

def correct_mask_status(data_dir, invalid_list, correct_list):
    print("="*50)
    print("Change Mask Status")

    for folder in invalid_list: 
        image_dir = os.path.join(data_dir, folder)

        incorrect_file = os.path.join(image_dir, correct_list[0])
        normal_file = os.path.join(image_dir, correct_list[1])

        if not isFileChanged(incorrect_file): #last modified time
            temp = os.path.join(image_dir, correct_list[2])

            os.rename(incorrect_file, temp)
            os.rename(normal_file, incorrect_file)
            os.rename(temp, normal_file)  # temp.jpg is not created in this folder

            print("Changed File Names")

        else :
            print("Already Changed")
    
    print("Process Done")



def change_incorrect_gender(incorrect, src, target): 
    changed_path = incorrect.replace(src, target)
    print(f"{incorrect.split('images/')[1]} is changing into {target}")
    shutil.move(incorrect, changed_path)


def correct_gender_status(data_dir, invalid_id_list): 
    df = pd.read_csv('/opt/ml/input/data/train/train.csv')

    path_list = [] # list that contains incorrect file path
    correct_gender_list = [] # list that contains correct gender of incorrect file path

    print("="*50)
    print("Change Gender Status")

    for path in df['path']:
        for pid in invalid_id_list:
            if pid in path:
                path_list.append(path)
                correct_gender_list.append("male" if "female" in path else "female")

    for idx, foldername in enumerate(path_list):
        folder_dir = os.path.join(data_dir, foldername)
        gender = correct_gender_list[idx]

        if gender == "male" and os.path.exists(folder_dir):
            change_incorrect_gender(folder_dir, "female", gender)

        elif gender=="female" and os.path.exists(folder_dir):
            change_incorrect_gender(folder_dir, "male", gender)

    print("Process Done")




def readCurrentFolders(data_dir): # change age task should read current folders
    return sorted(list(filter(lambda p:not p.startswith("."), os.listdir(data_dir))))    


def correct_age_status(data_dir, invalid_age_id):
    current_folders = readCurrentFolders(data_dir)
    idx = 0 # for age dict

    print("="*50)
    print("Change Age Status")

    for folder in current_folders: #sorted folders list
        invalid_path = os.path.join(data_dir, folder)
        invalid_age = folder.split("Asian_")[1]

        if folder.split("_")[0] in invalid_age_id.keys():
            correct_age = str(list(invalid_age_id.values())[idx])
            correct_path = invalid_path.replace(invalid_age, correct_age)

            print(f"{invalid_path.split('images/')[1]} is changing into {correct_path.split('images/')[1]}")
            shutil.move(invalid_path, correct_path)

            idx += 1


    print("Process Done")


if __name__ == "__main__":
    data_dir= "/opt/ml/input/data/train/images"

    mask_status_invalid = ["000020_female_Asian_50", "004418_male_Asian_20", "005227_male_Asian_22"]
    mask_status_name = ["incorrect_mask.jpg", "normal.jpg", "temp.jpg"]

    gender_status_invalid = ["000225", "000664", "000767", "001498-1", "001509", "003113", "003223", "004281", 
    "004432", "005223", "006359", "006360","006361", "006362", "006363", "006364", "006424"]

    age_status_invalid = {"001009" : 20, "001064": 20, "001637":20, "001666":20, "001852":20, "004348": 60}

    correct_mask_status(data_dir, mask_status_invalid, mask_status_name)
    correct_gender_status(data_dir, gender_status_invalid) 
    correct_age_status(data_dir, age_status_invalid)

Change Mask Status
Changed File Names
Already Changed
Already Changed
Process Done
Change Gender Status
Process Done
Change Age Status
001009_female_Asian_20 is changing into 001009_female_Asian_20
001064_female_Asian_20 is changing into 001064_female_Asian_20
001637_female_Asian_20 is changing into 001637_female_Asian_20
001666_female_Asian_20 is changing into 001666_female_Asian_20
001852_male_Asian_20 is changing into 001852_male_Asian_20
004348_male_Asian_60 is changing into 004348_male_Asian_60
Process Done
