In [1]:
import numpy as np
import os
import torch
from torchvision import transforms
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision.datasets import CIFAR100

---
### Create the pipeline

In [2]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

cifar_train = CIFAR100(os.getcwd(), download=True, transform=transform)
cifar_test = CIFAR100(os.getcwd(), train = False, transform=transform)


"""
-------------------------------------------------------
Map fine label names to coarse label names
"""
def unpickle(file):
    import pickle
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

meta = unpickle('cifar-100-python/meta')
training_set = unpickle('cifar-100-python/train')

# CIFAR-100 label names
fine_label_names = meta[b'fine_label_names']
coarse_label_names = meta[b'coarse_label_names']

label_mappings = []   # mappings from fine label to coarse label
for fine_label, coarse_label in zip(training_set[b'fine_labels'],training_set[b'coarse_labels']):
    if (fine_label, coarse_label) not in label_mappings:
        label_mappings.append((fine_label, coarse_label))
label_mappings.sort()

fine_to_coarse = [label_mapping[1] for label_mapping in label_mappings]   # Also mappings from fine to coarse label but in a list
"""
--------------------------------------------------------
"""


class CustomDataset(Dataset):
    """
    A custom Dataset Class for CIFAR  
    """
    def __init__(self, data, labels, transform = None):
        self.data = torch.tensor(data)
        self.labels = torch.tensor(labels)
        self.transform = transform
    
    def __getitem__(self, index: int):
        return self.data[index], self.labels[index]
    
    def __len__(self):
        return len(self.labels)


def getLabeledDataset(dataset: CIFAR100, label):
    """
    Return CustomDataset instance with labels that match the specified label value
    """ 
    labels = np.array(dataset.targets)
    matched_data = dataset.data[labels == label]
    return CustomDataset(matched_data, [label]*len(matched_data))
    
def getFineLabeledDataset(dataset: CIFAR100, fine_label_name):
    """
    Return CustomDataset instance with labels that match the specified fine label name
    """
    return getLabeledDataset(dataset, fine_label_names.index(fine_label_name))

def mergeDataset(datasets: list):
    """
    Merge CustomDataset instances (to later be used for binary classification)
    """
    merged_data = np.vstack(tuple(dataset.data for dataset in datasets))
    merged_labels = []
    for dataset in datasets:
        merged_labels += dataset.labels
    
    return CustomDataset(merged_data, merged_labels)

def getCoarseLabeledDataset(dataset: CIFAR100, coarse_label_name):
    """
    Return CustomDataset instance with labels that match the specified fine label name
    Fine all the fine labels (classes) within coarse label (superclass) and merge all of the former datasets
    """
    coarse_label_index = coarse_label_names.index(coarse_label_name)
    
    # list of matched fine label values(int)
    matched_labels  = \
    [fine_label_name for fine_label_name, coarse_label_number in enumerate(fine_to_coarse) if coarse_label_number ==  coarse_label_index]
    return mergeDataset(list(getLabeledDataset(dataset, matched_label) for matched_label in matched_labels))

Files already downloaded and verified


In [3]:
# Test
worm_dataset = getFineLabeledDataset(cifar_train,b'worm')
aquarium_fish_dataset = getFineLabeledDataset(cifar_train,b'aquarium_fish')
worm_aquarium_fish_dataset = mergeDataset([worm_dataset, aquarium_fish_dataset])

In [4]:
print(worm_aquarium_fish_dataset[0])
print(len(worm_aquarium_fish_dataset))

(tensor([[[ 48,  48,  48],
         [ 36,  36,  36],
         [ 45,  45,  45],
         ...,
         [  0,   0,   0],
         [  1,   1,   1],
         [  1,   1,   1]],

        [[ 14,  14,  14],
         [  4,   4,   4],
         [  4,   4,   4],
         ...,
         [  0,   0,   0],
         [  0,   0,   0],
         [  1,   1,   1]],

        [[  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0],
         ...,
         [  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0]],

        ...,

        [[  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0],
         ...,
         [ 99,  99,  99],
         [206, 206, 206],
         [125, 125, 125]],

        [[  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0],
         ...,
         [ 21,  21,  21],
         [115, 115, 115],
         [ 91,  91,  91]],

        [[  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0],
         ...,
         [  0,   0,   0],
       

In [5]:
coarse_label_names[0]

b'aquatic_mammals'

In [6]:
# Coarse label (superclass) 'aquatic mammals' dataset 
aquatic_mammals = getCoarseLabeledDataset(cifar_test, b'aquatic_mammals')

In [7]:
# set of all fine label value (int: 0->99) belongs to 'aquatic mammals' coarse label
print(aquatic_mammals.labels.unique())

# name of all fine labels in 'aquatic mammals'
print('\nClasses inside superclass \'aquatic mammals\'')
print(list(fine_label_names[int(i)] for i in aquatic_mammals.labels.unique()))

tensor([ 4, 30, 55, 72, 95])

Classes inside superclass 'aquatic mammals'
[b'beaver', b'dolphin', b'otter', b'seal', b'whale']


----
### Building Household Model  

In [8]:
# Get the training set
furniture_train = getCoarseLabeledDataset(cifar_train, b'household_furniture')
furniture_train.labels = torch.ones(len(furniture_train))
electrical_train = getCoarseLabeledDataset(cifar_train, b'household_electrical_devices')
electrical_train.labels = torch.zeros(len(electrical_train))
#print(len(furniture_train))
#print(len(electrical_train))

In [9]:
# For simplicity sake, we take the first of 125 (5%) of furniture and 2375 (95%) of electrical devices
household_train = mergeDataset([\
    CustomDataset(furniture_train.data[:125], furniture_train.labels[:125]),
    CustomDataset(electrical_train.data[:2375], electrical_train.labels[:2375])
])

  self.data = torch.tensor(data)
  self.labels = torch.tensor(labels)


In [10]:
household_train.labels.unique()

tensor([0., 1.])

In [11]:
# testing set
furniture_test = getCoarseLabeledDataset(cifar_test, b'household_furniture')
furniture_test.labels = torch.ones(len(furniture_test))
electrical_test = getCoarseLabeledDataset(cifar_test, b'household_electrical_devices')
electrical_test.labels = torch.zeros(len(electrical_test))
household_test = mergeDataset([furniture_test, electrical_test])

In [12]:
household_test

<__main__.CustomDataset at 0x7fb6fc7d0640>

In [13]:
household_train.labels.count_nonzero()

tensor(125)

In [58]:
dataset_loader=torch.utils.data.DataLoader(household_train,batch_size=100,shuffle=False,num_workers=4)

In [59]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

Using cuda device


In [82]:
class CIFARtrainer(nn.Module):
    def __init__(self):
        super(CIFARtrainer,self).__init__()
        self.flatten=nn.Flatten()
        self.dnn1=nn.Sequential(
            nn.Linear(in_features=3072,out_features=1000),
            nn.Linear(in_features=1000,out_features=300),
            nn.Linear(in_features=300,out_features=1),
            nn.Sigmoid()
        )
    def forward(self,x):
        x=self.flatten(x)
        x=self.dnn1(x)
        return x

In [88]:
def mfe(output1,target1):
    out1=[]
    tar1=[]
    out2=[]
    tar2=[]
    for i in range(len(target1)): 
        if (coarse_label_names[output1[i]]==0 and coarse_label_names[target1[i]]==0):
            out1.append(output1[i])
            tar1.append(target1[i])
        if (coarse_label_names[output1[i]]==1 and coarse_label_names[target1[i]]==1):
            out2.append(output1[i])
            tar2.append(target1[i])
    out1=torch.totensor(out1)
    tar1=torch.totensor(tar1)
    out2=torch.totensor(out2)
    tar2=torch.totensor(tar2)
    fne = torch.mean((out1 - tar1)**2)
    fpe = torch.mean((out2 - tar2)**2)
    return fpe+fne
def msfe(output,target):
    out1=[]
    tar1=[]
    out2=[]
    tar2=[]
    for i in range(len(target)): 
        if (coarse_label_names[output1[i]]==0 and coarse_label_names[target1[i]]==0):
            out1.append(output1[i])
            tar1.append(target1[i])
        if (coarse_label_names[output1[i]]==1 and coarse_label_names[target1[i]]==1):
            out2.append(output1[i])
            tar2.append(target1[i])
    out1=torch.totensor(out1)
    tar1=torch.totensor(tar1)
    out2=torch.totensor(out2)
    tar2=torch.totensor(tar2)
    fne = torch.mean((out1 - tar1)**2)
    fpe = torch.mean((out2 - tar2)**2)
    return fpe**2+fne**2

In [89]:
TRAINER=CIFARtrainer().float().to(device="cuda")
#loss_criterion = mfe()

optimizer= torch.optim.Adam(TRAINER.parameters(),lr=0.0008)
#start training the model
for epoch in range(50):    
    for X,y in dataset_loader:
        #print(X.shape)
        #move data to cuda for operating
        X=X.to(device="cuda")
        y=y.to(device="cuda")
        predict=TRAINER(X.float())
        loss=mfe(predict,y.float())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"Loss for current epoch",epoch,":",loss)
    print("pred:{},y:{}".format(predict.shape,y.shape))

TypeError: only integer tensors of a single element can be converted to an index