In [6]:
from abc import ABC, abstractmethod
from torch.utils.data import Subset
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data import ConcatDataset
from sklearn.model_selection import train_test_split

import torch
import scipy.io
import numpy as np

In [9]:
# #########################################################################
# 0. Base Loader
# #########################################################################
class BaseLoader(ABC):
    def __init__(self):
        super().__init__()
        self.train_set = None  # must be of type torch.utils.data.Dataset
        self.test_set = None  # must be of type torch.utils.data.Dataset

    @abstractmethod
    def loaders(self,
                batch_size: int,
                shuffle_train=True,
                shuffle_test=False,
                num_workers: int = 0) -> (DataLoader, DataLoader):
        """Implement data loaders of type torch.utils.data.DataLoader for train_set and test_set."""
        pass

    def __repr__(self):
        return self.__class__.__name__

In [17]:
# #########################################################################
# 1. Dataset for Training
# #########################################################################
class SatimageDataset(Dataset):
    def __init__(self,
                 root: str='../data/satimage-2.mat',
                 label_abnormal: tuple=(),  # If unsupervised, do not specify
                 train: bool=True,
                 split: float=0.2,
                 random_state: int=42):
        super(Dataset, self).__init__()

        # Initialization
        self.root = root
        self.label_abnormal = label_abnormal
        
        # Load data
        mat = scipy.io.loadmat(root)
        
        X = mat['X']
        y = mat['y'].reshape(-1)           
        
        if not label_abnormal:
            X = X[y == 0]
            y = y[y == 0]
        
        # Train-test split
        X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                            test_size=split,
                                                            random_state=random_state,
                                                            stratify=y)
        
        if train:
            self.X = torch.tensor(X_train, dtype=torch.float32)
            self.y = torch.tensor(y_train, dtype=torch.float32)
        else:
            self.X = torch.tensor(X_test, dtype=torch.float32)
            self.y = torch.tensor(y_test, dtype=torch.float32)

    def __getitem__(self, index):
        sample, target = self.X[index], int(self.y[index])
        return sample, target, index

    def __len__(self):
        return len(self.X)

In [18]:
kk = SatimageDataset()

In [11]:
# #########################################################################
# 2. Loader for Training
# #####################################uu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Tu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Teu,Te

In [None]:
kk = SatimageDataset()

In [None]:
class SatimageLoader(BaseDataset):
    def __init__(self,
                 root: str='../data/satimage-2.mat',
                 label_normal: tuple=(0,),
                 label_abnormal: tuple=(),  # If unsupervised, do not specify
                 ratio_abnormal: float=0.1,
                 split: float=0.2,
                 random_state: int=42):
        super().__init__(root)

        # Initialization
        self.root = root
        self.label_normal = label_normal
        self.label_abnormal = label_abnormal
        self.ratio_abnormal = ratio_abnormal
        
        # Load data
        mat = scipy.io.loadmat(root)
        X = mat['X']
        y = mat['y'].reshape(-1)
        
        # Train-test split
        X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                            test_size=split,
                                                            random_state=random_state,
                                                            stratify=y)
        
        if train:
            self.X = torch.tensor(X_train, dtype=torch.float32)
            self.y = torch.tensor(y_train, dtype=torch.float32)
        else:
            self.X = torch.tensor(X_test, dtype=torch.float32)
            self.y = torch.tensor(y_test, dtype=torch.float32)
        

        # Read in initial Full Set
        # Add in download=True if you haven't downloaded yet
        print('Loading dataset for you!')
        train_set = CIFAR10Dataset(root=root, train=True, transform=transforms.ToTensor(), download=True)
        test_set = CIFAR10Dataset(root=root, train=False, transform=transforms.ToTensor(), download=True)
        print('Almost loaded!')

        # Get the labels for classes intended to use
        y_train = np.array(train_set.targets)
        y_test = np.array(test_set.targets)

        # Get the indices for classes intended to use
        train_idx = self.get_idx(y_train, label_normal, label_abnormal, ratio_abnormal, True)
        test_idx = self.get_idx(y_test, label_normal, label_abnormal, ratio_abnormal, False)

        # Get the subset
        self.train_set = Subset(train_set, train_idx)
        self.test_set = Subset(test_set, test_idx)


    def loaders(self,
                batch_size: int,
                shuffle_train=True,
                shuffle_test=False,
                num_workers: int = 0) -> (DataLoader, DataLoader):
        train_loader = DataLoader(dataset=self.train_set,
                                  batch_size=batch_size,
                                  shuffle=shuffle_train,
                                  num_workers=num_workers,
                                  drop_last=True)
        test_loader = DataLoader(dataset=self.test_set,
                                 batch_size=batch_size,
                                 shuffle=shuffle_test,
                                 num_workers=num_workers,
                                 drop_last=False)
        return train_loader, test_loader


In [1]:
"""
Title: cifar10_loader.py
Description: The loader classes for the FashionMNIST datasets
Author: Lek'Sai Ye, University of Chicago
"""

from PIL import Image
from abc import ABC, abstractmethod
from torch.utils.data import Subset
from torch.utils.data import DataLoader
from torch.utils.data import ConcatDataset
from torchvision.datasets import FashionMNIST
from torchvision.datasets import CIFAR10

import torch
import torchvision
import numpy as np
import torchvision.transforms as transforms


# #########################################################################
# 1. Base Dataset
# #########################################################################
class BaseDataset(ABC):
    def __init__(self, root: str):
        super().__init__()

        self.root = root
        self.label_normal = ()
        self.label_abnormal = ()
        self.train_set = None
        self.test_set = None

    @abstractmethod
    def loaders(self,
                batch_size: int,
                shuffle_train=True,
                shuffle_test=False,
                num_workers: int = 0):
        pass

    def __repr__(self):
        return self.__class__.__name__


# #########################################################################
# 2. CIFAR10 Dataset
# #########################################################################
class CIFAR10Dataset(CIFAR10):
    """
    Add an index to get item.
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __getitem__(self, index):
        img, target = self.data[index], int(self.targets[index])
        img = Image.fromarray(img)
        transform = transforms.ToTensor()
        img = transform(img)
        return img, int(target), index


# #########################################################################
# 3. CIFAR10 Loader for Training
# #########################################################################
class CIFAR10Loader(BaseDataset):
    def __init__(self,
                 root: str='/net/leksai/data/CIFAR10',
                 label_normal: tuple=(0,),
                 label_abnormal: tuple=(),  # If unsupervised, do not specify
                 ratio_abnormal: float=0.1):
        super().__init__(root)

        # Initialization
        self.root = root
        self.label_normal = label_normal
        self.label_abnormal = label_abnormal
        self.ratio_abnormal = ratio_abnormal

        # Read in initial Full Set
        # Add in download=True if you haven't downloaded yet
        print('Loading dataset for you!')
        train_set = CIFAR10Dataset(root=root, train=True, transform=transforms.ToTensor(), download=True)
        test_set = CIFAR10Dataset(root=root, train=False, transform=transforms.ToTensor(), download=True)
        print('Almost loaded!')

        # Get the labels for classes intended to use
        y_train = np.array(train_set.targets)
        y_test = np.array(test_set.targets)

        # Get the indices for classes intended to use
        train_idx = self.get_idx(y_train, label_normal, label_abnormal, ratio_abnormal, True)
        test_idx = self.get_idx(y_test, label_normal, label_abnormal, ratio_abnormal, False)

        # Get the subset
        self.train_set = Subset(train_set, train_idx)
        self.test_set = Subset(test_set, test_idx)

    def get_idx(self, y, label_normal, label_abnormal, ratio_abnormal, train):
        """
        Creat a numpy list of indices of label_ in labels.
        Inputs:
            y (np.array): dataset.targets.cpu().data.numpy()
            label_normal (tuple): e.g. (0,)
            label_abnormal (tuple): e.g. (1,)
            ratio_abnormal (float): e.g. 0.1
            train (bool): True / False
        """
        idx_normal = np.argwhere(np.isin(y, label_normal)).flatten()

        if label_abnormal:
            idx_abnormal = np.argwhere(np.isin(y, label_abnormal)).flatten()
            np.random.shuffle(idx_abnormal)
            if train:
                idx_abnormal = idx_abnormal[:int(len(idx_abnormal) * ratio_abnormal)]
            idx_all = np.hstack((idx_normal, idx_abnormal))
        else:
            idx_all = idx_normal
        return idx_all

    def loaders(self,
                batch_size: int,
                shuffle_train=True,
                shuffle_test=False,
                num_workers: int = 0) -> (DataLoader, DataLoader):
        train_loader = DataLoader(dataset=self.train_set,
                                  batch_size=batch_size,
                                  shuffle=shuffle_train,
                                  num_workers=num_workers,
                                  drop_last=True)
        test_loader = DataLoader(dataset=self.test_set,
                                 batch_size=batch_size,
                                 shuffle=shuffle_test,
                                 num_workers=num_workers,
                                 drop_last=False)
        return train_loader, test_loader


# #########################################################################
# 4. CIFAR10 Loader for Evaluation
# #########################################################################
class CIFAR10LoaderEval(BaseDataset):
    def __init__(self,
                 root: str='/net/leksai/data/CIFAR10',
                 label: tuple=(),
                 test_eval: bool=False):
        super().__init__(root)

        # Initialization
        self.root = root
        self.label = label

        # Read in initial Full Set
        # Add in download=True if you haven't downloaded yet
        train_set = CIFAR10Dataset(root=root, train=True, transform=transforms.ToTensor(), download=True)
        test_set = CIFAR10Dataset(root=root, train=False, transform=transforms.ToTensor(), download=True)

        # Get the labels for classes intended to use
        y_train = np.array(train_set.targets)
        y_test = np.array(test_set.targets)

        # Get the indices for classes intended to use
        train_idx = self.get_idx(y_train, label)
        test_idx = self.get_idx(y_test, label)

        # Get the subset
        train_set = Subset(train_set, train_idx)
        test_set = Subset(test_set, test_idx)
        if test_eval:
            self.all_set = test_set
        else:
            self.all_set = ConcatDataset((train_set, test_set))

    def get_idx(self, y, label):
        """
        Creat a numpy list of indices of label_ in labels.
        Inputs:
            y (np.array): dataset.targets.cpu().data.numpy()
            label (tuple): e.g. (0,)
        """
        return np.argwhere(np.isin(y, label)).flatten()

    def loaders(self,
                batch_size: int,
                shuffle=False,
                num_workers: int = 0):
        all_loader = DataLoader(dataset=self.all_set,
                                batch_size=batch_size,
                                shuffle=shuffle,
                                num_workers=num_workers,
                                drop_last=False)
        return all_loader
