# 02 - Parsing and Preparing for training

#### Imports

In [1]:
import numpy as np
import pandas as pd
import json 
from functools import reduce
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.feature_extraction import DictVectorizer
from sklearn.externals import joblib
import pickle
import os.path
from sklearn.model_selection import KFold
from sklearn.decomposition import PCA, IncrementalPCA, SparsePCA, TruncatedSVD
import scipy.sparse
from sklearn.preprocessing import MinMaxScaler, MaxAbsScaler
import catboost
from catboost import CatBoostClassifier, Pool
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

sns.set(style="white")

#### Constants

In [2]:
spr_chunk_fn = "sparse_folds/dspr{!s}.npz"

In [3]:
nrows = None
rare_key_threshold = 10

n_components = 1000
nfolds = 100
data_folder = 'data/'
model_folder = "models/"
raw_dataset_filename = model_folder+'raw_dataset.pkl'
train_dataset_filename = model_folder+'train_dataset.pkl'
test_dataset_filename = model_folder+'test_dataset.pkl'

label_encoder_fn = model_folder + "label_encoder.pkl"

target_name = 'target'
test_index_name = 'tst_index'

vk1_fn = model_folder+'vk1.pkl'
vk2_fn = model_folder+'vk2.pkl'
vk3_fn = model_folder+'vk3.pkl'


cross_keys_fn = model_folder+'cross_keys.pkl'
js1_keys_fn = model_folder+'js1_keys.pkl'
js2_keys_fn = model_folder+'js2_keys.pkl'
js3_keys_fn = model_folder+'js3_keys.pkl'
keys_count_fn = model_folder+'keys_count.pkl'
value_keys_fn = model_folder+'value_keys.pkl'
target_fn = model_folder+'target.pkl'

scaler_fn = model_folder+'scaler.pkl'
svd_fn = model_folder+'svd.pkl'

column_names = ['cat1','cat2','cat3','cat4','cat5','dtm','dts','em']

train_data_fn = model_folder+'train_data.pkl'
test_data_fn = model_folder+'test_data.pkl'

In [4]:
train_folds_folder = "folds_train/"
test_folds_folder = "folds_test/"

weight_multiplier_fn = model_folder+"weight_multiplier.pkl"
json_metas_fn = model_folder+"json_metas.pkl"

#### Functions

In [5]:
def Load(filename):
    if os.path.isfile(filename):
        return joblib.load(filename)

In [6]:
def Save(obj, filename):
    joblib.dump(obj, filename)

In [7]:
def kill_thread(thread):
    import ctypes
    
    id = thread.ident
    code = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        ctypes.c_long(id),
        ctypes.py_object(SystemError)
    )
    if code == 0:
        raise ValueError('invalid thread id')
    elif code != 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(
            ctypes.c_long(id),
            ctypes.c_long(0)
        )
        raise SystemError('PyThreadState_SetAsyncExc failed')

In [8]:
def jobs_manager():
    from IPython.lib.backgroundjobs import BackgroundJobManager
    from IPython.core.magic import register_line_magic
    from IPython import get_ipython

    jobs = BackgroundJobManager()

    @register_line_magic
    def job(line):
        ip = get_ipython()
        jobs.new(line, ip.user_global_ns)

    return jobs

def get_chunks(sequence, count):
    count = min(count, len(sequence))
    chunks = [[] for _ in range(count)]
    for index, item in enumerate(sequence):
        chunks[index % count].append(item) 
    return chunks

def log_progress(sequence, every=None, size=None):
    from ipywidgets import IntProgress, HTML, VBox
    from IPython.display import display

    is_iterator = False
    if size is None:
        try:
            size = len(sequence)
        except TypeError:
            is_iterator = True
    if size is not None:
        if every is None:
            if size <= 200:
                every = 1
            else:
                every = size / 200     # every 0.5%
    else:
        assert every is not None, 'sequence is iterator, set every'

    if is_iterator:
        progress = IntProgress(min=0, max=1, value=1)
        progress.bar_style = 'info'
    else:
        progress = IntProgress(min=0, max=size, value=0)
    label = HTML()
    box = VBox(children=[label, progress])
    display(box)

    index = 0
    try:
        for index, record in enumerate(sequence, 1):
            if index == 1 or index % every == 0:
                if is_iterator:
                    label.value = '{index} / ?'.format(index=index)
                else:
                    progress.value = index
                    label.value = u'{index} / {size}'.format(
                        index=index,
                        size=size
                    )
            yield record
    except:
        progress.bar_style = 'danger'
        raise
    else:
        progress.bar_style = 'success'
        progress.value = index
        label.value = str(index or '?')

jobs = jobs_manager()

# Loading data

In [9]:
json_metas = Load(json_metas_fn)

v1 = json_metas[0]["vectorizer"]
v2 = json_metas[1]["vectorizer"]
v3 = json_metas[2]["vectorizer"]

label_encoder = Load(label_encoder_fn)

## Helpsers for sparse

In [10]:
def stayOnlyKeys(keys, dictionary):
    for key in list(dictionary.keys()):
        if key not in keys:
            del dictionary[key]
    return dictionary

def json_parse(text, keys):
    return stayOnlyKeys(keys, json.loads(text))

In [None]:
import gc

def parse_fold(i, folder):
    print("Loading fold:",i)
    dfi_fn = folder+"dfi{!s}.pkl".format(i)
    df = Load(dfi_fn)
#     print("df.info before i:",i,df.info())
    
    df['cat_1'] = np.where(df['cat_feature'] == 1, 1, 0)
    df['cat_2'] = np.where(df['cat_feature'] == 2, 1, 0)
    df['cat_3'] = np.where(df['cat_feature'] == 3, 1, 0)
    df['cat_4'] = np.where(df['cat_feature'] == 4, 1, 0)
    df['cat_5'] = np.where(df['cat_feature'] == 5, 1, 0)
#     df = df.drop(['cat_feature'], axis=1)
    df['entries'] = df.groupby('id').size()
    
    print("Parsing jsons:",i)
    for meta in json_metas:
        print("Parsing:",meta["column"],"Fold:",i)
        df[meta["column"]] = df[meta["column"]].apply(lambda text: json_parse(text, meta["important_keys"]))
    
    #we need to place id column at the end of dataframe
    col_at_end = ["id"]
    df = df[[c for c in df if c not in col_at_end]+[c for c in col_at_end if c in df]]
    
    print("Rewriting fold:",i)
    dfi_fn = folder+"parsed_dfi{!s}.pkl".format(i)
    
#     print("df.info after i:",i,df.info())
    
    Save(df,dfi_fn)

    del df
    gc.collect()

def parse_train_fold(i):
    global train_folds_folder
    
    parse_fold(i,train_folds_folder)
    
def parse_test_fold(i):
    global test_folds_folder
    parse_fold(i, test_folds_folder)

In [None]:
%%time
for chunk in get_chunks(range(0,nfolds), 8):
    %job [parse_train_fold(index) for index in log_progress(chunk, every=1)]    

for thread in jobs.running:
    thread.join()

Starting job # 0 in a separate thread.
Starting job # 2 in a separate thread.
Starting job # 3 in a separate thread.
Starting job # 4 in a separate thread.
Starting job # 5 in a separate thread.
Starting job # 6 in a separate thread.
Starting job # 7 in a separate thread.
Starting job # 8 in a separate thread.


Loading fold: 0
Loading fold: 1
Loading fold: 3
Loading fold: 2
Loading fold: 4
Loading fold: 6
Loading fold: 5
Loading fold: 7


Defaulting to column, but this will raise an ambiguity error in a future version
  from ipykernel import kernelapp as app


Parsing jsons: 5
Parsing: json3 Fold: 5
Parsing: json1 Fold: 5
Parsing jsons: 7
Parsing: json3 Fold: 7
Parsing: json1 Fold:Parsing jsons: 4
Parsing: json3 Fold: 7
 4
Parsing: json2 Fold: 5
Parsing jsons: 1
Parsing: json3 Fold: 1
Parsing jsons: 6
Parsing: json3 Fold: 6
Parsing: json1 Fold: 4
Parsing jsons: 0
Parsing: json3 Fold: 0
Parsing: json1 Fold: 1
Parsing: json1 Fold: 6
Parsing: json1 Fold: 0
Parsing: json2 Fold: 7
Parsing: json2 Fold: 4
Parsing: json2 Fold: 1
Parsing: json2 Fold: 0
Parsing: json2 Fold: 6
Parsing jsons: 2
Parsing: json3 Fold: 2
Rewriting fold: 5
Loading fold: 13
Parsing: json1 Fold: 2
Rewriting fold: 7
Loading fold: 15
Parsing: json2 Fold: 2
Rewriting fold: 1
Rewriting fold: 4
Loading fold: 9
Rewriting fold: 0
Rewriting fold: 6
Loading fold: 12
Loading fold: 8
Loading fold: 14
Rewriting fold: 2
Parsing jsons: 3
Parsing: json3 Fold: 3
Parsing: json1 Fold: 3
Loading fold: 10
Parsing: json2 Fold: 3
Rewriting fold: 3
Loading fold: 11
Parsing jsons: 13
Parsing: json3 F

In [None]:
import collections

# def add_cuid_column(df):
#     global label_encoder
    
#     df['cuid'] = df.index
#     df = df.reset_index(drop=True)
#     df['cuid'] = label_encoder.transform(df['cuid'])
    
#     return df

def get_mask(df):
    mask = collections.defaultdict(list)

    #last column is the index
    for i,v in enumerate(df.values[:,-1]):
        mask[v].append(i)
    
    return mask

def group_chunk(A,mask):
    mask0 = list(mask.values())[0]
    
#     print("chunk i:",i,",matrix",A[mask0,:])
    cgr_sum = csr_matrix(A[mask0,:].sum(axis=0))
    cgr_mean = csr_matrix(A[mask0,:].mean(axis=0))
    cgr_max = csr_matrix(A[mask0,:].max(axis=0))
    
    t = 0
    for k in mask:
        if t != 0:
#             print('k!=0')
            cgr_sum = vstack([
                cgr_sum,
                csr_matrix(A[mask[k],:].sum(axis=0))
            ])
            cgr_mean = vstack([
                cgr_mean,
                csr_matrix(A[mask[k],:].mean(axis=0))
            ])        
            cgr_max = vstack([
                cgr_max,
                csr_matrix(A[mask[k],:].max(axis=0))
            ])
        t= t+1

    return hstack([cgr_sum,cgr_mean,cgr_max]).tocsr()    
    
def save_sparse_chunk(data,i,folder): 
    scipy.sparse.save_npz(folder+"dspr{!s}.npz".format(i), data)
    
def load_sparse_chunk(i,folder):
    return scipy.sparse.load_npz(folder+"dspr{!s}.npz".format(i))

## Sparse logic

In [None]:
from scipy.sparse import vstack, hstack, csr_matrix
from scipy import sparse
import gc

def sparse(i, folder):
    global v1
    global v2
    global v3
    
    print("Loading chunk:",i)
    df = Load(folder+"parsed_dfi{!s}.pkl".format(i))
#     print("df.shape",df.info())
#     df = add_cuid_column(df)
    
    print('Counting mask dictionary:',i)
    mask = get_mask(df)
       
    print("Sparsing and transforming chunk:",i)
    
    j1 = v1.transform(df['json1'])
    j2 = v2.transform(df['json2'])
    j3 = v3.transform(df['json3'])
    
    df_c = df.drop(["json1","json2","json3"], axis=1)
#     df_c = df[['cat_1','cat_2','cat_3','cat_4','cat_5','dt_diff','entries','id']]
    df_cs = csr_matrix(df_c.to_sparse().to_coo())

    print("Concant chunk:",i)
    A = hstack([j1,j2,j3,df_cs]).tocsr()

    print("Groupby chunk:",i)
    sparse_data = group_chunk(A, mask)
    
    print("Saving sparsed chunk:",i)
    save_sparse_chunk(sparse_data,i,folder)
    print("Saved chunk:",i)
    del df
    del mask
    del df_c
    del df_cs
    del j1
    del j2
    del j3
    del A
    del sparse_data
    gc.collect()

def sparse_train_fold(i):
    global train_folds_folder
    
    sparse(i,train_folds_folder)
    
def sparse_test_fold(i):
    global test_folds_folder
    
    sparse(i,test_folds_folder)

In [None]:
%%time
for chunk in get_chunks(range(0,nfolds), 8):
    %job [sparse_train_fold(index) for index in log_progress(chunk, every=1)]
    
for thread in jobs.running:
    thread.join()

## Collections folds to one

In [None]:
%%time
import gc
from scipy.sparse import coo_matrix, hstack, vstack

sparse_matrix = load_sparse_chunk(0,train_folds_folder)

for i in range(1,nfolds):
    print(i)
    next_chunk = load_sparse_chunk(i,train_folds_folder)
    print("next_chunk.shape",next_chunk.shape)
    sparse_matrix = vstack([sparse_matrix, next_chunk])
    print("sparse_matrix.shape",sparse_matrix.shape)

scipy.sparse.save_npz("train_sparse_matrix_before_scale.npz", sparse_matrix)

In [None]:
sparse_matrix.shape

# Test DataSet

In [None]:
%%time
for chunk in get_chunks(range(0,nfolds), 8):
    %job [parse_test_fold(index) for index in log_progress(chunk, every=1)]    
    
for thread in jobs.running:
    thread.join()

In [None]:
%%time
for chunk in get_chunks(range(0,nfolds), 8):
    %job [sparse_test_fold(index) for index in log_progress(chunk, every=1)]
    
for thread in jobs.running:
    thread.join()

In [None]:
%%time
import gc
from scipy.sparse import coo_matrix, hstack, vstack

test_sparse_matrix = load_sparse_chunk(0,test_folds_folder)

for i in range(1,nfolds):
    print(i)
    next_chunk = load_sparse_chunk(i,test_folds_folder)
    test_sparse_matrix = vstack([test_sparse_matrix, next_chunk])

In [None]:
scipy.sparse.save_npz("test_sparse_matrix_before_scale.npz", test_sparse_matrix)

## Scalling

In [None]:
%%time
all_sparse_matrix = sparse_matrix
all_sparse_matrix = vstack([all_sparse_matrix, test_sparse_matrix])
scipy.sparse.save_npz("all_sparse_matrix_before_scale.npz", sparse_matrix)

In [None]:
all_sparse_matrix

In [None]:
%%time
from sklearn.preprocessing import Normalizer
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MaxAbsScaler

scaler = StandardScaler(with_mean=False).fit(all_sparse_matrix)

In [None]:
Save(scaler,'scaler.pkl')

In [None]:
%%time
sparse_matrix = scaler.transform(sparse_matrix)

In [None]:
%%time
test_sparse_matrix = scaler.transform(test_sparse_matrix)

In [None]:
%%time
scipy.sparse.save_npz("train_sparse_matrix_after_scale.npz", sparse_matrix)
scipy.sparse.save_npz("test_sparse_matrix_after_scale.npz", test_sparse_matrix)

In [None]:
%%time
all_sparse_matrix = scaler.transform(all_sparse_matrix)

In [None]:
%%time
scipy.sparse.save_npz("all_sparse_matrix_after_scale.npz", all_sparse_matrix)

# SVD

In [None]:
all_sparse_matrix = scipy.sparse.load_npz("all_sparse_matrix_after_scale.npz")

In [None]:
svd = TruncatedSVD(
    n_components=n_components,
    n_iter=5,
    random_state=42,
#     algorithm = "arpack",
#     tol
)

In [None]:
%%time
svd = svd.fit(all_sparse_matrix)

In [None]:
%%time
Save(svd,svd_fn)

In [None]:
%%time
sparse_matrix = scipy.sparse.load_npz("train_sparse_matrix_after_scale.npz")
data = svd.transform(sparse_matrix)
Save(data, train_data_fn)

In [None]:
%%time
test_sparse_matrix = scipy.sparse.load_npz("test_sparse_matrix_after_scale.npz")
test_data = svd.transform(test_sparse_matrix)
Save(test_data, test_data_fn)