In [18]:
# portrait batch logic
import argparse
import logging
import pickle
import re
import os

import boto3
import numpy as np
import pandas as pd
from tqdm import tqdm

tqdm_notebook.pandas()

In [15]:
########################################
# 从s3同步数据
########################################
s3client = boto3.client('s3')


def sync_s3(file_name_list, s3_folder, local_folder):
    for f in file_name_list:
        print("file preparation: download src key {} to dst key {}".format(os.path.join(
            s3_folder, f), os.path.join(local_folder, f)))
        s3client.download_file(bucket, os.path.join(
            s3_folder, f), os.path.join(local_folder, f))


def write_to_s3(filename, bucket, key):
    print("upload s3://{}/{}".format(bucket, key))
    with open(filename, 'rb') as f:  # Read in binary mode
        # return s3client.upload_fileobj(f, bucket, key)
        return s3client.put_object(
            ACL='bucket-owner-full-control',
            Bucket=bucket,
            Key=key,
            Body=f
        )

def write_str_to_s3(content, bucket, key):
    print("write s3://{}/{}, content={}".format(bucket, key, content))
    s3client.put_object(Body=str(content).encode("utf8"), Bucket=bucket, Key=key, ACL='bucket-owner-full-control')

default_bucket = 'aws-gcr-rs-sol-workshop-ap-southeast-1-522244679887'
default_prefix = 'sample-data'
parser = argparse.ArgumentParser()
parser.add_argument('--bucket', type=str, default=default_bucket)
parser.add_argument('--prefix', type=str, default=default_prefix)
args, _ = parser.parse_known_args()
bucket = args.bucket
prefix = args.prefix

print("bucket={}".format(bucket))
print("prefix='{}'".format(prefix))

out_s3_path = "s3://{}/{}/feature/content/inverted-list".format(bucket, prefix)

local_folder = 'info'
if not os.path.exists(local_folder):
    os.makedirs(local_folder)
# 行为/物品数据同步
file_name_list = ['action.csv']
s3_folder = '{}/system/action-data'.format(prefix)
sync_s3(file_name_list, s3_folder, local_folder)
file_name_list = ['recall_config.pickle']
s3_folder = '{}/model/recall'.format(prefix)
sync_s3(file_name_list, s3_folder, local_folder)

bucket=aws-gcr-rs-sol-workshop-ap-southeast-1-522244679887
prefix='sample-data'
file preparation: download src key sample-data/system/action-data/action.csv to dst key info/action.csv
file preparation: download src key sample-data/system/item-data/item.csv to dst key info/item.csv


In [5]:
df_filter_action = pd.read_csv('info/action.csv',sep='_!_',names=['user_id','news_id','timestamp','action_type','action'])

  """Entry point for launching an IPython kernel.


In [36]:
df_filter_action.head()

Unnamed: 0,user_id,news_id,timestamp,action_type,action
0,52a23654-9dc3-11eb-a364-acde48001122,6552345461607367172,1618477588,1,0
1,52a23654-9dc3-11eb-a364-acde48001122,6552332581256299016,1618472565,1,0
2,52a23654-9dc3-11eb-a364-acde48001122,6552130363123040771,1618467016,1,0
3,52a23654-9dc3-11eb-a364-acde48001122,6475484594673025293,1618462187,1,1
4,52a238fc-9dc3-11eb-a364-acde48001122,6552277802022863374,1618468013,1,0


In [6]:
from sklearn import preprocessing
df_item_stats = df_filter_action[['news_id','action_type','action']]
df_item_stats = df_item_stats.groupby(['news_id','action_type']).sum()
df_item_stats = df_item_stats.reset_index()
df_item_stats['action'] = df_item_stats['action'] / df_item_stats['action'].abs().max() * 10.0
df_item_stats = df_item_stats.drop([0])
df_item_stats.head()

Unnamed: 0,news_id,action_type,action
1,6320151429650579970,1,3.333333
2,6418014704991469826,1,1.111111
3,6422082903882072321,1,0.0
4,6422169653342109954,1,2.222222
5,6423135627981619458,1,2.222222


In [53]:
pd_merge_result = pd.merge(df_filter_item, df_item_stats, on="news_id", how="left").drop(columns=['action_type'])
pd_merge_result = pd_merge_result.fillna(0)
row_has_NaN = pd_merge_result.isnull().any(axis=1)

In [57]:
pd_merge_result.drop(columns=['popularity']).rename(columns={"action":"popularity"})

Unnamed: 0,news_id,type_code,type,title,keywords,new,popularity
0,6552418723179790856,102,news_entertainment,谢娜三喜临门何炅送祝福吴昕送祝福只有沈梦辰不一样,"杜海涛,谢娜,何炅,沈梦辰,吴昕,快本",0,3.333333
1,6552390851157295629,102,news_entertainment,杨幂景甜徐冬冬唐嫣不好好穿衣却美的有趣又撩人,"杨幂,徐冬冬,背带裙,大唐荣耀,唐嫣,景甜",0,1.111111
2,6552309039697494532,103,news_sports,亚洲杯夺冠赔率日本伊朗领衔中国竟与泰国并列,"土库曼斯坦,乌兹别克斯坦,亚洲杯,赔率,小组赛",0,2.222222
3,6552452056043487748,103,news_sports,马夏尔要去切尔西可以商量不过穆里尼奥的要价是4000万加威廉,"威廉,曼联,穆里尼奥,布莱顿,马夏尔",0,4.444444
4,6552466727995703815,103,news_sports,昔日中超金靴半场独造6球虐爆辽足华夏送走他后悔吗,"阿洛,阿洛伊西奥,华夏幸福,埃尔纳内斯,穆里奇",0,1.111111
...,...,...,...,...,...,...,...
2655,6521968288589677060,100,news_story,李自成为何只在北京当了42天的皇帝看看他在北京42天都干了啥,"崇祯皇帝,李自成,山海关,吴三桂,陈圆圆",0,6.666667
2656,6549073874741363213,100,news_story,NT早唐中唐咋回事,"顶臀径,唐氏筛查,AFP,孕妈,染色体",0,4.444444
2657,6546844349055894023,100,news_story,兴唐119罗成是天下第七魏文通是天下第九罗成几招能获胜,"罗士信,魏文通,罗成,秦琼,丁彦平",0,5.555556
2658,6555209635756769800,114,stock,富士康拟发行约197亿股股票简称工业富联,"股票,中金公司,招股说明书,首次公开发行,富士康",0,1.111111


In [7]:
# prepare model for batch process
os.environ['GRAPH_BUCKET'] = 'sagemaker-us-east-1-002224604296'
os.environ['KG_DBPEDIA_KEY'] = 'recommender-system-data/model/sort/content/words/mapping/kg_dbpedia.txt'
os.environ['KG_ENTITY_KEY'] = 'recommender-system-data/model/sort/content/words/mapping/entities_dbpedia.dict'
os.environ['KG_RELATION_KEY'] = 'recommender-system-data/model/sort/content/words/mapping/relations_dbpedia.dict'
os.environ['KG_ENTITY_INDUSTRY_KEY'] = 'recommender-system-data/model/sort/content/words/mapping/entity_industry.txt'
os.environ['KG_VOCAB_KEY'] = 'recommender-system-data/model/sort/content/words/mapping/vocab.json'
os.environ['DATA_INPUT_KEY'] = ''
os.environ['TRAIN_OUTPUT_KEY'] = 'recommender-system-data/model/sort/content/kg/news/gw/'
kg_path = os.environ['GRAPH_BUCKET']
dbpedia_key = os.environ['KG_DBPEDIA_KEY']
entity_key = os.environ['KG_ENTITY_KEY']
relation_key = os.environ['KG_RELATION_KEY']
entity_industry_key = os.environ['KG_ENTITY_INDUSTRY_KEY']
vocab_key = os.environ['KG_VOCAB_KEY']
data_input_key = os.environ['DATA_INPUT_KEY']
train_output_key = os.environ['TRAIN_OUTPUT_KEY']

env = {
    'GRAPH_BUCKET': kg_path,
    'KG_DBPEDIA_KEY': dbpedia_key,
    'KG_ENTITY_KEY': entity_key,
    'KG_RELATION_KEY': relation_key,
    'KG_ENTITY_INDUSTRY_KEY': entity_industry_key,
    'KG_VOCAB_KEY': vocab_key,
    'DATA_INPUT_KEY': data_input_key,
    'TRAIN_OUTPUT_KEY': train_output_key
}
graph = kg.Kg(env)  # Where we keep the model when it's loaded
model = encoding.encoding(graph, env)

loading vocabulary file /home/ec2-user/.fastNLP/fasthan/fasthan_base/vocab.txt
Load pre-trained BERT parameters from file /home/ec2-user/.fastNLP/fasthan/fasthan_base/model.bin.


In [34]:
len(df_filter_item)

2660

In [8]:
# generate dict_id_keywords for tfidf
dict_keywords_id = {}
for row in df_filter_item.iterrows():
    item_row = row[1]
    program_id = str(item_row['news_id'])
    for kw in item_row['keywords'].split(','):
        if kw not in dict_keywords_id.keys():
            dict_keywords_id[kw] = [program_id]
            continue
        current_list = dict_keywords_id[kw]
        current_list.append(program_id)
        dict_keywords_id[kw].append(program_id)
n_keyword_whole = len(dict_keywords_id)

In [16]:
def get_tfidf(category_property):
    if not category_property or str(category_property).lower() in ['nan', 'nr', '']:
        return [None]
    if not category_property:
        return [None]
    value = [item.strip() for item in category_property.split(',')]
    keywords_tfidf = {}
    for keyword in value:
        current_score = 1 / len(value)*math.log(n_keyword_whole / len(dict_keywords_id[keyword]))
        keywords_tfidf[keyword] = current_score
    return keywords_tfidf
        
def get_category(category_property):
    if not category_property or str(category_property).lower() in ['nan', 'nr', '']:
        return [None]
    if not category_property:
        return [None]
    return [item.strip().lower() for item in category_property.split(',')]
            
def get_single_item(item):
    if not item or str(item).lower().strip() in ['nan', 'nr', '']:
        return [None]
    return [str(item).lower().strip()]

def get_entities(title):
    return model[title]

def single_dict(raw_dict, feat, item_id):
    if feat not in raw_dict.keys():
        raw_dict[feat] = [item_id]
    else:
        current_list = raw_dict[feat]
        current_list.append(item_id)
        raw_dict[feat] = current_list

def list_dict(raw_dict, feat_list, item_id):
    for feat in feat_list:
        single_dict(raw_dict, feat, item_id)

def update_popularity(item_df, action_df):
    pd_merge_result = pd.merge(item_df, action_df, on="news_id", how="left").drop(columns=['action_type'])
    pd_merge_result = pd_merge_result.fillna(0)
    df_update = pd_merge_result.drop(columns=['popularity']).rename(columns={"action":"popularity"})
    return df_update
        
def sort_by_score(df):
    logging.info("sort_by_score() enter, df.columns: {}".format(df.columns))
    df['popularity'].fillna(0, inplace=True)

    df['popularity_log'] = np.log1p(df['popularity'])
    popularity_log_max = df['popularity_log'].max()
    popularity_log_min = df['popularity_log'].min()

    df['popularity_scaled'] = ((df['popularity_log'] - popularity_log_min) / (
            popularity_log_max - popularity_log_min)) * 10

    df_sorted = df.sort_values(by='popularity_scaled', ascending=False)
    
    df_sorted = df_sorted.drop(
        ['popularity_log', 'popularity_scaled'], axis=1)

    logging.info("sort_by_score() return, df.columns: {}".format(df_sorted.columns))
    return df_sorted

def get_bucket_key_from_s3_path(s3_path):
    m = re.match(r"s3://(.*?)/(.*)", s3_path)
    return m.group(1), m.group(2)

def gen_pickle_files(df, action_df):
    df_update = update_popularity(df, action_df)
    df_sort = sort_by_score(df_update)
    
    news_id_news_property_dict = {}
    news_type_news_ids_dict = {}
    news_keywords_news_ids_dict = {}
    news_entities_news_ids_dict = {}
    news_words_news_ids_dict = {}
    
    for row in df_sort.iterrows():
        item_row = row[1]
        program_id = str(item_row['news_id'])
        current_entities = get_entities(item_row['title'])[0]
        current_words = get_entities(item_row['title'])[1]
        program_dict = {
            'title': get_single_item(item_row['title']),
            'type': get_single_item(item_row['type']),
            'keywords': get_category(item_row['keywords']),
            'tfidf': get_tfidf(item_row['keywords']),
            'entities': current_entities,
            'words': current_words
        }
        news_id_news_property_dict[program_id] = program_dict
        list_dict(news_type_news_ids_dict, program_dict['type'], program_id)
        list_dict(news_keywords_news_ids_dict, program_dict['keywords'], program_id)
        list_dict(news_entities_news_ids_dict, program_dict['entities'], program_id)
        list_dict(news_words_news_ids_dict, program_dict['words'], program_id)

    result_dict = {
        'news_id_news_property_dict': news_id_news_property_dict,
        'news_type_news_ids_dict': news_type_news_ids_dict,
        'news_keywords_news_ids_dict': news_keywords_news_ids_dict,
        'news_entities_news_ids_dict': news_entities_news_ids_dict,
        'news_words_news_ids_dict': news_words_news_ids_dict
    }
    return result_dict

In [14]:
rd = gen_pickle_files(df_filter_item, df_item_stats)

In [19]:
bucket, out_prefix = get_bucket_key_from_s3_path(out_s3_path)
for dict_name, dict_val in rd.items():
    file_name = f'{dict_name}.pickle'
    # print("pickle =>", file_name)
    out_file = open(file_name, 'wb')
    pickle.dump(dict_val, out_file)
    out_file.close()
    # s3_url = S3Uploader.upload(file_name, out_s3_path)
    s3_url = write_to_s3(file_name, bucket, f'{out_prefix}/{file_name}')
    logging.info("write {}".format(s3_url))

upload s3://aws-gcr-rs-sol-workshop-ap-southeast-1-522244679887/sample-data/feature/content/inverted-list/news_id_news_property_dict.pickle
upload s3://aws-gcr-rs-sol-workshop-ap-southeast-1-522244679887/sample-data/feature/content/inverted-list/news_type_news_ids_dict.pickle
upload s3://aws-gcr-rs-sol-workshop-ap-southeast-1-522244679887/sample-data/feature/content/inverted-list/news_keywords_news_ids_dict.pickle
upload s3://aws-gcr-rs-sol-workshop-ap-southeast-1-522244679887/sample-data/feature/content/inverted-list/news_entities_news_ids_dict.pickle
upload s3://aws-gcr-rs-sol-workshop-ap-southeast-1-522244679887/sample-data/feature/content/inverted-list/news_words_news_ids_dict.pickle


In [11]:
n = 0
for k, v in rd['news_id_news_property_dict'].items():
    print("k {} v {}".format(k,v))
    if n > 10:
        break
    n = n + 1

k 6552418723179790856 v {'title': ['谢娜三喜临门何炅送祝福吴昕送祝福只有沈梦辰不一样'], 'type': ['news_entertainment'], 'keywords': ['杜海涛', '谢娜', '何炅', '沈梦辰', '吴昕', '快本'], 'tfidf': {'杜海涛': 1.1915032285682567, '谢娜': 0.8605173146234215, '何炅': 0.9546056150797302, '沈梦辰': 1.3327195386327908, '吴昕': 1.1915032285682567, '快本': 1.1161723746110805}, 'entities': [40191, 0, 46990, 1871, 5802, 162743, 1871, 5802, 315, 390701, 28, 302, 0, 0, 0, 0], 'words': [559632, 0, 613175, 0, 0, 754092, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}
k 6552390851157295629 v {'title': ['杨幂景甜徐冬冬唐嫣不好好穿衣却美的有趣又撩人'], 'type': ['news_entertainment'], 'keywords': ['杨幂', '徐冬冬', '背带裙', '大唐荣耀', '唐嫣', '景甜'], 'tfidf': {'杨幂': 1.1161723746110805, '徐冬冬': 1.3327195386327908, '背带裙': 1.5158215867441425, '大唐荣耀': 1.5158215867441425, '唐嫣': 1.3327195386327908, '景甜': 1.3327195386327908}, 'entities': [20585, 130577, 193876, 71718, 28, 2798, 20784, 382, 727, 2, 5876, 121, 67692, 0, 0, 0], 'words': [0, 0, 359872, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}
k 6552309039697494532 v {'ti