-
Notifications
You must be signed in to change notification settings - Fork 1
/
imbalanced_sampler.py
73 lines (61 loc) · 2.76 KB
/
imbalanced_sampler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import torch
import torch.utils.data
import torchvision
from change_dataset import ChangeDataset, ChangeDataset_synthetic
class ImbalancedDatasetSampler(torch.utils.data.sampler.Sampler):
"""Samples elements randomly from a given list of indices for imbalanced dataset
Arguments:
indices (list, optional): a list of indices
num_samples (int, optional): number of samples to draw
callback_get_label func: a callback-like function which takes two arguments - dataset and index
"""
def __init__(self, dataset, indices=None, num_samples=None, callback_get_label=None):
self.dataset = dataset
# if indices is not provided,
# all elements in the dataset will be considered
self.indices = list(range(len(dataset))) \
if indices is None else indices
# define custom callback
self.callback_get_label = callback_get_label
# if num_samples is not provided,
# draw `len(indices)` samples in each iteration
self.num_samples = len(self.indices) \
if num_samples is None else num_samples
# distribution of classes in the dataset
label_to_count = {}
for idx in self.indices:
label = self._get_label(dataset, idx)
if label in label_to_count:
label_to_count[label] += 1
else:
label_to_count[label] = 1
self.label_to_count = label_to_count
self.cal_weights()
def cal_weights(self):
# weight for each sample
weights = [1.0 / self.label_to_count[self._get_label(self.dataset, idx)]
for idx in self.indices]
self.weights = torch.DoubleTensor(weights)
def set_sampler_like(self, sampler):
self.label_to_count = sampler.label_to_count
self.cal_weights()
def _get_label(self, dataset, idx):
if self.callback_get_label:
return self.callback_get_label(dataset, idx)
elif isinstance(dataset, torchvision.datasets.MNIST):
return dataset.train_labels[idx].item()
elif isinstance(dataset, torchvision.datasets.ImageFolder):
return dataset.imgs[idx][1]
elif isinstance(dataset, torch.utils.data.Subset):
return dataset.dataset.imgs[idx][1]
elif isinstance(dataset, ChangeDataset):
return dataset[idx].y.tolist()[0]
elif isinstance(dataset, ChangeDataset_synthetic):
return dataset[idx].y.tolist()[0]
else:
raise NotImplementedError
def __iter__(self):
return (self.indices[i] for i in torch.multinomial(
self.weights, self.num_samples, replacement=True))
def __len__(self):
return self.num_samples