In [7]:
import datetime
import json
import os
import pickle
import requests
import urllib
from itertools import groupby
from operator import itemgetter
from typing import Dict, Tuple
from multiprocessing import Pool

import numpy as np
import pandas as pd
import sqlalchemy as sa
from pymongo import MongoClient
from scipy.sparse import save_npz, load_npz, csr_matrix
from tqdm import tqdm

os.environ['MKL_NUM_THREADS'] = '1'
DT = datetime.datetime.now().strftime('%Y-%m-%d')

# Загружаем csv

`user_id` - результат рандома
`item_id` - результат рандома

In [8]:
user_item_views_df = pd.read_csv('/usr/share/data_store/user_item_views.zip', compression='zip')
print(user_item_views_df.shape)
user_item_views_df.head(3)

(1975696, 4)


Unnamed: 0,user_id,item_id,show_timestamp,show_duration
0,912948920,1587935070,1119307,323
1,1882728205,1466874188,1115796,1428
2,382105433,276839040,1116585,921


Для трансформации в csr создаём индексы

In [9]:
unique_users = user_item_views_df.user_id.unique()
unique_items = user_item_views_df.item_id.unique()
item_to_id = {j: i for i, j in enumerate(unique_items)}
id_to_item = {j: i for i, j in item_to_id.items()}
user_to_id = {j: i for i, j in enumerate(unique_users)}
print('Индекс создан: %d строк %d столбцов' % (len(user_to_id), len(item_to_id)))

Индекс создан: 168756 строк 9991 столбцов


In [10]:
len(user_to_id)

168756

## Трансформация в разреженная матрица

Для каждого пользователя оставляем top-20 последних просмотров

In [11]:
%%time
HISTORY_TOP = 20
user_item_views_df['rank'] = (
    user_item_views_df
    .groupby(by=['user_id'])['show_timestamp']
    .rank(method='first', ascending=False)
)
ui_slim_df = user_item_views_df[user_item_views_df['rank'] < HISTORY_TOP][['user_id', 'item_id']]
num_rows = len(user_to_id)
num_cols = len(item_to_id)
entries = np.ones(ui_slim_df.shape[0])
rows = tuple(user_to_id[i] for i in ui_slim_df.user_id.values)
cols = tuple(item_to_id[i] for i in ui_slim_df.item_id.values)

train_set_csr = csr_matrix(
    (entries, (rows, cols)),
    shape=(num_rows, num_cols),
    dtype=np.float32
)
train_set_csr
save_npz(f'train_set_{DT}.npz', train_set_csr)
print('Данные сохранены в %s' % f'train_set_{DT}.npz')

Данные сохранены в train_set_2020-12-07.npz
CPU times: user 6.37 s, sys: 136 ms, total: 6.51 s
Wall time: 6.55 s


In [12]:
train_set_csr

<168756x9991 sparse matrix of type '<class 'numpy.float32'>'
	with 1433407 stored elements in Compressed Sparse Row format>

# Обучение модели

In [13]:
from implicit.als import AlternatingLeastSquares

implict_als_params = {'factors': 4, 'iterations': 1}

model = AlternatingLeastSquares(**implict_als_params)

# транспонируем обязательно!
model.fit(train_set_csr.T.tocsr())



HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=1.0), HTML(value='')))




## Пример работы модели

In [14]:
def id_to_content_df(ids: np.array, content_df: pd.DataFrame, id_to_item):
    items = tuple(id_to_item[i] for i in ids)
    result_df = content_catalog[
        content_catalog.item_id.isin(items)
    ]
    return result_df

content_catalog = pd.read_csv('/usr/share/data_store/content_catalog.zip', compression='zip')

random_history = train_set_csr[
    np.random.randint(low=0, high=train_set_csr.shape[0])
]

id_to_content_df(random_history.nonzero()[1], content_catalog, id_to_item)

Unnamed: 0,item_id,title
3916,622870895,Защитник
4319,955291410,Последствия
8392,770977699,Роковое влечение
13349,1939981791,Выбор
25273,1245146184,Неистовый
25990,1496630533,Лицо со шрамом
27442,157249585,Капоне. Лицо со шрамом


Проверим, что рекоммендует модель

In [15]:
recommends = model.recommend(
            userid = 0,
            user_items=random_history,
            N=10,
            filter_already_liked_items=True,
            recalculate_user=True
)
id_to_content_df([rec[0] for rec in recommends], content_catalog, id_to_item)

Unnamed: 0,item_id,title
4098,984316348,Гарри Поттер и Принц-полукровка
7951,1619286035,Гарри Поттер и Кубок огня
10411,1776502261,Гарри Поттер и философский камень
10508,401051817,Гарри Поттер и Орден Феникса
16205,1687588185,Щенячий патруль: Мегащенки
26561,1729092206,Гарри Поттер и узник Азкабана
27855,1694481414,Гарри Поттер и Дары Смерти: Часть I
28796,538560695,Разговорник
28958,1376675893,Зверополис
30121,231135289,"Гудбай, Америка"


# Валидация модели

In [17]:
with open('/usr/share/data_store/ground_truth_dataset.pkl', 'rb') as f:
    ground_truth_dataset = pickle.load(f)
with open('/usr/share/data_store/test_dataset.pkl', 'rb') as f:
    test_dataset = pickle.load(f)
print(len(test_dataset), len(ground_truth_dataset))

13163 13163


In [18]:
# test_dataset[1000]

In [19]:
# ground_truth_dataset[1000]

In [20]:
def get_als_action_history_vector(item_to_id: Dict[int, int], action_history, binary=True) -> np.ndarray:
    """Получить историю действий для ALS

    :param item_to_id: справочник контента ALS
    :return:
    """
    als_action_history_vector = np.zeros(len(item_to_id), dtype=int)
    for iid, item_attr in action_history.items():
        if iid in item_to_id.keys():
            if binary:
                als_action_history_vector[item_to_id[iid]] = 1
            else:
                als_action_history_vector[item_to_id[iid]] = item_attr
    return als_action_history_vector

def vectorize_action_history(action_history):
    res = get_als_action_history_vector(item_to_id, action_history)
    return res

with Pool(2) as p:
    test_dataset_vectors = p.map(vectorize_action_history, test_dataset)
    ground_truth_dataset_vectors = p.map(vectorize_action_history, ground_truth_dataset)
print(len(test_dataset_vectors))

13163


Готовим данные для мультипроцессинга - объединяем в один массив историю пользователя и валидационные просмотры

In [21]:
train_valid_pairs = []
for test_user_id in range(len(test_dataset_vectors)):
    train_valid_pairs.append((
        csr_matrix(test_dataset_vectors[test_user_id]),
        ground_truth_dataset_vectors[test_user_id].nonzero()[0]
    ))

In [22]:
%%time

N = 40
testing_model = model

def top_n_recommends(watch_history):
    top_n_result = testing_model.recommend(
            userid = 0,
            user_items=watch_history[0],
            N=N,
            filter_already_liked_items=True,
            recalculate_user=True
    )
    hit = 0
    if len(watch_history[1]) > 0 and np.intersect1d(watch_history[1], top_n_result).size > 0:
        hit = 1
    return hit

with Pool(2) as p:
    hits = p.map(top_n_recommends, train_valid_pairs)
print(sum(hits)/len(hits))

0.18354478462356605
CPU times: user 483 ms, sys: 194 ms, total: 677 ms
Wall time: 34.9 s


# Бейзлайны

top 100 популярного

In [23]:
%%time

N = 40
content_popularity = np.asarray(train_set_csr.sum(axis=0)).reshape(-1)
top_100_popular_items = np.argsort(-content_popularity)[:100]

def top_n_recommends(watch_history):
    top_n_result = top_100_popular_items[:N]
    hit = 0
    if len(watch_history[1]) > 0 and np.intersect1d(watch_history[1], top_n_result).size > 0:
        hit = 1
    return hit

with Pool(5) as p:
    hits = p.map(top_n_recommends, train_valid_pairs)
print(sum(hits)/len(hits))

0.18438046038137204
CPU times: user 486 ms, sys: 336 ms, total: 823 ms
Wall time: 1.42 s


Рандом

In [24]:
%%time

N = 40
content_popularity = np.asarray(train_set_csr.sum(axis=0)).reshape(-1)
all_content = np.array(list(id_to_item.keys()))

def top_n_recommends(watch_history):
    top_n_result = np.random.choice(all_content, size=N, replace=True)
    hit = 0
    if len(watch_history[1]) > 0 and np.intersect1d(watch_history[1], top_n_result).size > 0:
        hit = 1
    return hit

with Pool(5) as p:
    hits = p.map(top_n_recommends, train_valid_pairs)
print(sum(hits)/len(hits))

0.0076730228671275545
CPU times: user 503 ms, sys: 345 ms, total: 848 ms
Wall time: 1.66 s


Пример с обучением модели

In [25]:
implict_als_params = {'factors': 20, 'iterations': 30}
model = AlternatingLeastSquares(**implict_als_params)
# транспонируем обязательно!
model.fit(train_set_csr.T.tocsr())

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=30.0), HTML(value='')))




In [26]:
%%time

N = 50
testing_model = model

def top_n_recommends(watch_history):
    top_n_result = testing_model.recommend(
            userid = 0,
            user_items=watch_history[0],
            N=N,
            filter_already_liked_items=True,
            recalculate_user=True
    )
    hit = 0
    if len(watch_history[1]) > 0 and np.intersect1d(watch_history[1], top_n_result).size > 0:
        hit = 1
    return hit

with Pool(3) as p:
    hits = p.map(top_n_recommends, train_valid_pairs)
print(sum(hits)/len(hits))

0.25518498822456886
CPU times: user 761 ms, sys: 220 ms, total: 981 ms
Wall time: 47.2 s


Проверяем рекомендации на обученной модели

In [27]:
recommends = model.recommend(
            userid = 0,
            user_items=random_history,
            N=10,
            filter_already_liked_items=True,
            recalculate_user=True
)
id_to_content_df([rec[0] for rec in recommends], content_catalog, id_to_item)

Unnamed: 0,item_id,title
622,39175006,Призраки войны
2002,438216250,Киллер по вызову
7722,1724154138,Шахматист
8198,1402254622,Лето 84
10898,1528576868,Морские паразиты
25044,1687494578,Легендарное ограбление
26996,1782092814,Самый пьяный округ в мире
27855,1694481414,Гарри Поттер и Дары Смерти: Часть I
28064,1129147321,Волк с Уолл-стрит
30562,702331912,Дурная слава


# Загружаем JSON

Нужно распаковать архив и подготовить его для загрузки в Mongo

In [30]:
import tarfile
with tarfile.open('/usr/share/data_store/json_views.tar.gz', 'r') as json_tar:
    json_tar.extractall('/usr/share/data_store')

Открываем файл на чтение

In [31]:
for line in open('/usr/share/data_store/json_views.json'):
    print(line)
    break

{"value": 1950, "date": 949614752, "validation": 0, "item_id": 930420160, "user_id": 399644822}



Подключаемся к Mongo

In [46]:
client = MongoClient('mongo_host', 27017)
db = client['watch_history_db']
collection = db['watch_history']
print(collection)

Collection(Database(MongoClient(host=['mongo_host:27017'], document_class=dict, tz_aware=False, connect=True), 'watch_history_db'), 'watch_history')


In [45]:
%%time

for line in open('/usr/share/data_store/json_views.json'):
    json_line = eval(line)
    collection.insert_one(json_line)

# client.close()

CPU times: user 1min 30s, sys: 8.94 s, total: 1min 39s
Wall time: 2min 15s


Чтобы проверить, что добавление запусей прошло успешно запустите подключение к Mongo

```shell
python3 upstart.py -s mongo
```

Далее переключимся в схему данных
```shell
use watch_history_db
```

И отобразим несколько записей из загруженного датасета

```shell
db.watch_history.find().limit(10)
```

Плстроим выборку пользователей

In [48]:
distinct_users = collection.distinct("user_id")
print(distinct_users[:10])

[399644822, 968040702, 428847820, 425435267, 605494934, 309946896, 565376993, 613615730, 963328481, 428290214]
