In [1]:
try:
    import numpy as np
    import pandas as pd
    from dask_ml.preprocessing import StandardScaler
    import gc
    import time
    import dask.dataframe as dd
    from dask.distributed import Client, progress # see the sys load
    
except: ImportError(), 'Some modules have not loaded'

In [2]:
# set our files path
test = 'test.tsv'
train = 'train.tsv'

In [129]:
class LoadBigCsvFile:
    '''load data from tsv, transform, scale, add two columns'''
    def __init__(self, train, test, scaler=StandardScaler(copy=False)):
        self.train = train
        self.test = test
        self.scaler = scaler
    def read_data(self):
        # use dask and read with c
        try:
            data_train = dd.read_csv(self.train, \
                                     dtype={n:'int16' for n in range(1, 300)}, engine='c').reset_index()
            data_test = dd.read_csv(self.test, \
                                    dtype={n:'int16' for n in range(1, 300)}, engine='c').reset_index()
        except: (IOError, OSError), 'can not open file'
            
#         assert data_train.isna()# no NaN
#         assert train.shape[1] == test.shape[1] # check shape of features
        assert len(data_test) != 0 and len(data_train) != 0 #if any data?
        # fit and transform
        self.scaler.fit(data_train.iloc[:,1:])
        test_transformed = self.scaler.transform(data_test.iloc[:,1:])
        # compute and add columns
        test_transformed['max_feature_2_abs_mean_diff'] = abs(test_transformed.mean(axis=1) - test_transformed.max(axis=1))
        test_transformed['max_feature_2_index'] = test_transformed.idxmin(axis=1)
        test_transformed['job_id'] = data_test.iloc[:,0]
        
        return data_train
    
data = LoadBigCsvFile(train, test, scaler=StandardScaler(copy=False)).read_data()
data.head(2)

Unnamed: 0,level_0,level_1,level_2,level_3,level_4,level_5,level_6,level_7,level_8,level_9,...,level_247,level_248,level_249,level_250,level_251,level_252,level_253,level_254,level_255,id_job\tfeatures
0,1864791934054678713\t2,9835,9999,9941,9945,9386,9899,9421,9954,9952,...,8818,9954,9925,9934,8689,9958,9086,9114,9950,9875
1,-7413918695841089440\t2,9082,9999,9700,9669,9981,9729,9822,9667,9526,...,9979,9752,9695,9676,9974,9788,9955,9907,9747,9824


In [122]:
data.i

<class 'dask.dataframe.core.DataFrame'>
Columns: 257 entries, level_0 to id_job	features
dtypes: object(1), int16(1), int64(255)

In [None]:
    
# init class 
start_time = time.time()
data_ = LoadBigCsvFile(train, test).read_data()
gc.collect()
print('class loaded in %s seconds' % (time.time() - start_time)) 

time.sleep(1) # set some time gap

# save to hdf for later use or modification
start_time = time.time()
data_.to_hdf('test_transformed.hdf',  key='df1')
print('file saved in hdf in %s seconds' % (time.time() - start_time))

time.sleep(1) # set some time gap

# check the file and its content 
start_time = time.time()
hdf_read = dd.read_hdf('test_transformed.hdf', key='df1', mode='r', chunksize=10000)
print('file load into system in %s seconds' % (time.time() - start_time))

hdf_read.head(3)

In [96]:
class LoadBigCsvFile:
    
    '''load data from tsv, transform, scale, add two columns'''
    
    def __init__(self, train, test, scaler=StandardScaler(copy=False)):

        self.train = train
        self.test = test
        self.scaler = scaler
    
    def read_data(self):
        # use dask for read with C 
        try:
            data_train = dd.read_csv(self.train, sep=',', header=None, skiprows=1, dtype={n:'int16' for n in range(1, 300)}, engine='c')
            data_test = dd.read_csv(self.test, sep=',', header=None, skiprows=1, dtype={n:'int16' for n in range(1, 300)}, engine='c')
            
        except: MemoryError, 'not enough memory'
            
        assert train.isna().sum().sum() == 0 # no NaN
        assert train.shape[1] == test.shape[1] # check shape of features
        assert len(train) != 0 and len(test) != 0 #if any data?
        
        # fit from train and scale to test
        self.scaler.fit(data_train.iloc[:,1:])
        temp = self.scaler.transform(data_test.iloc[:,1:])
        
        del data_train # del not needed
        
        # index of max element
        temp['max_feature_2_index'] = temp.idxmax(axis=1)
        
        # calculate absolute diviation from max value in row
        temp['max_feature_2_abs_mean_diff'] = abs(temp.max(axis=1) - temp.mean(axis=1))
        
        # set id columnsD
        temp['job_id'] = data_test.iloc[:,0] 
        
        del data_test # del not needed
        
        return temp

### Test with 5G file

In [None]:
# set workers
client = Client(n_workers=2, threads_per_worker=2, memory_limit='3GB')
client

In [None]:
# setting the number of rows for the CSV file
start_time = time.time()

N = 5_000_000
columns = 257

# create DF 
df = pd.DataFrame(np.random.randint(999, 999999, size=(N, columns)), columns=['level_%s' % i for i in range(0, columns)])

print('%s seconds' % (time.time() - start_time))

In [None]:
# save df to csv 

start_time = time.time()

df.to_csv('random.csv', sep=',')

print('%s seconds' % (time.time() - start_time)) # 877.5422155857086 seconds, 10 G!

In [None]:
gib = 'jobble_data/random.csv' # 5 mln records! 

In [None]:
class LoadBigCsvFile_TRAIN:
    
    '''load data from tsv, transform, scale, add two columns'''
    
    def __init__(self, train, scaler=StandardScaler(copy=False)):

        self.train = train
        self.scaler = scaler
    
    def read_data(self):
        # use dask for read with C 
        try:
            data_train = dd.read_csv(self.train, sep=',', header=None, skiprows=1, dtype={n:'int16' for n in range(1, 300)}, engine='c')            
        except: MemoryError, 'not enough memory'
        
        # fit from train and scale to test
        self.scaler.fit(data_train.iloc[:,1:])
        temp = self.scaler.transform(data_train.iloc[:,1:])
                
        # index of max element
        temp['max_feature_2_index'] = temp.idxmax(axis=1)
        
        # calculate absolute diviation from max value in row
        temp['max_feature_2_abs_mean_diff'] = abs(temp.max(axis=1) - temp.mean(axis=1))
        
        # set id columnsD
        temp['job_id'] = data_train.iloc[:,0] 
        
        del data_train # del not needed
        
        return temp

In [None]:
'''init the class,'''

start_time = time.time()

rer = LoadBigCsvFile_TRAIN(gib, StandardScaler(copy=False)).read_data()

print('%s seconds' % (time.time() - start_time)) # 10 ces

In [None]:
gc.collect()

In [None]:
# save to HDF

start_time = time.time()

rer.to_hdf('random.hdf',  key='df1')

print('%s seconds' % (time.time() - start_time)) #152.77526926994324 seconds!!! 10.4 G

In [None]:
# read from HDF

start_time = time.time()

hdf_read = dd.read_hdf('random.hdf', key='df1', mode='r', chunksize=10000)

print('%s seconds' % (time.time() - start_time)) # 0.03516435623168945 seconds!!! 

In [None]:
hdf_read.head(3) # read head

In [None]:
assert len(hdf_read) == 5_000_000 # test 5ml records