In [2]:
#%%capture
from pyicecake import pyicecake
import types
import collections
import numpy as np
from random import shuffle
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
import PIL
from PIL import Image
import cupy as cp

In [None]:
gc = pyicecake.GPUCache(40*1024*1024*1024) # 60GB

In [None]:
labels_dict = dict()
filenames = []
def convert_image_to_dltensor(image_folder):
    files = []
#     with open(image_folder + 'val.txt', 'r') as f:
    with open(image_folder + 'file_list.txt', 'r') as f:
        files = [line.strip() for line in f if line is not '']
    for l in files:
        filename = l.split(' ')[0]
        filenames.append(filename)
        npbuff = np.asarray(PIL.Image.open(image_folder+filename), dtype=np.uint8)
        gc.put_numpy_array(filename, npbuff)

In [None]:
# convert_image_to_dltensor('/mnt/optane-ssd/lipeng/imagenet/')
convert_image_to_dltensor('/home/lwangay/dali-data/images/')

for f in filenames[:10]:
    print(f)

In [None]:
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
%matplotlib inline
def show_all_imgs(imgs):
    _, axs = plt.subplots(5, 4, figsize=(12, 12))
    axs = axs.flatten()
    for img, ax in zip(imgs, axs):
        ax.imshow(img)
    plt.show()

In [None]:
imgs = []
for f in filenames[:20]:
    npbuff = gc.get_numpy_array(f)
    img = PIL.Image.fromarray(npbuff)
    imgs.append(img)
show_all_imgs(imgs)
    



In [6]:
batch_size=64

In [None]:
class ExternalInputIterator(object):
    def __init__(self, batch_size):
        self.batch_size = batch_size
        self.files = filenames

    def __iter__(self):
        self.i = 0
        self.n = len(self.files)
        return self

    def __next__(self):
        batch = []
        labels = []
        for _ in range(self.batch_size):
            batch.append(filenames[self.i])
            self.i = (self.i + 1) % self.n
        return batch

    next = __next__

In [None]:
eii = ExternalInputIterator(batch_size)
iterator = iter(eii)

In [None]:
def read_from_icecake():
    ret = iterator.next()
    dltensors = []
    for f in ret:
#         print(f)
        dltensors.append(gc.get_dltensor(f, 0))
    return dltensors

class DLTensorPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(DLTensorPipeline, self).__init__(batch_size,num_threads,device_id,exec_async=False,
                                             exec_pipelined=False,seed=12)
        self.read_from_icecake = ops.DLTensorPythonFunction(function=read_from_icecake, device='gpu', 
                                              synchronize_stream=True, batch_processing=True)
#         self.rotate = ops.Rotate(device='gpu')
#         self.rng = ops.Uniform(range = (-30.0, 30.0))
#         self.crop = ops.RandomResizedCrop(device='gpu', size=[200,200])
    def define_graph(self):
        res = self.read_from_icecake()
#         res = self.crop(res)
#         angle = self.rng()
#         res = self.rotate(res, angle=angle)
        return res



In [None]:
pipe = DLTensorPipeline(batch_size=batch_size, num_threads=2, device_id = 0)
pipe.build()

In [None]:
imgs = []
for i in range(0, 5):
    pipe_out = pipe.run()
    im, = pipe_out
    img=im.as_cpu()
    for i in img:
        imgs.append(i)
    
show_all_imgs(imgs)
    

In [None]:
from timeit import default_timer as timer

In [9]:
def speedtest(pipeclass, batch, n_threads, eii = None):
    pipe = None
    if eii is not None:
        pipe = pipeclass(batch, n_threads, 0, eii)
    else:
        pipe = pipeclass(batch, n_threads, 0)
    pipe.build()
    # warmup
    for i in range(5):
        pipe.run()
    # test
    n_test = 1000
    t_start = timer()
    for i in range(n_test):
        pipe.run()
    t = timer() - t_start
    print("Speed: {} imgs/s".format((n_test * batch)/t))

In [None]:
class SimplePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(SimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = "/home/lwangay/dali-data/images/", file_list="/home/lwangay/dali-data/images/file_list.txt", random_shuffle = False)
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        return (images, labels)

In [None]:
class MixedPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(MixedPipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = "/home/lwangay/dali-data/images/", file_list="/home/lwangay/dali-data/images/file_list.txt", random_shuffle = False)
        self.decode = ops.ImageDecoder(device = 'mixed', output_type = types.RGB)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        return (images, labels)

In [None]:
speedtest(SimplePipeline, batch_size, 1)

In [None]:
speedtest(MixedPipeline, batch_size, 1)

In [None]:
speedtest(DLTensorPipeline, batch_size, 1)

In [3]:
class ExternalInputIterator(object):
    def __init__(self, batch_size, device_id, num_gpus):
        self.images_dir = "/home/lwangay/dali-data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", 'r') as f:
            self.files = [line.rstrip() for line in f if line is not '']
        # whole data set size
        self.data_set_len = len(self.files)
        # based on the device_id and total number of GPUs - world size
        # get proper shard
        self.files = self.files[self.data_set_len * device_id // num_gpus:
                                self.data_set_len * (device_id + 1) // num_gpus]
        self.n = len(self.files)

    def __iter__(self):
        self.i = 0
        shuffle(self.files)
        return self

    def __next__(self):
        batch = []
        labels = []

        if self.i >= self.n:
            raise StopIteration

        for _ in range(self.batch_size):
            jpeg_filename, label = self.files[self.i].split(' ')
            f = open(self.images_dir + jpeg_filename, 'rb')
            npbuf = np.frombuffer(f.read(), dtype = np.uint8)
            cpbuf = cp.asarray(npbuf)
            batch.append(cpbuf.toDlpack())
            labels.append(np.array([label], dtype = np.uint8))
            self.i = (self.i + 1) % self.n
        return (batch, labels)

    @property
    def size(self,):
        return self.data_set_len

    next = __next__

In [14]:
class ExternalSourcePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, external_data):
        super(ExternalSourcePipeline, self).__init__(batch_size,
                                      num_threads,
                                      device_id,
                                      seed=12)
        self.input = ops.ExternalSource()
        self.input_label = ops.ExternalSource()
        self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)
#         self.res = ops.Resize(device="gpu", resize_x=240, resize_y=240)
#         self.cast = ops.Cast(device = "gpu",
#                              dtype = types.UINT8)
        self.external_data = external_data
        self.iterator = iter(self.external_data)

    def define_graph(self):
        self.jpegs = self.input()
        self.labels = self.input_label()
        images = self.decode(self.jpegs)
#         images = self.res(images)
#         output = self.cast(images)
        return (images, self.labels)

    def iter_setup(self):
        try:
            (images, labels) = self.iterator.next()
            self.feed_input(self.jpegs, images, onGPU=True)
            self.feed_input(self.labels, labels)
        except StopIteration:
            self.iterator = iter(self.external_data)
            raise StopIteration

In [15]:
eii = ExternalInputIterator(batch_size, 0, 1)
speedtest(ExternalSourcePipeline, batch_size, 1, eii)

TypeError: nvidia.dali.backend_impl.TensorGPU: No constructor defined!