In [1]:
from dask import dataframe as dd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
sns.set_style('whitegrid')

In [2]:
import pandas as pd
from pandas import DataFrame, Series
import pprint
import pickle
import gc


class DtypesConvert:

    def __init__(self, data):
        self.data = data
        self.memory_diff = None
        self.dtype_diff = None
        self.column_types = None

    def mem_usage(self, pandas_obj):
        if isinstance(pandas_obj, pd.DataFrame):
            usage_b = pandas_obj.memory_usage(deep=True).sum()
        else:  # we assume if not a df it's a series
            usage_b = pandas_obj.memory_usage(deep=True)
        usage_mb = round(usage_b/1024**2, 3)
        return usage_mb

    def check_object(self, object_df):
        converted_obj = DataFrame()
        for col in object_df.columns:
            num_unique_values = len(object_df[col].unique())
            num_total_values = len(object_df[col])
            if num_unique_values/num_total_values < 0.5:
                converted_obj.loc[:, col] = object_df.astype('category')
            else:
                converted_obj.loc[:, col] = object_df[col]
        return converted_obj

    def dtype_memory(self):
        optimized_df = self.data
        origin_memory_all = 0
        converted_memory_all = 0
        decline_ratio_all = 0
        memory_diff = DataFrame(columns=['before', 'after'])
        dtype_diff = []
        for dtype in ['int', 'float', 'object']:
            selected_dtype = self.data.select_dtypes(include=[dtype])
            if not selected_dtype.empty:
                mean_usage_mb = self.mem_usage(selected_dtype)
                print('Average memory usage for {} columns:{:03.2f}MB'.format(
                    dtype, mean_usage_mb))
                if dtype == 'int':
                    converted_df = selected_dtype.apply(
                        pd.to_numeric, downcast='unsigned')
                elif dtype == 'float':
                    converted_df = selected_dtype.apply(
                        pd.to_numeric, downcast='float')
                elif dtype == 'object':
                    converted_df = self.check_object(selected_dtype)
                optimized_df[converted_df.columns] = converted_df
                origin_memory = self.mem_usage(selected_dtype)
                converted_memory = self.mem_usage(converted_df)
                memory_diff.loc[dtype, 'before'] = origin_memory
                memory_diff.loc[dtype, 'after'] = converted_memory
                memory_diff.loc[dtype, 'decline_ratio(%)'] = round(
                    (origin_memory-converted_memory)/origin_memory*100, 2)
                decline_ratio = (
                    origin_memory - converted_memory)/origin_memory*100
                origin_memory_all += origin_memory
                converted_memory_all += converted_memory
                decline_ratio_all += decline_ratio
                print('--------------------------------------------------')
                print('Origin memory usage for {} columns:{:03.2f}'.format(
                    dtype, origin_memory))
                print('Converted memory usage for {} columns: {:03.2f}'.format(
                    dtype, converted_memory))
                print('Decline ratio:{:03.2f}%'.format(decline_ratio))
                compare_df = pd.concat(
                    [selected_dtype.dtypes, converted_df.dtypes], axis=1)
                compare_df.columns = ['before', 'after']
                compare_df = compare_df.apply(Series.value_counts)
                dtype_diff.append(compare_df)
                print(compare_df)
        dtype_diff = pd.concat(dtype_diff)
        self.memory_diff = memory_diff
        self.dtype_diff = dtype_diff
        print('---------------------------------------------------')
        print('Total origin memoty usage:{:03.2f}'.format(origin_memory_all))
        print('Total convert memotyusage:{:03.2f}'.format(
            converted_memory_all))
        print('Total decline ratio:{:03.2f}%'.format(decline_ratio_all))
        print('-------------------------------------------------------')
        print(memory_diff)
        print(dtype_diff)
        return optimized_df

    def dump_and_load_file(self, file_path, columns_types=None):
        if columns_types:
            with open(file_path, 'wb') as file:
                columns_types = pickle.dump(columns_types, file)
        else:
            with open(file_path, 'rb') as file:
                columns_types = pickle.load(file)
                return columns_types

    def dataframe_dtype_converted(self, optimized_df, file_path):
        dtypes = optimized_df.dtypes
        dtypes_col = dtypes.index
        dtypes_type = [i.name for i in dtypes.values]
        column_types = dict(zip(dtypes_col, dtypes_type))
        self.column_types = column_types
        self.dump_and_load_file('column_types.pkl', column_types)
        preview = {key: value for key, value in list(
            column_types.items())[:10]}
        pp = pprint.PrettyPrinter(indent=4)
        pp.pprint(preview)
        read_and_optimized = pd.read_table(file_path, dtype=column_types)
        print(self.mem_usage(read_and_optimized))
        print(read_and_optimized.info())
        column_types_load = self.dump_and_load_file('column_types.pkl')
        print(len(column_types_load))
        return read_and_optimized
file_path = '../data/train/train_1.txt'
reader = pd.read_table(file_path, chunksize=100)
data = reader.get_chunk()
C = DtypesConvert(data)
optimized_df = C.dtype_memory()
optimized_data = C.dataframe_dtype_converted(optimized_df, file_path)
C.column_types

Average memory usage for float columns:4.77MB
--------------------------------------------------
Origin memory usage for float columns:4.77
Converted memory usage for float columns: 2.39
Decline ratio:49.99%
         before   after
float32     NaN  6254.0
float64  6254.0     NaN
Average memory usage for object columns:0.01MB
--------------------------------------------------
Origin memory usage for object columns:0.01
Converted memory usage for object columns: 0.01
Decline ratio:0.00%
        before  after
object       1      1
---------------------------------------------------
Total origin memoty usage:4.78
Total convert memotyusage:2.39
Total decline ratio:49.99%
-------------------------------------------------------
       before  after  decline_ratio(%)
float   4.771  2.386             49.99
object  0.006  0.006              0.00
         before   after
float32     NaN  6254.0
float64  6254.0     NaN
object      1.0     1.0
{   'f1': 'float32',
    'f2': 'float32',
    'f3': 'flo

{'id': 'int64',
 'loan_dt': 'object',
 'label': 'float32',
 'tag': 'int64',
 'f1': 'float32',
 'f2': 'float32',
 'f3': 'float32',
 'f4': 'float32',
 'f5': 'float32',
 'f6': 'float32',
 'f7': 'float32',
 'f8': 'float32',
 'f9': 'int64',
 'f10': 'float32',
 'f11': 'int64',
 'f12': 'float32',
 'f13': 'int64',
 'f14': 'float32',
 'f15': 'float32',
 'f16': 'float32',
 'f17': 'float32',
 'f18': 'int64',
 'f19': 'float32',
 'f20': 'float32',
 'f21': 'float32',
 'f22': 'float32',
 'f23': 'float32',
 'f24': 'float32',
 'f25': 'float32',
 'f26': 'float32',
 'f27': 'float32',
 'f28': 'float32',
 'f29': 'float32',
 'f30': 'float32',
 'f31': 'float32',
 'f32': 'float32',
 'f33': 'float32',
 'f34': 'float32',
 'f35': 'float32',
 'f36': 'float32',
 'f37': 'float32',
 'f38': 'int64',
 'f39': 'float32',
 'f40': 'float32',
 'f41': 'float32',
 'f42': 'float32',
 'f43': 'float32',
 'f44': 'float32',
 'f45': 'float32',
 'f46': 'float32',
 'f47': 'float32',
 'f48': 'float32',
 'f49': 'float32',
 'f50': 'flo

In [3]:
from numba import jit
@jit
def gpu_max(a):
    return np.max(a, axis=1)

@jit
def gpu_min(a):
    return np.min(a, axis=1)

dd_sample= dd.from_pandas(optimized_df.transpose(), npartitions=4)
max_vals= dd_sample.map_partitions(gpu_max).compute()
min_vals= dd_sample.map_partitions(gpu_min).compute()
df_desc = pd.DataFrame({'min':min_vals, 'max':max_vals})
na_cnt = np.sum(df_desc['max'].isnull())
print('null columns: %d' % na_cnt)

num_cols = df_desc.loc[(df_desc.index!='loan_dt') & (~df_desc['max'].isnull()),'max'].values
pd.cut(num_cols,bins=[-1000,0,1,2,100,1e9],labels=['<0','0','1','2-100','>100']).value_counts()

null columns: 30


<0       1090
0        2181
1         335
2-100    1560
>100     1551
dtype: int64

In [4]:
df_desc = df_desc[~df_desc['max'].isna()]
df_desc[df_desc['min']==df_desc['max']]

Unnamed: 0,min,max
f1000,0,0
f1002,0,0
f1015,0,0
f1019,0,0
f102,0,0
f1020,0,0
f1022,0,0
f1026,0,0
f1030,0,0
f1034,0,0


In [5]:
train = dd.read_table('../data/train/train*.txt')

In [79]:
df_desc.index

Index(['f1', 'f10', 'f100', 'f1000', 'f1001', 'f1002', 'f1003', 'f1004',
       'f1005', 'f1006',
       ...
       'f993', 'f994', 'f995', 'f996', 'f997', 'f998', 'f999', 'id', 'label',
       'tag'],
      dtype='object', length=6718)

In [82]:
train[list(df_desc.index[-126:])].max().compute()

f889     7.140000e-02
f89      5.840000e+02
f890     1.000000e+00
f891     4.500000e+01
f892     1.000000e+00
f893     1.325100e+04
f894     2.000000e+00
f895     0.000000e+00
f896     1.000000e+00
f897     1.950000e+02
f898     6.200000e+01
f899     1.000000e+00
f9       4.800000e+01
f90      1.367131e+10
f900     3.000000e+00
f901     2.550000e+02
f902     1.390000e+02
f903     8.700000e-03
f904     1.000000e+00
f905     1.000000e+00
f906     3.785967e+03
f907     1.000000e+00
f908     5.690000e+02
f909     6.557454e+00
f91      1.000000e+00
f910     4.714045e-01
f911     8.976000e+03
f912     3.244000e+03
f913     2.873028e+01
f914     1.840000e+02
             ...     
f975     1.000000e+00
f976     1.700000e+01
f977     1.500000e+01
f978     1.000000e+00
f979     6.875000e-01
f98      2.408484e+03
f980     1.414214e+00
f981     5.000000e-01
f982     8.528685e+06
f983     1.000000e+00
f984     2.800000e+01
f985     1.000000e+00
f986     8.296131e+04
f987     3.000000e+04
f988     3

In [None]:
train.to_parquet('../data/train/')