# Проект "Предсказание отклика абонента на подключение услуги"
_Черненко А.Е._

# Подготовка тренировочных и тестовых данных с наборами признаков по профилю потребления

Импорт библиотек

In [1]:
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client, progress

Запуск клиента Dask.

Это предоствит панель мониторинга (Dashboard), которая полезна для получения информации о вычислениях.
Ссылка на панель мониторинга в ячейке вывода.

In [2]:
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:61651  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 12.76 GB


В качестве исходных данных доступна информация об отклике абонентов на предложение подключения одной из услуг __`data_train.csv`__, __`data_test.csv`__. Каждому пользователю может быть сделано несколько предложений в разное время, каждое из которых он может или принять, или отклонить.

Отдельным набором данных представлен нормализованный анонимизированный набор признаков __`features.csv`__, характеризующий профиль потребления абонента. Эти данные привязаны к определенному времени, поскольку профиль абонента может меняться с течением времени.

Итого, в качестве входных данных будут представлены:
* __data_train.csv__: 
    * `id`,
    * `vas_id`,
    * `buy_time`,
    * `target`;
    
    
* __features.csv__: 
    * `id`, 
    * `<feature_list>`;

и тестовый набор:
* __data_test.csv__:
    * `id`,
    * `vas_id`,
    * `buy_time`;

где:
* `target` - целевая переменная, где 1 означает подключение услуги, 0 - абонент не подключил услугу соответственно,
* `buy_time` - время покупки, представлено в формате timestamp,
* `id` - идентификатор абонента,
* `vas_id` - подключаемая услуга.

Загрузим исходные .csv-файлы в `dask.dataframe` и сразу переименуем столбцы индексов и `buy_time` для их различия между данными откликов и данными профилей потребления

In [3]:
# мапер для переименования индексов и buy_time 
def mapper(name):
    return {'Unnamed: 0': f'index_{name}', 
            'buy_time': f'buy_time_{name}'}


dd_features = dd.read_csv('c://mfds/features.csv',sep='\t').rename(columns=mapper('feat'))
dd_train = dd.read_csv('data_train.csv').rename(columns=mapper('vas'))
dd_test = dd.read_csv('data_test.csv').rename(columns=mapper('vas'))

In [4]:
LEN_TRAIN = len(dd_train.index)
print(f'Наблюдений в "data_train.csv": {LEN_TRAIN}')

Наблюдений в "data_train.csv": 831653


In [5]:
LEN_TEST = len(dd_test.index)
print(f'Наблюдений в "data_test.csv": {LEN_TEST}')

Наблюдений в "data_test.csv": 71231


Необходимо все наблюдения из `data_train.csv` и `data_test.csv` дополнить соответствующими признаками из `features.csv`. 

Подробно рассмотрим алгоритм получения признаков из профиля потребления для тренировочной выборки, а затем повторим аналогичные дествия для тестовой выборки.

Проведем слияние тренировочной выборки `dd_train` и базы данных профилей потребления `dd_features`.
Слияние проведем методом пересечения (`how='inner'`) по столбцу `id`. Данный способ подходит нам с допущением, что в `features.csv` имеются данные для всех `id`, с которыми мы будем работать в тренировочной и тестовой выборках.

In [6]:
merged_train = dd.merge(dd_train, dd_features, on=['id'], how='inner')

Получим результат слияния в виде `pandas.DataFrame`

In [7]:
pd_train = merged_train.compute()

In [8]:
pd_train.head(3)

Unnamed: 0,index_vas,id,vas_id,buy_time_vas,target,index_feat,buy_time_feat,0,1,2,...,243,244,245,246,247,248,249,250,251,252
0,140,4130548,2.0,1544389200,0.0,8832,1548018000,11.700029,17.790888,4.429214,...,-943.373846,-598.770792,-25.996269,-22.630448,-220.747724,-14.832889,-0.694428,-12.175933,-0.45614,1.0
1,842,540997,1.0,1541365200,0.0,11897,1545598800,-96.799971,-69.199112,-108.200786,...,-977.373846,-613.770792,-25.996269,-37.630448,-306.747724,-24.832889,0.305572,-12.175933,-0.45614,0.0
2,843,540997,4.0,1542574800,1.0,11897,1545598800,-96.799971,-69.199112,-108.200786,...,-977.373846,-613.770792,-25.996269,-37.630448,-306.747724,-24.832889,0.305572,-12.175933,-0.45614,0.0


Наблюдений в полученном датафрейме

In [9]:
pd_train.shape[0]

860052

Видно, что после слияния датафрейм имеет больше наблюдей, чем исходный. Это связано с тем, что для некоторых клиентов в данных профиля потребления имеются несколько записей, зафиксированных в разное время. Для таких случаев произошло дублирование некоторых наблюдений из `data_train.csv` с разными записями признков из `features.csv`. Избавляться от дубликатов будем с учетом актуальности по временни. То есть среди дубликатов будем оставлять только те наблдения, у которых разница между временными штампами отклика на услугу (`'buy_time_vas'`) и фиксации записи по признакам профиля потребления (`'buy_time_feat'`) минимальна. Введем новый столбец `'time_delta'`, который будет отражать указанную разницу во времени.

In [10]:
pd_train['time_delta'] = abs(pd_train['buy_time_vas'] - pd_train['buy_time_feat'])

Отсортируем наблюдения в порядке возрастания `'time_delta'`

In [11]:
pd_train.sort_values(['time_delta'], inplace=True, ignore_index=True)

Далее удалим по порядку все дубликаты, оставляя лишь те, которые встречаются в датафрейме первыми (`keep='first'`). Благодаря сортировке и данному действию, среди дубликатов останутся лишь те наблюдения, которые имеют минимальное значение `time_delta`. Дублированными строками считаются те, которые имеют одинаковое значение `index_vas`.

In [12]:
pd_train.drop_duplicates(['index_vas'], keep='first', inplace=True)

Отсортируем датафрейм по возрастанию `index_vas`.

In [13]:
pd_train.sort_values(['index_vas'], inplace=True, ignore_index=True)

Проверим нарушилась ли индексация `index_vas` путем сравнения со сброшенным на предыдущем этапе индеском датафрейма.

In [14]:
(pd_train.index == pd_train['index_vas']).all()

True

Индексы совпадают, индексация не нарушена.

Сравним количество наблюдений полученного датафрейма с `data_train.csv`

In [15]:
print(f'Наблюдений   в   исходном  "data_train.csv": {LEN_TRAIN}')
print(f'Наблюдений в новом тренирвоочном датафрейме: {pd_train.shape[0]}')

Наблюдений   в   исходном  "data_train.csv": 831653
Наблюдений в новом тренирвоочном датафрейме: 831653


#### Вывод: 
Индексация и количество наблюдений после извлечения и добавления признаков потребления __НЕ НАРУШЕНЫ__.

Удалим лишние стобцы с индексами и признаком `buy_time` из `features.csv`. Признак `time_delta` оставим, так как он несет в себе информацию об актуальности признаков профиля потребления на момент отклика клиента на услугу. И сохраним датафрейм в `data_feat_train.pkl`.

In [16]:
pd_train.drop(['index_vas','index_feat', 'buy_time_feat'], axis=1).astype('float32').to_pickle('data_f32_train.pkl')

Повторим алгоритм для тестовой выборки.

In [17]:
merged_test = dd.merge(dd_test, dd_features, on=['id'], how='inner')
pd_test = merged_test.compute()
pd_test['time_delta'] = abs(pd_test['buy_time_vas'] - pd_test['buy_time_feat'])
pd_test.sort_values(['time_delta'], inplace=True, ignore_index=True)
pd_test.drop_duplicates(['index_vas'], keep='first', inplace=True)
pd_test.sort_values(['index_vas'], inplace=True, ignore_index=True)

Индексация в норме:

In [18]:
(pd_test.index == pd_test['index_vas']).all()

True

Количество наблюдений сохранено:

In [19]:
print(f'Наблюдений в исходном  "data_test.csv": {LEN_TEST}')
print(f'Наблюдений в новом тестовом датафрейме: {pd_test.shape[0]}')

Наблюдений в исходном  "data_test.csv": 71231
Наблюдений в новом тестовом датафрейме: 71231


Сохраним датафрейм в `data_feat_test.pkl`.

In [20]:
pd_test.drop(['index_vas','index_feat'], axis=1).astype('float32').to_pickle('data_f32_test.pkl')

tornado.application - ERROR - Uncaught exception GET /status/ws (127.0.0.1)
HTTPServerRequest(protocol='http', host='127.0.0.1:8787', method='GET', uri='/status/ws', version='HTTP/1.1', remote_ip='127.0.0.1')
Traceback (most recent call last):
  File "c:\users\aech\appdata\local\programs\python\python37\lib\site-packages\tornado\websocket.py", line 546, in _run_callback
    result = callback(*args, **kwargs)
  File "c:\users\aech\appdata\local\programs\python\python37\lib\site-packages\bokeh\server\views\ws.py", line 135, in open
    raise ProtocolError("Token is expired.")
bokeh.protocol.exceptions.ProtocolError: Token is expired.
