In [1]:
!nvidia-smi

Thu Nov 19 17:52:08 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 455.32.00    Driver Version: 455.32.00    CUDA Version: 11.1     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla V100-SXM2...  Off  | 00000000:1C:00.0 Off |                    0 |
| N/A   29C    P0    39W / 300W |      0MiB / 16160MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   1  Tesla V100-SXM2...  Off  | 00000000:1D:00.0 Off |                    0 |
| N/A   28C    P0    38W / 300W |      0MiB / 16160MiB |      0%      Default |
|       

In [2]:
# Standard Libraries
import os
import glob
import shutil
import nvidia_smi

# External Dependencies
import cupy as cp
import cudf
import dask_cudf
from dask_cuda import LocalCUDACluster
import dask
from dask.distributed import Client
from dask.utils import parse_bytes
from dask.delayed import delayed
import dask.dataframe as dd
import rmm

from pathlib import Path
import pandas as pd
import numpy as np

# NVTabular
import nvtabular as nvt
import nvtabular.ops as ops
from nvtabular.io import Shuffle
from nvtabular.utils import device_mem_size

import warnings
warnings.filterwarnings('ignore')

import logging

In [3]:
# home-credit-default-risk tables
if not Path("data/application_test.csv").is_file():
    %cd data
    ! wget https://www.dropbox.com/s/j9xwcj9ixki5t2l/home-credit-default-risk.zip?dl=0 -O data.zip
    ! unzip -q data.zip
    ! rm data.zip
# default-of-credit-card-clients-dataset
if not Path("data/default_ucr.csv").is_file():
    %cd data
    ! wget https://www.dropbox.com/s/lj0d7qez18ea7dx/UCI_Credit_Card.csv?dl=0 -O default_ucr.csv
    %cd ..

In [4]:
# Read in the source datasets
dict_ = {
    'datasets':[
                # default-of-credit-card-clients-datasets
                pd.read_csv('./data/default_ucr.csv'),
    ],

    'name_dropped_columns':
                [
                 # default-of-credit-card-clients-datasets
                 ['ID', 'default.payment.next.month']
                 #['default.payment.next.month'] # 'ID' is needed for shuffling
    ],   
}

# Keep ID and target columns separately
dict_['dropped_columns'] = [dict_['datasets'][i][dict_['name_dropped_columns'][i]] for i in range(len(dict_['datasets']))]

# Drop ID and target columns from the tables
dict_['datasets'] = [dict_['datasets'][i].drop(dict_['name_dropped_columns'][i], axis=1) for i in range(len(dict_['datasets']))]

In [5]:
N_D = 0
X, X_rest = dict_['datasets'][N_D], dict_['dropped_columns'][N_D]

#from fencoding_CPUs import FEncoding
#f_dict = FEncoding().initialize_types(X, return_dtype=False)
#f_dict

In [6]:
def date_replace_(self, X):
    # TODO: dask realization is needed 
    def pars_date(x):
        fmts = ('%Y', '%b %d, %Y','%b %d, %Y','%B %d, %Y','%B %d %Y','%m/%d/%Y','%m/%d/%y','%b %Y','%B%Y','%b %d,%Y', 
                  '%d.%m.%Y', '%Y.%m.%d', '%d-%m-%Y', '%Y-%m-%d %H:%M:%S')
        t = True
        if str(x.dtype) == 'object':
            for fmt in fmts:
                try:
                    return pd.Series([dt.datetime.strptime(str(x.iloc[i]), fmt) for i in range(len(x))]).apply(lambda q: q.strftime('%m/%d/%Y')).astype('datetime64[ns]')
                    t = False
                    break 
                except ValueError:
                    pass
        if t and (len(str(x.iloc[0])) > 9) and (len(str(x.iloc[0])) <= 14): 
        # TODO: better condition on string to identify that it is unix timestep
            try:
                x = x.astype('float')
                return pd.Series([dt.datetime.fromtimestamp(x.iloc[i]) for i in range(len(x))]).apply(lambda q: q.strftime('%m/%d/%Y')).astype('datetime64[ns]')
            except ValueError:
                pass
    for column in X.columns:
        x = pars_date(X[column])
        try: 
            x.nunique()
            X[column] = x
        except AttributeError:
            pass
    return X

In [7]:
class FEncoding_advanced(object):   
    def __init__(self, n_gpus=-1, device_spill_frac=0.8):    
        '''
        device_spill_frac: Spill GPU-Worker memory to host at this limit. Reduce if spilling fails to prevent device memory errors.
        '''
        global cliet, cluster
        # Deploy a Single-Machine Multi-GPU Cluster
        if n_gpus == -1:
            nvidia_smi.nvmlInit()
            n_gpus_avail = nvidia_smi.nvmlDeviceGetCount()
            print('\n n_gpus_avail: {}'.format(n_gpus_avail))
            n_gpus = n_gpus_avail
        # Delect devices to place workers
        visible_devices = [i for i in list(range(n_gpus))]
        visible_devices = str(visible_devices)[1:-1]
        #print('visible_devices: {}'.format(visible_devices))
        
        
        #TODO: how to reinitialzed cluster
        cluster = LocalCUDACluster(
            protocol = "tcp", # "tcp" or "ucx"
            CUDA_VISIBLE_DEVICES = visible_devices,
            device_memory_limit = device_spill_frac * device_mem_size(kind="total"),
        )
        
        try:
            # Create the distributed client
            client = Client(cluster)
            display(client)
            print('\n Dashboard avail: http://localhost:8888/proxy/8787/status')
            # Initialize RMM pool on ALL workers
            def _rmm_pool():
                rmm.reinitialize(
                    pool_allocator=True,
                    initial_pool_size=None, # Use default size
                )         
            client.run(_rmm_pool)  
        
        except MemoryError:
            print('\n The client is already initialized')
        
        self.n_gpus = n_gpus
        self.client = client
        self.output_path="./parquet_data_tmp"
        
        # Regarding intialization part
        self.categor_types = ['category', 'object', 'bool', 'int32', 'int64', 'int8']
        self.numer_types = ['float', 'float32', 'float64']
        self.time_types = ['datetime64[ns]', 'datetime64[ns, tz]'] 
        # What else? https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html
        # TODO: check if there are any other time types
        
    def elim_empty_columns(self, X):
        # GPU version
        ddf = dd.from_pandas(X, npartitions=self.n_gpus)
        cols_to_drop = []
        for column in ddf.columns:
            if len(ddf[column].unique().compute().values) < 2:
                cols_to_drop.append(column)
        print('\n dropped columns:', cols_to_drop)
        return ddf.drop(cols_to_drop, axis=1).compute()  
    
    def initialize_types(self, X, return_dtype=False):
        self.categor_columns, self.numer_columns, self.time_columns = [], [], []
        # Sometimes categorical feature can be presented with a float type. Let's check for that  
        for column in X.columns:
            c_type = str(X[column].dtype) 
            if any(c_type == t for t in self.numer_types):
                unique_values = list(np.unique(X[column][~np.isnan(X[column])]))
                if np.array([el.item().is_integer() for el in unique_values]).sum() == len(unique_values):
                    #print('\n {} has type {} and number of unique values: {}, will be considered as a categorical \n'.format(column, c_type, len(unique_values)))
                    #logging.info(f"{column} has type {c_type} and number of unique values: {len(unique_values)}, will be considered as a categorical")
                    self.categor_columns.append(column)
                else:
                    self.numer_columns.append(column)
            if any(c_type == t for t in self.categor_types):
                self.categor_columns.append(column)
            if any(c_type == t for t in self.time_types):
                self.time_columns.append(column)                             
        out_dict =  {'categor_columns': self.categor_columns,
                'numer_columns': self.numer_columns,
                'time_columns': self.time_columns,                    
         }
        if return_dtype:
            out_dict.update(
                {'categor_columns_dtypes': [str(X[self.categor_columns].dtypes.values[i]) for i in range(len(self.categor_columns))],
                 'numer_columns_dtypes': [str(X[self.numer_columns].dtypes.values[i]) for i in range(len(self.numer_columns))],
                 'time_columns_dtypes': [str(X[self.time_columns].dtypes.values[i]) for i in range(len(self.time_columns))],                    
             })
        return out_dict
    
    def nvtabular(self, X, filename = None):
        f_dict = fencoding.initialize_types(X,  return_dtype=False)
        dataset = nvt.Dataset(X)
        
        # Initalize our Workflow
        workflow = nvt.Workflow(cat_names=self.categor_columns, 
                        cont_names=self.numer_columns,
                        label_name=[],
                        client=self.client
                       )
        
        tmp_output_path="./parquet_data_tmp"
        
        # Operators: https://nvidia.github.io/NVTabular/main/api/ops/index.html

        workflow.add_preprocess(
            #TODO: change in OutlDetect 
            ops.Clip(0, 10, columns=f_dict['categor_columns'])#min_value=None, max_value=None, columns=f_dict['numer_columns'], replace=True)

            #TODO: change in encode_categor
            #ops.TargetEncoding(cat_groups=f_dict['categor_columns'],
             #                  cont_target=None),

            #TODO: chenge in tree-based models, nana will be filled in with max values (or zeros)
            #ops.FillMissing(fill_val=0, columns=f_dict['categor_columns'] + f_dict['numer_columns'], replace=True),
        )

        workflow.add_preprocess(
            ops.Categorify(10)
        )

        #workflow.add_preprocess(
        #    ops.FillMedian()#columns=f_dict['categor_columns'], preprocessing=True, replace=True)

        #)
        
        
        
        workflow.finalize()
        
        workflow.apply(
            dataset,
             output_format="parquet",
             output_path=tmp_output_path,
             shuffle=Shuffle.PER_WORKER,  # Shuffle algorithm
             out_files_per_proc=8, # Number of output files per worker
        )
        files = glob.glob(tmp_output_path + "/*.parquet")
        X_final = cudf.read_parquet(files[0])
        for i in range(1, len(files)):    
            X_final = X_final.append(cudf.read_parquet(files[i]))      
        
        # Delete temporary files
        shutil.rmtree(tmp_output_path, ignore_errors=True)
        shutil.rmtree('dask-worker-space', ignore_errors=True)
        
        if filename is not None: 
            X_final.to_csv('./data/' + filename, index=False)
            
        return X_final 

In [8]:
fencoding = FEncoding_advanced()


 n_gpus_avail: 2


0,1
Client  Scheduler: tcp://127.0.0.1:44447  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 404.32 GB



 Dashboard avail: http://localhost:8888/proxy/8787/status


In [9]:
fencoding.elim_empty_columns(X)


 dropped columns: []


Unnamed: 0,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,...,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6
0,20000.0,2,2,1,24,2,2,-1,-1,-2,...,689.0,0.0,0.0,0.0,0.0,689.0,0.0,0.0,0.0,0.0
1,120000.0,2,2,2,26,-1,2,0,0,0,...,2682.0,3272.0,3455.0,3261.0,0.0,1000.0,1000.0,1000.0,0.0,2000.0
2,90000.0,2,2,2,34,0,0,0,0,0,...,13559.0,14331.0,14948.0,15549.0,1518.0,1500.0,1000.0,1000.0,1000.0,5000.0
3,50000.0,2,2,1,37,0,0,0,0,0,...,49291.0,28314.0,28959.0,29547.0,2000.0,2019.0,1200.0,1100.0,1069.0,1000.0
4,50000.0,1,2,1,57,-1,0,-1,0,0,...,35835.0,20940.0,19146.0,19131.0,2000.0,36681.0,10000.0,9000.0,689.0,679.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
29995,220000.0,1,3,1,39,0,0,0,0,0,...,208365.0,88004.0,31237.0,15980.0,8500.0,20000.0,5003.0,3047.0,5000.0,1000.0
29996,150000.0,1,3,2,43,-1,-1,-1,-1,0,...,3502.0,8979.0,5190.0,0.0,1837.0,3526.0,8998.0,129.0,0.0,0.0
29997,30000.0,1,2,2,37,4,3,2,-1,0,...,2758.0,20878.0,20582.0,19357.0,0.0,0.0,22000.0,4200.0,2000.0,3100.0
29998,80000.0,1,3,1,41,1,-1,0,0,0,...,76304.0,52774.0,11855.0,48944.0,85900.0,3409.0,1178.0,1926.0,52964.0,1804.0


In [10]:
fencoding.initialize_types(X, return_dtype=True)

{'categor_columns': ['LIMIT_BAL',
  'SEX',
  'EDUCATION',
  'MARRIAGE',
  'AGE',
  'PAY_0',
  'PAY_2',
  'PAY_3',
  'PAY_4',
  'PAY_5',
  'PAY_6',
  'BILL_AMT1',
  'BILL_AMT2',
  'BILL_AMT3',
  'BILL_AMT4',
  'BILL_AMT5',
  'BILL_AMT6',
  'PAY_AMT1',
  'PAY_AMT2',
  'PAY_AMT3',
  'PAY_AMT4',
  'PAY_AMT5',
  'PAY_AMT6'],
 'numer_columns': [],
 'time_columns': [],
 'categor_columns_dtypes': ['float64',
  'int64',
  'int64',
  'int64',
  'int64',
  'int64',
  'int64',
  'int64',
  'int64',
  'int64',
  'int64',
  'float64',
  'float64',
  'float64',
  'float64',
  'float64',
  'float64',
  'float64',
  'float64',
  'float64',
  'float64',
  'float64',
  'float64'],
 'numer_columns_dtypes': [],
 'time_columns_dtypes': []}

In [10]:
fencoding.nvtabular(X, filename = None)