In [34]:
## The numpydataloader is very slow, compared to the random generate directly sample on the large vector
## This notebook explore the concurrent dataloader to avoid the i/o bottle neck. 
## The old repo was saved on hdd, switching to ssd, the i/o is much faster

In [1]:
import torch.utils.data as data_utils
from torchvision import datasets, transforms
import torch
from transformers import AutoImageProcessor, AutoModel
from PIL import Image
import requests
from tqdm import tqdm
import numpy as np
import torch 
import os
from tqdm import tqdm
import pandas as pd
from time import time 
from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor, ALL_COMPLETED
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import wait 

In [2]:
train_df = pd.read_csv("../datasets/mnst_train.csv")
train_df["array_path"] = train_df["array_path"].apply(lambda x: "../" + x)
train_df.head(3)

Unnamed: 0,array_path,bag_length,target
0,../datasets/MNST_train/0/0/bagLength_027_targe...,27,1
1,../datasets/MNST_train/0/0/bagLength_038_targe...,38,0
2,../datasets/MNST_train/0/0/bagLength_022_targe...,22,0


In [3]:
np.random.seed(101)
sample_indices = np.random.randint(len(train_df), size=(100))
sample_indices

array([176991, 214539, 476497, 204614, 661055, 311895, 579581, 471115,
       937967, 753033, 672717, 153128, 349828,  35391, 250792, 323317,
       465212, 340700, 937070, 121861, 348637, 915496,  32817, 456531,
       864879, 615930, 649352, 315763, 755613, 871283, 280635, 251298,
       140844, 552776, 117011, 497278,  15436, 151263, 194155, 336215,
       621440, 874601, 746999,  77257, 858351, 761406, 396964, 676819,
       904291, 757020, 152383, 858631, 312202, 917940, 275896, 290407,
       802086, 252654, 920905, 712116, 169362, 200183,  50503, 838543,
       539948, 987648, 470644, 463210, 857228, 960785, 324344, 248011,
       455667, 468431, 956257, 597981, 276120, 973476, 392060, 659135,
       425235, 608754, 586147, 530974, 781322,  12787, 550844, 276071,
        52123, 855816,  83798, 371066, 173850,  58867, 274263, 274094,
       962607, 789430, 690917, 256214])

In [6]:
io_times = []
total_times = []
for index in tqdm(sample_indices):
    t0 = time()
    np_path = train_df.loc[index, "array_path"]
    label = train_df.loc[index, "target"]
    t1 = time()
    np_array = np.load(np_path)
    t2 = time()
    np.random.shuffle(np_array)
    bag_length = len(np_array)
    attention_mask = torch.ones((40), dtype=(torch.float32))
    if bag_length < 40:
        np_array = np.pad(
            np_array, ((0, 40 - bag_length), (0, 0)), "constant", constant_values=(0, 0)
        )
    else:
        np_array = np_array[: 40, :]
        attention_mask[40 :] = 0
    t3 = time()
    io_times.append(t2 - t1)
    total_times.append(t3 - t0)

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [00:00<00:00, 1124.13it/s]


In [7]:
np.mean(io_times)/np.mean(total_times), np.sum(io_times), np.sum(total_times)

(0.5939801464288706, 0.0523991584777832, 0.08821702003479004)

In [19]:
# I/O takes 99% of the time

In [20]:
def get_one_sample(index):
    np_path = train_df.loc[index, "array_path"]
    label = train_df.loc[index, "target"]
    np_array = np.load(np_path)
    bag_length = len(np_array)
    indices = np.arange(bag_length)
    np.random.shuffle(indices)
    np_array = np_array[indices]
    attention_mask = torch.ones((40), dtype=(torch.float32))
    if bag_length < 40:
        np_array = np.pad(
            np_array, ((0, 40 - bag_length), (0, 0)), "constant", constant_values=(0, 0)
        )
        attention_mask[bag_length:] = 0
    else:
        np_array = np_array[: 40, :]
        
    return torch.tensor(np_array), torch.tensor(label, dtype=torch.float32), attention_mask

In [21]:
def dataloader(batch_size=128, max_threads=100):
    n = 1024 * 4
    n_batch = (n + batch_size - 1) // batch_size
    all_indices = np.random.choice(np.arange(len(train_df)), n)
    with ThreadPoolExecutor(max_threads) as executor:
        # submit tasks and collect futures
        for batch_i in range(n_batch):
            futures = [executor.submit(get_one_sample, i) for i in all_indices[batch_i*batch_size: (batch_i+1) * batch_size]]
            inps = []
            labels = []
            masks = []
            for future in as_completed(futures):
                result = future.result()
                inp_, lab_, msk_ = result
                inps.append(inp_)
                labels.append(lab_)
                masks.append(msk_)
            yield torch.stack(inps), torch.stack(labels), torch.stack(masks)

In [22]:
t1 = time()
total_n = 0
inps1 = []
for inp_, lab_, mask_ in dataloader(batch_size=4096, max_threads=2048):
    total_n += len(inp_)
    inps1.append(inp_)
t2 = time()
t2 - t1, total_n

(39.07833433151245, 4096)

In [115]:
def dataloader_v2(batch_size=128, buffer_size=512, max_threads = 100):
    n = len(train_df.iloc[:1024 * 4])
    n_batch = (n + batch_size - 1) // batch_size
    i = 0
    buffer_i = 0
    with ThreadPoolExecutor(max_threads) as executor:
        # submit tasks and collect future
        futures_buffer = set(executor.submit(get_one_sample, i) for i in range(buffer_size))
        buffer_i = buffer_size 
        inps = []
        labels = []
        masks = []
        while(i < n):
            finished, futures_buffer = wait(futures_buffer, return_when=FIRST_COMPLETED)
            for future in finished:
                inp_, lab_, msk_ = future.result()
                inps.append(inp_)
                labels.append(lab_)
                masks.append(msk_)
                i += 1
                if(i % batch_size == 0):
                    print(i, len(inps))
                    yield torch.stack(inps), torch.stack(labels), torch.stack(masks)
                    inps = []
                    labels = []
                    masks = []
                    futures_buffer.update(executor.submit(get_one_sample, i) for i in range(buffer_i, buffer_i + batch_size))
                    buffer_i += batch_size

In [121]:
t1 = time()
total_n = 0
for inp_, lab_, mask_ in dataloader_v2(batch_size=1024, max_threads=800, buffer_size=2048):
    total_n += len(inp_)
t2 = time()
t2 - t1, total_n

1024 1024
2048 1024
3072 1024
4096 1024


(18.103395462036133, 4096)

In [129]:
def dataloader_v3(batch_size=128, max_threads=100):
    n = len(train_df.iloc[:1024 * 4])
    n_batch = (n + batch_size - 1) // batch_size
    all_indices = np.arange(n)
    with ProcessPoolExecutor(max_threads) as executor:
        # submit tasks and collect futures
        for batch_i in range(n_batch):
            futures = [executor.submit(get_one_sample, i) for i in all_indices[batch_i*batch_size: (batch_i+1) * batch_size]]
            inps = []
            labels = []
            masks = []
            for future in as_completed(futures):
                result = future.result()
                inp_, lab_, msk_ = result
                inps.append(inp_)
                labels.append(lab_)
                masks.append(msk_)
            yield torch.stack(inps), torch.stack(labels), torch.stack(masks)

In [130]:
t1 = time()
total_n = 0
for inp_, lab_, mask_ in dataloader_v3(batch_size=128, max_threads=50):
    total_n += len(inp_)
t2 = time()
t2 - t1, total_n

(11.550877571105957, 4096)

In [16]:
def dataloader_v4(batch_size=128, max_threads=100, max_bag_length=40):
    ## share mem
    def get_one_sample_shared(local_i, index):
        t0 = time()
        np_path = train_df.loc[index, "array_path"]
        label = train_df.loc[index, "target"]
        t1 = time()
        np_array = np.load(np_path)
        bag_length = len(np_array)
        t2 = time()
        indices = np.arange(bag_length)
        np.random.shuffle(indices)
        np_array = np_array[indices]
        t3 = time()
        if bag_length < 40:
            input_tensor[local_i,:bag_length, :] = torch.tensor(np_array)
            attention_mask[local_i, bag_length:] = 0
        else:
            input_tensor[local_i,:, :] = torch.tensor(np_array[: 40, :])
        label_tensor[local_i] = label 
        t4 = time()
        # print("load array takes %0.4f p1 takes %0.4f p3 takes %0.4f p4 takes %0.4f"%(t2 - t1, t1 - t0 , t3 - t2, t4 - t3))
        
    n = 1024 * 24
    n_batch = (n + batch_size - 1) // batch_size
    randint = np.random.randint(len(train_df))
    if randint + n < len(train_df): 
        all_indices = np.arange(randint, randint + n)
    else: 
        all_indices = np.arange(randint - n, randint)
    with ThreadPoolExecutor(max_threads) as executor:
        # submit tasks and collect futures
        for batch_i in range(n_batch):
            t0 = time()
            attention_mask = torch.ones((batch_size, max_bag_length), dtype=(torch.float32))
            input_tensor = torch.zeros(
                (batch_size, max_bag_length, 384),
                dtype=(torch.float32),
            )
            label_tensor = torch.zeros((batch_size), dtype=(torch.float32))
            t1 = time()
            # print("Create empty torch tensor takes %0.4f"%(t1 - t0))
            futures = [executor.submit(get_one_sample_shared, local_i, i) for local_i, i in enumerate(all_indices[batch_i*batch_size: (batch_i+1) * batch_size])]
            finished, remains = wait(futures, return_when=ALL_COMPLETED)
            t2 = time()
            # print("Load and Overwrite empty torch tensor takes %0.4f"%(t2 - t0))
            yield input_tensor, label_tensor, attention_mask

In [17]:
t1 = time()
total_n = 0
inps2 = []
masks2 = []
for inp_, lab_, mask_ in dataloader_v4(batch_size=2048, max_threads=1024):
    total_n += len(inp_)
    inps2.append(inp_)
    masks2.append(mask_)
t2 = time()
t2 - t1, total_n

(14.459701776504517, 24576)

In [18]:
# inps1_tensor = torch.concat(inps1)
inps2_tensor = torch.concat(inps2)
masks2 = torch.concat(masks2)

In [20]:
masks2[10], masks2[1]

(tensor([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0.]),
 tensor([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0.]))

In [23]:
def load_numpy(local_i, index):
    np_path = train_df.loc[index, "array_path"]
    np_array = np.load(np_path)
max_threads = 1024
max_bag_length = 40 
batch_size = 2048
t0 = time()
n = 1024 * 8
n_batch = (n + batch_size - 1) // batch_size
randint = np.random.randint(len(train_df))
if randint + n < len(train_df): 
    all_indices = np.arange(randint, randint + n)
else: 
    all_indices = np.arange(randint - n, randint)
with ThreadPoolExecutor(max_threads) as executor:
    for batch_i in range(n_batch):
        attention_mask = torch.ones((batch_size, max_bag_length), dtype=(torch.float32))
        input_tensor = torch.zeros(
            (batch_size, max_bag_length, 384),
            dtype=(torch.float32),
        )
        label_tensor = torch.zeros((batch_size), dtype=(torch.float32))
        futures = [executor.submit(load_numpy, local_i, i) for local_i, i in enumerate(all_indices[batch_i*batch_size: (batch_i+1) * batch_size])]
        finished, remains = wait(futures, return_when=ALL_COMPLETED)
t1 = time()
print("Load %i numpy takes  %0.4f secs"%(n, t1 - t0))

Load 8192 numpy takes  3.2522 secs


In [11]:
2048 * 4

8192