## Objective


**코끼리 냉장고에 집어넣기 : 실시간 추천엔진을 노트북에서 돌게 만들어보자** 라는 주제로 발표된 추천 엔진 관련 발표는 분산 환경인 하둡이나 스파크를 쓰지 않고도, 노트북 수준으로도 충분히  대용량의 추천 엔진을 디자인할 수 있음을 보여주는 자료입니다. Jaccard Similarity을 활용한 Collaborative Filtering을 구현해보도록 하겠습니다.


***caution***
> [Deview 2015 하용호님 발표](https://www.slideshare.net/deview/261-52784785)의 발표 내용을 코드로 구현한 Script입니다. 


### 필요 모듈 가져오기

conda 환경인 경우, 추가적으로 아래 모듈들을 설치해주셔야 합니다.

````shell
pip install datasketch # Minhash Package
pip install redis # Redis DB Interface package 
pip install python-snappy # String Compression Package
````


In [1]:
%matplotlib inline

import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from tensorflow.keras.utils import get_file
tqdm.pandas()

### 데이터 가져오기 

In [2]:
ROOT_URL = "https://craftsangjae.s3.ap-northeast-2.amazonaws.com/data/"

# 데이터 가져오기
play_path = get_file("lastfm_play.csv",
                     ROOT_URL+"lastfm_play.csv")
artist_path = get_file("lastfm_artist.csv",
                       ROOT_URL+"lastfm_artist.csv")
user_path = get_file("lastfm_user.csv",
                     ROOT_URL+"lastfm_user.csv")

play_df = pd.read_csv(play_path)
artist_df = pd.read_csv(artist_path)
user_df = pd.read_csv(user_path)

## 사전 설명

### 1. Jaccard Similarity

Jaacard Similarity는 **두 집합 간 유사도**를 구하는 수식으로, 주로 구매 유무, 페이지 뷰 유무 등의 행동 정보를 통해 유사도를 구할 때 이용합니다.


$$
J(A, B) = \frac{|A \cap B|}{|A \cup B|}
$$



In [3]:
def jaccard_similarity(A,B):
    """
    두 집합 A,B에 대한 유사도
    """
    return len(A & B) / len(A | B)

예시 ) 세 가지 아이템(A,B,C) 이 존재하고 7명의 유저가(1,2,3,4,5,6,7)이 존재할 때, 각 아이템 별 구매한 유저들은 아래처럼 존재하고 있는 상황.

$
A = \{ 1, 2, 4, 7\} \\
B = \{ 2, 3, 5, 6\} \\
C = \{ 3, 4, 5\}
$

In [4]:
A = {1,2,4,7}
B = {2,3,5,7}
C = {3,4,5}

In [5]:
sim = jaccard_similarity(A,B)
print(f"J(A,B)의 유사도 : {sim:.3%}")

sim = jaccard_similarity(A,C)
print(f"J(A,C)의 유사도 : {sim:.3%}")

sim = jaccard_similarity(B,C)
print(f"J(B,C)의 유사도 : {sim:.3%}")

J(A,B)의 유사도 : 33.333%
J(A,C)의 유사도 : 16.667%
J(B,C)의 유사도 : 40.000%


### 2. MinHash

<img src="https://www.researchgate.net/publication/304265533/figure/fig4/AS:391339092004880@1470313918577/Overview-of-the-MinHash-bottom-sketch-strategy-for-estimating-the-Jaccard-index-First.png" width="300" > 

Minhash는 기본적으로 집합을 Hash하는 알고리즘입니다. Hash는 임의의 길이의 데이터(Key Values)를 고정된 길이의 데이터(Hash Values)로 맵핑합니다. MinHash는 하나의 집합에 대해 고정된 갯수의 Hash Values을 생성합니다.

이 때 일반적인 Hash와 다르게 하나의 독특한 성질(local sensitive hashing)이 있는데, 두 집합이 유사할수록 해당 Hash Values가 비슷하게 생성됩니다.

---- 

*** CF) MinHASH 살펴보기 ***

In [6]:
from datasketch import MinHash

# Hash하고자 하는 집합
set_A = {"A","B","C"}

# 생성하는 Hash의 갯수
minhash = MinHash(num_perm=4)

for value in set_A:
    # 원소 별로 하나씩 minhash에 적용
    minhash.update(str(value).encode('utf8'))
    
minhash.hashvalues

array([ 432770662, 2866125906,  383765683,  257361572], dtype=uint64)

위의 과정을 함수로 나타내면 아래와 같습니다.

In [7]:
def get_hash(target_set, sig_size=128):
    minhash = MinHash(sig_size)
    for value in target_set:
        minhash.update(str(value).encode('utf8'))
    return minhash.hashvalues

MinHash Function은 서로 다른 Hash Function들(위에서는 4개)으로 구성되어 있습니다. MinHash의 알고리즘을 보기 위해 간단한 예제를 만들어보도록 하겠습니다.

In [8]:
hash_a = get_hash({'A'}, 4)
hash_a

array([2155541700, 3497910862, 1404536498,  257361572], dtype=uint64)

In [9]:
hash_b = get_hash({'B'}, 4)
hash_b

array([1660848309, 2866125906,  383765683,  690308850], dtype=uint64)

In [10]:
hash_c = get_hash({'C'}, 4)
hash_c

array([ 432770662, 3839306110,  701789493, 2250697747], dtype=uint64)

MinHash는 각 원소 별로 Signature(여기서는 4개)을 구한 후, 각 Signature 중 가장 작은 값을 저장하는 방식입니다. 가장 작은 값을 저장한다 해서 MinHash라고 불립니다.

In [11]:
hash_abc = get_hash({"A","B", "C"},4)
hash_abc

array([ 432770662, 2866125906,  383765683,  257361572], dtype=uint64)

In [12]:
# 각 시그니처 별 최소 값(Min)이 해당 집합 Signature
np.maximum(hash_a, hash_b, hash_c)

array([2155541700, 3497910862, 1404536498,  690308850], dtype=uint64)

#### 특성 1 : 원소가 중복되면, 동일한 결과 반환한다. 

집합의 특성과 동일하게 이미 Minhash에 포함되었다면 MinHash의 값은 동일하게 나옵니다.

In [13]:
minhash.update("A".encode('utf8'))

minhash.hashvalues

array([ 432770662, 2866125906,  383765683,  257361572], dtype=uint64)

#### 특성 2: 순서에 영향을 받지 않는다.

그리고 집합과 동일하게, 원소를 update하는 순서가 달라지더라도 동일한 결과를 반환합니다.

In [14]:
minhash = MinHash()

for value in ["A","B","C","D"]:
    minhash.update(str(value).encode('utf8'))
    
minhash.hashvalues

array([ 187542028,  206934951,  383765683,  257361572,   14019373,
         98127798,  510146389,  158207789, 2211499638,  236986188,
       1430055357, 1069935458,  622631458,  859502047,  304814259,
       2930336844,   82639309, 1462000340, 1259992472, 1462270518,
         35851626,   62567127,  669040041,  734884339,  640828744,
       1638357194,  104131353,  338442154,  826472273,  251592307,
       2872577173, 1624068580, 1915339881, 1075083221,  145452920,
        141861766,  565557948,  109692850, 1037588332,  232800860,
         71174338,  904892082,  126924591,    6559914, 1344550122,
       1751405721,  136141014,  469736690,  718739130, 1591066330,
       2073693511,  156225272,  172995981, 1829169708,   48017838,
       1191568394,   59197654,   49810303,   94627355,  345970473,
       1306477605,  502945878,   23067506,   30293773, 1446449111,
       1446167980,  666285169,  522965222,   99415839,   14356784,
         84889254,  952242489, 1614147919,  515649169, 1677435

In [15]:
minhash = MinHash()

for value in ["D","C","B","A"]:
    minhash.update(str(value).encode('utf8'))
    
minhash.hashvalues

array([ 187542028,  206934951,  383765683,  257361572,   14019373,
         98127798,  510146389,  158207789, 2211499638,  236986188,
       1430055357, 1069935458,  622631458,  859502047,  304814259,
       2930336844,   82639309, 1462000340, 1259992472, 1462270518,
         35851626,   62567127,  669040041,  734884339,  640828744,
       1638357194,  104131353,  338442154,  826472273,  251592307,
       2872577173, 1624068580, 1915339881, 1075083221,  145452920,
        141861766,  565557948,  109692850, 1037588332,  232800860,
         71174338,  904892082,  126924591,    6559914, 1344550122,
       1751405721,  136141014,  469736690,  718739130, 1591066330,
       2073693511,  156225272,  172995981, 1829169708,   48017838,
       1191568394,   59197654,   49810303,   94627355,  345970473,
       1306477605,  502945878,   23067506,   30293773, 1446449111,
       1446167980,  666285169,  522965222,   99415839,   14356784,
         84889254,  952242489, 1614147919,  515649169, 1677435

#### 특성 3 : 집합이 비슷하면, Hash 값도 비슷하게 나온다.(핵심 특징)

Minhash의 가장 중요한 특성 중 하나로, 집합 간의 IOU 값과 Hash 값의 IOU가 비슷하게 나옵니다.

In [16]:
set_A = {"A","B","C","D","E","F","G","H"}
set_B = {"D","E","F","G","H","I","J","K"}

#### 두 집합의 IOU 값 구하기 

In [17]:
intersection = len(set_A & set_B)
union = len(set_A | set_B)
iou = intersection / union 
iou

0.45454545454545453

#### 두 집합의 MinHash 구하기

In [18]:
minhash_A = get_hash(set_A)
minhash_B = get_hash(set_B)

In [19]:
minhash_iou = np.mean(minhash_A == minhash_B)
minhash_iou

0.5

MinHash의 signature size가 커질수록 집합 간의 IOU 값과 Hash 값의 IOU가 더 비슷해 집니다.

In [20]:
sig_size = 256

minhash_A = get_hash(set_A, sig_size)
minhash_B = get_hash(set_B, sig_size)

np.mean(minhash_A == minhash_B)

0.44921875

---


## 대규모 Memory Based Collaborative Filtering의 어려운 점

현업에서 요구되는 응답 시간은 보통 300ms 이내를 목표로 합니다. 이것보다 늦을 경우, 웹페이지가 느리다고 느끼기 때문에 고객에게 나쁜 서비스 경험을 제공할 수 있습니다. 그래서 추천 시스템을 디자인할 때에는 "얼마나 단시간 내에", "얼마나 많은" 요청을 처리할 수 있는가가 핵심 이슈가 됩니다.

#### 1. 연산량이 지나치게 많음

일반적인 협업 필터링은 전체 아이템 간의 유사도를 동시에 계산합니다. $O(N^2)$의 관계로 아이템 수가 10배가 늘어나면, 유사도 연산은 100배가 늘어나게 됩니다. lastFM의 경우 아이템에 해당하는 아티스트의 수가 160,110명에 불과했지만, 유통업체 같은 경우 아이템 수가(SKU) 일반적으로 100만개가 훨씬 넘습니다.

100만개만 되더라도 각 아이템 간 유사도를 구하게 되면, 일반적인 컴퓨터의 램으로는 계산이 어려운 수준이고 이 규모가 되면 그 때부터 Hadoop과 같은 분산처리에서 다루어야 합니다. 

#### 2. 실시간으로 반영이 어려움

그리고 위와 같은 수준의 연산량은 매번 계산하기가 어렵습니다. 그렇기 때문에 보통 주기적으로 하루 단위 혹은 1시간 단위 등 연산을 진행하고, Item Similarity Matrix을 업데이트 하는 방식으로 진행합니다. 뉴스 피드 추천과 같은 컨텐츠 추천에서는 실시간성이 매우 핵심인데, 실시간으로 추천이 갱신되지 못하는 기존의 방식은 추천 시스템으로 적용하기 어렵습니다.

### 해결방법 : MinHash를 이용하자!

연산이 오래 걸리는 것은 바로 아이템끼리의 유사도를 구하기 위해 집합 연산을 수행해야 하기 때문입니다. 그러지 말고, MinHash를 이용하면 비싼 집합 연산 대신 몇개의 숫자가 일치하는지 계산하는 것으로 대신할 수 있습니다. 

거기에 Database의 Secondary Index를 이용하면, 비교할 대상을 확 줄일 수 있으므로 굉장히 효율적으로 계산해낼 수 있습니다.

## MinHash와 In-Memory Cache DB을 활용한 추천 엔진 구현하기


### Item 별 Click Stream 구성하기

Click Stream은 이전 시간에 배운 장바구니랑 동일하다고 생각하면 됩니다.

In [21]:
play_stream = (
    play_df               # 청취 데이터를
    .groupby('artist_id') # artist을 기준으로
    ['user_id']           # user_id를 모아
    .progress_apply(set)  # 하나의 집합으로 만들어 주세요
)

100%|██████████| 160110/160110 [00:24<00:00, 6494.16it/s] 


### MinHash Hashvalues로 바꾸기

각 play의 MinHash값을 구해보도록 하겠습니다.

In [22]:
minhash_per_item = (
    play_stream
    .progress_apply(get_hash)
)
minhash_per_item

100%|██████████| 160110/160110 [05:17<00:00, 504.80it/s] 


artist_id
0         [15272515, 7929136, 54786193, 220850801, 14846...
1         [219544, 472353, 153487, 136833, 212995, 34638...
2         [2316182, 2669925, 4108700, 1561830, 17398608,...
3         [1977307, 1054261, 1553539, 571953, 7801169, 8...
4         [1652888, 4097654, 3743117, 8838460, 3607721, ...
                                ...                        
160105    [951799114, 1854879239, 4247449365, 1547854251...
160106    [2423266424, 3583945328, 1697077480, 395307164...
160107    [720630150, 180770797, 1768760589, 804380883, ...
160108    [1741782383, 1286326773, 4150332639, 167529337...
160109    [1741782383, 1286326773, 4150332639, 167529337...
Name: user_id, Length: 160110, dtype: object

In [23]:
minhv_df = pd.DataFrame(np.stack(minhash_per_item.values))
minhv_df.index = minhash_per_item.index
columns = [f"sig{col}" for col in minhv_df.columns]
minhv_df.columns = columns
minhv_df

Unnamed: 0_level_0,sig0,sig1,sig2,sig3,sig4,sig5,sig6,sig7,sig8,sig9,...,sig118,sig119,sig120,sig121,sig122,sig123,sig124,sig125,sig126,sig127
artist_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
0,15272515,7929136,54786193,220850801,148469233,129193264,151833698,32190803,72030142,106014141,...,285663242,20262139,12990350,21532253,185151218,159232678,34536932,61756701,78849441,119144360
1,219544,472353,153487,136833,212995,346387,505720,208617,218104,593208,...,419089,623455,868879,106014,130336,1134767,232626,409169,1772688,138745
2,2316182,2669925,4108700,1561830,17398608,5668259,4294243,3828143,20816126,3376388,...,741346,2162120,1574153,697069,18495062,2420681,7216669,9287660,2476826,17341546
3,1977307,1054261,1553539,571953,7801169,836069,1598983,1727387,28286,1692707,...,654609,1126684,1999946,3790200,328002,6616527,2228328,302693,40856,190736
4,1652888,4097654,3743117,8838460,3607721,2329176,5720695,8343277,5818621,11115887,...,3837885,1399469,65564,3750797,960596,3244396,4194281,5287100,1590393,15457
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
160105,951799114,1854879239,4247449365,1547854251,1160565990,3668936329,4156069434,1981523146,991820527,1062032107,...,3366869862,3666061203,529114427,596484597,44481688,3482473300,48484541,3787114399,747956424,3997977448
160106,2423266424,3583945328,1697077480,3953071647,611347776,2423315376,317309556,2976015654,3413969749,3676028536,...,2129998970,978934029,196219302,1312992217,2210018945,3903786975,2853574539,592227410,3611773069,4120859720
160107,720630150,180770797,1768760589,804380883,1830534683,3668193745,3824152446,1903889119,848275811,1196171276,...,2949029734,2160684325,1722904350,2439634735,1448679535,2106267287,2063784359,4246900980,51876737,445005217
160108,1741782383,1286326773,4150332639,1675293374,197421572,1176580331,3191575451,3469730157,1493787956,548116222,...,1626671209,71124821,2120339757,1706043572,728158693,3795006693,3082018583,2259003803,1661831016,3959399716


### Secondary Index로 구성하여 Redis에 저장하기


Redis에 Secondary Index 방식으로 접근하면 빠르게 처리할 수 있습니다. 각 Signature의 Value를 Key 값으로 두고, 해당 Value를 가지고 있는 아이템의 리스트를 집합으로 두면 됩니다. Naive하게 String으로 할 경우 Database의 용량을 지나치게 많이 먹을 수 있기 때문에, 이를 줄이기 위해 `snappy`를 이용해 string의 길이를 줄여줍니다. 좀더 빠르게 처리하기 위해 multiprocessing을 활용하였습니다. 8core macbook pro환경에서는 대략 1시간정도 소요되었습니다. 

***caution***

현재 노트북이 돌고 있는 서버에 `redis`가 동작하고 있어야 합니다. 아닌 경우 Docker를 통해 간단히 실행시킬 수 있습니다.

````shell
docker run --name redis -p 6379:6379 -d --rm redis
````

In [38]:
from redis import Redis
import snappy

def etl_worker(inputs):
    """
    각 signature 별로 Key-Value 형태로 Redis에 ETL하는 메소드
    """
    
    sig_name, signature_series = inputs
    
    db = Redis('localhost', port=6379)   
    print(f"start {sig_name} processing")
    for sig_value, grouped in (
        signature_series.groupby(signature_series)):
        # Key로 만들기
        key_string = "{}-{}".format(sig_name, sig_value)
        # Value를 String으로 만들기
        value_string = snappy.compress(
            str(grouped.index.values.tolist()))
        db.set(key_string, value_string)

In [None]:
from multiprocessing import Pool, cpu_count

pool = Pool(cpu_count())
pool.map(etl_worker, minhv_df.iteritems())

## 추천 시스템 동작하기

실시간 추천이 가능한지를 살펴보도록 하겠습니다. 실시간 추천이 되기 위해서는 요청이 들어왔을 때 얼마나 빠르게 처리할 수 있는지와 새로 데이터가 들어왔을 때 얼마나 빠르게 업데이트가 되는지를 보도록 하겠습니다.

#### (1) 아이템 추천하기

In [52]:
# 마돈나를 들은 사람에게 추천
print(artist_df[artist_df.artist_name=='madonna'])
target_item = minhv_df.loc[353]

     artist_id artist_name
353        353     madonna


In [65]:
%%timeit -n 3 -r 3
# set Query
querys = [f'{k}-{v}' for k, v in target_item.items()]

# 퀴리를 통해 redis에서 가져오기
db = Redis('localhost', port=6379)   
intersected_ids = np.concatenate(
    [json.loads(snappy.decompress(row).decode('utf8')) for row in db.mget(querys)])

# 아이템 count하기
items, counts = np.unique(intersected_ids, return_counts=True)   

artist_names = [artist_df.loc[item,"artist_name"] for item in items]
result = (
    pd.Series(counts,index=artist_names)
    .sort_values(ascending=False)
    .iloc[:5])

31.9 ms ± 3.37 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)


In [66]:
result

madonna            128
michael jackson     28
kylie minogue       24
britney spears      24
rihanna             22
dtype: int64

### (2) 정보 갱신하기

#### 예제 상황 
> 유저(241)번이 마돈나(353)을 보았을 때

In [96]:
user_id = 241
artist_id = 353
old_minhash = minhv_df.loc[artist_id]

In [99]:
# 유저 241번의 minhash value
new_minhash = get_hash([user_id])

MinHash를 갱신하는 것은 추가할 MinHash와 기존 MinHash값을 서로 비교해서 각 Signature의 최소값으로 갱신해주면 됩니다.

In [100]:
updated_minhash = np.minimum(old_minhash, new_minhash)

#### Secondary Index 업데이트 하기

old signature value에서 artist를 빼주고, new signature value에서는 artist를 추가해주면 끝입니다

In [118]:
%%timeit -n 3 -r 3

# 지난 secondary index 지우기
old_kv = old_minhash[updated_minhash!=old_minhash]
old_kv

for k, v in old_kv.items():
    old_index_list = json.loads(snappy.decompress(db.get(f"{k}-{v}")))
    
    if artist_id not in old_index_list:
        continue
    old_index_list.remove(artist_id)
    
    db.set(f"{k}-{v}", snappy.compress(str(old_index_list)))

# 업데이트된 secondary index 추가하기
new_kv = updated_minhash[updated_minhash!=old_minhash]

for k, v in new_kv.items():
    new_index_list = json.loads(snappy.decompress(db.get(f"{k}-{v}")))
    
    if artist_id in new_index_list:
        continue
    new_index_list.append(artist_id)
    new_index_list = sorted(new_index_list)
    
    db.set(f"{k}-{v}", snappy.compress(str(new_index_list)))

4.64 ms ± 258 µs per loop (mean ± std. dev. of 3 runs, 3 loops each)
