In [None]:
#default_exp data.load

In [None]:
#export
from local.torch_basics import *
from local.test import *
from local.notebook.showdoc import show_doc

In [None]:
#export
from torch.utils.data.dataloader import _MultiProcessingDataLoaderIter,_SingleProcessDataLoaderIter,_DatasetKind
_loaders = (_MultiProcessingDataLoaderIter,_SingleProcessDataLoaderIter)

In [None]:
bs = 4
letters = list(string.ascii_lowercase)

## DataLoader

In [None]:
#export
def _wif(worker_id):
    info = get_worker_info()
    ds = info.dataset.d
    ds.nw,ds.offs = info.num_workers,info.id
    set_seed(info.seed)
    ds.wif()

class _FakeLoader(GetAttr):
    _auto_collation,collate_fn,drop_last,dataset_kind,_index_sampler = False,noops,False,_DatasetKind.Iterable,Inf.count
    def __init__(self, d, pin_memory, num_workers, timeout):
        self.dataset,self.default,self.worker_init_fn = self,d,_wif
        store_attr(self, 'd,pin_memory,num_workers,timeout')
        self.multiprocessing_context = (None,multiprocessing)[num_workers>0]

    def __iter__(self): return iter(self.d.create_batches(self.d.sampler()))

In [None]:
#export
_collate_types = (ndarray, Tensor, typing.Mapping, str)

In [None]:
#export
def fa_collate(t):
    b = t[0]
    return (default_collate(t) if isinstance(b, _collate_types)
            else type(t[0])([fa_collate(s) for s in zip(*t)]) if isinstance(b, Sequence)
            else default_collate(t))

In [None]:
#e.g. x is int, y is tuple
t = [(1,(2,3)),(1,(2,3))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).mapped(type), [Tensor,tuple])

t = [(1,(2,(3,4))),(1,(2,(3,4)))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).mapped(type), [Tensor,tuple])
test_eq(L(fa_collate(t)[1]).mapped(type), [Tensor,tuple])

In [None]:
#export
def fa_convert(t):
    return (default_collate(t) if isinstance(t, _collate_types)
            else type(t)([fa_convert(s) for s in t]) if isinstance(t, Sequence)
            else default_convert(t))

In [None]:
t0 = array([1,2])
t = [t0,(t0,t0)]

test_eq(fa_convert(t), default_convert(t))
test_eq(L(fa_convert(t)).mapped(type), [Tensor,tuple])

In [None]:
#export
@funcs_kwargs
class DataLoader():
    wif=before_iter=after_item=before_batch=after_batch=after_iter = noops
    _methods = 'wif before_iter create_batches sampler create_item after_item before_batch create_batch retain after_batch after_iter'.split()
    def __init__(self, dataset=None, bs=None, shuffle=False, drop_last=False, indexed=None,
                 num_workers=0, pin_memory=False, timeout=0, **kwargs):
        if indexed is None: indexed = dataset is not None and hasattr(dataset,'__getitem__')
        store_attr(self, 'dataset,bs,drop_last,shuffle,indexed')
        self.fake_l = _FakeLoader(self, pin_memory, num_workers, timeout)
        self.lock,self.rng,self.nw,self.offs = Lock(),random.Random(),1,0
        try: self.n = len(self.dataset)
        except TypeError: self.n = None
        assert not kwargs and not (bs is None and drop_last)

    def __iter__(self):
        self.before_iter()
        for b in _loaders[self.fake_l.num_workers==0](self.fake_l): yield self.after_batch(b)
        self.after_iter()

    def __len__(self):
        if self.n is None: raise TypeError
        if self.bs is None: return self.n
        return self.n//self.bs + (0 if self.drop_last or self.n%self.bs==0 else 1)

    def create_batches(self, samps):
        self.it = iter(self.dataset) if self.dataset else None
        res = map(self.do_item, samps)
        yield from res if self.bs is None else map(self.do_batch, chunked(res, self.bs, self.drop_last))

    def shuffle_fn(self, idxs): return self.rng.sample(idxs, len(idxs))
    def sampler(self):
        idxs = Inf.count if self.indexed else Inf.nones
        if self.n is not None:
            idxs = list(itertools.islice(idxs, self.n))
            idxs = self.shuffle_fn(idxs) if self.shuffle else idxs
        return (b for i,b in enumerate(idxs) if i//(self.bs or 1)%self.nw==self.offs)

    def retain(self, res, b):  return retain_types(res, b[0] if is_listy(b) else b)
    def create_item(self, s):  return next(self.it) if s is None else self.dataset[s]
    def create_batch(self, b): return (fa_collate,fa_convert)[self.bs is None](b)
    def one_batch(self):   return next(iter(self))
    def do_item(self, s):  return self.after_item(self.create_item(s))
    def do_batch(self, b): return self.retain(self.create_batch(self.before_batch(b)), b)

Override `item` and use the default infinite sampler to get a stream of unknown length (`stop()` when you want to stop the stream).

In [None]:
class RandDL(DataLoader):
    def create_item(self, s):
        r = random.random()
        return r if r<0.95 else stop()

L(RandDL())

In [None]:
L(RandDL(bs=4, drop_last=True)).mapped(len)

In [None]:
L(RandDL(bs=4, num_workers=4, drop_last=True)).mapped(len)

If you don't set `bs`, then `dataset` is assumed to provide an iterator or a `__getitem__` that returns a batch.

In [None]:
ds1 = DataLoader(letters)
test_eq(L(ds1), letters)
test_eq(len(ds1), 26)

test_shuffled(L(DataLoader(letters, shuffle=True)), letters)

ds1 = DataLoader(letters, indexed=False)
test_eq(L(ds1), letters)
test_eq(len(ds1), 26)

t2 = L(tensor([0,1,2]),tensor([3,4,5]))
ds2 = DataLoader(t2)
test_eq_type(L(ds2), t2)

t3 = L(array([0,1,2]),array([3,4,5]))
ds3 = DataLoader(t3)
test_eq_type(L(ds3), t2)

ds4 = DataLoader(t3, create_batch=noops, after_iter=lambda: setattr(t3, 'f', 1))
test_eq_type(L(ds4), t3)
test_eq(t3.f, 1)

If you do set `bs`, then `dataset` is assumed to provide an iterator or a `__getitem__` that returns a single item of a batch.

In [None]:
def twoepochs(d): return ' '.join(''.join(o) for _ in range(2) for o in d)

In [None]:
ds1 = DataLoader(letters, bs=4, drop_last=True, num_workers=0)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx abcd efgh ijkl mnop qrst uvwx')

ds1 = DataLoader(letters,4,num_workers=2)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx yz abcd efgh ijkl mnop qrst uvwx yz')

ds1 = DataLoader(range(12), bs=4, num_workers=3)
test_eq_type(L(ds1), L(tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])))

ds1 = DataLoader([str(i) for i in range(11)], bs=4, after_iter=lambda: setattr(t3, 'f', 2))
test_eq_type(L(ds1), L(['0','1','2','3'],['4','5','6','7'],['8','9','10']))
test_eq(t3.f, 2)

it = iter(DataLoader(map(noop,range(20)), bs=4, num_workers=1))
test_eq_type([next(it) for _ in range(3)], [tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])])

In [None]:
class SleepyDL(list):
    def __getitem__(self,i):
        time.sleep(random.random()/50)
        return super().__getitem__(i)

t = SleepyDL(letters)

%time test_eq(DataLoader(t, num_workers=0), letters)
%time test_eq(DataLoader(t, num_workers=2), letters)
%time test_eq(DataLoader(t, num_workers=4), letters)
test_shuffled(L(DataLoader(t, shuffle=True, num_workers=4)), letters)

In [None]:
class SleepyQueue():
    "Simulate a queue with varying latency"
    def __init__(self, q): self.q=q
    def __iter__(self):
        while True:
            time.sleep(random.random()/100)
            try: yield self.q.get_nowait()
            except queues.Empty: return

q = Queue()
for o in range(30): q.put(o)
it = SleepyQueue(q)

%time test_shuffled(L(DataLoader(it, num_workers=4)), range(30))

In [None]:
class A(TensorBase): pass
t = A(tensor([1,2]))

for nw in (0,2):
    tdl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
    b = next(iter(tdl))
    test_eq(type(b[0]), A)

    t = (A(tensor([1,2])),)
    tdl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
    b = next(iter(tdl))
    test_eq(type(b[0]), A)

In [None]:
class A(TensorBase): pass
t = A(tensor([1,2]))

tdl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=2, after_batch=to_device)
b = next(iter(tdl))
test_eq(type(b[0]), A)

## Export -

In [None]:
#hide
from local.notebook.export import notebook2script
notebook2script(all_fs=True)