In [43]:
import os
os.chdir('/home/larion/test/')

import pandas as pd
import numpy as np

import concurrent.futures
import concurrent
import multiprocessing

from functools import partial

%matplotlib inline
%config InlineBackend.figure_format = 'retina'
%load_ext autoreload
%autoreload

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [143]:
! python -V

Python 3.7.3


In [112]:
## extract feature 2
def extract_feature(string:str):
    features = string.split(',')
    index = int(features[0])
    return int(features[index])

## Just for check

In [95]:
data = pd.read_csv('./data/train.tsv', sep='\t')
data.features.map(extract_feature).describe()

count      799.000000
mean      9867.720901
std        560.638652
min       4573.000000
25%       9998.000000
50%       9999.000000
75%       9999.000000
max      10000.000000
Name: features, dtype: float64

## Calculations

In [91]:
file='./data/train.tsv'
chunksize=100

## map functions 
def mean_len(chunk):
        return chunk.features.map(extract_feature).sum(), len(chunk)
    
def std_len(chunk, X=0):
    return np.power(chunk.features.map(extract_feature) - X, 2).sum(), len(chunk)

def chunk_min(chunk):
    extr = chunk.features.map(extract_feature)
    c_min = extr.min()
    idxmin = extr.idxmin()
    return c_min, idxmin

## reduce functions
def chunk_max(chunk):
    extr = chunk.features.map(extract_feature)
    c_max = extr.max()
    idxmax = extr.idxmax()
    return c_max, idxmax

def min_reducer(step, cache):
    c_min = step[0]
    idx_min = step[1]
    cache[0] = min(c_min, cache[0])
    if cache[0] == c_min:
        cache[1] = idx_min

def max_reducer(step, cache):
    c_max = step[0]
    idx_max = step[1]
    cache[0] = max(c_max, cache[0])
    if cache[0] == c_max:
        cache[1] = idx_max
    
def stat_reducer(step, cache):
    chunk_sum, chunk_n = step
    cache[0] += chunk_sum
    cache[1] += chunk_n    
    
## multiproceossor map_reduce general pipeline for large files
def map_reduce(file, chunksize, map_f, reduce_f, init):
    cpu_count = multiprocessing.cpu_count()
    with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count - 2) as executor:
        futures = [executor.submit(map_f, chunk) for chunk in pd.read_csv(file, chunksize=chunksize, sep='\t')]
        first = True
        for future in concurrent.futures.as_completed(futures):
            step = future.result()
            if first:
                cache = init
                first = False
            reduce_f(step, cache)
    return cache   

## samples 
def mean(file='./data/train.tsv', chunksize=100):
    Sum, N = map_reduce(file, chunksize, mean_len, stat_reducer, [0, 0])
    return Sum/N

def std(X, file='./data/train.tsv', chunksize=100):
    Sum, N = map_reduce(file, chunksize, partial(std_len, X=X), stat_reducer, [0, 0])
    return np.sqrt(Sum / (N-1))

def minimum(file='./data/train.tsv', chunksize=100):
    res = map_reduce(file, chunksize, chunk_min, min_reducer, [20000, 0])
    return res

def miximum(file='./data/train.tsv', chunksize=100):
    res = map_reduce(file, chunksize, chunk_max, max_reducer, [-1, 0])
    return res

### Compare results with check

In [92]:
## results
X = mean()
S = std(X)
m, m_indx = minimum()
M, M_indx = miximum()
X, S, (m, m_indx), (M, M_indx)
## Our result match check, nice!

(9867.720901126408, 560.6386516245803, (4573, 640), (10000, 700))

## Normalization

In [137]:
def normalize(inputs, outs, decrease_factor, scale_facltor, chunksize=100):

    def norm(decrease_factor, scale_facltor):
        def _norm(x):
            return (x - decrease_factor) / scale_facltor
        return _norm

    for chunk in pd.read_csv(inputs, chunksize=chunksize, sep='\t'):
        indx = chunk.features.iloc[0].split(',')[0]
        column = f'feature_{indx}_stand'
        chunk[column] = chunk.features.map(extract_feature).map(norm(X, S))
        columns = ['id_job', column]
        chunk[columns].to_csv(outs, mode='a', sep='\t', index=False)

In [138]:
## If you want to use min-max normalization you could pass other factors to this method
normalize('./data/test.tsv', './data/test_proc.tsv', X, S)

## Results

In [140]:
## It is not clear for which file I should estemate max_feature_2_index and max_feature_2_abs_mean_diff. 

## For test.tsv
M, M_indx = miximum(file='./data/test.tsv')
print ('max_feature_2_index:', M_indx)
print ('max_feature_2_abs_mean_diff:', np.abs(M - X))

max_feature_2_index: 52
max_feature_2_abs_mean_diff: 132.27909887359237


In [142]:
## For train.tsv
M, M_indx = miximum(file='./data/train.tsv')
print ('max_feature_2_index:', M_indx)
print ('max_feature_2_abs_mean_diff:', np.abs(M - X))

max_feature_2_index: 700
max_feature_2_abs_mean_diff: 132.27909887359237
