## Transform train & test.csv into numpy arrays.

Before starting train.csv is split into parts so we can read in parallel.

- `split -l 10000000 train.csv train_part`
- `for i in $(ls -1 train_part*); do sed -i '1s;^;ip,app,device,os,channel,click_time,attributed_time,is_attributed\n;' $i`


In [1]:
import glob
import hashlib
import traceback 
import math
import pickle
from csv import DictReader
import multiprocessing as mp
from functools import lru_cache
from collections import Counter
from pyhashxx import hashxx

import numpy as np
from tqdm import tqdm

D = 2 ** 23


FEATURES = [
    
    # single features
    ('ip',), 
    ('app',), 
    ('device',),
    ('os',), 
    ('channel',),
    
    ('click_hour',),
    
    # pair interactions
    ('device', 'app'), 
    ('channel', 'app'), 
    ('channel', 'device'), 
    ('channel', 'os'),
    ('ip', 'channel'),
    ('ip', 'device'),
    ('ip', 'app'),
    ('ip', 'click_hour'),
    
    # triple
    ('ip', 'device', 'os')
]

COUNT_FEATURES = [
    ('device', 'app'), 
    ('channel', 'app'), 
    ('channel', 'device'), 
    ('channel', 'os'),
    ('ip', 'channel'),
    ('ip', 'device'),
    ('ip', 'app'),
    ('ip', 'click_hour'),
    ('ip', 'device', 'os')        
]

# maxsize=None means the cache is unbounded.
# set to something reasonable if memory is limited.
@lru_cache(maxsize=None)
def hashed(value, D):
    # hash is not stable after python 3.3 unless PYTHONHASHSEED is set.
    # we need something with less collisions and stable to be able to pickle the model.
    #return int(hashlib.md5(value.encode('utf8')).hexdigest(), 16) % D
    return hashxx(value.encode('utf8')) % D

        
def get_x(csv_row):        
    try:
        x = {}
        csv_row['click_hour'] = int(csv_row['click_time'][-8:-6]) # hour
        for k in FEATURES:
            x[k] = hashed(' '.join([str(csv_row[c]) for c in k]), D)
        return x
    except Exception as e:
        #print(csv_row)        
        traceback.print_exc()


def load_part(fname, max_size=10000000):
    train_x = np.zeros((max_size, len(FEATURES)), dtype=np.uint32) 
    train_y = np.zeros((max_size), dtype=np.uint8)
    assert(D < 2**32)
    
    partial_counters = {}
    for k in COUNT_FEATURES:    
        partial_counters[k] = Counter()
        
    with open(fname) as f:
        rows = 0
        for idx, row in tqdm(enumerate(DictReader(f)), total=max_size, mininterval=30):
            h = get_x(row)
            x = [h[k] for k in FEATURES]
            
            train_x[idx, :] = x
            train_y[idx] = 1. if row['is_attributed'] == '1' else 0
            
            if int(row['click_time'][-1:]) != 9:            
                # 2017-11-09 is validation set
                is_validation=True
                
                
            # update partial counters, 
            for k in COUNT_FEATURES:                
                partial_counters[k][h[k]] += 1
                
            rows += 1
            
        return train_x[:rows], train_y[:rows], partial_counters

def prepare_train(glob_path):
    fnames = list(glob.glob(glob_path))

    p = mp.Pool(8)
    parts = p.map(load_part, fnames)
    
    X = np.concatenate([p[0] for p in parts])
    y = np.concatenate([p[1] for p in parts])
    counters = {}
    for k in COUNT_FEATURES:                
        counters[k] = sum([p[2][k] for p in parts], Counter())
    
    print(X.shape, y.shape, type(counters))
    p.close()
    
    d = int(math.log(D, 2))
    np.savez_compressed('tmp/train-hashxx-D{}.npz'.format(d), x=X, y=y)
    with open('tmp/aux-hashxx-D{}.pkl'.format(d), 'wb') as f:
        pickle.dump([counters, FEATURES, COUNT_FEATURES], f)    
    return X, y, counters

def prepare_test():
    size = 18790470
    X = np.zeros((size, len(FEATURES)), dtype=np.uint32)
    click_id = np.zeros((size), dtype=np.uint32)
    with open('input/test.csv') as f:
        for idx, row in tqdm(enumerate(DictReader(f)), total=size, mininterval=30):
            h = get_x(row)
            x = [h[k] for k in FEATURES]
            X[idx, :] = x
            click_id[idx] = row['click_id']
    d = int(math.log(D, 2))
    np.savez_compressed('tmp/test-hashxx-D{}.npz'.format(d), x=X, click_id=click_id)
    return X

In [2]:
#X, y, counters = prepare_train('input/train_parta*')
#print('finished train.')

#X = prepare_test()
#print('finished test.')

print('.')

.


In [5]:

import gc
#del X
#del y
gc.collect()

168

In [6]:
trainz = np.load('tmp/train-hashxx-D23.npz')
with open('tmp/aux-hashxx-D23.pkl', 'rb') as f: 
    aux = pickle.load(f)

In [7]:
trainz.keys(), len(aux)

(['x', 'y'], 3)

In [14]:
pos_features = [(k, features.index(k)) for k in count_features]

In [15]:
pos_features

[(('device', 'app'), 6),
 (('channel', 'app'), 7),
 (('channel', 'device'), 8),
 (('channel', 'os'), 9),
 (('ip', 'channel'), 10),
 (('ip', 'device'), 11),
 (('ip', 'app'), 12),
 (('ip', 'click_hour'), 13),
 (('ip', 'device', 'os'), 14)]

In [9]:
X.shape, y.shape, counters.keys(), type(features), len(features), type(count_features), len(count_features)

((184903890, 15),
 (184903890,),
 dict_keys([('device', 'app'), ('channel', 'app'), ('channel', 'device'), ('channel', 'os'), ('ip', 'channel'), ('ip', 'device'), ('ip', 'app'), ('ip', 'click_hour'), ('ip', 'device', 'os')]),
 list,
 15,
 list,
 9)

In [12]:
import gc

if False:
    # temporary hack to parallize
    def augment_single(feature_no):
        X, y, counters, features, count_features = trainz['x'], trainz['y'], aux[0], aux[1], aux[2]    
        pos_features = [
            (('device', 'app'), 6),
            (('channel', 'app'), 7),
            (('channel', 'device'), 8),
            (('channel', 'os'), 9),
            (('ip', 'channel'), 10),
            (('ip', 'device'), 11),
            (('ip', 'app'), 12),
            (('ip', 'click_hour'), 13),
            (('ip', 'device', 'os'), 14)
        ]

        pos_feature = pos_features[feature_no]
        feature_name, feature_pos = pos_feature
        counter = counters[feature_name]
        X_part = X[:, feature_pos].copy()

        del X
        del y
        gc.collect()

        extra_part = np.zeros((X_part.shape[0],), dtype=np.uint32) 
        extra_part = [counter[x] for x in tqdm(X_part)]
        np.savez_compressed('tmp/extra-{}.npz'.format(feature_no), extra=extra_part)

    
    
    
def augment(X, counters, features, count_features):    
    pos_features = [(k, features.index(k)) for k in count_features]
    extra = np.zeros((X.shape[0], len(pos_features)), dtype=np.uint32) 
    for idx, (feature_name, feature_pos) in enumerate(pos_features):        
        counter = counters[feature_name]
        extra[:, idx] = [counter[x] for x in tqdm(X[:, feature_pos], mininterval=30)]
    return extra
    

In [4]:
#train_extra = augment(X, counters, features, count_features)

trainz = np.load('tmp/train-hashxx-D23.npz')

with open('tmp/aux-hashxx-D23.pkl', 'rb') as f: 
    aux = pickle.load(f)
    
    
    

In [5]:
X, y, counters, features, count_features = trainz['x'], trainz['y'], aux[0], aux[1], aux[2]    

In [15]:
extra = np.zeros((X.shape[0], 8), dtype=np.uint32) 
for feature_no in range(8):
    extra[:, feature_no] = np.load('tmp/extra-{}.npz'.format(feature_no))['extra']
                                    


In [16]:
extra.shape

(184903890, 8)

In [17]:
X.shape

(184903890, 15)

In [18]:
X2 = np.hstack([X, extra])

In [19]:
X2.shape

(184903890, 23)

In [21]:
np.savez_compressed('tmp/train-hashxx-D23-extra.npz', x=X2)

In [20]:
del X
del extra
gc.collect()

218

In [None]:
np.savez_compressed('tmp/test-hashxx-D{}.npz'.format(d), x=X, click_id=click_id)

In [4]:
if False:
    train = np.load('tmp/train.npz')
    X_train, y_trqain = train['x'], train['y']
    print(X_train.shape, y_train.shape)

In [5]:
if False:
    test = np.load('tmp/test.npz')
    X_test = test['x']
    print(X_test.shape)

In [34]:
a=Counter()
b=Counter()
a['a'] += 1
b['b'] += 1

In [35]:
sum([a, b], Counter())

Counter({'a': 1, 'b': 1})