# Импорт библиотек

In [267]:
import pandas as pd
import numpy as np
import math

import yaml
import joblib

import warnings

warnings.filterwarnings('ignore')

In [219]:
config_path = '../config/parameters.yaml'
config = yaml.load(open(config_path), Loader=yaml.FullLoader)

In [217]:
config

{'preprocessing': {'raw_data_extension': '.parquet',
  'change_col_types': {'region_name': 'category',
   'city_name': 'category',
   'cpe_manufacturer_name': 'category',
   'cpe_model_name': 'category',
   'url_host': 'category',
   'cpe_type_cd': 'category',
   'cpe_model_os_type': 'category',
   'date': 'datetime64[ns]',
   'price': 'float32',
   'part_of_day': 'category',
   'request_cnt': 'int8',
   'user_id': 'int32'},
  'columns_fill_na': {'price': -999},
  'replace_values': {'cpe_model_os_type': {'Apple iOS': 'iOS'},
   'cpe_manufacturer_name': {'Huawei Device Company Limited': 'Huawei',
    'Motorola Mobility LLC, a Lenovo Company': 'Motorola',
    'Sony Mobile Communications Inc.': 'Sony',
    'Highscreen Limited': 'Highscreen',
    'Realme Chongqing Mobile Telecommunications Corp Ltd': 'Realme',
    'Realme Mobile Telecommunications (Shenzhen) Co Ltd': 'Realme'}},
  'columns_save_min_max': ['region_cnt',
   'city_cnt',
   'url_host_cnt',
   'part_of_day_morning',
   'part_of

# Загрузка данных

In [220]:
config['evaluate']

{'submit_data': '../data/check/submit_data.csv'}

In [221]:
# загрузим сырые данные для предсказания
submit_data = pd.read_csv(config['evaluate']['submit_data'])

In [222]:
submit_data.shape

(11225731, 12)

In [223]:
submit_data.head()

Unnamed: 0,user_id,region_name,city_name,cpe_manufacturer_name,cpe_model_name,url_host,cpe_type_cd,cpe_model_os_type,price,date,part_of_day,request_cnt
0,211594,Ростовская область,Сальск,Xiaomi,Poco X3 Pro,googleads.g.doubleclick.net,smartphone,Android,23876.0,2021-06-22,evening,1
1,211594,Ростовская область,Сальск,Xiaomi,Poco X3 Pro,unblock.mts.ru,smartphone,Android,23876.0,2021-06-22,day,2
2,211594,Ростовская область,Сальск,Xiaomi,Poco X3 Pro,unblock.mts.ru,smartphone,Android,23876.0,2021-06-22,evening,3
3,211594,Ростовская область,Сальск,Xiaomi,Poco X3 Pro,online.sberbank.ru,smartphone,Android,23876.0,2021-06-23,morning,1
4,341343,Москва,Москва,Samsung,Galaxy S8 Dual,mp.weixin.qq.com,smartphone,Android,39750.0,2021-06-24,day,1


# Preprocessing

In [268]:
# функции для предобработки сырых данных
def check_columns(data: pd.DataFrame, columns_types: dict) -> None:
    """
    Проверка соответствия колонок
    :param data: датасет
    :param columns_types: словарь с признаками и типами
    """
    column_sequence = columns_types.keys()

    assert set(column_sequence) == set(data.columns), "Признаки не совпадают"


def change_cols_type(data: pd.DataFrame, col_types_dict: dict) -> pd.DataFrame:
    """
    Изменение типа столбцов на заданные
    :param data: датафрейм
    :param col_types_dict: словарь с признаками и типами данных
    :return: датафрейм
    """
    return data.astype(col_types_dict)


def fill_na_values(data: pd.DataFrame, fill_na_val: dict) -> pd.DataFrame:
    """
    Заполнение пропусков заданными значениями
    :param data: датафрейм
    :param fill_na_val: словарь с названиями признаков и значением, которым нужно заполнить пропуки
    :return: датафрейм
    """
    return data.fillna(fill_na_val)


def replace_model_mistakes(data: pd.DataFrame,
                           replace_val: dict) -> pd.DataFrame:
    """
    Функция исправляет неточности в данных
    :param data: датафрейм с данными
    :param replace_val: словарь с признаками и значениями
    :return: датафрейм
    """
    return data.replace(replace_val)


def replace_nokia_type(data: pd.DataFrame) -> pd.DataFrame:
    """
    Функция исправляет неточности в данных, заменяет
    :param data: датафрейм с данными
    :return: датафрейм с исправленными неточностями
    """
    data['cpe_type_cd'] = np.where((data['cpe_manufacturer_name'] == 'Nokia') &
                                   (data['cpe_model_name'] == '3 Dual'),
                                   'plain', data['cpe_type_cd'])
    return data

In [269]:
# функции для генерации признаков из сырых данных
def get_data_part_day(data: pd.DataFrame) -> pd.DataFrame:
    """
    Функция аггрегирует данные по user_id и возвращает долю и кол-во визитов в разное время суток
    :param data: датафрейм с данными
    :return: аггрегированный датафрейм
    """
    df_part_day = pd.get_dummies(data[['user_id', 'part_of_day']])

    # кол-во визитов пользователя в разные части дня
    df_part_day = (df_part_day.groupby('user_id', as_index=False).agg({
        'part_of_day_day':
        'sum',
        'part_of_day_evening':
        'sum',
        'part_of_day_morning':
        'sum',
        'part_of_day_night':
        'sum'
    }))

    # общее кол-во визитов пользователя
    df_part_day['sum_visits'] = (df_part_day.part_of_day_day +
                                 df_part_day.part_of_day_evening +
                                 df_part_day.part_of_day_morning +
                                 df_part_day.part_of_day_night)

    # доля визитов в разные части дня
    df_part_day['day_pct'] = df_part_day.part_of_day_day / \
        df_part_day.sum_visits
    df_part_day['evening_pct'] = df_part_day.part_of_day_evening / \
        df_part_day.sum_visits
    df_part_day['morning_pct'] = df_part_day.part_of_day_morning / \
        df_part_day.sum_visits
    df_part_day['night_pct'] = df_part_day.part_of_day_night / \
        df_part_day.sum_visits
    return df_part_day


def get_data_days(data: pd.DataFrame) -> pd.DataFrame:
    """
    Функция аггрегирует данные по user_id и возвращает следующие признаки:
    - act_days - кол-во дат активности пользователя
    - request_cnt - кол-во запросов пользователя
    - avg_req_per_day - среднее кол-во запросов пользователя
    - period_days - кол-во дней между первым и последним визитом пользователя
    - act_days_pct - доля дней, когда пользователь совершал визит

    :param data: датафрейм с данными
    :return: аггрегированный датафрейм
    """
    # кол-во дней с визитами
    df_active_days = (data.groupby('user_id', as_index=False).agg({
        'date':
        'nunique',
        'request_cnt':
        'sum'
    }).rename(columns={'date': 'act_days'}))

    # среднее кол-во запросов в дни визита
    df_active_days['avg_req_per_day'] = (df_active_days.request_cnt /
                                         df_active_days.act_days)

    # первая и последняя дата визита
    df_dates_period = (data.groupby('user_id', as_index=False).agg(
        {'date': ['max', 'min']}))

    # кол-во дней между первым и последним заходом
    df_dates_period['period_days'] = (df_dates_period['date'].iloc[:, 0] -
                                      df_dates_period['date'].iloc[:, 1])
    df_dates_period['period_days'] = df_dates_period.period_days.dt.days + 1
    df_dates_period = df_dates_period.drop('date', axis=1)

    df_dates_period.columns = df_dates_period.columns.droplevel(1)

    df_days = (df_active_days.merge(df_dates_period, on='user_id'))
    # доля дней, когда пользователь совершал визит
    df_days['act_days_pct'] = df_days.act_days / df_days.period_days

    return df_days


def get_user_model_price(data: pd.DataFrame) -> pd.DataFrame:
    """
    Функция аггрегирует данные по user_id и возвращает следующие признаки:
    - cpe_type_cd - тип устройства
    - cpe_model_os_type - операционная система устройства
    - cpe_manufacturer_name - производитель устройства
    - price - цена устройства пользователя

    :param data: датафрейм с данными
    :return: аггрегированный датафрейм
    """
    df_model = data.groupby('user_id', as_index=False).agg({
        'cpe_type_cd':
        pd.Series.mode,
        'cpe_manufacturer_name':
        pd.Series.mode,
        'cpe_model_os_type':
        pd.Series.mode,
        'price':
        'mean'
    })
    return df_model.fillna(-999)


def get_user_city_cnt(data: pd.DataFrame) -> pd.DataFrame:
    """
    Функция аггрегирует данные по user_id и возвращает следующие признаки:
    - region_cnt - кол-во уникальных регионов, из которых был совершен визит
    - city_cnt - кол-во уникальных городов, из которых был совершен визит

    :param data: датафрейм с данными
    :return: аггрегированный датафрейм
    """
    df_city_cnt = data.groupby('user_id', as_index=False) \
        .agg({'region_name': 'nunique',
              'city_name': 'nunique'}) \
        .rename(columns={'region_name': 'region_cnt',
                         'city_name': 'city_cnt'})
    return df_city_cnt


def get_user_url_cnt(data: pd.DataFrame) -> pd.DataFrame:
    """
    Функция аггрегирует данные по user_id и возвращает следующие признаки:
    - url_host_cnt - кол-во уникальных ссылок, с которых был совершен визит

    :param data: датафрейм с данными
    :return: аггрегированный датафрейм
    """
    df_url_cnt = data.groupby('user_id', as_index=False) \
        .agg({'url_host': 'nunique'}) \
        .rename(columns={'url_host': 'url_host_cnt'})
    return df_url_cnt

In [270]:
# функции для предобработки аггрегированных данных
def save_unique_train_data(data: pd.DataFrame, 
                           columns_save_unique: list,
                           columns_save_min_max: list,
                           unique_values_path: str) -> None:
    """
    Сохранение словаря с признаками и уникальными значениями
    :param data: датасет
    :param columns_save_unique: признаки, для которых нужно сохранить уникальные значения
    :param columns_save_min_max: признаки, для которых нужно сохранить только мин и макс
    :param unique_values_path: путь до файла со словарем
    :return: None
    """
    unique_df = data[columns_save_unique]
    min_max_df = data[columns_save_min_max]

    # создаем словарь с уникальными значениями для вывода в UI
    dict_unique = {
        key: unique_df[key].unique().tolist()
        for key in unique_df.columns
    }

    # создаем словарь со значениями min, max
    dict_min_max = {
        key: [
            math.floor(min_max_df[key].min())
            if min_max_df[key].min() > 0 else 0,
            math.ceil(min_max_df[key].max())
        ]
        for key in min_max_df.columns
    }

    # объединяем словари
    dict_unique.update(dict_min_max)

    with open(unique_values_path, "w") as file:
        json.dump(dict_unique, file)

In [271]:
def pipeline_raw_preprocessing(data: pd.DataFrame, cfg: dict) -> pd.DataFrame:
    """
    Функция обрабатывает сырые данные
    :param data: датасет с сырыми данными
    :param cfg: словарь с данными из конфигурационного файла
    :return: датасет
    """
    # проверка на соответствие признаков
    check_columns(data, cfg['preprocessing']['change_col_types'])
    # преобразование типов
    data = change_cols_type(data, cfg['preprocessing']['change_col_types'])
    # заполнение пропусков
    data = fill_na_values(data, cfg['preprocessing']['columns_fill_na'])
    # замена ошибок в данных
    data = replace_model_mistakes(data, cfg['preprocessing']['replace_values'])
    data = replace_nokia_type(data)
    return data


def pipeline_feature_generation(data: pd.DataFrame) -> pd.DataFrame:
    """
    Функция аггрегирует сырые данные и создает необходимые признаки
    """
    # кол-во визитов пользователя в разное время суток
    data_part_day = get_data_part_day(data)
    # кол-во активных дней и среднее кол-во запросов в сутки
    data_days = get_data_days(data)
    # данные об устройстве
    data_user_model = get_user_model_price(data)
    # кол-во регионов и городов
    data_city_cnt = get_user_city_cnt(data)
    # кол-во уникальных url
    data_url_cnt = get_user_url_cnt(data)

    # объединяем все в один датафрейм
    df = (data_part_day
          .merge(data_days, how='left', on='user_id')
          .merge(data_user_model, how='left', on='user_id')
          .merge(data_city_cnt, how='left', on='user_id')
          .merge(data_url_cnt, how='left', on='user_id'))
    return df

In [272]:
# итоговый пайплайн
def pipeline_preprocessing(data: pd.DataFrame,
                           cfg: dict,
                           flag_raw: bool = False,
                           flag_train: bool = False) -> pd.DataFrame:
    """
    Пайплайн для предобработки данных
    :param data: датасет
    :param cfg: словарь с конфигурационными данными
    :param flag_raw: если True - данные сырые и их нужно предобработать
    :param flag_train: если True - нужно соединить признаки с таргетом и сохранить уникальные значения
    :return: датасет
    """
    # если данные сырые
    if flag_raw:
        # обработка сырых данных
        data = pipeline_raw_preprocessing(data, cfg)
        # генерация признаков
        data = pipeline_feature_generation(data)

    # проверка столбцов аггрегированных данных
    check_columns(data, cfg['preprocessing']['agg_columns_type'])
    # изменение типов колонок на нужные
    data = change_cols_type(data, cfg['preprocessing']['agg_columns_type'])

    # если данные для тренировки
    if flag_train:
        # сохраним уникальные значения
        save_unique_train_data(
            data=data,
            columns_save_unique=cfg['preprocessing']['columns_save_unique'],
            columns_save_min_max=cfg['preprocessing']['columns_save_min_max'],
            unique_values_path=cfg['preprocessing']['unique_values_path'])

        # загрузим таргет и удалим пропуски
        targets = pd.read_parquet(cfg['train']['target_data_path'])
        targets = targets[['user_id', cfg['train']['target']]]
        targets = targets[targets[cfg['train']['target']] != 'NA'].dropna()
        # объединим признаки и таргет
        data = data.merge(targets, on='user_id')
        # изменим тип колонки для таргета
        data = change_cols_type(data, cfg['train']['target_type'])

    return data

In [273]:
# получим обработанные данные из сырых
df_preproc_test = pipeline_preprocessing(submit_data,
                                         config,
                                         flag_raw=True,
                                         flag_train=False)

In [274]:
df_preproc_test.head()

Unnamed: 0,user_id,part_of_day_day,part_of_day_evening,part_of_day_morning,part_of_day_night,sum_visits,day_pct,evening_pct,morning_pct,night_pct,...,avg_req_per_day,period_days,act_days_pct,cpe_type_cd,cpe_manufacturer_name,cpe_model_os_type,price,region_cnt,city_cnt,url_host_cnt
0,27,808,885,635,342,2670,0.302622,0.331461,0.237828,0.12809,...,66.074623,72,0.930556,smartphone,Xiaomi,Android,12990.0,1,2,209
1,83,421,292,132,318,1163,0.361995,0.251075,0.1135,0.273431,...,66.269234,26,1.0,smartphone,Apple,iOS,36840.0,1,1,131
2,100,651,120,704,14,1489,0.437206,0.080591,0.472801,0.009402,...,103.709679,34,0.911765,smartphone,Xiaomi,Android,21990.0,1,1,145
3,115,244,334,82,165,825,0.295758,0.404848,0.099394,0.2,...,41.799999,40,0.875,smartphone,Apple,iOS,28132.998047,1,1,119
4,171,274,115,189,35,613,0.446982,0.187602,0.30832,0.057096,...,21.0,53,0.886792,smartphone,Xiaomi,Android,20206.0,1,5,59


In [256]:
df_preproc_test.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14368 entries, 0 to 14367
Data columns (total 22 columns):
 #   Column                 Non-Null Count  Dtype   
---  ------                 --------------  -----   
 0   user_id                14368 non-null  int32   
 1   part_of_day_day        14368 non-null  int16   
 2   part_of_day_evening    14368 non-null  int16   
 3   part_of_day_morning    14368 non-null  int16   
 4   part_of_day_night      14368 non-null  int16   
 5   sum_visits             14368 non-null  int16   
 6   day_pct                14368 non-null  float32 
 7   evening_pct            14368 non-null  float32 
 8   morning_pct            14368 non-null  float32 
 9   night_pct              14368 non-null  float32 
 10  act_days               14368 non-null  int16   
 11  request_cnt            14368 non-null  int32   
 12  avg_req_per_day        14368 non-null  float32 
 13  period_days            14368 non-null  int16   
 14  act_days_pct           14368 non-null 

In [208]:
df_preproc_test = df_preproc_test.drop(config['train']['columns_to_drop'],
                                       axis=1)

# Evaluate

In [215]:
config['train']['model_path']

'../models/model_clf.joblib'

In [200]:
# загрузим обученную модель
model = joblib.load(config['train']['model_path'])

In [212]:
# получим предсказание нашей модели
df_preproc_test['predict'] = model.predict(df_preproc_test)

In [214]:
df_preproc_test.head()

Unnamed: 0,part_of_day_day,part_of_day_evening,part_of_day_morning,part_of_day_night,sum_visits,day_pct,evening_pct,morning_pct,night_pct,act_days,...,period_days,act_days_pct,cpe_type_cd,cpe_manufacturer_name,cpe_model_os_type,price,region_cnt,city_cnt,url_host_cnt,predict
0,808,885,635,342,2670,0.302622,0.331461,0.237828,0.12809,67,...,72,0.930556,smartphone,Xiaomi,Android,12990.0,1,2,209,0
1,421,292,132,318,1163,0.361995,0.251075,0.1135,0.273431,26,...,26,1.0,smartphone,Apple,iOS,36840.0,1,1,131,0
2,651,120,704,14,1489,0.437206,0.080591,0.472801,0.009402,31,...,34,0.911765,smartphone,Xiaomi,Android,21990.0,1,1,145,1
3,244,334,82,165,825,0.295758,0.404848,0.099394,0.2,35,...,40,0.875,smartphone,Apple,iOS,28132.998047,1,1,119,0
4,274,115,189,35,613,0.446982,0.187602,0.30832,0.057096,47,...,53,0.886792,smartphone,Xiaomi,Android,20206.0,1,5,59,1
