In [1]:
import argparse
import time
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from scipy import sparse
import os
import pandas as pd
from scipy import sparse
import numpy as np
from typing import Tuple

In [2]:
SEED = 777
VERBOSE = False

In [3]:
torch.manual_seed(SEED)

#만약 GPU가 사용가능한 환경이라면 GPU를 사용
if torch.cuda.is_available():
    use_cuda = True

device = torch.device("cuda" if use_cuda else "cpu")
device

device(type='cuda')

In [4]:
print("Load Movielens dataset")
# Load Data
DATA_DIR = '/opt/ml/input/data/train'
raw_data = pd.read_csv(os.path.join(DATA_DIR, 'train_ratings.csv'), header=0)

Load Movielens dataset


In [5]:
_min_cnt = float('inf')
for usr_id, tp in raw_data.groupby('user', as_index=False):
    usr_id, tp['item']
    if not isinstance(usr_id, int) and not isinstance(tp, pd.DataFrame):
        print('except')
        break
    _min_cnt = min(tp.shape[0], _min_cnt)

In [6]:
_min_cnt

16

In [7]:
raw_data.shape[0]

5154471

# 테스트셋 분리

In [8]:
def generate_general_train_test_set(test_plays: pd.DataFrame, n_all=10, n_seq=2) -> Tuple[pd.DataFrame, pd.DataFrame]:
    np.random.seed(SEED)
    trains, labels = [], []
    for usr_id, tp in test_plays.groupby('user', as_index=False):
        _n_all = min(tp.shape[0]//4, n_all)
        _n_seq = min(_n_all, n_seq)
        _n_static = _n_all - _n_seq
        _n_all = _n_static + _n_seq

        _idxs = np.random.permutation(tp.shape[0]-_n_seq)[:_n_static]
        _mask = tp.index.isin(tp.index[_idxs])
        for i in range(_n_seq):
            _mask[-i-1] = True
        if VERBOSE:
         if _n_all != 10:
            print('_n_all:', _n_all)
            print(usr_id, _idxs)
            print(_n_static, _n_seq)

        trains.append(tp[~_mask])
        labels.append(tp[_mask])
        
    train_df = pd.concat(trains)
    label_df = pd.concat(labels)
    return train_df, label_df

In [9]:
train_df, label_df = generate_general_train_test_set(raw_data, n_all=10, n_seq=2)

In [10]:
assert raw_data.shape[0] == train_df.shape[0] + label_df.shape[0]

In [11]:
label_df.head(12)

Unnamed: 0,user,item,time
61,11,150,1230785343
89,11,60037,1230787639
131,11,57368,1230788571
134,11,1748,1230788594
257,11,6996,1230856754
275,11,364,1230858919
301,11,3000,1230859482
351,11,69526,1251170492
374,11,7153,1294796132
375,11,4226,1294796159


In [12]:
label_df['user'].unique()

array([    11,     14,     18, ..., 138486, 138492, 138493])

# recall계산 속도 테스트

In [13]:
test_submission = pd.read_csv("general_test_submission.csv")

In [57]:
test_submission[test_submission['user'] == 11]['item']

0     1680
1     1610
2     2683
3     4370
4    80463
5    88125
6     4741
7     1252
8      915
9     1237
Name: item, dtype: int64

## for loop

In [58]:
label_df[label_df['user'] == 11]['item'].isin(test_submission[test_submission['user'] == 11]['item']).sum()

0

In [59]:
from tqdm import tqdm

In [60]:
%%time
recall_sum = 0
for user in tqdm(label_df['user'].unique()):
    preds = label_df[label_df['user'] == user]['item']
    labels = test_submission[test_submission['user'] == user]['item']

    recall_sum += preds.isin(labels).sum() / labels.shape[0]

100%|██████████| 31360/31360 [00:42<00:00, 737.54it/s]

CPU times: user 42.5 s, sys: 80 ms, total: 42.6 s
Wall time: 42.5 s





In [61]:
recall_sum/label_df['user'].nunique()

0.09025510204079744

## async

In [62]:
import asyncio

In [63]:
async def _worker(user):
    preds = label_df[label_df['user'] == user]['item']
    labels = test_submission[test_submission['user'] == user]['item']

    return preds.isin(labels).sum() / labels.shape[0]

In [64]:
async def get_recall():
    loop = asyncio.get_event_loop()
    tasks = [loop.create_task(_worker(user)) for user in label_df['user'].unique()]
    result = await asyncio.gather(*tasks)
    print(sum(result) / label_df['user'].nunique())
    return sum(result) / label_df['user'].nunique()

In [65]:
start = int(time.time())
await get_recall()
print("***run time(sec) :", int(time.time()) - start)

0.09025510204079744
***run time(sec) : 43


## multiprocessing

In [66]:
from multiprocessing import Pool

In [67]:
def _worker(user):
    preds = label_df[label_df['user'] == user]['item']
    labels = test_submission[test_submission['user'] == user]['item']

    return preds.isin(labels).sum() / labels.shape[0]

In [68]:
def get_recall():
    with Pool(os.cpu_count()) as p:
        result = p.map(_worker, label_df['user'].unique())
    return sum(result) / label_df['user'].nunique()

In [69]:
%%time
get_recall()

CPU times: user 104 ms, sys: 172 ms, total: 276 ms
Wall time: 5.66 s


0.09025510204079744

In [97]:
def _worker(user_df):
        user, submission_df = user_df
        preds = label_df[label_df['user'] == user]['item']
        labels = submission_df[submission_df['user'] == user]['item']

        return preds.isin(labels).sum() / labels.shape[0]

In [98]:
def get_recall(submission_df):
    with Pool(os.cpu_count()) as p:
        users = label_df['user'].unique()
        result = p.map(_worker, zip(users, [submission_df]*len(users)) )
    
    return sum(result) / label_df['user'].nunique()

# 모델별 성능 테스트

In [70]:
test_submission_ep1 = pd.read_csv("general_test_submission_ep1.csv")

In [71]:
_cnt = 0
for user in tqdm(label_df['user'].unique()):
    _cnt += label_df[label_df['user'] == user]['item'].isin(test_submission_ep1[test_submission_ep1['user'] == user]['item']).sum()

100%|██████████| 31360/31360 [00:41<00:00, 752.22it/s]


In [72]:
_cnt/test_submission.shape[0]

0.07685905612244898

In [73]:
all_test_submission = pd.read_csv("g_test_submission.csv")

In [74]:
_cnt = 0
for user in tqdm(label_df['user'].unique()):
    _cnt += label_df[label_df['user'] == user]['item'].isin(all_test_submission[all_test_submission['user'] == user]['item']).sum()

100%|██████████| 31360/31360 [00:41<00:00, 752.16it/s]


In [75]:
_cnt/test_submission.shape[0]

0.11298469387755102

In [99]:
get_recall(all_test_submission)

0.1129846938775256