<a href="https://colab.research.google.com/github/ShaswataJash/LargeDatasetHandling/blob/master/Incremental_min_max_calculation_for_large_dataset_using_Ray.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!uname -a
!python --version

Linux 5f5882c553b5 5.10.147+ #1 SMP Sat Dec 10 16:00:40 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux
Python 3.9.16


In [None]:
import torch
print(torch.__version__)

1.13.1+cu116


In [None]:
!df -h /dev/shm

Filesystem      Size  Used Avail Use% Mounted on
shm             5.7G     0  5.7G   0% /dev/shm


In [None]:
#https://stackoverflow.com/questions/7878707/how-to-unmount-a-busy-device
#for python multipprocessor, data across child process and main process are being shared through shared memory
#for pytorch Dataloader, shared memory requirement can be quite high
!sudo umount -l /dev/shm/ && sudo mount -t tmpfs -o rw,nosuid,nodev,noexec,relatime,size=9G shm /dev/shm

In [None]:
#refer: https://numpy.org/doc/stable/reference/global_state.html#madvise-hugepage-on-linux
!cat /sys/kernel/mm/transparent_hugepage/enabled
!cat /sys/kernel/mm/transparent_hugepage/defrag
!cat /sys/kernel/mm/transparent_hugepage/use_zero_page
!cat /sys/kernel/mm/transparent_hugepage/hpage_pmd_size

always [madvise] never
always defer defer+madvise [madvise] never
1
2097152


In [None]:
%env

In [None]:
#https://stackoverflow.com/questions/37890898/how-to-set-env-variable-in-jupyter-notebook
%env NUMPY_MADVISE_HUGEPAGE=1

env: NUMPY_MADVISE_HUGEPAGE=1


#Determine total availiable GPU memory

In [34]:
#ref: https://stackoverflow.com/questions/59567226/how-to-programmatically-determine-available-gpu-memory-with-tensorflow
import subprocess as sp
import os
def get_gpu_memory():
    command = "nvidia-smi --query-gpu=memory.free --format=csv"
    try:
        memory_free_info = sp.check_output(command.split()).decode('ascii').split('\n')[:-1][1:]
        memory_free_values = [int(x.split()[0]) for i, x in enumerate(memory_free_info)]
        return memory_free_values[0] * 1024 * 1024 # memory_free_values[0] is in MB, thus converting into bytes
    except Exception as e:
        print(e)
        return -1

#downloading kaggle competitions files

In [None]:
!pip install kaggle==1.5.12

In [None]:
%%python

import sys
import logging
import os
import subprocess

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:%(levelname)s:%(message)s')
logger = logging.getLogger('my_logger')
#handling of kaggle interaction
try:
    os.environ["KAGGLE_CONFIG_DIR"] = '/home' #kaggle.json file should be uploaded to /home location before executing this cell
    kaggle_write_cmd = "kaggle competitions download -c open-problems-multimodal"
    kaggle_write_call = subprocess.run(kaggle_write_cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    logger.info(kaggle_write_call.stdout)
    if kaggle_write_call.returncode != 0:
        logger.error("Error in kaggle download, errorcode=%s", kaggle_write_call.returncode)
        sys.stdout.flush()
        sys.exit("Forceful exit as kaggle download returned error")
except BaseException as err:
    logger.error("kaggle download related error", exc_info=True)
    sys.stdout.flush()
    sys.exit("Forceful exit as exception encountered while kaggle download")

In [None]:
!mkdir /content/drive/MyDrive/colab_exp_result/kaggle_data
!unzip /content/open-problems-multimodal.zip -d /content/drive/MyDrive/colab_exp_result/kaggle_data

We can mount Google drive in colab and can copy the kaggle competitions files there. This will help not to run kaggle download code everytime before start of the notebook - it can save lot of time. Instead, everytime we can directly copy the contents from drive into the local filesystem of the underneath VM hosting the notebook.

In [1]:
!nohup cp /content/drive/MyDrive/colab_exp_result/kaggle_data/* /mnt &

nohup: appending output to 'nohup.out'


In [4]:
!ls -l /mnt

total 28181072
-rw------- 1 root root  2418406934 Apr 20 03:14 evaluation_ids.csv
-rw------- 1 root root      551250 Apr 20 03:14 max_cite_inputs.txt
-rw------- 1 root root     5723550 Apr 20 03:14 max_multi_inputs.txt
-rw------- 1 root root      234920 Apr 20 03:14 metadata_cite_day_2_donor_27678.csv
-rw------- 1 root root     9770334 Apr 20 03:14 metadata.csv
-rw------- 1 root root      551250 Apr 20 03:14 min_cite_inputs.txt
-rw------- 1 root root     5723550 Apr 20 03:14 min_multi_inputs.txt
-rw------- 1 root root   843563244 Apr 20 03:15 sample_submission.csv
-rw------- 1 root root   307964530 Apr 20 03:15 test_cite_inputs_day_2_donor_27678.h5
-rw------- 1 root root  1704565845 Apr 20 03:15 test_cite_inputs.h5
-rw------- 1 root root  6473530657 Apr 20 03:16 test_multi_inputs.h5
-rw------- 1 root root  2498128492 Apr 20 03:17 train_cite_inputs.h5
-rw------- 1 root root    38539123 Apr 20 03:17 train_cite_targets.h5
-rw------- 1 root root 11334840656 Apr 20 03:20 train_multi_inputs.

#Installation of required software packages

In [3]:
!pip install h5py==3.8.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
#Ref: https://docs.h5py.org/en/stable/mpi.html
#check whether parallel version of h5py is availiable
!h5cc -showconfig

In [5]:
!pip install hdf5plugin~=2.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting hdf5plugin~=2.0
  Downloading hdf5plugin-2.3.2-py2.py3-none-manylinux2014_x86_64.whl (5.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.4/5.4 MB[0m [31m16.1 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: hdf5plugin
Successfully installed hdf5plugin-2.3.2


In [6]:
!pip install -U "ray[default]"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ray[default]
  Downloading ray-2.3.1-cp39-cp39-manylinux2014_x86_64.whl (58.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.6/58.6 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
Collecting aiosignal
  Downloading aiosignal-1.3.1-py3-none-any.whl (7.6 kB)
Collecting frozenlist
  Downloading frozenlist-1.3.3-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (158 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m158.8/158.8 kB[0m [31m11.6 MB/s[0m eta [36m0:00:00[0m
Collecting virtualenv>=20.0.24
  Downloading virtualenv-20.22.0-py3-none-any.whl (3.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m45.4 MB/s[0m eta [36m0:00:00[0m
Collecting aiohttp-cors
  Downloading aiohttp_cors-0.7.0-py3-none-any.whl (27 kB)
Collecting gpustat>=1.0.0
  Downloading g

#HDF5 handling common code

In [7]:
import h5py
import hdf5plugin #without importing this, decompression will not happen by h5py
def get_hdf5_dataset_value_key(hdf5_file, debug = 0):
    groups = []
    def node_visit(name):
        groups.append(name)
    
    hdf5_file.visit(node_visit)
    if debug>0: print(hdf5_file, groups)
    
    for g in groups:
        shape = hdf5_file[g].shape if isinstance(hdf5_file[g], h5py._hl.dataset.Dataset) else None
        if debug>0: print(g, type(hdf5_file[g]), shape)
        if (not shape is None) and (len(shape) == 2):
            return g
    
    return None

def get_hdf5_dataset_with_specific_shape(hdf5_file, size, debug = 0):
    groups = []
    def node_visit(name):
        groups.append(name)
    
    hdf5_file.visit(node_visit)
    if debug>0: print(hdf5_file, groups)
    
    for g in groups:
        shape = hdf5_file[g].shape if isinstance(hdf5_file[g], h5py._hl.dataset.Dataset) else None
        if debug>0: print(g, type(hdf5_file[g]), shape)
        if (not shape is None) and (len(shape) == 1) and (shape[0] == size):
            return g
    
    return None

def get_hdf5_info(hdf5_file):
    print('root-group file-object name:', hdf5_file.name)
    def print_keys(gr, level):
        keys = list(gr.keys())
        for k in keys:
            
            if isinstance(gr[k], h5py._hl.group.Group):
                print('->'*level, k, gr[k])
                print_keys(gr[k], level + 1)
            elif isinstance(gr[k], h5py._hl.dataset.Dataset):
                print('->'*level, k, gr[k], 'dtype=', gr[k].dtype , 'size=', gr[k].size, 'nbytes=', gr[k].nbytes, 
                      'maxshape=', gr[k].maxshape, 'chunks=', gr[k].chunks)

    print_keys(hdf5_file, 1)



In [8]:
import h5py
import hdf5plugin #without importing this, decompression will not happen by h5py
print('============= TRAIN MULTI INPUT ====================')
train_multi_input_file = h5py.File('/mnt/train_multi_inputs.h5') # HDF5 file
get_hdf5_info(train_multi_input_file)
train_multi_input_file.close()
del train_multi_input_file
print('============= TEST MULTI INPUT ====================')
test_multi_input_file = h5py.File('/mnt/test_multi_inputs.h5') # HDF5 file
get_hdf5_info(test_multi_input_file)
test_multi_input_file.close()
del test_multi_input_file

root-group file-object name: /
-> train_multi_inputs <HDF5 group "/train_multi_inputs" (4 members)>
->-> axis0 <HDF5 dataset "axis0": shape (228942,), type "|S26"> dtype= |S26 size= 228942 nbytes= 5952492 maxshape= (228942,) chunks= (2520,)
->-> axis1 <HDF5 dataset "axis1": shape (105942,), type "|S12"> dtype= |S12 size= 105942 nbytes= 1271304 maxshape= (105942,) chunks= (5461,)
->-> block0_items <HDF5 dataset "block0_items": shape (228942,), type "|S26"> dtype= |S26 size= 228942 nbytes= 5952492 maxshape= (228942,) chunks= (2520,)
->-> block0_values <HDF5 dataset "block0_values": shape (105942, 228942), type "<f4"> dtype= float32 size= 24254573364 nbytes= 97018293456 maxshape= (105942, 228942) chunks= (1, 228942)
root-group file-object name: /
-> test_multi_inputs <HDF5 group "/test_multi_inputs" (4 members)>
->-> axis0 <HDF5 dataset "axis0": shape (228942,), type "|S26"> dtype= |S26 size= 228942 nbytes= 5952492 maxshape= (228942,) chunks= (2520,)
->-> axis1 <HDF5 dataset "axis1": shap

In [9]:
import h5py
import hdf5plugin #without importing this, decompression will not happen by h5py
train_mult_input_file = h5py.File('/mnt/train_multi_inputs.h5') # HDF5 file
hdf5_input_key = get_hdf5_dataset_value_key(train_mult_input_file, debug=1)

<HDF5 file "train_multi_inputs.h5" (mode r)> ['train_multi_inputs', 'train_multi_inputs/axis0', 'train_multi_inputs/axis1', 'train_multi_inputs/block0_items', 'train_multi_inputs/block0_values']
train_multi_inputs <class 'h5py._hl.group.Group'> None
train_multi_inputs/axis0 <class 'h5py._hl.dataset.Dataset'> (228942,)
train_multi_inputs/axis1 <class 'h5py._hl.dataset.Dataset'> (105942,)
train_multi_inputs/block0_items <class 'h5py._hl.dataset.Dataset'> (228942,)
train_multi_inputs/block0_values <class 'h5py._hl.dataset.Dataset'> (105942, 228942)


In [None]:
hdf5_col_name_key = get_hdf5_dataset_with_specific_shape(train_mult_input_file, 228942, debug=1)
cols = train_mult_input_file[hdf5_col_name_key]
print(cols.shape)
from tqdm import tqdm
col_name = []
for c_id in tqdm(range(cols.shape[0])):
    col_name.append(str(cols[c_id], 'UTF-8'))

In [26]:
%load_ext autoreload
%autoreload 2

*   https://luis-sena.medium.com/sharing-big-numpy-arrays-across-python-processes-abf0dc2a0ab2 (why ray with shared object store is best sol)
*   Ref: https://towardsdatascience.com/histogram-on-function-space-4a710241f026
*   Ref: https://stackoverflow.com/questions/71844846/is-there-a-faster-way-to-get-correlation-coefficents (fast corr-coef)



# Global min and max determination of the raw-inputs (will be used for min-max normalization of the data)

In [42]:
import os
import traceback
import ray

#@ray.remote(num_cpus=0.5, num_gpus=0.25)
#@ray.remote(num_cpus=0.5)
@ray.remote
class RawInputDataset:

    import h5py
    import hdf5plugin #without importing this, decompression will not happen by h5py
    import numpy as np
    import torch

    def __init__(self, hdf5_input_path, batch_size=1, debug=0):
        self.inited = False
        if debug>0: print('hdf5_input_path:', hdf5_input_path, 'batch_size:', batch_size, 'debug:', debug)

        assert batch_size >= 1

        self.hdf5_input_path = hdf5_input_path
        self.batch_size=batch_size
        self.debug = debug
        
        self.hdf5_input = h5py.File(self.hdf5_input_path, 'r', driver='stdio')
        hdf5_input_key = get_hdf5_dataset_value_key(self.hdf5_input)
        self.hdf5_dataset = self.hdf5_input[hdf5_input_key]
        self.len = self.hdf5_dataset.shape[0]

        self.cuda_device = torch.device("cuda:0" if torch.cuda.is_available and (get_gpu_memory() > 0) else "cpu")
        self.input = np.zeros((batch_size,self.hdf5_dataset.shape[1]), dtype=self.hdf5_dataset.dtype)
        self.stat_id_consumed = []
        if self.debug>0: print('hdf5 file:', self.hdf5_input, 'hdf5 group:', hdf5_input_key, 'hdf5 dataset:', self.hdf5_dataset, 'id=', id(self.hdf5_input), flush=True)
        if self.debug>0: print('torch-device:', self.cuda_device, 'self.input:', self.input.shape)
        self.inited =True

    def is_inited(self):
        return self.inited
    
    # without '__reduce__', the instance is unserializable.
    '''
    def __reduce__(self):
        deserializer = RawInputDataset
        serialized_data = (self.hdf5_input_path, self.batch_size, self.debug)
        return deserializer, serialized_data
    '''

    def __len__(self): return self.len   

    def __getitem__(self, row):
        try:
            assert row < self.len
            input = self.hdf5_dataset[row]
            if self.debug>0:
                self.stat_id_consumed.append(row)
            if self.debug >4: print('type of input=', type(input) , 'shape=', input.shape, flush=True)
            return torch.from_numpy(input).detach().to(self.cuda_device)
        except Exception as e:
            print('Exception occurred in __getitem__:', e)
            traceback.print_exc()
            
        return None

    def get_batch(self, starting_row):
        try:
            assert starting_row < self.len
            end_row = min(starting_row + self.batch_size, self.len)            
            #input = self.hdf5_dataset[starting_row:end_row]
            input = self.input
            if input.shape[0] != (end_row - starting_row): #will happen for the last batch
                input = np.zeros(((end_row - starting_row),self.hdf5_dataset.shape[1]), dtype=self.hdf5_dataset.dtype)
            self.hdf5_dataset.read_direct(input, source_sel=np.s_[starting_row:end_row,:], dest_sel=None)
            if self.debug>0:
                self.stat_id_consumed.extend(range(starting_row, end_row))
            if self.debug >4: print('type of input=', type(input) , 'shape=', input.shape, flush=True)
            return torch.from_numpy(input).detach().to(self.cuda_device)
        except Exception as e:
            print('Exception occurred in get_batch():', e)
            traceback.print_exc()
            
        return None

    def find_min_max_on_batch(self, starting_row):
        try:
            data = self.get_batch(starting_row)
            local_min = torch.min(data, dim=0)[0] #we have to find min for each col (so reduction of dim=0)
            local_max = torch.max(data, dim=0)[0] #we have to find max for each col (so reduction of dim=0)    
            return (local_min, local_max)
        except Exception as e:
            print('Exception occurred in find_min_max_on_batch():', e)
            traceback.print_exc()

        return (None, None)

    def reset_stat(self):
        if self.debug <= 0:
           return
        self.stat_id_consumed.clear()    

    def __del__(self):
        self.hdf5_input.close()


In [None]:
!python3 -m pip install memray

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting memray
  Downloading memray-1.7.0-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (3.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m50.6 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: memray
Successfully installed memray-1.7.0


In [None]:
%load_ext memray

In [43]:
#%%memray_flamegraph --trace-python-allocators --follow-fork --native --leaks 
import math
import traceback
import gc
import os
import h5py
from tqdm import tqdm

import ray

def find_min_max():
    try:
        hdf5_input_file_path = '/mnt/train_multi_inputs.h5'
        hdf5_input = h5py.File(hdf5_input_file_path, 'r')
        hdf5_input_key = get_hdf5_dataset_value_key(hdf5_input)
        data_len = hdf5_input[hdf5_input_key].shape[0]
        first_elem = hdf5_input[hdf5_input_key][0]
        elem_size_in_bytes = first_elem.size * first_elem.itemsize
        print('first_elem.size:', first_elem.size, 'first_elem.itemsize:', first_elem.itemsize, 'elem_size_in_bytes:', elem_size_in_bytes)
        hdf5_input.close()
        del hdf5_input

        DEBUG_LEVEL = 1

        #optimal_batch_size = math.floor((1024 * 1024 * 1024) / elem_size_in_bytes) #max 1GB of numpy array 
        optimal_batch_size = math.floor((20 * 1024 * 1024) / elem_size_in_bytes) #max 20MB of numpy array 
        print('optimal_batch_size:', optimal_batch_size)

        print('number of cpu availiable:', os.cpu_count())

        while(gc.collect() > 0): pass #clean the memory as much as possible

        work_arg = []
        for s_row in range(0, data_len, optimal_batch_size):
        #for s_row in range(0, 6 * optimal_batch_size, optimal_batch_size):
            work_arg.append(s_row)
        print("total task required:", len(work_arg))
        
        min_max_actors = [RawInputDataset.remote(hdf5_input_file_path, optimal_batch_size, DEBUG_LEVEL) for _ in range(os.cpu_count() * 4)]
        
        for actor in min_max_actors:
            while(not ray.get(actor.is_inited.remote())): pass
            print(actor, 'is initialized')

        result_ids = []
        for id, w in enumerate(work_arg):
            result_ids.append(min_max_actors[id%len(min_max_actors)].find_min_max_on_batch.remote(w))

        min = None
        max = None
        with tqdm(total=len(result_ids)) as pbar:
            while len(result_ids):
                done_ids, result_ids = ray.wait(result_ids)
                for d in done_ids:
                    result = ray.get(d)
                    if not (min is None):
                        min = torch.minimum(min, result[0])
                        max = torch.maximum(max, result[1])
                    else:
                        min = result[0]
                        max = result[1]
                    pbar.update(1)     
            
        print('max.shape:', max.shape, 'min.shape:', min.shape)
        return max, min
    except Exception as e:
        print(e)
        traceback.print_exc()
    
if __name__ == '__main__':
    print('MAIN pid=', os. getpid())

    ray.shutdown()
    ray.init()
    assert ray.is_initialized()

    find_min_max()
    gc.collect()
    print(gc.get_stats())

    ray.shutdown()
    assert not ray.is_initialized()

MAIN pid= 154


2023-04-20 07:16:27,890	INFO worker.py:1544 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


first_elem.size: 228942 first_elem.itemsize: 4 elem_size_in_bytes: 915768
optimal_batch_size: 22
number of cpu availiable: 2
total task required: 4816
[2m[36m(RawInputDataset pid=67780)[0m hdf5_input_path: /mnt/train_multi_inputs.h5 batch_size: 22 debug: 1
[2m[36m(RawInputDataset pid=67781)[0m hdf5_input_path: /mnt/train_multi_inputs.h5 batch_size: 22 debug: 1
[2m[36m(RawInputDataset pid=67839)[0m hdf5_input_path: /mnt/train_multi_inputs.h5 batch_size: 22 debug: 1
[2m[36m(RawInputDataset pid=67839)[0m [Errno 2] No such file or directory: 'nvidia-smi'
[2m[36m(RawInputDataset pid=67839)[0m hdf5 file: <HDF5 file "train_multi_inputs.h5" (mode r)> hdf5 group: train_multi_inputs/block0_values hdf5 dataset: <HDF5 dataset "block0_values": shape (105942, 228942), type "<f4"> id= 140403600037872
[2m[36m(RawInputDataset pid=67839)[0m torch-device: cpu self.input: (22, 228942)
Actor(RawInputDataset, cde97d62c2900db299574ef801000000) is initialized
Actor(RawInputDataset, f15289186

100%|██████████| 4816/4816 [05:51<00:00, 13.71it/s]


max.shape: torch.Size([228942]) min.shape: torch.Size([228942])
[{'collections': 1073, 'collected': 167741, 'uncollectable': 0}, {'collections': 86, 'collected': 16065, 'uncollectable': 0}, {'collections': 48, 'collected': 2973, 'uncollectable': 0}]


In [None]:
np.savetxt('/content/drive/MyDrive/colab_exp_result/kaggle_data/max_multi_inputs.txt', max_m.numpy())
np.savetxt('/content/drive/MyDrive/colab_exp_result/kaggle_data/min_multi_inputs.txt', min_m.numpy())

In stead of calculating min max of the input, we can read it everytime from a saved location. This will save time in terms of rerunning the min-max finding algorithm.

In [None]:
max_multi = np.float32(np.loadtxt('/content/drive/MyDrive/colab_exp_result/kaggle_data/max_multi_inputs.txt'))
min_multi = np.float32(np.loadtxt('/content/drive/MyDrive/colab_exp_result/kaggle_data/min_multi_inputs.txt'))
print(max_multi.shape)
print(min_multi.shape)