-
Notifications
You must be signed in to change notification settings - Fork 5.6k
/
batch_sampler.py
344 lines (274 loc) · 12.8 KB
/
batch_sampler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import division
import numpy as np
import math
from .sampler import Sampler, SequenceSampler, RandomSampler
from .dataset import Dataset, IterableDataset
__all__ = ["BatchSampler", "DistributedBatchSampler"]
class BatchSampler(Sampler):
"""
A base implement of batch sampler used by `paddle.io.DataLoader`
which yield mini-batch indices(a list/tuple with length as
mini-batch size and holds sample indices) iterably.
Batch sampler used by :code:`paddle.io.DataLoader` should be a subclass
of :code:`paddle.io.BatchSampler`, BatchSampler subclasses should
implement following methods:
:code:`__iter__`: return mini-batch indices iterably.
:code:`__len__`: get mini-batch number in an epoch.
Args:
dataset(Dataset): this could be a :code:`paddle.io.Dataset`
implement or other python object which implemented
:code:`__len__` for BatchSampler to get indices as the
range of :attr:`dataset` length. Default None.
sampler (Sampler): this could be a :code:`paddle.io.Dataset`
instance which implemented :code:`__iter__` to yield
sample indices. :attr:`sampler` and :attr:`dataset`
can not be set in the same time. If :attr:`sampler`
is set, :attr:`shuffle` should not be set. Default None.
shuffle(bool): whether to shuffle indices order before genrating
batch indices. Default False.
batch_size(int): sample indice number in a mini-batch indices.
drop_last(bool): whether drop the last incomplete batch dataset size
is not divisible by the batch size. Default False
Returns:
BatchSampler: an iterable object for indices iterating
Examples:
.. code-block:: python
from paddle.io import RandomSampler, BatchSampler, Dataset
# init with dataset
class RandomDataset(Dataset):
def __init__(self, num_samples):
self.num_samples = num_samples
def __getitem__(self, idx):
image = np.random.random([784]).astype('float32')
label = np.random.randint(0, 9, (1, )).astype('int64')
return image, label
def __len__(self):
return self.num_samples
bs = BatchSampler(dataset=RandomDataset(100),
shuffle=False,
batch_size=16,
drop_last=False)
for batch_indices in bs:
print(batch_indices)
# init with sampler
sampler = RandomSampler(RandomDataset(100))
bs = BatchSampler(sampler=sampler,
batch_size=8,
drop_last=True)
for batch_indices in bs:
print(batch_indices)
see `paddle.io.DataLoader`
"""
def __init__(self,
dataset=None,
sampler=None,
shuffle=False,
batch_size=1,
drop_last=False):
if dataset is None:
assert sampler is not None, \
"either dataset or sampler should be set"
assert isinstance(sampler, Sampler), \
"sampler should be a paddle.io.Sampler, but got {}".format(type(sampler))
assert not shuffle, "shuffle should be False when sampler is set"
self.sampler = sampler
else:
assert not isinstance(dataset, IterableDataset), \
"dataset should not be a paddle.io.IterableDataset"
assert sampler is None, \
"should not set both dataset and sampler"
assert isinstance(shuffle, bool), \
"shuffle should be a boolean value, but got {}".format(type(shuffle))
if shuffle:
self.sampler = RandomSampler(dataset)
else:
self.sampler = SequenceSampler(dataset)
assert isinstance(batch_size, int) and batch_size > 0, \
"batch_size should be a positive integer, but got {}".format(batch_size)
self.batch_size = batch_size
assert isinstance(drop_last, bool), \
"drop_last should be a boolean value, but got {}".format(type(drop_last))
self.drop_last = drop_last
def __iter__(self):
batch_indices = []
for idx in self.sampler:
batch_indices.append(idx)
if len(batch_indices) == self.batch_size:
yield batch_indices
batch_indices = []
if not self.drop_last and len(batch_indices) > 0:
yield batch_indices
def __len__(self):
num_samples = len(self.sampler)
num_samples += int(not self.drop_last) * (self.batch_size - 1)
return num_samples // self.batch_size
class _InfiniteIterableSampler(object):
def __init__(self, dataset, batch_size=1):
assert isinstance(
dataset, IterableDataset
), "dataset should be an instance of paddle.io.IterableDataset"
self.dataset = dataset
self.batch_size = batch_size
def __iter__(self):
while True:
yield [None] * self.batch_size
class DistributedBatchSampler(BatchSampler):
"""Sampler that restricts data loading to a subset of the dataset.
In such case, each process can pass a DistributedBatchSampler instance
as a DataLoader sampler, and load a subset of the original dataset that
is exclusive to it.
.. note::
Dataset is assumed to be of constant size.
Args:
dataset(paddle.io.Dataset): this could be a `paddle.io.Dataset` implement
or other python object which implemented
`__len__` for BatchSampler to get sample
number of data source.
batch_size(int): sample indice number in a mini-batch indices.
num_replicas(int, optional): porcess number in distributed training.
If :attr:`num_replicas` is None, :attr:`num_replicas` will be
retrieved from :code:`paddle.distributed.ParallenEnv`.
Default None.
rank(int, optional): the rank of the current process among :attr:`num_replicas`
processes. If :attr:`rank` is None, :attr:`rank` is retrieved from
:code:`paddle.distributed.ParallenEnv`. Default None.
shuffle(bool): whther to shuffle indices order before genrating
batch indices. Default False.
drop_last(bool): whether drop the last incomplete batch dataset size
is not divisible by the batch size. Default False
Examples:
.. code-block:: python
import numpy as np
from paddle.io import Dataset, DistributedBatchSampler
# init with dataset
class RandomDataset(Dataset):
def __init__(self, num_samples):
self.num_samples = num_samples
def __getitem__(self, idx):
image = np.random.random([784]).astype('float32')
label = np.random.randint(0, 9, (1, )).astype('int64')
return image, label
def __len__(self):
return self.num_samples
dataset = RandomDataset(100)
sampler = DistributedBatchSampler(dataset, batch_size=64)
for data in sampler:
# do something
break
"""
def __init__(self,
dataset,
batch_size,
num_replicas=None,
rank=None,
shuffle=False,
drop_last=False):
self.dataset = dataset
assert isinstance(batch_size, int) and batch_size > 0, \
"batch_size should be a positive integer"
self.batch_size = batch_size
assert isinstance(shuffle, bool), \
"shuffle should be a boolean value"
self.shuffle = shuffle
assert isinstance(drop_last, bool), \
"drop_last should be a boolean number"
from paddle.fluid.dygraph.parallel import ParallelEnv
if num_replicas is not None:
assert isinstance(num_replicas, int) and num_replicas > 0, \
"num_replicas should be a positive integer"
self.nranks = num_replicas
else:
self.nranks = ParallelEnv().nranks
if rank is not None:
assert isinstance(rank, int) and rank >= 0, \
"rank should be a non-negative integer"
self.local_rank = rank
else:
self.local_rank = ParallelEnv().local_rank
self.drop_last = drop_last
self.epoch = 0
self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.nranks))
self.total_size = self.num_samples * self.nranks
def __iter__(self):
num_samples = len(self.dataset)
indices = np.arange(num_samples).tolist()
indices += indices[:(self.total_size - len(indices))]
assert len(indices) == self.total_size
if self.shuffle:
np.random.RandomState(self.epoch).shuffle(indices)
self.epoch += 1
# subsample
def _get_indices_by_batch_size(indices):
subsampled_indices = []
last_batch_size = self.total_size % (self.batch_size * self.nranks)
assert last_batch_size % self.nranks == 0
last_local_batch_size = last_batch_size // self.nranks
for i in range(self.local_rank * self.batch_size,
len(indices) - last_batch_size,
self.batch_size * self.nranks):
subsampled_indices.extend(indices[i:i + self.batch_size])
indices = indices[len(indices) - last_batch_size:]
subsampled_indices.extend(
indices[self.local_rank *
last_local_batch_size:(self.local_rank + 1) *
last_local_batch_size])
return subsampled_indices
if self.nranks > 1:
indices = _get_indices_by_batch_size(indices)
assert len(indices) == self.num_samples
_sample_iter = iter(indices)
batch_indices = []
for idx in _sample_iter:
batch_indices.append(idx)
if len(batch_indices) == self.batch_size:
yield batch_indices
batch_indices = []
if not self.drop_last and len(batch_indices) > 0:
yield batch_indices
def __len__(self):
num_samples = self.num_samples
num_samples += int(not self.drop_last) * (self.batch_size - 1)
return num_samples // self.batch_size
def set_epoch(self, epoch):
"""
Sets the epoch number. When :attr:`shuffle=True`, this number is used
as seeds of random numbers. By default, users may not set this, all
replicas (workers) use a different random ordering for each epoch.
If set same number at each epoch, this sampler will yield the same
ordering at all epoches.
Arguments:
epoch (int): Epoch number.
Examples:
.. code-block:: python
import numpy as np
from paddle.io import Dataset, DistributedBatchSampler
# init with dataset
class RandomDataset(Dataset):
def __init__(self, num_samples):
self.num_samples = num_samples
def __getitem__(self, idx):
image = np.random.random([784]).astype('float32')
label = np.random.randint(0, 9, (1, )).astype('int64')
return image, label
def __len__(self):
return self.num_samples
dataset = RandomDataset(100)
sampler = DistributedBatchSampler(dataset, batch_size=64)
for epoch in range(10):
sampler.set_epoch(epoch)
"""
self.epoch = epoch