# Author imformation:
Fan Shengzhe, Shanghaijiaotong University, Shanghai, China  
Email: fanshengzhe@sjtu.edu.cn

# 1.读取数据

## 1.1 基本数据的读取

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
import os
os.chdir("drive/My Drive/reco/agri-machine-reco")

In [None]:
metric_recall = True

In [None]:
import pandas as pd
from functools import partial
import os
import tqdm
import collections
import math
import json
import random
import gc

data_dir = './cache'
save_dir = './cache'

if not os.path.exists(save_dir):
  os.mkdir(save_dir)

log_table = pd.read_csv(os.path.join(data_dir, 'ctx_info.csv'))
user_info = pd.read_csv(os.path.join(data_dir, 'user_info.csv'))
item_info = pd.read_csv(os.path.join(data_dir, 'item_info.csv'))
consumer_info = pd.read_csv(os.path.join(data_dir, 'consumer_info.csv'))

## 1.2 定义多路召回字典

In [None]:
# 定义一个多路召回的字典，将各路召回的结果都保存在这个字典当中
user_multi_recall_dict =  {'vanilla_itemcf_recall': {},
                'binetwork_recall': {}
                }

# 2.前置准备工作

## 2.1 读取拆分后的数据
将用户的点击log拆分为：
* 历史行为序列
* 最后一次正向动作交互的物品

In [None]:
hist_click_df = pd.read_csv(os.path.join(data_dir, 'hist_click_df.csv'))
last_click_df = pd.read_csv(os.path.join(data_dir, 'last_click_df.csv'))

## 2.2 召回评估函数

In [None]:
def metrics_recall(user_recall_items_dict, trn_last_click_df, topk=50):
  last_click_item_dict = dict(zip(trn_last_click_df['user_id'], trn_last_click_df['item_id']))
  user_num = len(trn_last_click_df)
  
  for k in range(10, topk+1, 10):
    hit_num = 0
    mrr = 0
    for user, item_list in user_recall_items_dict.items():
      tmp_recall_items = [x[0] for x in user_recall_items_dict[user][:k]]
      try:
        if last_click_item_dict[user] in set(tmp_recall_items):
          hit_num += 1
          mrr += 1 / (tmp_recall_items.index(last_click_item_dict[user]) + 1) 
      except:
        continue
    
    hit_rate = round(hit_num * 1.0 / user_num, 5)
    mrr = round(mrr * 1.0 / user_num, 5)
    print(' topk: ', k, ' : ', 'hit_rate: ', hit_rate, 'mrr:', mrr, 'user_num : ', user_num)

## 2.2 召回倒排表

### 2.2.1 i侧 i: item_info倒排表

In [None]:
def get_item_info_dict(item_info_df):

  fea_name_list = list(item_info_df.columns)
  fea_name_list.remove('item_id')
  item_info_dict = {fea_name: {} for fea_name in fea_name_list}
  for fea_name in item_info_dict:
    item_info_dict[fea_name] = dict(zip(item_info_df['item_id'], item_info_df[fea_name]))

  return item_info_dict

In [None]:
item_info_dict = get_item_info_dict(item_info)

### 2.2.2 u侧 u: user_info倒排表

In [None]:
def get_user_info_dict(user_info_df):

  fea_name_list = list(user_info_df.columns)
  fea_name_list.remove('user_id')
  user_info_dict = {fea_name: {} for fea_name in fea_name_list}
  for fea_name in user_info_dict:
    user_info_dict[fea_name] = dict(zip(user_info_df['user_id'], user_info_df[fea_name]))

  return user_info_dict

In [None]:
user_info_dict = get_user_info_dict(user_info)

### 2.2.3 ctx相关 u: (i, ctx)倒排表
计算u2u相似度使用

In [None]:
# {user1: [(item1, ctx1), (item2, ctx2)..]...}
def get_user_item_ctx(log_table):
    
  log_table = log_table.sort_values('event_time')
  fea_name_list = list(log_table.columns)
  fea_name_list.remove('item_id')
  fea_name_list.remove('user_id')

  def make_item_ctx_dict(group):
    ctx_item = [group['item_id']] + [group[fea_name] for fea_name in fea_name_list]
    return list(zip(*ctx_item))
  
  user_item_ctx_df = log_table.groupby('user_id').apply(make_item_ctx_dict).reset_index().rename(columns={0: 'item_ctx_list'})
  user_item_ctx_dict = dict(zip(user_item_ctx_df['user_id'], user_item_ctx_df['item_ctx_list']))
  
  return user_item_ctx_dict

In [None]:
user_item_ctx_dict = get_user_item_ctx(hist_click_df)

### 2.2.4 ctx相关i: (u, ctx)倒排表
计算i2i相似度使用

In [None]:
# {item1: [(user1, time1), (user2, time2)...]...}
def get_item_user_ctx(log_table):
    
  log_table = log_table.sort_values('event_time')
  fea_name_list = list(log_table.columns)
  fea_name_list.remove('item_id')
  fea_name_list.remove('user_id')

  def make_user_ctx_dict(group):
    ctx_item = [group['user_id']] + [group[fea_name] for fea_name in fea_name_list]
    return list(zip(*ctx_item))
  
  item_user_ctx_df = log_table.groupby('item_id').apply(make_user_ctx_dict).reset_index().rename(columns={0: 'user_ctx_list'})
  item_user_ctx_dict = dict(zip(item_user_ctx_df['item_id'], item_user_ctx_df['user_ctx_list']))
  
  return item_user_ctx_dict

In [None]:
item_user_ctx_dict = get_item_user_ctx(hist_click_df)

### 2.2.5 hot item列表

In [None]:
# 获取热度最高的item
def get_topk_hot_item(log_table, k, rule='i_active_score'):
  hot_item_df = log_table[['item_id', rule]]
  hot_item_df = hot_item_df.sort_values(by=[rule], ascending=[False])
  topk_click = hot_item_df['item_id'].index[:k]
  return topk_click

In [None]:
topk_hot_item = get_topk_hot_item(item_info, k=50)

### 2.2.6 u: filter_item倒排表

In [None]:
# {user1: [item1, item2...]...}
def get_filter_dict(filter_df):
    
  def make_item_list(df):
    return list(df['item_id'])
  
  filter_df = filter_df.groupby('user_id')[['item_id']].apply(make_item_list).reset_index().rename(columns={0: 'item_id'})
  filter_dict = dict(zip(filter_df['user_id'], filter_df['item_id']))
  
  return filter_dict

In [None]:
remove_df = hist_click_df[hist_click_df['event_type']=='remove_intent']
deal_df = hist_click_df[hist_click_df['event_type']=='deal']
filter_df = pd.concat([remove_df, deal_df, last_click_df, last_click_df]).drop_duplicates(['user_id', 'item_id', 'user_session'], keep=False)
filter_dict = get_filter_dict(filter_df)

## 2.3 多任务加速器

In [None]:
import multitasking
import signal

In [None]:
max_threads = multitasking.config['CPU_CORES']
multitasking.set_max_threads(max_threads)
multitasking.set_engine('process')
signal.signal(signal.SIGINT, multitasking.killall)

<function _signal.default_int_handler>

In [None]:
multitask_cache_dir = os.path.join(save_dir, 'multitask_cache')

# 3.召回

## 3.1 定义召回函数

### 3.1.1 i2i recall core

In [None]:
# basic i2i
@multitasking.task
  recall_items_dict = {}
  for user_id in tqdm.tqdm(user_id_list):
    user_hist_items = user_item_ctx_dict[user_id]
    
    try:
      user_filter_items = filter_dict[user_id]
      user_filter_items = set(user_filter_items)
    except:
      user_filter_items = set()

    item_rank = {}
    for loc, (i, event_time, event_type, region, price, user_session, is_nan_region, year, month, season, yearday) in enumerate(user_hist_items):      
      # loc:顺序，i：物品id，j：另一个物品id
      for j, wij in sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]:
        if j in user_filter_items:
          continue
        
        item_rank.setdefault(j, 0)
        item_rank[int(j)] += wij  


    # 数量不足，用热门商品补全
    if len(item_rank) < recall_item_num:
      for i, item in enumerate(item_topk_click):
        if item in item_rank.items(): # 填充的item应该不在原来的列表中
          continue
        item_rank[int(item)] = 0      # hot items的score统一给0分 
        if len(item_rank) == recall_item_num:
          break
    
    item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
    # print('userid', type(user_id))
    recall_items_dict[int(user_id)] = item_rank

  os.makedirs(multitask_cache_dir, exist_ok=True)
  with open(os.path.join(multitask_cache_dir, f'worker_{worker_id}.json'), 'w', encoding='utf-8') as f:
    json.dump(recall_items_dict, f, indent=2, sort_keys=True, ensure_ascii=False) # 写为多行

In [None]:
def multitask_i2i_recall(all_users, user_item_ctx_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, filter_dict, num_workers):
  # 召回
  n_split = num_workers
  random.shuffle(all_users)
  user_nums = len(all_users)
  worker_len = math.ceil(user_nums/n_split)

  # 清空临时文件夹
  for path, _, file_list in os.walk(multitask_cache_dir):
    for file_name in file_list:
      os.remove(os.path.join(path, file_name))

  for worker_id in range(0, num_workers):
    part_users = all_users[worker_id*worker_len: (worker_id+1)*worker_len]
    i2i_rec_core(part_users, user_item_ctx_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, filter_dict, worker_id)

  multitasking.wait_for_tasks()
  gc.collect()
  print('merging multi-worker results...')

  def int_keys(ordered_pairs):
    result = {}
    for key, value in ordered_pairs:
      try:
        key = int(key)
      except ValueError:
        pass
      result[key] = value
    return result

  recall_items_dict = {}
  for path, _, file_list in os.walk(multitask_cache_dir):
    for file_name in tqdm.tqdm(file_list):
      with open(os.path.join(path, file_name), 'r', encoding="utf-8") as f:
        part_recall_items_dict = json.load(f, object_pairs_hook=int_keys)
      recall_items_dict.update(part_recall_items_dict)
  
  for path, _, file_list in os.walk(multitask_cache_dir):
    for file_name in file_list:
      os.remove(os.path.join(path, file_name))
  return recall_items_dict

## 3.2 经典协同过滤

### 3.2.1 itemCF sim

In [None]:
def itemcf_sim(user_item_ctx_dict):
  # 计算物品相似度
  i2i_sim = {}
  item_cnt = collections.defaultdict(int)
  for user, item_ctx_list in tqdm.tqdm(user_item_ctx_dict.items()):     
    for loc1, (i, i_event_time, i_event_type, i_region, i_price, i_user_session, i_is_nan_region,\
           i_year, i_month, i_season, i_yearday) in enumerate(item_ctx_list):
      
      item_cnt[i] += 1
      i2i_sim.setdefault(i, {})
      for loc2, (j, j_event_time, j_event_type, j_region, j_price, j_user_session, j_is_nan_region,\
           j_year, j_month, j_season, j_yearday) in enumerate(item_ctx_list):
        if(i == j):
            continue

        i2i_sim[i].setdefault(j, 0)
        i2i_sim[i][j] += 1
              
  i2i_sim_ = i2i_sim.copy()
  for i, related_items in i2i_sim.items():
    for j, wij in related_items.items():
      i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])

  
  return i2i_sim_

In [None]:
i2i_sim = itemcf_sim(user_item_ctx_dict)

100%|██████████| 54533/54533 [00:10<00:00, 5297.54it/s]


In [None]:
itemcf_recall_items_dict = collections.defaultdict(dict)

sim_item_topk = 200
recall_item_num = 500

all_users = hist_click_df['user_id'].unique()
    
itemcf_recall_items_dict = multitask_i2i_recall(all_users, user_item_ctx_dict, \
                          i2i_sim, sim_item_topk, recall_item_num, \
                          topk_hot_item, filter_dict, max_threads)

user_multi_recall_dict['vanilla_itemcf_recall'] = itemcf_recall_items_dict

if metric_recall:
  metrics_recall(user_multi_recall_dict['vanilla_itemcf_recall'], last_click_df, topk=recall_item_num)

100%|██████████| 13634/13634 [00:47<00:00, 287.06it/s]
100%|██████████| 13631/13631 [00:48<00:00, 280.41it/s]
100%|██████████| 13634/13634 [00:48<00:00, 278.86it/s]
100%|██████████| 13634/13634 [00:52<00:00, 260.17it/s]


merging multi-worker results...


100%|██████████| 4/4 [00:18<00:00,  4.68s/it]


 topk:  10  :  hit_rate:  0.04398 mrr: 0.01794 user_num :  54345
 topk:  20  :  hit_rate:  0.06087 mrr: 0.0191 user_num :  54345
 topk:  30  :  hit_rate:  0.07209 mrr: 0.01955 user_num :  54345
 topk:  40  :  hit_rate:  0.08089 mrr: 0.0198 user_num :  54345
 topk:  50  :  hit_rate:  0.08816 mrr: 0.01996 user_num :  54345
 topk:  60  :  hit_rate:  0.09403 mrr: 0.02007 user_num :  54345
 topk:  70  :  hit_rate:  0.09909 mrr: 0.02014 user_num :  54345
 topk:  80  :  hit_rate:  0.10325 mrr: 0.0202 user_num :  54345
 topk:  90  :  hit_rate:  0.1068 mrr: 0.02024 user_num :  54345
 topk:  100  :  hit_rate:  0.11064 mrr: 0.02028 user_num :  54345
 topk:  110  :  hit_rate:  0.11383 mrr: 0.02031 user_num :  54345
 topk:  120  :  hit_rate:  0.1167 mrr: 0.02034 user_num :  54345
 topk:  130  :  hit_rate:  0.11926 mrr: 0.02036 user_num :  54345
 topk:  140  :  hit_rate:  0.12148 mrr: 0.02037 user_num :  54345
 topk:  150  :  hit_rate:  0.12327 mrr: 0.02038 user_num :  54345
 topk:  160  :  hit_rate

## 3.3 Bi-network召回

In [None]:
def get_binetwork_sim(user_item_ctx_dict, item_user_ctx_dict):

  sim_dict = {}

  for i, user_ctx in tqdm.tqdm(item_user_ctx_dict.items()):
    sim_dict.setdefault(i, {})

    for user, i_event_time, i_event_type, i_region, i_price, i_user_session,\
        i_is_nan_region, i_year, i_month, i_season, i_yearday in user_ctx:
      for j, j_event_time, j_event_type, j_region, j_price, j_user_session,\
        is_nan_region, j_year, j_month, j_season, j_yearday in user_item_ctx_dict[user]:
        sim_dict[i].setdefault(j, 0)
        sim_dict[i][j] += 1 / (math.log(len(user_ctx)+1) * math.log(len(user_item_ctx_dict[user])+1))

  return sim_dict

In [None]:
binetwork_sim_dict = get_binetwork_sim(user_item_ctx_dict, item_user_ctx_dict)

100%|██████████| 32316/32316 [00:20<00:00, 1547.02it/s] 


In [None]:
binetwork_recall_items_dict = collections.defaultdict(dict)

sim_item_topk = 200
recall_item_num = 500

all_users = hist_click_df['user_id'].unique()
    
binetwork_recall_items_dict = multitask_i2i_recall(all_users, user_item_ctx_dict, \
                          binetwork_sim_dict, sim_item_topk, recall_item_num, \
                          topk_hot_item, filter_dict, max_threads)

user_multi_recall_dict['binetwork_recall'] = binetwork_recall_items_dict

if metric_recall:
  metrics_recall(user_multi_recall_dict['binetwork_recall'], last_click_df, topk=recall_item_num)

100%|██████████| 13634/13634 [00:45<00:00, 301.90it/s]
100%|██████████| 13634/13634 [00:47<00:00, 288.66it/s]
100%|██████████| 13634/13634 [00:48<00:00, 280.49it/s]
100%|██████████| 13631/13631 [00:49<00:00, 275.27it/s]


merging multi-worker results...


100%|██████████| 4/4 [00:18<00:00,  4.70s/it]


 topk:  10  :  hit_rate:  0.8163 mrr: 0.76237 user_num :  54345
 topk:  20  :  hit_rate:  0.83213 mrr: 0.76348 user_num :  54345
 topk:  30  :  hit_rate:  0.84052 mrr: 0.76381 user_num :  54345
 topk:  40  :  hit_rate:  0.84665 mrr: 0.76399 user_num :  54345
 topk:  50  :  hit_rate:  0.85143 mrr: 0.7641 user_num :  54345
 topk:  60  :  hit_rate:  0.85544 mrr: 0.76417 user_num :  54345
 topk:  70  :  hit_rate:  0.85835 mrr: 0.76421 user_num :  54345
 topk:  80  :  hit_rate:  0.86085 mrr: 0.76425 user_num :  54345
 topk:  90  :  hit_rate:  0.86308 mrr: 0.76427 user_num :  54345
 topk:  100  :  hit_rate:  0.86514 mrr: 0.76429 user_num :  54345
 topk:  110  :  hit_rate:  0.86676 mrr: 0.76431 user_num :  54345
 topk:  120  :  hit_rate:  0.86856 mrr: 0.76432 user_num :  54345
 topk:  130  :  hit_rate:  0.87018 mrr: 0.76434 user_num :  54345
 topk:  140  :  hit_rate:  0.87163 mrr: 0.76435 user_num :  54345
 topk:  150  :  hit_rate:  0.87307 mrr: 0.76436 user_num :  54345
 topk:  160  :  hit_r

# 4.多路召回合并

In [None]:
def merge_recall_results(user_multi_recall_dict, weight_dict=None, topk=25):
  final_recall_items_dict = {}
  
  # 用户内对recall score做归一化
  def norm_user_recall_items_sim(sorted_item_list):
    if len(sorted_item_list) < 2:
      return sorted_item_list
    
    min_sim = sorted_item_list[-1][1]
    max_sim = sorted_item_list[0][1]
    
    norm_sorted_item_list = []
    for item, score in sorted_item_list:
      if max_sim > 0:
        norm_score = 1.0 * (score - min_sim) / (max_sim - min_sim) if max_sim > min_sim else 1.0
      else:
        norm_score = 0.0
      norm_sorted_item_list.append((item, norm_score))
      
    return norm_sorted_item_list
  
  print('Recall Results Merging...')
  for method, user_recall_items in tqdm.tqdm(user_multi_recall_dict.items()):
    
    # 每路召回的权重设定
    if weight_dict == None:
      recall_method_weight = 1
    else:
      recall_method_weight = weight_dict[method]
    
    # 分用户做recall score的归一化
    for user_id, sorted_item_list in user_recall_items.items():
      user_recall_items[user_id] = norm_user_recall_items_sim(sorted_item_list)
    
    # 多路召回合并，recall_score * recall_weight 再做相加
    for user_id, sorted_item_list in user_recall_items.items():
      final_recall_items_dict.setdefault(user_id, {})
      for item, score in sorted_item_list:
        final_recall_items_dict[user_id].setdefault(item, 0)
        final_recall_items_dict[user_id][item] += recall_method_weight * score  

  final_recall_items_dict_rank = {}
  # 控制最终的召回数量
  for user, recall_item_dict in final_recall_items_dict.items():
    final_recall_items_dict_rank[user] = sorted(recall_item_dict.items(), key=lambda x: x[1], reverse=True)[:topk]

  return final_recall_items_dict_rank

In [None]:
weight_dict = {'vanilla_itemcf_recall': 0.00,
        'binetwork_recall':1.00}

In [None]:
final_recall_items_dict = merge_recall_results(user_multi_recall_dict, weight_dict, topk=500)
if metric_recall:
  metrics_recall(final_recall_items_dict, last_click_df, topk=recall_item_num)

Recall Results Merging...


100%|██████████| 2/2 [00:22<00:00, 11.39s/it]


 topk:  10  :  hit_rate:  0.81632 mrr: 0.76238 user_num :  54345
 topk:  20  :  hit_rate:  0.83202 mrr: 0.76347 user_num :  54345
 topk:  30  :  hit_rate:  0.84046 mrr: 0.76381 user_num :  54345
 topk:  40  :  hit_rate:  0.84659 mrr: 0.76399 user_num :  54345
 topk:  50  :  hit_rate:  0.85139 mrr: 0.76409 user_num :  54345
 topk:  60  :  hit_rate:  0.85537 mrr: 0.76417 user_num :  54345
 topk:  70  :  hit_rate:  0.85829 mrr: 0.76421 user_num :  54345
 topk:  80  :  hit_rate:  0.8608 mrr: 0.76424 user_num :  54345
 topk:  90  :  hit_rate:  0.86302 mrr: 0.76427 user_num :  54345
 topk:  100  :  hit_rate:  0.8651 mrr: 0.76429 user_num :  54345
 topk:  110  :  hit_rate:  0.86672 mrr: 0.76431 user_num :  54345
 topk:  120  :  hit_rate:  0.86856 mrr: 0.76432 user_num :  54345
 topk:  130  :  hit_rate:  0.87016 mrr: 0.76434 user_num :  54345
 topk:  140  :  hit_rate:  0.8716 mrr: 0.76435 user_num :  54345
 topk:  150  :  hit_rate:  0.87301 mrr: 0.76436 user_num :  54345
 topk:  160  :  hit_ra

In [None]:
if os.path.exists(os.path.join(save_dir, 'recall_items_dict.json')):
  os.remove(os.path.join(save_dir, 'recall_items_dict.json'))

In [None]:
with open(os.path.join(save_dir, 'recall_items_dict.json'), 'w', encoding='utf-8') as f:
  json.dump(final_recall_items_dict, f, indent=2, sort_keys=True, ensure_ascii=False) # 写为多行