# Objective

In this notebook, I have optimised some of the preprocessing methods that you can find [here](https://www.kaggle.com/code/raimondomelis/preprocessing-of-data-in-chunks-right-way). In the previous notebook I explained the theory, but now we will see how to perform preprocessing with the GPU accelerator. In particular, we are going to compare GPU P100 vs GPU T4 x2. 

**TIP:** never perform all preprocessing on the GPU (in Kaggle it is limited), but it is convenient to also use the CPU simultaneously.

# Calculations performed

I performed some of the standard feature engineering calculations: **count, last, nunique**

# Importing Libraries

In [1]:
import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

/kaggle/input/talkingdata-adtracking-fraud-detection/sample_submission.csv
/kaggle/input/talkingdata-adtracking-fraud-detection/train_sample.csv
/kaggle/input/talkingdata-adtracking-fraud-detection/test_supplement.csv
/kaggle/input/talkingdata-adtracking-fraud-detection/train.csv
/kaggle/input/talkingdata-adtracking-fraud-detection/test.csv


In [2]:
import gc
import operator as op
import numpy as np
import cupy as cp
import pandas as pd
from tqdm.auto import tqdm
import cudf
import time


import warnings 
warnings.filterwarnings('ignore')

# GPU P100 

In the right-hand panel, set P100 as GPU accelerator

In [3]:
#Dividing our dataset into N° parts
num_parts = 4
def read_preprocess_divide(num_parts):
    #wanted columns
    columns = ['ip', 'channel', 'click_time']
    dtypes = {
             'ip'      : 'int32',
             'channel' : 'int16',
             'click_time' : 'datetime64[us]',
             }
    df = cudf.read_csv('../input/talkingdata-adtracking-fraud-detection/train.csv', usecols=columns, dtype=dtypes)
    all_rows = len(df)
    chunk = all_rows//num_parts
    #sort the dataset by ip and reset the index
    df = df.sort_values(by=['ip', 'click_time']).reset_index(drop = True)
    return df, all_rows, chunk 
    

def window(df):
    #calculate the most common value with the "mode", and the "window"
    most_common = df['ip'].mode().values.tolist()[0]
    window = len(df[df['ip'] == most_common])+1
    return window

def feature_engineering(df,start,new_end):
    if new_end is not None:
        end = new_end+1
    else:
        end = None
    features = [c for c in list(df.columns) if c not in ['ip','click_time']]
    cat_function = ['count', 'last', 'nunique']    
    new_chunk = df[start:end].groupby('ip')[features].agg(cat_function)
    new_chunk.columns = ['_'.join(x) for x in new_chunk.columns]
    new_chunk.reset_index(inplace = True)
    diff_num_features = [f'diff_{col}' for col in features]
    df = df.to_pandas()
    ips = df[start:end]['ip'].values
    new_chunk_diff = df[start:end].groupby('ip')[features].diff().add_prefix('diff_')
    new_chunk_diff.insert(0,'ip',ips)
    new_chunk_diff = cudf.DataFrame(new_chunk_diff)
    new_chunk_diff = new_chunk_diff.groupby('ip')[diff_num_features].agg(cat_function)
    new_chunk_diff.columns = ['_'.join(x) for x in new_chunk_diff.columns]
    new_chunk_diff.reset_index(inplace = True)
    new_chunk = new_chunk.merge(new_chunk_diff, how = 'inner', on = 'ip')
    new_chunk = new_chunk.sort_values(by=['ip']).reset_index(drop = True)
    return new_chunk

In [4]:
%%time
df, all_rows, chunk = read_preprocess_divide(num_parts)

CPU times: user 5.1 s, sys: 3.75 s, total: 8.85 s
Wall time: 1min 8s


In [5]:
%%time
#function to select a safe window of rows
window = window(df)
#new dataframe to append the results of the for loop
new_df=cudf.DataFrame()
#set start = 0
start = 0
for p in range(0,num_parts):
    end = p*chunk + chunk
    if end < all_rows:
        chunk_window = df[start:end].tail(window)
        second_last_unique = chunk_window['ip'].unique().values.tolist()[-2]
        new_end = chunk_window[chunk_window['ip'] == second_last_unique].tail(1).index[0]
        print(f"Processing {(new_end+1)-start} rows of chunk N° {p+1}")
        new_chunk = feature_engineering(df,start,new_end)
    else:
        print(f"Processing {all_rows-(new_end+1)} rows of chunk N° {p+1}")
        new_chunk = feature_engineering(df,start,None)
    start = new_end+1
    new_df = new_df.append(new_chunk, ignore_index=True)

Processing 46220468 rows of chunk N° 1
Processing 46231465 rows of chunk N° 2
Processing 46223993 rows of chunk N° 3
Processing 46227948 rows of chunk N° 4
CPU times: user 1min 56s, sys: 19.8 s, total: 2min 16s
Wall time: 2min 17s


In [6]:
new_df

Unnamed: 0,ip,channel_count,channel_last,channel_nunique,diff_channel_count,diff_channel_last,diff_channel_nunique
0,1,47,113,15,46,0.0,28
1,5,24,205,13,23,104.0,20
2,6,1454,127,88,1453,0.0,467
3,9,4029,21,106,4028,-114.0,667
4,10,1180,466,97,1179,221.0,422
...,...,...,...,...,...,...,...
277390,364773,15,113,9,14,0.0,11
277391,364774,3,213,1,2,0.0,1
277392,364775,24,330,15,23,223.0,16
277393,364776,309,280,75,308,179.0,169


# GPU T4 x2

In the right-hand panel, set T4 x2 as GPU accelerator

# Importing Libraries

In [1]:
import gc
import operator as op
import numpy as np
import cupy as cp
import pandas as pd
from tqdm.auto import tqdm
import cudf
import time


import warnings 
warnings.filterwarnings('ignore')

In [2]:
#Dividing our dataset into N° parts
num_parts = 4
def read_preprocess_divide(num_parts):
    #wanted columns
    columns = ['ip', 'channel', 'click_time']
    dtypes = {
             'ip'      : 'int32',
             'channel' : 'int16',
             'click_time' : 'datetime64[us]',
             }
    df = cudf.read_csv('../input/talkingdata-adtracking-fraud-detection/train.csv', usecols=columns, dtype=dtypes)
    all_rows = len(df)
    chunk = all_rows//num_parts
    #sort the dataset by ip and reset the index
    df = df.sort_values(by=['ip', 'click_time']).reset_index(drop = True)
    return df, all_rows, chunk 
    

def window(df):
    #calculate the most common value with the "mode", and the "window"
    most_common = df['ip'].mode().values.tolist()[0]
    window = len(df[df['ip'] == most_common])+1
    return window

def feature_engineering(df,start,new_end):
    if new_end is not None:
        end = new_end+1
    else:
        end = None
    features = [c for c in list(df.columns) if c not in ['ip','click_time']]
    cat_function = ['count', 'last', 'nunique']    
    new_chunk = df[start:end].groupby('ip')[features].agg(cat_function)
    new_chunk.columns = ['_'.join(x) for x in new_chunk.columns]
    new_chunk.reset_index(inplace = True)
    diff_num_features = [f'diff_{col}' for col in features]
    df = df.to_pandas()
    ips = df[start:end]['ip'].values
    new_chunk_diff = df[start:end].groupby('ip')[features].diff().add_prefix('diff_')
    new_chunk_diff.insert(0,'ip',ips)
    new_chunk_diff = cudf.DataFrame(new_chunk_diff)
    new_chunk_diff = new_chunk_diff.groupby('ip')[diff_num_features].agg(cat_function)
    new_chunk_diff.columns = ['_'.join(x) for x in new_chunk_diff.columns]
    new_chunk_diff.reset_index(inplace = True)
    new_chunk = new_chunk.merge(new_chunk_diff, how = 'inner', on = 'ip')
    new_chunk = new_chunk.sort_values(by=['ip']).reset_index(drop = True)
    return new_chunk

In [3]:
%%time
df, all_rows, chunk = read_preprocess_divide(num_parts)

CPU times: user 7.44 s, sys: 4.11 s, total: 11.5 s
Wall time: 1min 34s


In [4]:
%%time
#function to select a safe window of rows
window = window(df)
#new dataframe to append the results of the for loop
new_df=cudf.DataFrame()
#set start = 0
start = 0
for p in range(0,num_parts):
    end = p*chunk + chunk
    if end < all_rows:
        chunk_window = df[start:end].tail(window)
        second_last_unique = chunk_window['ip'].unique().values.tolist()[-2]
        new_end = chunk_window[chunk_window['ip'] == second_last_unique].tail(1).index[0]
        print(f"Processing {(new_end+1)-start} rows of chunk N° {p+1}")
        new_chunk = feature_engineering(df,start,new_end)
    else:
        print(f"Processing {all_rows-(new_end+1)} rows of chunk N° {p+1}")
        new_chunk = feature_engineering(df,start,None)
    start = new_end+1
    new_df = new_df.append(new_chunk, ignore_index=True)

Processing 46220468 rows of chunk N° 1
Processing 46231465 rows of chunk N° 2
Processing 46223993 rows of chunk N° 3
Processing 46227948 rows of chunk N° 4
CPU times: user 2min 4s, sys: 22.6 s, total: 2min 27s
Wall time: 2min 28s


In [5]:
new_df

Unnamed: 0,ip,channel_count,channel_last,channel_nunique,diff_channel_count,diff_channel_last,diff_channel_nunique
0,1,47,113,15,46,0.0,28
1,5,24,205,13,23,104.0,20
2,6,1454,127,88,1453,0.0,467
3,9,4029,21,106,4028,-114.0,667
4,10,1180,466,97,1179,221.0,422
...,...,...,...,...,...,...,...
277390,364773,15,113,9,14,0.0,11
277391,364774,3,213,1,2,0.0,1
277392,364775,24,330,15,23,223.0,16
277393,364776,309,280,75,308,179.0,169


# RESULTS

**GPU P100**

- Read, preprocess and divide took **1min 8s**
- The processing with feature engineering took **2min 17s**

**GPU T4 x2**

- Read, preprocess and divide took **1min 34s**
- The processing with feature engineering took **2min 28s**


GPU P100 apparently seems more powerful, but let us remember that GPU T4 x2 was designed for image processing and neural networks. My advice for novices is to use only CPU and GPU P100 for tabular datasets, and GPU T4 x2 for images