# LIBRARY LOADS

In [2]:
import sys
sys.executable

'/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/bin/python3'

In [3]:
import pandas as pd
from pandas import Series, DataFrame
import numpy as np
from scipy import sparse

from lightautoml.reader.cudf_reader import CudfReader
from lightautoml.reader.hybrid_reader import HybridReader
from lightautoml.reader.base import PandasToPandasReader
from lightautoml.tasks import Task

import cudf
import dask_cudf

from numba import cuda
import cupy as cp

from lightautoml.transformers.base import SequentialTransformer, UnionTransformer


from lightautoml.transformers import numeric_gpu, categorical_gpu, datetime_gpu
from lightautoml.transformers import numeric, categorical, datetime

from lightautoml.pipelines.utils import get_columns_by_role

from dask.distributed import Client
from dask_cuda import LocalCUDACluster

import os

In [4]:
from lightautoml.reader.daskcudf_reader import DaskCudfReader

## synthetic data generation

In [None]:
from time import perf_counter

import numpy as np
import pandas as pd

import random

from numba import jit
import string

RANDS_CHARS = np.array(list(string.ascii_letters + string.digits),
                       dtype=(np.str_, 1))


@jit(nopython=True)
def gen_cols(n_cols):
    cols = [""]*n_cols
    for i in range(n_cols):
        cols[i] = "col_" + str(i)
    return cols

def gen_string_data(n, n_str):
    string_db = ["algorithm", "analog", "app", "application", "array",
                 "backup", "bandwidth", "binary", "bit", "byte"]#,
                 #"bitmap", "blog", "bookmark", "boot", "broadband",
                 #"browser" , "buffer", "bug"]
    inds = np.random.randint(0, len(string_db), (n, n_str))
    output = np.empty(inds.shape, dtype=object)
    for i in range(inds.shape[0]):
        for j in range(inds.shape[1]):
            output[i][j] = string_db[inds[i][j]]

    return output

def generate_data(n, n_num, n_cat, n_date, n_str, max_n_cat):
    print("Generating dummy data")
    n_cols = n_num+n_cat+n_str+n_date
    cols = gen_cols(n_cols)
    data = np.random.random((n, n_num))*100-50

    category_data = np.random.randint(0, np.random.randint(1,max_n_cat), (n, n_cat))
    string_data = gen_string_data(n, n_str)

    string_data = np.reshape(string_data, (n, n_str))

    date_data = np.random.randint(0, 1000,
                               (n, n_date)).astype(np.dtype("timedelta64[D]")) \
                               + np.datetime64("2018-01-01")

    data = pd.DataFrame(data, columns = cols[:n_num]).astype('f')
    
    ix = [(row, col) for row in range(data.shape[0]) for col in range(data.shape[1])]
    for row, col in random.sample(ix, int(round(.1*len(ix)))):
        data.iat[row, col] = np.nan
    
    nn = len(data.columns)
    for i in range(n_cat):
        data[cols[nn+i]] = pd.Series(category_data[:,i]).astype('f')
    nn = len(data.columns)
    for i in range(n_str):
        data[cols[nn+i]] = pd.Series(string_data[:,i]).astype(object)
    nn = len(data.columns)
    for i in range(n_date):
        data[cols[nn+i]] = pd.Series(date_data[:,i])

    data['col_m'] = pd.Series(np.zeros(n))
    data['col_n'] = pd.Series(np.zeros(n))
    data['TARGET'] = pd.Series(np.random.randint(0, 4, n)).astype('i')

    print("Shape of the dummy data:", data.shape)
    print("Size of the dummy data:",
          round(data.memory_usage(deep=True).sum()/1024./1024.,4), "MB.")
    return 'TARGET', cols, data

In [5]:
cluster = LocalCUDACluster(rmm_managed_memory=True,
                           CUDA_VISIBLE_DEVICES="0",
                           protocol="ucx", enable_nvlink=True,
                           memory_limit="8GB")

client = Client(cluster)
client.run(cudf.set_allocator, "managed")
#client.run(os.getpid)

distributed.preloading - INFO - Import preload module: dask_cuda.initialize


{'ucx://127.0.0.1:53793': None}

# DATA AND READERS PREPARATION

In [6]:
target, _, data = generate_data(n=40, n_num=4, n_cat=2, n_date=2,
                                    n_str=3, max_n_cat=10)

In [8]:
task = task = Task('binary')
adv_roles = True
parts = 1
reader = PandasToPandasReader(task, advanced_roles=adv_roles,
                              n_jobs=1)
gpu_reader = CudfReader(task, advanced_roles=adv_roles, n_jobs=1)
dd_reader = DaskCudfReader(task, advanced_roles=adv_roles, 
                           n_jobs=1, compute=False, npartitions=parts)

hy_reader = HybridReader(task, num_cpu_readers=1,
                         num_gpu_readers=2, gpu_ratio=0.6,
                         output='mgpu', advanced_roles=adv_roles,
                         npartitions=parts, n_jobs=1)

In [9]:
gpu_data = cudf.DataFrame.from_pandas(data, nan_as_null=False)
dd_data = dask_cudf.from_cudf(gpu_data, npartitions=parts)

# TIMING READERS

In [10]:
%%time
ds = reader.fit_read(data, roles = {'target': 'TARGET'})

CPU times: user 17 s, sys: 613 ms, total: 17.7 s
Wall time: 17.4 s


In [11]:
%%time
gpu_ds = gpu_reader.fit_read(gpu_data, roles = {'target': 'TARGET'})

CPU times: user 9.49 s, sys: 1.51 s, total: 11 s
Wall time: 12.4 s


In [12]:
%%time
dd_ds = dd_reader.fit_read(dd_data, roles = {'target': 'TARGET'})

CPU times: user 35.2 s, sys: 6.55 s, total: 41.8 s
Wall time: 1min 6s


# TIMING LABEL ENCODER

In [13]:
trf = categorical.LabelEncoder()
gpu_trf = categorical_gpu.LabelEncoder_gpu()
dd_trf = categorical_gpu.LabelEncoder_gpu()

cats = ds[:, get_columns_by_role(ds, 'Category')]
gpu_cats = gpu_ds[:, get_columns_by_role(gpu_ds, 'Category')]
dd_cats = dd_ds[:, get_columns_by_role(dd_ds, 'Category')]

print(cats.shape, gpu_cats.shape, dd_cats.shape)

(307511, 17) (307511, 17) (307511, 17)


In [None]:
%%timeit
enc = trf.fit_transform(cats)

In [None]:
%%timeit
enc = gpu_trf.fit_transform(gpu_cats)

In [None]:
%%timeit
enc = dd_trf.fit_transform(dd_cats)

# TIMING TARGET ENCODER

In [None]:
trf = SequentialTransformer(
    [categorical.LabelEncoder(), categorical.TargetEncoder()]
)
gpu_trf = SequentialTransformer(
    [categorical_gpu.LabelEncoder_gpu(), categorical_gpu.TargetEncoder_gpu()]
)
dd_trf = SequentialTransformer(
    [categorical_gpu.LabelEncoder_gpu(), categorical_gpu.TargetEncoder_gpu()]
)


In [None]:
%%timeit
enc = trf.fit_transform(cats)

In [None]:
%%timeit
enc = gpu_trf.fit_transform(gpu_cats)

In [None]:
%%timeit
enc = dd_trf.fit_transform(dd_cats)

# TIMING OTHER CATEGORICAL TRANSFORMERS

## FreqEncoder

In [None]:
trf = categorical.FreqEncoder()
gpu_trf = categorical_gpu.FreqEncoder_gpu()
dd_trf = categorical_gpu.FreqEncoder_gpu()

In [None]:
%%timeit
enc = trf.fit_transform(cats)

In [None]:
%%timeit
enc = gpu_trf.fit_transform(gpu_cats)

In [None]:
%%timeit
enc = dd_trf.fit_transform(dd_cats)

## OrdinalEncoder

In [None]:
trf = categorical.OrdinalEncoder()
gpu_trf = categorical_gpu.OrdinalEncoder_gpu()
dd_trf = categorical_gpu.OrdinalEncoder_gpu()

In [None]:
%%timeit
enc = trf.fit_transform(cats)

In [None]:
%%timeit
enc = gpu_trf.fit_transform(gpu_cats)

In [None]:
%%timeit
enc = dd_trf.fit_transform(dd_cats)

## OHEEncoder

In [None]:
trf = SequentialTransformer(
    [categorical.LabelEncoder(), categorical.OHEEncoder(make_sparse=True)]
)
gpu_trf = SequentialTransformer(
    [categorical_gpu.LabelEncoder_gpu(), categorical_gpu.OHEEncoder_gpu(make_sparse=False)]
)
dd_trf = SequentialTransformer(
    [categorical_gpu.LabelEncoder_gpu(), categorical_gpu.OHEEncoder_gpu(make_sparse=False)]
)

In [None]:
%%timeit
enc = trf.fit_transform(cats)

In [None]:
%%timeit
enc = gpu_trf.fit_transform(gpu_cats)

In [None]:
%%timeit
enc = dd_trf.fit_transform(dd_cats)

## CatIntersections

In [15]:
trf = SequentialTransformer(
    [categorical.LabelEncoder(), categorical.CatIntersectstions()]
)
gpu_trf = SequentialTransformer(
    [categorical_gpu.LabelEncoder_gpu(), categorical_gpu.CatIntersections_gpu()]
)
dd_trf = SequentialTransformer(
    [categorical_gpu.LabelEncoder_gpu(), categorical_gpu.CatIntersections_gpu()]
)

In [19]:
%%time
enc = trf.fit_transform(cats)

CPU times: user 1min 35s, sys: 2.8 s, total: 1min 38s
Wall time: 1min 37s


In [17]:
%%timeit
enc = gpu_trf.fit_transform(gpu_cats)

5.57 s ± 145 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [18]:
%%timeit
enc = dd_trf.fit_transform(dd_cats)

13.4 s ± 155 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


# TIMING DATETIME TRANSFORMERS

## TimeToNum

In [23]:
trf = datetime.TimeToNum()
gpu_trf = datetime_gpu.TimeToNum_gpu()
dd_trf = datetime_gpu.TimeToNum_gpu()

dats = ds[:, get_columns_by_role(ds, 'Datetime')]
gpu_dats = gpu_ds[:, get_columns_by_role(gpu_ds, 'Datetime')]
dd_dats = dd_ds[:, get_columns_by_role(dd_ds, 'Datetime')]

print(dats.shape, gpu_dats.shape, dd_dats.shape)

(307511, 2) (307511, 2) (307511, 2)


In [24]:
%%time
enc = trf.fit_transform(dats)

CPU times: user 12.4 ms, sys: 352 µs, total: 12.8 ms
Wall time: 13.7 ms


In [25]:
%%timeit
enc = gpu_trf.fit_transform(gpu_dats)

3.77 ms ± 105 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [27]:
%%timeit
enc = dd_trf.fit_transform(dd_dats)

ValueError: Metadata inference failed in `standardize_date`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
ValueError('Could not convert strings to integer type due to presence of non-integer values.')

Traceback:
---------
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/lib/python3.8/site-packages/dask/dataframe/utils.py", line 176, in raise_on_meta_error
    yield
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/lib/python3.8/site-packages/dask/dataframe/core.py", line 5676, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lightautoml/transformers/datetime_gpu.py", line 75, in standardize_date
    output = (data.astype(int) - mean) / std
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/lib/python3.8/site-packages/cudf/core/dataframe.py", line 1191, in astype
    result._data[col] = self._data[col].astype(
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/lib/python3.8/site-packages/cudf/core/column/column.py", line 881, in astype
    return self.as_numerical_column(dtype, **kwargs)
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/lib/python3.8/site-packages/cudf/core/column/string.py", line 5061, in as_numerical_column
    raise ValueError(


## BaseDiff

In [29]:
trf = datetime.BaseDiff(base_names=[dats.features[0]], diff_names=[dats.features[1]])
gpu_trf = datetime_gpu.BaseDiff_gpu(base_names=[dats.features[0]], diff_names=[dats.features[1]])
dd_trf = datetime_gpu.BaseDiff_gpu(base_names=[dats.features[0]], diff_names=[dats.features[1]])

In [30]:
%%time
enc = trf.fit_transform(dats)

CPU times: user 6.53 ms, sys: 965 µs, total: 7.5 ms
Wall time: 6.05 ms


In [31]:
%%timeit
enc = gpu_trf.fit_transform(gpu_dats)

7.97 ms ± 778 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [32]:
%%timeit
enc = dd_trf.fit_transform(dd_dats)

ValueError: Metadata inference failed in `standardize_date_concat`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
ValueError('Could not convert strings to integer type due to presence of non-integer values.')

Traceback:
---------
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/lib/python3.8/site-packages/dask/dataframe/utils.py", line 176, in raise_on_meta_error
    yield
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/lib/python3.8/site-packages/dask/dataframe/core.py", line 5676, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lightautoml/transformers/datetime_gpu.py", line 218, in standardize_date_concat
    output = (data[self.diff_names].astype(int).values.T - data[col].astype(int).values) / std
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/lib/python3.8/site-packages/cudf/core/dataframe.py", line 1191, in astype
    result._data[col] = self._data[col].astype(
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/lib/python3.8/site-packages/cudf/core/column/column.py", line 881, in astype
    return self.as_numerical_column(dtype, **kwargs)
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/lib/python3.8/site-packages/cudf/core/column/string.py", line 5061, in as_numerical_column
    raise ValueError(


## DateSeasons

In [33]:
trf = datetime.DateSeasons()
gpu_trf = datetime_gpu.DateSeasons_gpu()
dd_trf = datetime_gpu.DateSeasons_gpu()

In [34]:
%%time
enc = trf.fit_transform(dats)

CPU times: user 176 ms, sys: 3.93 ms, total: 180 ms
Wall time: 177 ms


In [35]:
%%timeit
enc = gpu_trf.fit_transform(gpu_dats)

5.66 ms ± 386 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [36]:
%%timeit
enc = dd_trf.fit_transform(dd_dats)

ValueError: Metadata inference failed in `datetime_to_seasons`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
AttributeError('Can only use .dt accessor with datetimelike values')

Traceback:
---------
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/lib/python3.8/site-packages/dask/dataframe/utils.py", line 176, in raise_on_meta_error
    yield
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/lib/python3.8/site-packages/dask/dataframe/core.py", line 5676, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lightautoml/transformers/datetime_gpu.py", line 371, in datetime_to_seasons
    vals = getattr(data[col].dt, _date_attrs[seas]).values.astype(cp.int32)
  File "/beegfs/home/r.zagidullin/RAPIDS_VERSIONS/LightAutoML_GPU/lama_venv/lib/python3.8/site-packages/cudf/core/series.py", line 376, in dt
    raise AttributeError(


# TIMING NUMERICAL TRANSFORMERS

## NaNFlags

In [39]:
trf = numeric.NaNFlags()
gpu_trf = numeric_gpu.NaNFlags_gpu()
dd_trf = numeric_gpu.NaNFlags_gpu()

nums = ds[:, get_columns_by_role(ds, 'Numeric')]
gpu_nums = gpu_ds[:, get_columns_by_role(gpu_ds, 'Numeric')]
dd_nums = dd_ds[:, get_columns_by_role(dd_ds, 'Numeric')]

print(nums.shape, gpu_nums.shape, dd_nums.shape)

(307511, 77) (307511, 88) (307511, 82)


In [40]:
%%time
enc = trf.fit_transform(nums)

CPU times: user 56.4 ms, sys: 21.2 ms, total: 77.6 ms
Wall time: 76.1 ms


In [41]:
%%timeit
enc = gpu_trf.fit_transform(gpu_nums)

78.3 ms ± 7.08 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [43]:
%%timeit
enc = dd_trf.fit_transform(dd_nums)

231 ms ± 2.98 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## FillnaMedian

In [45]:
trf = numeric.FillnaMedian()
gpu_trf = numeric_gpu.FillnaMedian_gpu()
dd_trf = numeric_gpu.FillnaMedian_gpu()

In [46]:
%%time
enc = trf.fit_transform(nums)

CPU times: user 346 ms, sys: 19.6 ms, total: 366 ms
Wall time: 360 ms


In [47]:
%%timeit
enc = gpu_trf.fit_transform(gpu_nums)

160 ms ± 8.37 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [48]:
%%timeit
enc = dd_trf.fit_transform(dd_nums)

3.13 s ± 125 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## FillInf

In [45]:
trf = numeric.FillInf()
gpu_trf = numeric_gpu.FillInf_gpu()
dd_trf = numeric_gpu.FillInf_gpu()

In [46]:
%%time
enc = trf.fit_transform(nums)

CPU times: user 346 ms, sys: 19.6 ms, total: 366 ms
Wall time: 360 ms


In [47]:
%%timeit
enc = gpu_trf.fit_transform(gpu_nums)

160 ms ± 8.37 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [48]:
%%timeit
enc = dd_trf.fit_transform(dd_nums)

3.13 s ± 125 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## LogOdds

In [45]:
trf = numeric.LogOdds()
gpu_trf = numeric_gpu.LogOdds_gpu()
dd_trf = numeric_gpu.LogOdds_gpu()

In [46]:
%%time
enc = trf.fit_transform(nums)

CPU times: user 346 ms, sys: 19.6 ms, total: 366 ms
Wall time: 360 ms


In [47]:
%%timeit
enc = gpu_trf.fit_transform(gpu_nums)

160 ms ± 8.37 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [48]:
%%timeit
enc = dd_trf.fit_transform(dd_nums)

3.13 s ± 125 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## StandardScaler

In [45]:
trf = numeric.StandardScaler()
gpu_trf = numeric_gpu.StandardScaler_gpu()
dd_trf = numeric_gpu.StandardScaler_gpu()

In [46]:
%%time
enc = trf.fit_transform(nums)

CPU times: user 346 ms, sys: 19.6 ms, total: 366 ms
Wall time: 360 ms


In [47]:
%%timeit
enc = gpu_trf.fit_transform(gpu_nums)

160 ms ± 8.37 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [48]:
%%timeit
enc = dd_trf.fit_transform(dd_nums)

3.13 s ± 125 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## QuantileBinning

In [45]:
trf = numeric.QuantileBinning()
gpu_trf = numeric_gpu.QuantileBinning_gpu()
dd_trf = numeric_gpu.QuantileBinning_gpu()

In [46]:
%%time
enc = trf.fit_transform(nums)

CPU times: user 346 ms, sys: 19.6 ms, total: 366 ms
Wall time: 360 ms


In [47]:
%%timeit
enc = gpu_trf.fit_transform(gpu_nums)

160 ms ± 8.37 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [48]:
%%timeit
enc = dd_trf.fit_transform(dd_nums)

3.13 s ± 125 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


# TIMING MULTICLASS TARGET ENCODER

In [None]:
task = task = Task('multiclass')
adv_roles = True
parts = 1
reader = PandasToPandasReader(task, advanced_roles=adv_roles,
                              n_jobs=1)
gpu_reader = CudfReader(task, advanced_roles=adv_roles, n_jobs=1)
dd_reader = DaskCudfReader(task, advanced_roles=adv_roles, 
                           n_jobs=1, compute=False, npartitions=parts)

hy_reader = HybridReader(task, num_cpu_readers=1,
                         num_gpu_readers=2, gpu_ratio=0.6,
                         output='mgpu', advanced_roles=adv_roles,
                         npartitions=parts, n_jobs=1)

In [None]:
data_len = data.shape[0]
data['TARGET'] = pd.Series(np.random.randint(0, 4, data_len)).astype('i')

In [None]:
gpu_data = cudf.DataFrame.from_pandas(data, nan_as_null=False)
dd_data = dask_cudf.from_cudf(gpu_data, npartitions=parts)

In [None]:
%%time
ds = reader.fit_read(data, roles = {'target': 'TARGET'})

In [None]:
%%time
gpu_ds = gpu_reader.fit_read(gpu_data, roles = {'target': 'TARGET'})

In [None]:
%%time
dd_ds = dd_reader.fit_read(dd_data, roles = {'target': 'TARGET'})

In [None]:
cats = ds[:, get_columns_by_role(ds, 'Category')]
gpu_cats = gpu_ds[:, get_columns_by_role(gpu_ds, 'Category')]
dd_cats = dd_ds[:, get_columns_by_role(dd_ds, 'Category')]

#cpu cats have zero cat columns
print(cats.shape, gpu_cats.shape, dd_cats.shape)

In [None]:
trf = SequentialTransformer(
    [categorical.LabelEncoder(), categorical.MultiClassTargetEncoder()]
)
gpu_trf = SequentialTransformer(
    [categorical_gpu.LabelEncoder_gpu(), categorical_gpu.MultiClassTargetEncoder_gpu()]
)
dd_trf = SequentialTransformer(
    [categorical_gpu.LabelEncoder_gpu(), categorical_gpu.MultiClassTargetEncoder_gpu()]
)


In [None]:
%%timeit
enc = gpu_trf.fit_transform(gpu_cats)

In [None]:
%%timeit
enc = dd_trf.fit_transform(dd_cats)