# Import

In [1]:
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import numpy as np
import scipy.sparse
import yaml
import json
import joblib
import time

import implicit

from typing import Dict

import json

import warnings
warnings.filterwarnings("ignore")

In [2]:
config_path = '../config/params.yml'
config = yaml.load(open(config_path), Loader=yaml.FullLoader)

preproc = config['preprocessing']
training = config['train']
evaluate = config['evaluate']

In [3]:
test = pd.read_parquet(evaluate['predict_path'])

# Preprocessing

In [4]:
def data_merge(data_1: pd.DataFrame, data_2: pd.DataFrame) -> pd.DataFrame:
    """
    Объединяет два датасета по 'user_id' и делает начальную обработку
    """
    data = pd.merge(data_1, data_2[['user_id', 'age']], on='user_id')
    data = data[data.cpe_model_os_type != 'Apple iOS']
    return data
    

def filling_pass(data: pd.DataFrame, list_non: list) -> pd.DataFrame:
    """
    Заполняет пропуски значений в датасете
    :param data: датасет
    :param list_non: список с признаками в которых есть пропуски
    :return: датасет
    """
    for i in list_non:
        if i == 'price':
            data[i].fillna(data[i].median(), inplace=True)
        elif i == 'age':
            data[i].fillna(data[i].mode().iloc[0], inplace=True)
            data.loc[data[i] == 0, i] = data[i].mode().iloc[0]
    return data


def replace_values(data: pd.DataFrame, map_change_columns: dict) -> pd.DataFrame:
    """
    Заменяет значения в датасете
    :param data: датасет
    :param map_change_columns: словарь с признаками и значениями
    :return: датасет
    """
    data['month'] = pd.to_datetime(data.date).dt.month
    return data.replace(map_change_columns)


def get_bins(data: pd.DataFrame, first_val: int,
             second_val: int) -> pd.DataFrame:
    """
    Генерирует бины для разных признаков
    :param data: датасет
    :param first_val: первый порог значения для разбиения на бины
    :param second_val: второй порог значения для разбиения на бины
    :return: датасет
    """
    result = ("small" if data <= first_val else "medium" if 
                  first_val < data <= second_val else "large")
    return result


def data_drop(data: pd.DataFrame, drop_columns: list) -> pd.DataFrame:
    """
    Удаляет признаки
    :param data: датасет
    :param drop_columns: список с признаками
    :return: датасет
    """
    return data.drop(columns=drop_columns, axis=1)


def transform_types(data: pd.DataFrame, 
                    change_type_columns: dict) -> pd.DataFrame:
    """
    Преобразазует признаки в заданный тип данных
    :param data: датасет
    :param change_type_columns: словарь с признаками и типами данных
    :return:
    """
    return data.astype(change_type_columns, errors="raise")


def data_aggregate(data: pd.DataFrame, list_select, list_group) -> pa.Table:
    '''
    Преобразует в формат pa.Table, группирует и аггрегирует данные
    :param data: датасет
    :param list_select: используемые признаки из датасета
    :param list_group: группируемые признаки
    :return: pa.Table
    '''
    fin_df_pq = pa.Table.from_pandas(data)    
    data_agg = fin_df_pq.select(list_select).group_by(list_group).aggregate([
                          ('month', 'count_distinct'),
                          ('city_name', 'count_distinct'),
                          ('cpe_model_name', 'count_distinct'),
                          ('bins_price', 'count_distinct'),
                          ('part_of_day', 'count_distinct'),
                          ('bins_age', 'count_distinct')])
    return data_agg


def gen_mat(data_agg: pa.Table, 
            usr_dict: dict,
            url_dict: dict,
            dat: str, 
            user: str, 
            url: str) -> scipy.sparse.coo_matrix:
    """
    Генерирует разряженные матрицы
    """
    values = np.array(data_agg.select([dat]).to_pandas()[[dat]])
    rows = np.array(data_agg.select([user]).to_pandas()[user].map(usr_dict))
    cols = np.array(data_agg.select([url]).to_pandas()[url].map(url_dict))
    mat = scipy.sparse.coo_matrix((values.reshape((len(values))), (rows, cols)), 
                                      shape=((rows.max() + 1), 
                                             (cols.max() + 1).astype('int64')))
    return mat

In [5]:
def pipeline_preprocess(data: pd.DataFrame, **kwargs):
    """
    Пайплайн по предобработке данных
    :param data: датасет
    :return: датасет
    """
    #data merge
    data = data_merge(data_1=data, 
                      data_2=pd.read_parquet(kwargs['target_train_path']))
        
    # filling pass
    filling_pass(data=data, list_non=kwargs['list_non'])
    
    # replace values
    replace_values(data=data, map_change_columns=kwargs['map_change_columns'])
    
    # get bins
    for key in kwargs["map_bins_columns"].keys():
        if key == 'day':
            data['day'] = pd.to_datetime(data.date).dt.day
            data[f"bins_{key}"] = data[key].apply(lambda x: get_bins(x,
            first_val=kwargs["map_bins_columns"][key][0],
            second_val=kwargs["map_bins_columns"][key][1]))
        else:
            data[f"bins_{key}"] = data[key].apply(lambda x: get_bins(x,
            first_val=kwargs["map_bins_columns"][key][0],
            second_val=kwargs["map_bins_columns"][key][1]))
    
    # data drop
    data_drop(data=data, drop_columns=kwargs['drop_columns'])
    
    # transform types
    transform_types(data=data, 
                    change_type_columns=kwargs['change_type_columns_test'])
    
    # data agg
    data_agg = data_aggregate(data=data, list_select=kwargs['list_select'], 
                                         list_group=kwargs['list_group'])
    
    # создает словари url и user
    url_set = set(data_agg.select(['url_host']).to_pandas()['url_host'])
    url_dict = {url: idurl for url, idurl in zip(url_set, range(len(url_set)))}
    usr_set = set(data_agg.select(['user_id']).to_pandas()['user_id'])
    usr_dict = {usr: user_id for usr, user_id in zip(usr_set, range(len(usr_set)))}
    
    # gen mat
    mat_1 = gen_mat(data_agg=data_agg, 
                    url_dict=url_dict,
                    usr_dict=usr_dict,
                    dat='month_count_distinct', 
                    user='user_id', 
                    url='url_host')
    mat_2 = gen_mat(data_agg=data_agg,
                    url_dict=url_dict,
                    usr_dict=usr_dict,
                    dat='city_name_count_distinct', 
                    user='user_id', 
                    url='url_host')
    mat_3 = gen_mat(data_agg=data_agg, 
                    url_dict=url_dict,
                    usr_dict=usr_dict,
                    dat='cpe_model_name_count_distinct', 
                    user='user_id', 
                    url='url_host')
    mat_4 = gen_mat(data_agg=data_agg, 
                    url_dict=url_dict,
                    usr_dict=usr_dict,
                    dat='bins_price_count_distinct', 
                    user='user_id', 
                    url='url_host')
    mat_5 = gen_mat(data_agg=data_agg, 
                    url_dict=url_dict,
                    usr_dict=usr_dict,
                    dat='part_of_day_count_distinct', 
                    user='user_id', 
                    url='url_host')
    mat_6 = gen_mat(data_agg=data_agg, 
                    url_dict=url_dict,
                    usr_dict=usr_dict,
                    dat='bins_age_count_distinct', 
                    user='user_id', 
                    url='url_host')
    
    fin_mat = scipy.sparse.hstack((mat_1, 
                                   mat_2, 
                                   mat_3, 
                                   mat_4, 
                                   mat_5, 
                                   mat_6)).tocsr()
    
    als = implicit.als.AlternatingLeastSquares(factors=kwargs['factors'], 
                                           iterations=kwargs['iterations'], 
                                           use_gpu = False, 
                                           calculate_training_loss = False, 
                                           regularization=kwargs['regularization'],
                                           random_state=kwargs['random_state'])
    als.fit(fin_mat)
    
    u_factors = als.user_factors 
    inv_usr_map = {v: k for k, v in usr_dict.items()}
    usr_emb = pd.DataFrame(u_factors)
    usr_emb['user_id'] = usr_emb.index.map(inv_usr_map)
    df_test = pd.read_parquet(kwargs['target_train_path'])[['user_id', 
                                                            'age']].merge(usr_emb, 
                                                                          how ='inner', 
                                                                          on =['user_id'])
    df_test = df_test.dropna()
    return df_test

In [6]:
%%time
data_proc_test = pipeline_preprocess(data=test, **preproc)

  0%|          | 0/30 [00:00<?, ?it/s]

CPU times: user 5min 44s, sys: 27.7 s, total: 6min 12s
Wall time: 4min 53s


In [7]:
data_proc_test.head()

Unnamed: 0,user_id,age,0,1,2,3,4,5,6,7,...,40,41,42,43,44,45,46,47,48,49
0,275794,48.0,-0.00375,0.0069,0.030588,-0.014779,0.00334,-0.005936,0.020942,0.00566,...,-0.014734,-0.003334,0.013242,0.001186,0.011115,-0.008305,-0.00061,0.001853,0.006481,0.011192
1,371831,35.0,-0.024247,0.049213,0.060941,-0.002362,-0.099888,-0.020128,0.030257,-0.016099,...,0.029359,0.060279,-0.008487,-0.061379,0.108479,-0.039925,-0.003538,0.060662,-0.089627,0.0444
2,217720,21.0,-0.042138,0.010041,0.013134,0.029202,0.038673,-0.007476,0.003475,0.002297,...,-0.047063,-0.008949,0.040115,0.028249,-0.001972,0.034759,-0.037605,0.049185,-0.094859,0.067573
3,141895,36.0,-0.030126,0.04395,0.005127,0.041449,-0.026528,0.012999,-0.065308,0.028571,...,0.065487,0.015463,0.003454,-0.019044,0.064235,0.101885,-0.049907,-0.001156,-0.039707,-0.001821
4,217849,23.0,-0.014955,0.011674,0.013098,-0.020741,0.014403,0.010744,0.000356,0.009054,...,0.00317,0.001921,0.021446,-0.003161,-0.000814,-0.004824,0.006515,0.002903,-0.003962,0.022025


# Evaluate

In [8]:
model = joblib.load(training['model_path'])
data_proc_test['predict'] = model.predict(data_proc_test)

In [9]:
data_proc_test

Unnamed: 0,user_id,age,0,1,2,3,4,5,6,7,...,41,42,43,44,45,46,47,48,49,predict
0,275794,48.0,-0.003750,0.006900,0.030588,-0.014779,0.003340,-0.005936,0.020942,0.005660,...,-0.003334,0.013242,0.001186,0.011115,-0.008305,-0.000610,0.001853,0.006481,0.011192,1
1,371831,35.0,-0.024247,0.049213,0.060941,-0.002362,-0.099888,-0.020128,0.030257,-0.016099,...,0.060279,-0.008487,-0.061379,0.108479,-0.039925,-0.003538,0.060662,-0.089627,0.044400,1
2,217720,21.0,-0.042138,0.010041,0.013134,0.029202,0.038673,-0.007476,0.003475,0.002297,...,-0.008949,0.040115,0.028249,-0.001972,0.034759,-0.037605,0.049185,-0.094859,0.067573,0
3,141895,36.0,-0.030126,0.043950,0.005127,0.041449,-0.026528,0.012999,-0.065308,0.028571,...,0.015463,0.003454,-0.019044,0.064235,0.101885,-0.049907,-0.001156,-0.039707,-0.001821,0
4,217849,23.0,-0.014955,0.011674,0.013098,-0.020741,0.014403,0.010744,0.000356,0.009054,...,0.001921,0.021446,-0.003161,-0.000814,-0.004824,0.006515,0.002903,-0.003962,0.022025,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
26975,356729,26.0,0.047143,0.028410,0.001622,-0.010751,-0.055639,0.021016,-0.056829,0.043890,...,0.010118,0.050792,-0.059568,0.030982,-0.003968,-0.017902,0.034922,-0.021780,0.048098,0
26976,201342,51.0,-0.008526,0.012417,0.008638,-0.019365,0.011153,0.015383,-0.003041,-0.003095,...,0.007583,0.015067,0.011141,0.003426,0.005603,0.006766,0.015665,-0.000513,0.007337,1
26977,281829,36.0,-0.009123,-0.014656,0.030546,-0.076098,0.003767,0.021084,-0.027419,0.025447,...,0.043551,0.042533,-0.002924,-0.002461,-0.026235,0.002960,0.020184,-0.016288,-0.001266,0
26978,240576,60.0,-0.003797,-0.013931,0.015614,-0.014723,0.017445,0.029291,-0.017486,0.018279,...,0.027837,0.046474,-0.016248,0.010719,-0.010762,0.033089,-0.001553,-0.023560,0.031598,1
