In [None]:
#default_exp data.load

In [None]:
#export
from fastai2.torch_basics import *

from torch.utils.data.dataloader import _MultiProcessingDataLoaderIter,_SingleProcessDataLoaderIter,_DatasetKind
_loaders = (_MultiProcessingDataLoaderIter,_SingleProcessDataLoaderIter) # 2 loaders from pytorch 

# these are the best things in pytorch dataloader that can be used

In [None]:
from nbdev.showdoc import *

In [None]:
bs = 4
letters = list(string.ascii_lowercase)

In [None]:
''.join(letters)

'abcdefghijklmnopqrstuvwxyz'

# DataLoader

In [None]:
#export
def _wif(worker_id): # this will be called when a new process is created
    set_num_threads(1)
    info = get_worker_info() #pytorch function to get a worker
    ds = info.dataset.d
    ds.nw,ds.offs = info.num_workers,info.id # assigning things to the worker
    set_seed(info.seed) # set fixed random seed
    ds.wif()

class _FakeLoader(GetAttr):
    # pytorch expected object (which it works with) having these things in it
    _IterableDataset_len_called,_auto_collation,collate_fn,drop_last,dataset_kind,_dataset_kind,_index_sampler = (
        None,False,noops,False,_DatasetKind.Iterable,_DatasetKind.Iterable,Inf.count)
    def __init__(self, d, pin_memory, num_workers, timeout):
        self.dataset,self.default,self.worker_init_fn = self,d,_wif
        #worker_init_fn is PYTORCH data loader's callback which is called everytime a new process is fired off
        store_attr(self, 'd,pin_memory,num_workers,timeout')

    def __iter__(self): return iter(self.d.create_batches(self.d.sample()))

    @property
    def multiprocessing_context(self): return (None,multiprocessing)[self.num_workers>0]

    @contextmanager
    def no_multiproc(self):
        old_nw = self.num_workers
        try:
            self.num_workers = 0
            yield self.d
        finally: self.num_workers = old_nw

_collate_types = (ndarray, Tensor, typing.Mapping, str)

In [None]:
#export
def fa_collate(t): # fastai collate, an extension to pytorch default_collate (to turn data into a batch with batch size)
    # similar to pytorch default_collate, but can handle few more types
    # use only when there is a batch size (bs!=none)
    b = t[0]
    return (default_collate(t) if isinstance(b, _collate_types)
            else type(t[0])([fa_collate(s) for s in zip(*t)]) if isinstance(b, Sequence)
            else default_collate(t))

In [None]:
#e.g. x is int, y is tuple
t = [(1,(2,3)),(1,(2,3))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])

In [None]:
fa_collate(t) # think of it as you inverse a matrix, but the type and structure inside are still the same
# convert type to Tensor as well

(tensor([1, 1]), (tensor([2, 2]), tensor([3, 3])))

In [None]:
fa_collate(t)[0].type(),fa_collate(t)[1][0].type()

('torch.LongTensor', 'torch.LongTensor')

In [None]:
t = [(1,(2,(3,4))),(1,(2,(3,4)))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])
test_eq(L(fa_collate(t)[1]).map(type), [Tensor,tuple])

In [None]:
fa_collate(t)

(tensor([1, 1]), (tensor([2, 2]), (tensor([3, 3]), tensor([4, 4]))))

In [None]:
#export
def fa_convert(t): #fastai version of default_convert. Call only when bs=None (no batch size)
    return (default_convert(t) if isinstance(t, _collate_types)
            else type(t)([fa_convert(s) for s in t]) if isinstance(t, Sequence)
            else default_convert(t))

In [None]:
t0 = array([1,2])
t = [t0,(t0,t0)]
t

[array([1, 2]), (array([1, 2]), array([1, 2]))]

In [None]:
fa_convert(t) # convert array to tensor. Not doing 'matrix inverse' thing like fa_collate

[tensor([1, 2]), (tensor([1, 2]), tensor([1, 2]))]

In [None]:
fa_convert([(array([1]),array([2,3])),(array([1]),array([2,3]))])

[(tensor([1]), tensor([2, 3])), (tensor([1]), tensor([2, 3]))]

In [None]:
fa_convert([(1,(2,3)),(1,(2,3))]) #if type is not array, keep it?

[(1, (2, 3)), (1, (2, 3))]

In [None]:
fa_convert(t)[0].type(),fa_convert(t)[1][0].type()

('torch.LongTensor', 'torch.LongTensor')

In [None]:
test_eq(fa_convert(t), default_convert(t))
test_eq(L(fa_convert(t)).map(type), [Tensor,tuple])

In [None]:
#export
class SkipItemException(Exception): pass

In [None]:
#export
@funcs_kwargs # look for stuff inside _methods, and determine whether some of **kwargs in __init__ will be in this _methods, 
# then override DataLoader function with it. 
# See 'create_item' and use cases below
# Note: what is GetAttr for? Look at note 7 tabular core
class DataLoader(GetAttr):
    # list of callbacks for DataLoader
    _noop_methods = 'wif before_iter after_item before_batch after_batch after_iter'.split()
    # after_item vs after_iter
    # - after_item: this fn is done right after we grab 1 thing from the dataset
    # - after_iter: this fn is done only after we iterate the whole dataset
    for o in _noop_methods:
        exec(f"def {o}(self, x=None, *args, **kwargs): return x")
    _methods = _noop_methods + 'create_batches create_item create_batch retain \
        get_idxs sample shuffle_fn do_batch create_batch'.split()
    _default = 'dataset'

    def __init__(self, dataset=None, bs=None, num_workers=0, pin_memory=False, timeout=0, batch_size=None,
                 shuffle=False, drop_last=False, indexed=None, n=None, device=None, **kwargs):
        if batch_size is not None: bs = batch_size # PyTorch compatibility
        assert not (bs is None and drop_last)
        if indexed is None: indexed = dataset is not None and hasattr(dataset,'__getitem__')
        if n is None:
            try: n = len(dataset)
            except TypeError: pass
            
        store_attr(self, 'dataset,bs,shuffle,drop_last,indexed,n,pin_memory,timeout,device')
        # Note: replace storing class attributes like self.dataset,self.bs = dataset,bs ...
            
        self.rng,self.nw,self.offs = random.Random(),1,0
        self.fake_l = _FakeLoader(self, pin_memory, num_workers, timeout)
        
        
        #assert not kwargs and not (bs is None adn drop_last) 
        #remove assert not kwargs to throw error when unrecognizable things is passed in kwargs

    def __len__(self):
        if self.n is None: raise TypeError
        if self.bs is None: return self.n
        return self.n//self.bs + (0 if self.drop_last or self.n%self.bs==0 else 1)
    
    def get_idxs(self):
        idxs = Inf.count if self.indexed else Inf.nones # TODO: infinite list of integers if indexed=True, else infinite of None???
        if self.n is not None: idxs = list(itertools.islice(idxs, self.n)) #TODO: checkout functional programming in Python (itertools library package)
        if self.shuffle: idxs = self.shuffle_fn(idxs)
        return idxs
    
    def sample(self):
        idxs = self.get_idxs()
        return (b for i,b in enumerate(idxs) if i//(self.bs or 1)%self.nw==self.offs)
        
    def __iter__(self): # important function: how fastai dataloaders get data
        # callbacks used: before_iter, <create_item, after_item> (in def do_item), <before_batch,create_batch> (in def do_batch), after_batch
        self.randomize()
        self.before_iter() # before_iter callback
        for b in _loaders[self.fake_l.num_workers==0](self.fake_l): # FakeLoader instance, which do create_batches
            if self.device is not None: b = to_device(b, self.device)
            yield self.after_batch(b) # then do after_batch afterwards
        self.after_iter() #after_iter callback
        if hasattr(self, 'it'): delattr(self, 'it')

    def create_batches(self, samps):
        self.it = iter(self.dataset) if self.dataset is not None else None
        res = filter(lambda o:o is not None, map(self.do_item, samps)) # see do_item
        yield from map(self.do_batch, self.chunkify(res)) # see do_batch
        # btw doing lazily with map (generator). This is good: easy to understand, less bug, less code to write

    def new(self, dataset=None, cls=None, **kwargs):
        if dataset is None: dataset = self.dataset
        if cls is None: cls = type(self)
        cur_kwargs = dict(dataset=dataset, num_workers=self.fake_l.num_workers, pin_memory=self.pin_memory, timeout=self.timeout,
                          bs=self.bs, shuffle=self.shuffle, drop_last=self.drop_last, indexed=self.indexed, device=self.device)
        for n in self._methods: cur_kwargs[n] = getattr(self, n)
        return cls(**merge(cur_kwargs, kwargs))
    
    @property
    def prebatched(self): return self.bs is None
    def do_item(self, s):
        try: return self.after_item(self.create_item(s)) # create item and then apply after_item function immediately
        except SkipItemException: return None
    def chunkify(self, b): return b if self.prebatched else chunked(b, self.bs, self.drop_last)
    def shuffle_fn(self, idxs): return self.rng.sample(idxs, len(idxs))
    def randomize(self): self.rng = random.Random(self.rng.randint(0,2**32-1))
    def retain(self, res, b):  return retain_types(res, b[0] if is_listy(b) else b)
    def create_item(self, s):  return next(self.it) if s is None else self.dataset[s] # can treat both indexed and non-indexed
    def create_batch(self, b): return (fa_collate,fa_convert)[self.prebatched](b)
    def do_batch(self, b): return self.retain(self.create_batch(self.before_batch(b)), b) # before_batch => create_batch and retain same type
    def to(self, device): self.device = device
    def one_batch(self):
        if self.n is not None and len(self)==0: raise ValueError(f'This DataLoader does not contain any batches')
        with self.fake_l.no_multiproc(): res = first(self)
        if hasattr(self, 'it'): delattr(self, 'it')
        return res

In [None]:
DataLoader(create_batc=1) # well, not proper behavior on typo, unlike DataBunch on 3_03 notebook...

<__main__.DataLoader at 0x7f3e94994b90>

Override `item` and use the default infinite sampler to get a stream of unknown length (`stop()` when you want to stop the stream).

## Dataloader child class that overrides create_item

In [None]:
class RandDL(DataLoader):
    def create_item(self, s): # s can be some index. The default create_item func will do dataset[s] (or if non-index, do next(it))
        # but we can override it with this
        r = random.random()
        return r if r<0.95 else stop() # keep returning number until stop condition (number >=0.95) is met
        # stop() just raises an exception. It is how iteration in a generator knows when to stop 

L(RandDL())

(#19) [0.2448879411063063,0.023115802608246194,0.2516397681417346,0.8689824597369601,0.6202842409612128,0.282738303489663,0.6903038780761714,0.5454319031985198,0.39029153731457067,0.38409829020021446...]

In [None]:
L(RandDL(bs=4, drop_last=True))

(#0) []

In [None]:
L(RandDL(bs=4, drop_last=False)) # don't drop last batch if stop condition is met => result in batch of 3 at the end

(#16) [tensor([0.9397, 0.4274, 0.4689, 0.0060], dtype=torch.float64),tensor([0.7231, 0.0176, 0.4928, 0.4889], dtype=torch.float64),tensor([0.7878, 0.8066, 0.1979, 0.8556], dtype=torch.float64),tensor([0.0549, 0.3201, 0.1436, 0.3277], dtype=torch.float64),tensor([0.5176, 0.6728, 0.0704, 0.2168], dtype=torch.float64),tensor([0.3115, 0.3950, 0.3338, 0.4602], dtype=torch.float64),tensor([0.6882, 0.3257, 0.8847, 0.6711], dtype=torch.float64),tensor([0.2572, 0.3027, 0.8523, 0.8535], dtype=torch.float64),tensor([0.9048, 0.0431, 0.4288, 0.0712], dtype=torch.float64),tensor([0.2874, 0.9125, 0.2183, 0.9168], dtype=torch.float64)...]

In [None]:
dl = RandDL(bs=4, num_workers=4, drop_last=True) 
# 4 workers working on the dataloader at the same time, independently
L(dl).map(len)

(#12) [4,4,4,4,4,4,4,4,4,4...]

In [None]:
test_eq(dl.fake_l.num_workers, 4)
with dl.fake_l.no_multiproc(): 
    test_eq(dl.fake_l.num_workers, 0)
    L(dl).map(len)
test_eq(dl.fake_l.num_workers, 4)

## Dataloader that overrides create_item (no children/inheritance)

In [None]:
def _rand_item(s):
    r = random.random()
    return r if r<0.95 else stop()

L(DataLoader(create_item=_rand_item)) 
# create_item callback without creating DataLoader inheritance. 
# DataLoader create_item will be overidden by this _rand_item func
# Thanks to @func_kwargs

(#31) [0.8686841543357502,0.07343815517933427,0.5577334928343769,0.644018531735975,0.7052030486955067,0.9082687023578571,0.3882997696254147,0.5240111197056229,0.09998823746576968,0.21261268128410926...]

## Few other examples with dataloaders

If you don't set `bs`, then `dataset` is assumed to provide an iterator or a `__getitem__` that returns a batch.

In [None]:
''.join(letters)

'abcdefghijklmnopqrstuvwxyz'

In [None]:
ds1 = DataLoader(letters) #bs = None (or 0): get the whole thing back
test_eq(L(ds1), letters) # NOTE that ds1 is technically an iterator, so convert it into L (array) type to check for values
test_eq(len(ds1), 26)

test_shuffled(L(DataLoader(letters, shuffle=True)), letters) # you can even test if the data is shuffled properly

In [None]:
type(L(ds1)[0])

str

In [None]:
next(iter(ds1)) # ds1 is still an iterator regardless of whether data is indexed or not

'a'

In [None]:
ds1 = DataLoader(letters, indexed=False) #non-indexed dataset (with no __getitem__)
# i.e when dataset is too damn big, or you are streaming over a network, and you still want bs=0
# Note: this case the result isn't shuffled because 'letters' itself is a list and have __getitem__
test_eq(L(ds1), letters)
test_eq(len(ds1), 26)

In [None]:
type(L(ds1)[0])

str

In [None]:
ds1 = DataLoader(letters, bs=2,indexed=False) #non-indexed dataset (with no __getitem__) and bs!=0
next(iter(ds1))

['a', 'b']

In [None]:
t2 = L(tensor([0,1,2]),tensor([3,4,5]))
ds2 = DataLoader(t2)
test_eq_type(L(ds2), t2)
print(L(ds2))
print(L(ds2)[0].type())

(#2) [tensor([0, 1, 2]),tensor([3, 4, 5])]
torch.LongTensor


In [None]:
t3 = L(array([0,1,2]),array([3,4,5]))
ds3 = DataLoader(t3)
test_eq_type(L(ds3), t3.map(tensor))
print(L(ds3))
print(L(ds3)[0].type()) # convert array to tensor

(#2) [tensor([0, 1, 2]),tensor([3, 4, 5])]
torch.LongTensor


In [None]:
ds4 = DataLoader(t3, create_batch=noop, # create_batch == do nothing 
                 after_iter=lambda: setattr(t3, 'f', 1)) # add new attribute to input t3, after iterating it
# after_iter callback is similar to after_item used below

# tfms = [[PILImage.create], [labeller, Categorize()]]
# dsrc = DataSource(items, tfms)
# tdl = TfmdDL(dsrc, bs=1, after_item=[ImageResizer(128), ToTensor(), IntToFloatTensor()])
# differences b/t after_item and after_iter? See class DataLoader

In [None]:
print(next(iter(ds4)))
print(t3.f) # t3.f not exist yet because we are not done iterating the whole thing

[0 1 2]


AttributeError: 'L' object has no attribute 'f'

In [None]:
test_eq_type(L(ds4), t3)
test_eq(t3.f, 1)

In [None]:
# same examples of using callback, for some reasons
t3 = L(array([0,1,2]),array([3,4,5]))


ds1 = DataLoader([str(i) for i in range(11)], bs=4, 
                 after_iter=lambda: setattr(t3, 'f', 2)) 
# add HOOK CALLBACK func in dataloader: after_iter: add new attribute to INPUT t3 AFTER ALL THE ITERATION
test_eq_type(L(ds1), L(['0','1','2','3'],['4','5','6','7'],['8','9','10']))
test_eq(t3.f, 2)

In [None]:
ds1 = DataLoader(range(12), bs=4, num_workers=3)
test_eq_type(L(ds1), L(tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])))

If you do set `bs`, then `dataset` is assumed to provide an iterator or a `__getitem__` that returns a single item of a batch.

In [None]:
def twoepochs(d): return ' '.join(''.join(o) for _ in range(2) for o in d) 
# for each batch, concat chars together, then concat them together with ' ' as delimiters. Do this twice

In [None]:
[''.join(o) for _ in range(1) for o in [['a','b'],['c','d','e']]]

['ab', 'cde']

In [None]:
twoepochs([['a','b'],['c','d','e']])

'ab cde ab cde'

In [None]:
ds1 = DataLoader(letters, bs=4, drop_last=True, num_workers=0)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx abcd efgh ijkl mnop qrst uvwx')

ds1 = DataLoader(letters,4,num_workers=2)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx yz abcd efgh ijkl mnop qrst uvwx yz')

In [None]:
ds1 = DataLoader(letters, bs=4, drop_last=True, num_workers=0, n=4) # n is length of dataset
print(twoepochs(ds1))

ds1 = DataLoader(letters, bs=4, drop_last=True, num_workers=0, n=3) # when n < bs and drop_last = True, none return 
#because drop_last=true make sure last batch's size = bs. We only have 3 letters to work with.
print(twoepochs(ds1))

ds1 = DataLoader(letters, bs=4, drop_last=False, num_workers=0, n=3)
print(twoepochs(ds1))

ds1 = DataLoader(letters, bs=4, drop_last=True, num_workers=0, n=8)
print(twoepochs(ds1))

abcd abcd

abc abc
abcd efgh abcd efgh


## Initiate dataloader with a generator

In [None]:
L(map(noop,range(20)))

(#20) [0,1,2,3,4,5,6,7,8,9...]

In [None]:
it = iter(DataLoader(map(noop,range(20)), bs=4, num_workers=1)) # pass a generator (by using map) to DataLoader
test_eq_type([next(it) for _ in range(3)], [tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])])

## dataset has \__getitem__

In [None]:
class SleepyDL(list):
    def __getitem__(self,i):
        time.sleep(random.random()/50)
        return super().__getitem__(i)

t = SleepyDL(letters)

# to test DataLoader's multiple workers (working simultaneously) still return data in a correct others
%time test_eq(DataLoader(t, num_workers=0), letters)
%time test_eq(DataLoader(t, num_workers=2), letters)
%time test_eq(DataLoader(t, num_workers=4), letters)

CPU times: user 3.47 ms, sys: 536 µs, total: 4.01 ms
Wall time: 292 ms
CPU times: user 15.3 ms, sys: 13.6 ms, total: 28.9 ms
Wall time: 173 ms
CPU times: user 6.89 ms, sys: 32.8 ms, total: 39.6 ms
Wall time: 109 ms


In [None]:
L(DataLoader(t, num_workers=0))

(#26) ['a','b','c','d','e','f','g','h','i','j'...]

In [None]:
dl = DataLoader(t, shuffle=True, num_workers=2) # work well with multiple workers even when data is shuffled. Wow!
test_shuffled(L(dl), letters)
test_shuffled(L(dl), L(dl))

In [None]:
L(dl)

(#26) ['p','a','c','j','u','g','i','x','t','l'...]

## dataset does not have \__getitem__, only \__iter__,  (non-indexed dataset),

We can force this behavior by setting indexed=False (see above)

In [None]:
class SleepyQueue():
    "Simulate a queue with varying latency"
    def __init__(self, q): self.q=q
    def __iter__(self):
        while True:
            time.sleep(random.random()/100)
            try: yield self.q.get_nowait()
            except queues.Empty: return

In [None]:
q = Queue()
for o in range(30): q.put(o)
it = SleepyQueue(q)
L(it) # ordered

(#30) [0,1,2,3,4,5,6,7,8,9...]

In [None]:
q = Queue()
for o in range(30): q.put(o)
it = SleepyQueue(q)
L(DataLoader(it, num_workers=1)) # still ordered, with num_workers = 1

(#30) [0,1,2,3,4,5,6,7,8,9...]

In [None]:
q = Queue()
for o in range(30): q.put(o)
it = SleepyQueue(q)
L(DataLoader(it, num_workers=2)) #non-indexed ds with no __getitem__ and >1 num_workers will not get your items in order 
# (aka if items have order, they will ALWAYS be shuffled). It doesn't matter whether shuffle= True or False

(#30) [0,2,1,4,3,6,5,8,7,10...]

In [None]:
next(iter(it)) # note that with dataset with only iterator (no __get_item__), you can't continue fetching data from dataloader
# once the iterator reaches the end

In [None]:
q = Queue()
for o in range(30): q.put(o)
it = SleepyQueue(q)

%time test_shuffled(L(DataLoader(it, num_workers=4)), range(30)) # test whether data is shuffled correctly, multiple workers 

# Note on why >1 workers for non-indexed data can't maintain order:
# data is shuffled, since we don't know which worker will start first/finish first
# On the other hand, for indexed workers, we can assign indices for each worker to maintain order when shuffle=False

CPU times: user 7.51 ms, sys: 23.1 ms, total: 30.6 ms
Wall time: 97.7 ms


In [None]:
q = Queue()
for o in letters: q.put(o) # another non-indexed shuffle
it = SleepyQueue(q)
L(DataLoader(it, num_workers=4))

(#26) ['e','a','b','d','f','c','g','i','h','j'...]

In [None]:
q = Queue()
for o in letters: q.put(o)
it = SleepyQueue(q)
temp_dl = DataLoader(it, num_workers=4,bs=4) # with bs!=0
for temp in temp_dl:
    print(temp) # different workers keeping different left-over (when drop_last = False) running in different orders

['a', 'c', 'g', 'j']
['b', 'd']
['e', 'h', 'k', 'n']
['f', 'i', 'l', 'o']
['m', 'p', 's', 't']
['q', 'w']
['r', 'u', 'v', 'y']
['x']
['z']


## WTF is this: An interesting take on multiple workers changing variables asyncronously?

In [None]:
class TempClass():
    def __init__(self, letters): self.letters,self.i=letters,0
    def __iter__(self):
#         time.sleep(random.random()/100)
        while self.i< len(self.letters):
            print(self.i)
            yield self.letters[self.i]

            self.i+=1
        return
    

In [None]:
L(DataLoader(TempClass(letters[:5])))

0
1
2
3
4


(#5) [a,b,c,d,e]

In [None]:
L(DataLoader(TempClass(letters[:5]),num_workers=2)) #lol what?

0
1
2
0
3
1
2
4
3
4


(#10) [a,a,b,b,c,c,d,d,e,e]

In [None]:
L(DataLoader(TempClass(letters[:5]),num_workers=2)) #lol what?

0
0
1
2
1
2
3
4
3
4


(#10) [a,a,b,b,c,c,d,d,e,e]

## TODO: what is this?

In [None]:
class A(TensorBase): pass

for nw in (0,2):
    t = A(tensor([1,2]))
    dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
    b = next(iter(dl))
    test_eq(type(b), A)

    t = (A(tensor([1,2])),)
    dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
    b = next(iter(dl))
    test_eq(type(b[0]), A)

In [None]:
class A(TensorBase): pass
t = A(tensor(1,2))

tdl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=2, after_batch=to_device)
b = next(iter(tdl))
test_eq(type(b), A)

# Unknown attributes are delegated to `dataset`
test_eq(tdl.pop(), tensor(1,2))

## Export -