In [1]:
!rm -r info/*

In [2]:
import argparse
import logging
import pickle
import re
import os

import boto3
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder, MinMaxScaler

s3 = boto3.client('s3')

logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
                    datefmt='%Y-%m-%d:%H:%M:%S',
                    level=logging.INFO)

In [3]:
def prepare_df(basic_s3_files, expend_s3_files):
    expand_data_df = pd.concat([process_expend_file(expend_s3_path) for expend_s3_path in expend_s3_files],
                               axis=0, ignore_index=True).drop_duplicates('program_id')

    basic_df = pd.concat([process_basic_file(basic_s3_path) for basic_s3_path in basic_s3_files],
                         axis=0, ignore_index=True).drop_duplicates('program_id')

    df_merged = pd.merge(left=basic_df, right=expand_data_df, on='program_id', how='left')
    logging.info("df_merged len: {}".format(len(df_merged)))
    return df_merged

def process_expend_file(expend_s3_path):
    logging.info("process_expend_file() enter, expend_s3_path:{}".format(expend_s3_path))
    expand_dict = pd.read_excel(expend_s3_path, sheet_name=None)  # set sheet_name=None, read all sheets
    selected_columns = ['节目id', '导演', '演员', '受欢迎度', '票数', '评分', '等级']

    expand_data_df_arr = []
    for sheet_name in expand_dict.keys():
        logging.info("now process sheet: {}".format(sheet_name))
        df = expand_dict[sheet_name][selected_columns]
        expand_data_df_arr.append(df)
    expand_data_df = pd.concat(expand_data_df_arr, axis=0, ignore_index=True)
    logging.info("process_expend_file() return dataFrame, len: {}".format(len(expand_data_df)))
    expand_data_renamed_df = expand_data_df.rename(columns={'节目id': 'program_id',
                                                            '导演': 'director',
                                                            '演员': 'actor',
                                                            '受欢迎度': 'popularity',
                                                            '票数': 'ticket_num',
                                                            '评分': 'score',
                                                            '等级': 'level'})
    logging.info("expand df columns={}".format(expand_data_renamed_df.columns))
    return expand_data_renamed_df

def process_basic_file(basic_s3_path):
    logging.info("process_basic_file() enter, process_basic_file:{}".format(process_basic_file))
    df = pd.read_csv(basic_s3_path)
    logging.info("basic df columns={}".format(df.columns))
    return df

def get_actor(actor_str):
    if not actor_str or str(actor_str).lower() in ['nan', 'nr', '']:
        return [None]
    actor_str = re.sub(r"['\"\[\]]", '', actor_str)
    actor_arr = actor_str.split(',')
    return [item.strip().lower() for item in actor_arr]


def get_category(category_property):
    if not category_property or str(category_property).lower() in ['nan', 'nr', '']:
        return [None]
    if not category_property:
        return [None]
    return [item.strip().lower() for item in category_property.split(',')]


def get_single_item(item):
    if not item or str(item).lower().strip() in ['nan', 'nr', '']:
        return [None]
    return [str(item).lower().strip()]


def get_expend_file_from_basic(basic_s3_path):
    return re.sub(r"/basic/(\d+).csv", r"/expand/\1.xlsx", basic_s3_path)


def get_segment(basic_s3_path):
    m = re.search(r"/basic/(\d+).csv", basic_s3_path)
    s = 0
    if m:
        s = m.group(1)
    return s

In [4]:
parser = argparse.ArgumentParser()
default_bucket = 'sagemaker-us-east-1-002224604296'
default_mk_region = '1'
level_1 = 'recommender-system-film-mk'

parser.add_argument('--bucket', type=str, default=default_bucket)
parser.add_argument('--mk-region', type=str, default=default_mk_region)

args, _ = parser.parse_known_args()
logging.info('Received arguments {}'.format(args))
bucket = args.bucket
mk_region = args.mk_region

out_s3_path = f's3://{bucket}/{level_1}/{mk_region}/feature/content/inverted-list'
input_s3 = f's3://{bucket}/{level_1}/{mk_region}/system/item-data'

# basic_s3_files = [f'{input_s3}/basic/{mk_region}.csv']
# expend_s3_files = [f'{input_s3}/expand/{mk_region}.xlsx']
#
basic_s3_files = []
expend_s3_files = []
all_mk_regions = "1,1271,1347,1686,1770,1940,2434,47,86"
for mk_rg in all_mk_regions.split(","):
    basic_s3_files.append(f's3://{bucket}/{level_1}/{mk_rg}/system/item-data/basic/{mk_rg}.csv')
    expend_s3_files.append(f's3://{bucket}/{level_1}/{mk_rg}/system/item-data/expand/{mk_rg}.xlsx')

logging.info(f"basic_s3_files={basic_s3_files}")
logging.info(f"expend_s3_files={expend_s3_files}")
logging.info(f"out_s3_path={out_s3_path}")

2021-03-18:05:04:07,219 INFO     [<ipython-input-4-157836270fd3>:10] Received arguments Namespace(bucket='sagemaker-us-east-1-002224604296', mk_region='1')
2021-03-18:05:04:07,221 INFO     [<ipython-input-4-157836270fd3>:27] basic_s3_files=['s3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1/system/item-data/basic/1.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1271/system/item-data/basic/1271.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1347/system/item-data/basic/1347.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1686/system/item-data/basic/1686.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1770/system/item-data/basic/1770.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1940/system/item-data/basic/1940.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/2434/system/item-data/basic/2434.csv', 's3://sagemaker-us-east-1-0022

In [5]:
########################################
# 从s3同步数据
########################################
def sync_s3(file_name_list, s3_folder, local_folder):
    for f in file_name_list:
        print("file preparation: download src key {} to dst key {}".format(os.path.join(
            s3_folder, f), os.path.join(local_folder, f)))
        s3client.download_file(bucket, os.path.join(
            s3_folder, f), os.path.join(local_folder, f))
        
def write_to_s3(filename, bucket, key):
    with open(filename, 'rb') as f:  # Read in binary mode
        return s3client.upload_fileobj(f, bucket, key)

default_bucket = 'sagemaker-us-east-1-002224604296'
default_mk_region = '1'
level_1 = 'recommender-system-film-mk'

parser = argparse.ArgumentParser()
parser.add_argument('--bucket', type=str, default=default_bucket)
parser.add_argument('--mk-region', type=str, default=default_mk_region)

args, _ = parser.parse_known_args()
bucket = args.bucket
mk_region = args.mk_region

prefix = f"{level_1}/{mk_region}"

print("bucket={}".format(bucket))
print("prefix='{}'".format(prefix))

s3client = boto3.client('s3')
local_folder = 'info'
if not os.path.exists(local_folder):
    os.makedirs(local_folder)
# 行为数据加载
file_name_list = ['action.csv']
s3_folder = '{}/system/user-data/clean/latest'.format(prefix)
sync_s3(file_name_list, s3_folder, local_folder)

bucket=sagemaker-us-east-1-002224604296
prefix='recommender-system-film-mk/1'
file preparation: download src key recommender-system-film-mk/1/system/user-data/clean/latest/action.csv to dst key info/action.csv


In [6]:
basic_s3_path = basic_s3_files
expend_s3_path = expend_s3_files
logging.info(f"gen_pick_files(), "
                 f"\nbasic_s3_path={basic_s3_path}, "
                 f"\nexpend_s3_path={expend_s3_path}, "
                 f"\nout_s3_path={out_s3_path}")

df = prepare_df(basic_s3_path, expend_s3_path)

2021-03-18:05:04:07,772 INFO     [<ipython-input-6-5cbf4cfdaf50>:3] gen_pick_files(), 
basic_s3_path=['s3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1/system/item-data/basic/1.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1271/system/item-data/basic/1271.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1347/system/item-data/basic/1347.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1686/system/item-data/basic/1686.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1770/system/item-data/basic/1770.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1940/system/item-data/basic/1940.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/2434/system/item-data/basic/2434.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/47/system/item-data/basic/47.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/86

2021-03-18:05:04:12,131 INFO     [<ipython-input-3-898828d3bb92>:19] now process sheet: 3D Movie
2021-03-18:05:04:12,140 INFO     [<ipython-input-3-898828d3bb92>:23] process_expend_file() return dataFrame, len: 1243
2021-03-18:05:04:12,143 INFO     [<ipython-input-3-898828d3bb92>:31] expand df columns=Index(['program_id', 'director', 'actor', 'popularity', 'ticket_num', 'score',
       'level'],
      dtype='object')
2021-03-18:05:04:12,144 INFO     [<ipython-input-3-898828d3bb92>:13] process_expend_file() enter, expend_s3_path:s3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/47/system/item-data/expand/47.xlsx
2021-03-18:05:04:13,213 INFO     [<ipython-input-3-898828d3bb92>:19] now process sheet: Movie
2021-03-18:05:04:13,217 INFO     [<ipython-input-3-898828d3bb92>:23] process_expend_file() return dataFrame, len: 4004
2021-03-18:05:04:13,219 INFO     [<ipython-input-3-898828d3bb92>:31] expand df columns=Index(['program_id', 'director', 'actor', 'popularity', 'ticket_nu

In [7]:
movie_id_movie_property_data = {}
row_cnt = 0
for row in df.iterrows():
    item_row = row[1]
    program_id = str(item_row['program_id'])
    program_dict = {
        'director': get_single_item(item_row['director']),
        'level': get_single_item(item_row['level']),
        'year': get_single_item(item_row['release_year']),
        'actor': get_actor(item_row['actor']),
        'category': get_category(item_row['category_property']),
        'language': get_single_item(item_row['language'])
    }
    row_content = []
    row_content.append(str(item_row['program_id']))
    row_content.append(program_dict['director'])
    row_content.append(program_dict['level'])
    row_content.append(program_dict['year'])
    row_content.append(program_dict['actor'])
    row_content.append(program_dict['category'])
    row_content.append(program_dict['language'])
    movie_id_movie_property_data['row_{}'.format(row_cnt)] = row_content 
    row_cnt = row_cnt + 1

In [8]:
raw_data_pddf = pd.DataFrame.from_dict(movie_id_movie_property_data, orient='index', columns=['programId', 'director','level','year','actor','actegory','language'])
raw_data_pddf = raw_data_pddf.reset_index(drop=True)
raw_data_pddf.head()

Unnamed: 0,programId,director,level,year,actor,actegory,language
0,1689713,[brea grant],[None],[2020],"[angela bettis, david arquette, chloe farnwort...","[comedy, horror, thriller]",[english]
1,1689714,[michael boyle],[None],[2020],"[annie newton, drew petriello, katee shean, sp...","[comedy, drama]",[english]
2,1689715,[spike lee],[None],[2020],"[david byrne, chris giarmo, angie swan, jacque...","[documentary, music, musical]",[english]
3,1689716,[kara holden],[pg-13],[2020],"[steffan argus, sabrina carpenter, madison ise...","[drama, music]",[english]
4,1689717,[rza],[15],[2020],"[terrence howard, wesley snipes, t.i., eiza go...","[action, crime, drama]",[english]


In [9]:
# !!!应该用用户注册数据来生成encoding map
data_mk = pd.read_csv('info/action.csv',sep='\t')
data_mk.head()

Unnamed: 0,label,userid,programId,programType,action,timeStamp,title,genres
0,1,13396,1690115,1,2,1608198657,jungleland,drama
1,1,13396,1690115,1,2,1608198696,jungleland,drama
2,1,170811011119,1690115,1,2,1610225454,jungleland,drama
3,1,170811012694,1690115,1,2,1612996593,jungleland,drama
4,1,24384,1690115,1,2,1614031172,jungleland,drama


In [10]:
sample_data_pddf = raw_data_pddf

In [11]:
# generate lable encoding/ sparse feature
lbe = LabelEncoder()
sample_data_pddf['encode_id'] = lbe.fit_transform(sample_data_pddf['programId']) + 1
data_mk['encode_id'] = lbe.fit_transform(data_mk['userid']) + 1

In [12]:
# constructu mapping dictionary
raw_user_id_list = list(map(str,data_mk['userid'].values))
code_user_id_list = list(map(int, data_mk['encode_id'].values))
raw_embed_user_id_dict = dict(zip(raw_user_id_list, code_user_id_list))
embed_raw_user_id_dict = dict(zip(code_user_id_list, raw_user_id_list))

raw_item_id_list = list(map(str,sample_data_pddf['programId'].values))
code_item_id_list = list(map(int, sample_data_pddf['encode_id'].values))
raw_embed_item_id_dict = dict(zip(raw_item_id_list, code_item_id_list))
embed_raw_item_id_dict = dict(zip(code_item_id_list, raw_item_id_list))

In [13]:
file_name = 'info/raw_embed_user_mapping.pickle'
output_file = open(file_name, 'wb')
pickle.dump(raw_embed_user_id_dict, output_file)
output_file.close()
write_to_s3(file_name, bucket, "{}/feature/action/{}".format(prefix,file_name.split('/')[-1]))

file_name = 'info/embed_raw_user_mapping.pickle'
output_file = open(file_name, 'wb')
pickle.dump(embed_raw_user_id_dict, output_file)
output_file.close()
write_to_s3(file_name, bucket, "{}/feature/action/{}".format(prefix,file_name.split('/')[-1]))

file_name = 'info/raw_embed_item_mapping.pickle'
output_file = open(file_name, 'wb')
pickle.dump(raw_embed_item_id_dict, output_file)
output_file.close()
write_to_s3(file_name, bucket, "{}/feature/action/{}".format(prefix,file_name.split('/')[-1]))

file_name = 'info/embed_raw_item_mapping.pickle'
output_file = open(file_name, 'wb')
pickle.dump(embed_raw_item_id_dict, output_file)
output_file.close()
write_to_s3(file_name, bucket, "{}/feature/action/{}".format(prefix,file_name.split('/')[-1]))

In [14]:
!rm -r info/*
!python add-item-user-batch.py

2021-03-18:05:07:45,584 INFO     [add-item-user-batch.py:27] Received arguments Namespace(bucket='sagemaker-us-east-1-002224604296', mk_region='1')
2021-03-18:05:07:45,584 INFO     [add-item-user-batch.py:44] basic_s3_files=['s3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1/system/item-data/basic/1.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1271/system/item-data/basic/1271.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1347/system/item-data/basic/1347.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1686/system/item-data/basic/1686.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1770/system/item-data/basic/1770.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1940/system/item-data/basic/1940.csv', 's3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/2434/system/item-data/basic/2434.csv', 's3://sagemaker-us-east-1-002224604296/recomme

2021-03-18:05:07:49,810 INFO     [add-item-user-batch.py:107] now process sheet: Movie
2021-03-18:05:07:49,812 INFO     [add-item-user-batch.py:111] process_expend_file() return dataFrame, len: 245
2021-03-18:05:07:49,813 INFO     [add-item-user-batch.py:119] expand df columns=Index(['program_id', 'director', 'actor', 'popularity', 'ticket_num', 'score',
       'level'],
      dtype='object')
2021-03-18:05:07:49,813 INFO     [add-item-user-batch.py:101] process_expend_file() enter, expend_s3_path:s3://sagemaker-us-east-1-002224604296/recommender-system-film-mk/1940/system/item-data/expand/1940.xlsx
2021-03-18:05:07:49,913 INFO     [add-item-user-batch.py:107] now process sheet: New Movie
2021-03-18:05:07:49,914 INFO     [add-item-user-batch.py:107] now process sheet: Movie
2021-03-18:05:07:49,916 INFO     [add-item-user-batch.py:107] now process sheet: 3D Movie
2021-03-18:05:07:49,920 INFO     [add-item-user-batch.py:111] process_expend_file() return dataFrame, len: 11
2021-03-18:05:07

2021-03-18:05:07:53,695 INFO     [add-item-user-batch.py:125] basic df columns=Index(['program_id', 'program_type', 'program_name', 'sum_series',
       'new_series', 'createtime', 'series_type', 'star_level', 'release_year',
       'hdtv', 'play_level', 'category_property', 'language',
       'original_country', 'description', 'picture_url', 'length',
       'category_name'],
      dtype='object')
2021-03-18:05:07:53,695 INFO     [add-item-user-batch.py:123] process_basic_file() enter, process_basic_file:<function process_basic_file at 0x7f092c2800e0>
2021-03-18:05:07:53,956 INFO     [add-item-user-batch.py:125] basic df columns=Index(['program_id', 'program_type', 'program_name', 'sum_series',
       'new_series', 'createtime', 'series_type', 'star_level', 'release_year',
       'hdtv', 'play_level', 'category_property', 'language',
       'original_country', 'description', 'picture_url', 'length',
       'category_name'],
      dtype='object')
2021-03-18:05:07:54,32 INFO     [add-it