In [None]:
#default_exp data.load

In [None]:
#export
from local.torch_basics import *
from local.test import *

In [None]:
from local.notebook.showdoc import *

In [None]:
#export
from torch.utils.data.dataloader import _MultiProcessingDataLoaderIter,_SingleProcessDataLoaderIter,_DatasetKind
_loaders = (_MultiProcessingDataLoaderIter,_SingleProcessDataLoaderIter)

# these are the best things in pytorch dataloader that can be used

In [None]:
bs = 4
letters = list(string.ascii_lowercase)

In [None]:
''.join(letters)

'abcdefghijklmnopqrstuvwxyz'

## DataLoader

In [None]:
#export
def _wif(worker_id): # this will be called when a new process is created
    info = get_worker_info() #pytorch function to get a worker
    ds = info.dataset.d
    ds.nw,ds.offs = info.num_workers,info.id # assigning things to the worker
    set_seed(info.seed) # set fixed random seed
    ds.wif()

class _FakeLoader(GetAttr):
    # pytorch expected object (which it works with) having these things in it
    _auto_collation,collate_fn,drop_last,dataset_kind,_dataset_kind,_index_sampler = False,noops,False,_DatasetKind.Iterable,_DatasetKind.Iterable,Inf.count
    def __init__(self, d, pin_memory, num_workers, timeout):
        self.dataset,self.default,self.worker_init_fn = self,d,_wif
        store_attr(self, 'd,pin_memory,num_workers,timeout')

    def __iter__(self): return iter(self.d.create_batches(self.d.sampler()))
    
    @property
    def multiprocessing_context(self): return (None,multiprocessing)[self.num_workers>0]

_collate_types = (ndarray, Tensor, typing.Mapping, str)

In [None]:
#export
def fa_collate(t): # fastai collate, an extension to pytorch default_collate (to turn data into a batch with batch size)
    b = t[0]
    return (default_collate(t) if isinstance(b, _collate_types)
            else type(t[0])([fa_collate(s) for s in zip(*t)]) if isinstance(b, Sequence)
            else default_collate(t))

In [None]:
#e.g. x is int, y is tuple
t = [(1,(2,3)),(1,(2,3))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])

In [None]:
fa_collate(t)

(tensor([1, 1]), (tensor([2, 2]), tensor([3, 3])))

In [None]:
t = [(1,(2,(3,4))),(1,(2,(3,4)))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])
test_eq(L(fa_collate(t)[1]).map(type), [Tensor,tuple])

In [None]:
fa_collate(t)

(tensor([1, 1]), (tensor([2, 2]), (tensor([3, 3]), tensor([4, 4]))))

In [None]:
#export
def fa_convert(t):
    return (default_collate(t) if isinstance(t, _collate_types)
            else type(t)([fa_convert(s) for s in t]) if isinstance(t, Sequence)
            else default_convert(t))

In [None]:
t0 = array([1,2])
t = [t0,(t0,t0)]
t

[array([1, 2]), (array([1, 2]), array([1, 2]))]

In [None]:
fa_convert(t)

[tensor([1, 2]), (tensor([1, 2]), tensor([1, 2]))]

In [None]:
test_eq(fa_convert(t), default_convert(t))
test_eq(L(fa_convert(t)).map(type), [Tensor,tuple])

In [None]:
#export
@funcs_kwargs # look for stuff inside _methods, and determine that some of **kwargs in __init__ will be in this _methods, then override DataLoader function with it. See 'create_item' below
class DataLoader(GetAttr):
    wif=before_iter=after_item=before_batch=after_batch=after_iter = noops
    # list of the good old callbacks for DataLoader
    _methods = 'wif before_iter create_batches sampler create_item after_item before_batch create_batch retain after_batch after_iter get_idxs'.split()
    _default='dataset'
    def __init__(self, dataset=None, bs=None, shuffle=False, drop_last=False, indexed=None,
                 num_workers=0, pin_memory=False, timeout=0, n=None, **kwargs):
        if indexed is None: indexed = dataset is not None and hasattr(dataset,'__getitem__')
        store_attr(self, 'dataset,bs,drop_last,shuffle,indexed,pin_memory,timeout') # replace storing class attributes like self.dataset,self.bs = dataset,bs ...
        self.fake_l = _FakeLoader(self, pin_memory, num_workers, timeout)
        self.lock,self.rng,self.nw,self.offs = Lock(),random.Random(),1,0
        if n is None:
            try: self.n = len(self.dataset)
            except TypeError: self.n = None
        else: self.n = n
        assert not kwargs and not (bs is None and drop_last) # assert not kwargs to throw error when unrecognizable things is passed in kwargs

    def __iter__(self):
        self.rng = random.Random(self.rng.randint(0,2**32-1))
        self.before_iter()
        for b in _loaders[self.fake_l.num_workers==0](self.fake_l): yield self.after_batch(b) # FakeLoader stuff
        self.after_iter()

    def __len__(self):
        if self.n is None: raise TypeError
        if self.bs is None: return self.n
        return self.n//self.bs + (0 if self.drop_last or self.n%self.bs==0 else 1)

    def create_batches(self, samps):
        self.it = iter(self.dataset) if self.dataset is not None else None
        res = map(self.do_item, samps)
        yield from res if self.bs is None else map(self.do_batch, chunked(res, self.bs, self.drop_last))

    def shuffle_fn(self, idxs): return self.rng.sample(idxs, len(idxs))
    
    def get_idxs(self): 
        idxs = Inf.count if self.indexed else Inf.nones
        if self.n is not None:
            idxs = list(itertools.islice(idxs, self.n)) #TODO: checkout functional programming in Python (itertools library package)
        return idxs
    
    def sampler(self):
        idxs = self.get_idxs()
        idxs = self.shuffle_fn(idxs) if self.shuffle else idxs
        return (b for i,b in enumerate(idxs) if i//(self.bs or 1)%self.nw==self.offs)
    
    def new(self, dataset):
        kwargs = dict(bs=self.bs, shuffle=self.shuffle, drop_last=self.drop_last, indexed=self.indexed,
                      num_workers=self.fake_l.num_workers, pin_memory=self.pin_memory, timeout=self.timeout)
        for n in self._methods: kwargs[n] = getattr(self, n)
        return self.__class__(dataset, **kwargs)

    def retain(self, res, b):  return retain_types(res, b[0] if is_listy(b) else b)
    def create_item(self, s):  return next(self.it) if s is None else self.dataset[s]
    def create_batch(self, b): return (fa_collate,fa_convert)[self.bs is None](b)
    def do_item(self, s):  return self.after_item(self.create_item(s))
    def do_batch(self, b): return self.retain(self.create_batch(self.before_batch(b)), b)
    def one_batch(self):   
        with self.no_multiproc(): return next(iter(self))
    
    @contextmanager
    def no_multiproc(self): 
        old_nw = self.fake_l.num_workers
        try:
            self.fake_l.num_workers = 0
            yield self
        finally: self.fake_l.num_workers = old_nw

Override `item` and use the default infinite sampler to get a stream of unknown length (`stop()` when you want to stop the stream).

In [None]:
class RandDL(DataLoader):
    def create_item(self, s): # s can be some index. default create_item will do dataset[s]
        # but we can override it with this
        r = random.random()
        return r if r<0.95 else stop() # keep returning number until stop condition is met
        # stop() just raises an exception. It is how iteration in a generator knows when to stop 

L(RandDL())

(#21) [0.29186807360416067,0.827473434064214,0.08830081944942036,0.07450531333379884,0.25821327579634146,0.8943945887809045,0.5281855491659303,0.7665970146415215,0.36509391542224623,0.06435276272263946...]

In [None]:
L(RandDL(bs=4, drop_last=True))

(#2) [tensor([0.5825, 0.2775, 0.4218, 0.8515], dtype=torch.float64),tensor([0.3382, 0.5587, 0.3762, 0.4674], dtype=torch.float64)]

In [None]:
dl = RandDL(bs=4, num_workers=4, drop_last=True) # number of workers that working on the dataloader at the same time, independently
L(dl).map(len)

(#19) [4,4,4,4,4,4,4,4,4,4...]

In [None]:
test_eq(dl.fake_l.num_workers, 4)
with dl.no_multiproc(): 
    test_eq(dl.fake_l.num_workers, 0)
    L(dl).map(len)
test_eq(dl.fake_l.num_workers, 4)

In [None]:
def _rand_item(s):
    r = random.random()
    return r if r<0.95 else stop()

L(DataLoader(create_item=_rand_item)) # create_item callback without creating DataLoader inheritance. Thanks to @func_kwargs

(#38) [0.21902888639475415,0.8203430721436966,0.04251385627963944,0.007515236527880553,0.8065218221355448,0.5675178000176123,0.7268473156238129,0.3057651844809659,0.29375459515815106,0.03798958589451962...]

If you don't set `bs`, then `dataset` is assumed to provide an iterator or a `__getitem__` that returns a batch.

In [None]:
ds1 = DataLoader(letters) #bs = None (or 0): get the whole thing back
test_eq(L(ds1), letters)
test_eq(len(ds1), 26)

test_shuffled(L(DataLoader(letters, shuffle=True)), letters)

In [None]:
ds1 = DataLoader(letters, indexed=False) #non-indexed dataset (with no __getitem__)
# i.e when dataset is too damn big, or you are streaming over a network, and you still want bs=0
# Note: this case the result isn't shuffled because 'letters' itself is a list and have __getitem__
test_eq(L(ds1), letters)
test_eq(len(ds1), 26)

t2 = L(tensor([0,1,2]),tensor([3,4,5]))
ds2 = DataLoader(t2)
test_eq_type(L(ds2), t2)

t3 = L(array([0,1,2]),array([3,4,5]))
ds3 = DataLoader(t3)
test_eq_type(L(ds3), t3)

ds4 = DataLoader(t3, create_batch=noops, after_iter=lambda: setattr(t3, 'f', 1))
test_eq_type(L(ds4), t3)
test_eq(t3.f, 1)

If you do set `bs`, then `dataset` is assumed to provide an iterator or a `__getitem__` that returns a single item of a batch.

In [None]:
def twoepochs(d): return ' '.join(''.join(o) for _ in range(2) for o in d)

In [None]:
ds1 = DataLoader(range(12), bs=4, num_workers=3)
test_eq_type(L(ds1), L(tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])))

In [None]:
next(iter(ds1))

tensor([0, 1, 2, 3])

In [None]:
ds1 = DataLoader(letters, bs=4, drop_last=True, num_workers=0)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx abcd efgh ijkl mnop qrst uvwx')

ds1 = DataLoader(letters, bs=4, drop_last=True, num_workers=0, n=5)
test_eq(twoepochs(ds1), 'abcd abcd')

ds1 = DataLoader(letters,4,num_workers=2)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx yz abcd efgh ijkl mnop qrst uvwx yz')



print(t3)
ds1 = DataLoader([str(i) for i in range(11)], bs=4, after_iter=lambda: setattr(t3, 'f', 2)) 
# add hook callback func in dataloader: after_iter: run AFTER ALL THE ITERATION. TODO: but why t3? does not relate to anything
test_eq_type(L(ds1), L(['0','1','2','3'],['4','5','6','7'],['8','9','10']))
test_eq(t3.f, 2)

it = iter(DataLoader(map(noop,range(20)), bs=4, num_workers=1)) # pass a generator to DataLoader
test_eq_type([next(it) for _ in range(3)], [tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])])

(#2) [[0 1 2],[3 4 5]]


In [None]:
# dataset has __getitem__
class SleepyDL(list):
    def __getitem__(self,i):
        time.sleep(random.random()/50)
        return super().__getitem__(i)

t = SleepyDL(letters)

# to test DataLoader's multiple workers (working simultaneously) still return data in a correct others
%time test_eq(DataLoader(t, num_workers=0), letters)
%time test_eq(DataLoader(t, num_workers=2), letters)
%time test_eq(DataLoader(t, num_workers=4), letters)

CPU times: user 3.66 ms, sys: 0 ns, total: 3.66 ms
Wall time: 310 ms
CPU times: user 22.7 ms, sys: 6.97 ms, total: 29.7 ms
Wall time: 165 ms
CPU times: user 5.29 ms, sys: 23.7 ms, total: 29 ms
Wall time: 95.1 ms


In [None]:
dl = DataLoader(t, shuffle=True, num_workers=2) # work well with multiple workers even when data is shuffled. Wow!
test_shuffled(L(dl), letters)
test_shuffled(L(dl), L(dl))

In [None]:
# dataset does not have __getitem__, only __iter__ (non-indexed dataset)
class SleepyQueue():
    "Simulate a queue with varying latency"
    def __init__(self, q): self.q=q
    def __iter__(self):
        while True:
            time.sleep(random.random()/100)
            try: yield self.q.get_nowait()
            except queues.Empty: return



In [None]:
q = Queue()
for o in range(30): q.put(o)
it = SleepyQueue(q)
L(DataLoader(it, num_workers=4)) #non-indexed ds with no __getitem__ get shuffled

(#30) [1,4,3,0,2,6,5,7,10,8...]

In [None]:
q = Queue()
for o in range(30): q.put(o)
it = SleepyQueue(q)

%time test_shuffled(L(DataLoader(it, num_workers=4)), range(30)) # still work with multiple workers. 
# But data won't be shuffled (regardless to 'shuffle' parameter in DataLoader), since we don't know which worker will start first/finish first
# On the other hand, for indexed workers, we can assign indices for each worker to maintain order when shuffle=False

CPU times: user 8.31 ms, sys: 21.7 ms, total: 30 ms
Wall time: 75.2 ms


In [None]:
q = Queue()
for o in letters: q.put(o)
it = SleepyQueue(q)
L(DataLoader(it, num_workers=4)) # another non-indexed shuffle

(#26) [d,b,a,f,i,c,e,g,n,j...]

An interesting take on multiple workers changing variables asyncronously?

In [None]:
class TempClass():
    "Simulate a queue with varying latency"
    def __init__(self, letters): self.l,self.i=letters,0
    def __iter__(self):
#         time.sleep(random.random()/100)
        while self.i< len(self.l):
            print(self.i)
            yield self.l[self.i]

            self.i+=1
        return
    

In [None]:
L(DataLoader(TempClass(letters[:5])))

0
1
2
3
4


(#5) [a,b,c,d,e]

In [None]:
L(DataLoader(TempClass(letters[:5]),num_workers=2)) #lol what?

0
1
0
2
3
1
4
2
3
4


(#10) [a,a,b,b,c,c,d,d,e,e]

In [None]:
class A(TensorBase): pass

for nw in (0,2):
    t = A(tensor([1,2]))
    dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
    b = next(iter(dl))
    test_eq(type(b), A)

    t = (A(tensor([1,2])),)
    dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
    b = next(iter(dl))
    test_eq(type(b[0]), A)

In [None]:
class A(TensorBase): pass
t = A(tensor(1,2))

tdl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=2, after_batch=to_device)
b = next(iter(tdl))
test_eq(type(b), A)

# Unknown attributes are delegated to `dataset`
test_eq(tdl.pop(), tensor(1,2))

## Export -

In [None]:
#hide
from local.notebook.export import notebook2script
notebook2script(all_fs=True)

Converted 00_test.ipynb.
Converted 01_core.ipynb.
Converted 01a_utils.ipynb.
Converted 01b_dispatch.ipynb.
Converted 01c_torch_core.ipynb.
Converted 02_script.ipynb.
Converted 03_dataloader.ipynb.
Converted 04_transform.ipynb.
Converted 05_data_core.ipynb.
Converted 06_data_transforms.ipynb.
Converted 07_vision_core.ipynb.
Converted 08_pets_tutorial.ipynb.
Converted 09_vision_augment.ipynb.
Converted 10_data_block.ipynb.
Converted 11_layers.ipynb.
Converted 11a_vision_models_xresnet.ipynb.
Converted 12_optimizer.ipynb.
Converted 13_learner.ipynb.
Converted 14_callback_schedule.ipynb.
Converted 14a_callback_data.ipynb.
Converted 15_callback_hook.ipynb.
Converted 15a_vision_models_unet.ipynb.
Converted 16_callback_progress.ipynb.
Converted 17_callback_tracker.ipynb.
Converted 18_callback_fp16.ipynb.
Converted 19_callback_mixup.ipynb.
Converted 20_metrics.ipynb.
Converted 21_vision_learner.ipynb.
Converted 22_tutorial_imagenette.ipynb.
Converted 23_tutorial_transfer_learning.ipynb.
Conver