In [1]:
%matplotlib inline
import os, sys, numpy as np, pandas as pd, tensorflow as tf, re, codecs, seaborn as sns, json, time, csv, datetime as dt
import pickle, collections, random, math, numbers, scipy.sparse as sp, matplotlib.pyplot as plt, scipy.sparse as sp

from pprint import pprint
from tensorflow.contrib.training.python.training.hparam import HParams

def reload(mName):
    import importlib
    if mName in sys.modules:
        del sys.modules[mName]
    return importlib.import_module(mName)


from collections import deque, defaultdict, OrderedDict
from sklearn.preprocessing import MinMaxScaler, LabelEncoder, minmax_scale
from matplotlib import pyplot as plt
plt.style.use('ggplot')

# classpath
# ctx = os.path.abspath('..').replace('\\', '/')
ctx = 'D:/Python/notebook/restful'
cps = [ctx]
_ = [sys.path.insert(0, cp) for cp in cps if cp not in sys.path]

# data path
datapath = '/'.join([ctx, 'repo', 'data'])

seed = 88
utils = reload('recomm.trainer.utils.utils')
np.set_printoptions(precision=4, suppress=True, linewidth=100)
np.random.seed(seed)

  from ._conv import register_converters as _register_converters


## Simple Data Preprocess

In [None]:
import datetime as dt

ratings = pd.read_csv("{}/ml-latest-small/ratings.csv".format(datapath))
ratings['timestamp'] = ratings.timestamp.map(dt.datetime.fromtimestamp).map(str)
ratings['ori_rating'] = ratings['rating']
ratings['rating'] = (ratings.rating >= 4).astype(int)
tr, te = utils.split_by_ratio(ratings)

movies = pd.read_csv("{}/ml-latest-small/movies.csv".format(datapath))
avg_rt = ratings.groupby("movieId", as_index=False).ori_rating.mean().rename(index=str, columns={'ori_rating': 'avg_rating'})
movies = movies.merge(avg_rt, how='left', on='movieId')
# movies.avg_rating.fillna(ratings.rating.mean())
movies["year"] = movies.title.str.findall("\(\s*(\d+)\s*\)").map(lambda lst: int(lst[-1]) if len(lst) else None)
# movies["year"] = minmax_scale(movies.year.fillna(movies.year.median()))

In [None]:
def preprocess(data, movie_trans, train_hist=None, is_train=True):
    queue = []
    data = data.merge(movie_trans, how="left", on="movieId")
    columns=["user_id", "query_movie_ids",
             "genres", "avg_rating", "year", "candidate_movie_id",
             "timestamp",
             "rating"]
    
    list2str = lambda lst: ','.join(map(str, lst))
    for u, df in data.groupby("userId"):
        df = df.sort_values("rating", ascending=False)
        if not is_train:
            user_movies_hist = train_hist.query("userId == {}".format(u)).movieId
        for i, (_, r) in enumerate(df.iterrows()):
            if is_train:
                query_hist = df.movieId[:i].tolist() + df.movieId[i + 1:].tolist()
                query_hist = list2str(query_hist)
                queue.append([int(r.userId), query_hist, r.genres, r.avg_rating, r.year, int(r.movieId), r.timestamp, r.rating])
            else:
                tr_hist = set(user_movies_hist.tolist())
                query_hist = list(tr_hist - set([int(r.movieId)]))
                query_hist = list2str(query_hist)
                queue.append([int(r.userId), query_hist, r.genres, r.avg_rating, r.year, int(r.movieId), r.timestamp, r.rating])
    return pd.DataFrame(queue, columns=columns)
    
tr_merged = preprocess(tr, movies)
tr_merged.to_csv('./tr.raw.movielens.csv', index=False, header=None)

te_merged = preprocess(te, movies, tr, is_train=False)
te_merged.to_csv('./te.raw.movielens.csv', index=False, header=None)
# 合併成一個檔案
merged = pd.concat([tr_merged, te_merged], ignore_index=True)
merged.to_csv('./merged_movielens.csv', index=False, header=None)
merged.head()

<br/>
<br/>
<br/>
## Cmd Submit Training

In [None]:
!cd D:/Python/notebook/recomm_prod && \
gcloud ml-engine jobs submit training recomm_movielens_15 \
    --job-dir gs://recomm-job/foo/model \
    --runtime-version 1.4 \
    --module-name trainer.ctrl \
    --package-path trainer \
    --region asia-east1 \
    --config config.yaml \
    -- \
    --method train \
    --conf-path gs://recomm-job/foo/data/user_supplied/movielens.yaml

In [None]:
!gcloud ml-engine jobs describe recomm_movielens_15

<br/>
<br/>
<br/>
## Cloud Transform Data

In [None]:
utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
flex = reload('recomm.trainer.utils.flex')
service = reload('recomm.trainer.service')

ctrl = reload('recomm.trainer.ctrl').Ctrl.instance
params = dict(conf_path='gs://movielens-foo/user_supplied/movielens.yaml')
ctrl.gen_data(params)

## Cloud View Schema

In [134]:
utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
flex = reload('recomm.trainer.utils.flex')
service = reload('recomm.trainer.service')

ctrl = reload('recomm.trainer.ctrl').Ctrl.instance
params = {'conf_path': 'gs://movielens-foo/user_supplied/movielens.yaml'}
loader = ctrl.load_schema(params)

vars(loader.schema)

2018-03-13 17:31:40,947 - Loader - INFO [line:365] - try to unserialize from gs://recomm-job/foo-bar/movielens_recommendation/data/parsed.yaml


{'col_states_': OrderedDict([('query_movie_ids',
               CatgMapper(allow_null=True, default=None, is_multi=True,
                     name='query_movie_ids', sep=',', vocabs=None, vocabs_path=None)),
              ('genres',
               CatgMapper(allow_null=True, default=None, is_multi=True, name='genres',
                     sep='|', vocabs=None, vocabs_path=None)),
              ('avg_rating', NumericMapper(default=None, name='avg_rating')),
              ('year', NumericMapper(default=None, name='year')),
              ('candidate_movie_id',
               CatgMapper(allow_null=True, default=None, is_multi=False,
                     name='candidate_movie_id', sep=None, vocabs=None, vocabs_path=None)),
              ('rating',
               CatgMapper(allow_null=False, default=None, is_multi=False, name='rating',
                     sep=None, vocabs=None, vocabs_path=None))]),
 'conf_': {'columns': [{'id': 'user_id', 'm_dtype': 'catg'},
   {'id': 'query_movie_ids',
  

<br/>
<br/>
<br/>
## Cloud Ml-Engine Training

In [16]:
utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
flex = reload('recomm.trainer.utils.flex')
service = reload('recomm.trainer.service')

ctrl = reload('recomm.trainer.ctrl').Ctrl.instance
params = {'conf_path': 'gs://movielens-foo/user_supplied/movielens.yaml',
          'runtime_version': '1.4'}
ret = ctrl.train_submit(params)
job_id = ret.get('job_id')
print( ret.get('response') )

cd D:\Python\notebook\restful\recomm &&             gcloud ml-engine jobs submit training foo_bar_movielens_recommendation_20180313104856810910                 --job-dir gs://recomm-job/foo-bar/movielens_recommendation/model                 --module-name trainer.ctrl                 --package-path trainer                 --region asia-east1                 --config config.yaml                 --runtime-version 1.4                 --                 --train-steps 1000                 --method train                 --conf-path gs://movielens-foo/user_supplied/movielens.yaml                 --job-id foo_bar_movielens_recommendation_20180313104856810910
jobId: foo_bar_movielens_recommendation_20180313104856810910
state: QUEUED
  for chunk in iter(lambda: fp.read(4096), ''):
Job [foo_bar_movielens_recommendation_20180313104856810910] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ml-engine jobs describe foo_bar_movielens_re

<br/>
<br/>
<br/>
## Describe Job States

In [17]:
from oauth2client.client import GoogleCredentials
from googleapiclient import discovery

utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
flex = reload('recomm.trainer.utils.flex')
service = reload('recomm.trainer.service')

ctrl = reload('recomm.trainer.ctrl').Ctrl.instance
params = {'conf_path': 'gs://movielens-foo/user_supplied/movielens.yaml', 'job_id': job_id}
ctrl.describe(params)

2018-03-13 10:49:29,258 - googleapiclient.discovery - INFO [line:274] - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/ml/v1/rest
2018-03-13 10:49:31,118 - googleapiclient.discovery - INFO [line:868] - URL being requested: GET https://ml.googleapis.com/v1/projects/training-recommendation-engine/jobs/foo_bar_movielens_recommendation_20180313104856810910?alt=json
2018-03-13 10:49:31,119 - oauth2client.transport - INFO [line:151] - Attempting refresh to obtain initial access_token
2018-03-13 10:49:31,153 - oauth2client.client - INFO [line:795] - Refreshing access_token
2018-03-13 10:49:33,369 - Ctrl - INFO [line:207] - foo-bar: describe take time 0:00:04.298561


{'createTime': '2018-03-13T02:49:02Z',
 'jobId': 'foo_bar_movielens_recommendation_20180313104856810910',
 'state': 'PREPARING',
 'trainingInput': {'args': ['--train-steps',
   '1000',
   '--method',
   'train',
   '--conf-path',
   'gs://movielens-foo/user_supplied/movielens.yaml',
   '--job-id',
   'foo_bar_movielens_recommendation_20180313104856810910'],
  'jobDir': 'gs://recomm-job/foo-bar/movielens_recommendation/model',
  'packageUris': ['gs://recomm-job/foo-bar/movielens_recommendation/model/packages/bf35e3c8bf0dc0b0dc14b04cc39463b2bd01a92fc664a452921911a2af66c201/trainer-0.1.tar.gz'],
  'pythonModule': 'trainer.ctrl',
  'pythonVersion': '3.5',
  'region': 'asia-east1',
  'runtimeVersion': '1.4'},
 'trainingOutput': {}}

## Deploy

In [None]:
from oauth2client.client import GoogleCredentials
from googleapiclient import discovery

utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
flex = reload('recomm.trainer.utils.flex')
service = reload('recomm.trainer.service')

views = reload('recomm.views').ViewRecomm.instance
params = {'conf_path': 'gs://movielens-foo/user_supplied/movielens.yaml'}
ret = views.deploy(params)
ret

## Get Information From Deployed Model

In [None]:
from oauth2client.client import GoogleCredentials
from googleapiclient import discovery

utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
flex = reload('recomm.trainer.utils.flex')
service = reload('recomm.trainer.service')

views = reload('recomm.views').ViewRecomm.instance
params = {'conf_path': 'gs://movielens-foo/user_supplied/movielens.yaml'}
ret = views.model_info(params)
ret

## Restful predict

In [43]:
ratings = pd.read_csv("{}/ml-latest-small/ratings.csv".format(datapath))
ratings['timestamp'] = ratings.timestamp.map(dt.datetime.fromtimestamp).map(str)
ratings['ori_rating'] = ratings['rating']
ratings['rating'] = (ratings.rating >= 4).astype(int)

movies = pd.read_csv("{}/ml-latest-small/movies.csv".format(datapath))
avg_rt = ratings.groupby("movieId", as_index=False).ori_rating.mean().rename(index=str, columns={'ori_rating': 'avg_rating'})
movies = movies.merge(avg_rt, how='left', on='movieId')
movies["year"] = movies.title.str.findall("\(\s*(\d+)\s*\)").map(lambda lst: int(lst[-1]) if len(lst) else None)

ctrl = reload('recomm.trainer.ctrl').Ctrl.instance
params = {'conf_path': 'D:/Python/notebook/restful/repo/user_supplied/movielens.local.yaml', 'is_local': True}
loader = ctrl.load_schema(params)
merged = pd.read_csv('D:/Python/notebook/restful/repo/user_supplied/raws/merged_movielens.csv', names=loader.schema.raw_cols)
merged.head()

2018-03-13 14:41:53,669 - Loader - INFO [line:360] - try to unserialize from D:\Python\notebook\restful\repo/foo-bar/movielens_recommendation/data/parsed.yaml


Unnamed: 0,user_id,query_movie_ids,genres,avg_rating,year,candidate_movie_id,timestamp,rating
0,1,"1953,2105,31,1029,1061,1129,1263,1287,1293,133...",Drama,4.26087,1989.0,1172,2009-12-14 10:53:25,1
1,1,"1172,2105,31,1029,1061,1129,1263,1287,1293,133...",Action|Crime|Thriller,4.021739,1971.0,1953,2009-12-14 10:53:11,1
2,1,"1172,1953,31,1029,1061,1129,1263,1287,1293,133...",Action|Adventure|Sci-Fi,3.478723,1982.0,2105,2009-12-14 10:52:19,1
3,1,"1172,1953,2105,1029,1061,1129,1263,1287,1293,1...",Drama,3.178571,1995.0,31,2009-12-14 10:52:24,0
4,1,"1172,1953,2105,31,1061,1129,1263,1287,1293,133...",Animation|Children|Drama|Musical,3.702381,1941.0,1029,2009-12-14 10:52:59,0


In [113]:
# one user vs all items
# def restful_data(user_ids, num=5):
#     data = {
#         'query_movie_ids': merged.query('user_id in {}'.format(user_ids)).groupby('user_id').query_movie_ids.max().tolist(),
#     }
#     items = movies.rename(index=str, columns={"movieId": "candidate_movie_id"}).drop('title', 1)
#     items.loc[:, 'candidate_movie_id'] = items.candidate_movie_id.astype(str)
#     # reduce to 5 records
#     items = items[:num].to_dict('list')
#     data.update(items)
#     return data
def restful_data(user_ids, num=10):
    query_movie_ids = merged.query('user_id in {}'.format(user_ids))\
                            .groupby('user_id')\
                            .query_movie_ids.max()\
                            .tolist()
    items = movies.rename(index=str, columns={"movieId": "candidate_movie_id"}).drop('title', 1)
    items.loc[:, 'candidate_movie_id'] = items.candidate_movie_id.astype(str)
    items.loc[:, 'query_movie_ids'] = query_movie_ids[0]
    # reduce to 5 records
    items = items[:num].to_dict('records')
    return items

pd.DataFrame(restful_data((22,)))

Unnamed: 0,avg_rating,candidate_movie_id,genres,query_movie_ids,year
0,3.87247,1,Adventure|Animation|Children|Comedy|Fantasy,"32,1884,1580,1527,1387,1377,1376,1375,1372,135...",1995.0
1,3.401869,2,Adventure|Children|Fantasy,"32,1884,1580,1527,1387,1377,1376,1375,1372,135...",1995.0
2,3.161017,3,Comedy|Romance,"32,1884,1580,1527,1387,1377,1376,1375,1372,135...",1995.0
3,2.384615,4,Comedy|Drama|Romance,"32,1884,1580,1527,1387,1377,1376,1375,1372,135...",1995.0
4,3.267857,5,Comedy,"32,1884,1580,1527,1387,1377,1376,1375,1372,135...",1995.0
5,3.884615,6,Action|Crime|Thriller,"32,1884,1580,1527,1387,1377,1376,1375,1372,135...",1995.0
6,3.283019,7,Comedy|Romance,"32,1884,1580,1527,1387,1377,1376,1375,1372,135...",1995.0
7,3.8,8,Adventure|Children,"32,1884,1580,1527,1387,1377,1376,1375,1372,135...",1995.0
8,3.15,9,Action,"32,1884,1580,1527,1387,1377,1376,1375,1372,135...",1995.0
9,3.45082,10,Action|Adventure|Thriller,"32,1884,1580,1527,1387,1377,1376,1375,1372,135...",1995.0


## Restful Call

In [132]:
inputs = restful_data((22,))
inputs.reverse()
inputs

[{'avg_rating': 3.4508196721311477,
  'candidate_movie_id': '10',
  'genres': 'Action|Adventure|Thriller',
  'query_movie_ids': '32,1884,1580,1527,1387,1377,1376,1375,1372,1356,1339,1291,1270,1255,1240,1215,1210,1200,2174,2288,2291,3081,4011,3868,3793,3535,3527,3355,3213,3033,2459,2987,2959,2858,2762,2712,2571,2542,1198,1799,457,648,555,480,551,589,541,592,593,253,260,163,296,858,1089,2881,235,2990,2953,231,3052,208,2985,1148,2763,3147,267,2723,2717,2716,2710,2701,2700,2683,2672,3082,3300,3176,3751,4015,47,3999,3996,3994,3977,48,3826,3809,70,3697,173,3623,3578,153,158,3438,3408,3354,2617,3285,3253,2657,356,2616,588,1693,1682,1645,1641,1625,1608,1544,1391,552,586,1374,2605,1371,1320,784,1263,785,1080,1214,1208,1201,1097,1721,1722,1769,44,315,355,2502,1101,2431,2402,2340,2301,442,2232,2115,2081,2023,2011,2006,1997,1923,1917,1909,485,1876,4027',
  'year': 1995.0},
 {'avg_rating': 3.15,
  'candidate_movie_id': '9',
  'genres': 'Action',
  'query_movie_ids': '32,1884,1580,1527,1387,1377,137

In [133]:
import requests
# job_id = 'foo_bar_movielens_recommendation_20180313142623533104'
inputs = restful_data((22,))
inputs.reverse()
resp = requests.post(
    'http://127.0.0.1:8000/api/recomm/online_predict/',
    data={'conf_path': 'gs://movielens-foo/user_supplied/movielens.yaml',
          'json_data': json.dumps(inputs)}
)
r = json.loads(resp.text)
r

{'err_cde': '00',
 'response': [0.3389380872249603,
  0.49546846747398376,
  0.29588234424591064,
  0.471061110496521,
  0.18395668268203735,
  0.4615522027015686,
  0.8539339900016785,
  0.5304296612739563,
  0.36504778265953064,
  0.21630224585533142]}

In [None]:
utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
flex = reload('recomm.trainer.utils.flex')
service = reload('recomm.trainer.service')

views = reload('recomm.views').ViewRecomm.instance
params = {'conf_path': 'gs://movielens-foo/user_supplied/movielens.yaml',
          'json_data': restful_data((22,))}
ret = views.predict(params)
ret.get('response')

## Local Load Saved Model

In [None]:
from tensorflow.contrib import predictor

utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
flex = reload('recomm.trainer.utils.flex')
reload('recomm.trainer.reco_mf_dnn_est')
service = reload('recomm.trainer.service')

data = restful_data((22,))
views = reload('recomm.views').ViewRecomm.instance
params = {'conf_path': '../data/foo/user_supplied/movielens.local.yaml', 'is_local': True, 'json_data': data}
data_for_model = views.transform(params).get('response')

del data_for_model['query_movie_ids']
pred_fn = predictor.from_saved_model('../repo/foo-bar/movielens_recommendation/model/export/export_foo-bar/1520581839')
pred_fn(data_for_model)

## Inspect Saved Model

In [None]:
with tf.Session() as sess:
    tf.saved_model.loader.load(sess, 
        [tf.saved_model.tag_constants.SERVING],
        export_dir='../repo/foo-bar/movielens_recommendation/model/export/export_foo-bar/1520581839')
    for n in sess.graph.as_graph_def().node:
        print(n.name)

## Dataset Test

In [None]:
# model = result.get('response')
# vars(model)
utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
flex = reload('recomm.trainer.utils.flex')
service = reload('recomm.trainer.service')

ctrl = reload('recomm.trainer.ctrl').Ctrl.instance
params = {'conf_path': '../data/foo/user_supplied/movielens.local.yaml', 
          'is_local': True,
          'runtime_version': '1.4',
          'train_steps': 600}

tf.reset_default_graph()
model, p = ctrl.get_model(params)
train_input = model.input_fn2([p.train_file], n_epoch=1, n_batch=5)
features, labels = train_input()
print('\nfeatures: ', features)
with tf.Session() as sess:
    tf.global_variables_initializer().run()
    print( sess.run(features) )

<br/>
<br/>
<br/>
## Local Transform

In [3]:
utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
flex = reload('recomm.trainer.utils.flex')
service = reload('recomm.trainer.service')

ctrl = reload('recomm.trainer.ctrl').Ctrl.instance
params = {'conf_path': 'D:/Python/notebook/restful/repo/user_supplied/movielens.local.yaml', 'is_local': True}
ctrl.gen_data(params)

2018-03-13 11:23:05,663 - Loader - INFO [line:368] - try to parse D:/Python/notebook/restful/repo/user_supplied/movielens.local.yaml (user supplied) ...
2018-03-13 11:23:06,102 - CatgMapper - INFO [line:319] - [query_movie_ids] fetch vocab [D:/Python/notebook/restful/repo/user_supplied/item.vocab] 
2018-03-13 11:23:06,111 - CatgMapper - INFO [line:319] - [genres] fetch vocab [D:/Python/notebook/restful/repo/user_supplied/genres.vocab] 
2018-03-13 11:23:06,132 - CatgMapper - INFO [line:319] - [candidate_movie_id] fetch vocab [D:/Python/notebook/restful/repo/user_supplied/item.vocab] 
2018-03-13 11:23:08,506 - Loader - INFO [line:381] - try to transform ['D:/Python/notebook/restful/repo/user_supplied/raws/merged_movielens.csv'] ... 
2018-03-13 11:23:54,164 - Loader - INFO [line:440] - [D:/Python/notebook/restful/repo/user_supplied/raws/merged_movielens.csv]: process take time 0:00:44.931433


<br/>
<br/>
<br/>
## Local View Schema

In [3]:
utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
flex = reload('recomm.trainer.utils.flex')
service = reload('recomm.trainer.service')

with flex.io('D:/Python/notebook/restful/repo/foo-bar/movielens_recommendation/data/parsed.yaml').as_reader() as f:
    schema = flex.Schema.unserialize(f.stream)
vars(schema)

{'col_states_': OrderedDict([('query_movie_ids',
               CatgMapper(allow_null=True, default=None, is_multi=True,
                     name='query_movie_ids', sep=',', vocabs=None, vocabs_path=None)),
              ('genres',
               CatgMapper(allow_null=True, default=None, is_multi=True, name='genres',
                     sep='|', vocabs=None, vocabs_path=None)),
              ('avg_rating', NumericMapper(default=None, name='avg_rating')),
              ('year', NumericMapper(default=None, name='year')),
              ('candidate_movie_id',
               CatgMapper(allow_null=True, default=None, is_multi=False,
                     name='candidate_movie_id', sep=None, vocabs=None, vocabs_path=None)),
              ('rating',
               CatgMapper(allow_null=False, default=None, is_multi=False, name='rating',
                     sep=None, vocabs=None, vocabs_path=None))]),
 'conf_': {'columns': [{'id': 'user_id', 'm_dtype': 'catg'},
   {'id': 'query_movie_ids',
  

<br/>
<br/>
<br/>
## Local Training

In [138]:
utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
est = reload('recomm.trainer.reco_mf_dnn_est')
flex = reload('recomm.trainer.utils.flex')
service = reload('recomm.trainer.service')

ctrl = reload('recomm.trainer.ctrl').Ctrl.instance
params = {'conf_path': 'D:/Python/notebook/restful/repo/user_supplied/movielens.local.yaml', 
          'is_local': True,
          'runtime_version': '1.4',
          'train_steps': 600}
result = ctrl.train(params)

2018-03-13 17:52:58,932 - Ctrl - INFO [line:87] - received params: {'train_steps': 600, 'is_local': True, 'runtime_version': '1.4', 'conf_path': 'D:/Python/notebook/restful/repo/user_supplied/movielens.local.yaml'}
2018-03-13 17:52:58,935 - Ctrl - INFO [line:93] - do local training
2018-03-13 17:52:58,960 - Ctrl - INFO [line:113] - foo-bar: try to unserialize D:\Python\notebook\restful\repo/foo-bar/movielens_recommendation/data/parsed.yaml
2018-03-13 17:53:00,499 - Service - INFO [line:48] - received params: {'deploy_path': 'D:\\Python\\notebook\\restful\\repo/foo-bar/movielens_recommendation/data/deploy.yaml', 'train_steps': 600, 'parsed_conf_path': 'D:\\Python\\notebook\\restful\\repo/foo-bar/movielens_recommendation/data/parsed.yaml', 'eval_name': 'foo-bar', 'raw_dir': 'D:/Python/notebook/restful/repo/user_supplied/raws', 'data_dir': 'D:\\Python\\notebook\\restful\\repo/foo-bar/movielens_recommendation/data', 'model_id': 'movielens_recommendation', 'save_every_steps': None, 'train_f

ValueError: The two structures don't have the same sequence length. Input structure has length 6, while shallow structure has length 8.

## Local Get Model and Predict

In [13]:
utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
flex = reload('recomm.trainer.utils.flex')
est = reload('recomm.trainer.reco_mf_dnn_est')
service = reload('recomm.trainer.service')

ctrl = reload('recomm.trainer.ctrl').Ctrl.instance
params = {'conf_path': 'D:/Python/notebook/restful/repo/user_supplied/movielens.local.yaml', 
          'is_local': True, 'json_data': restful_data((22,))}
r = ctrl.est_predict(params)

INFO:tensorflow:Using config: {'_master': '', '_global_id_in_cluster': 0, '_service': None, '_task_id': 0, '_keep_checkpoint_every_n_hours': 10000, '_task_type': 'worker', '_is_chief': True, '_save_checkpoints_secs': 600, '_keep_checkpoint_max': 5, '_session_config': None, '_model_dir': 'D:\\Python\\notebook\\restful\\repo/foo-bar/movielens_recommendation/model', '_save_summary_steps': 100, '_log_step_count_steps': 300, '_tf_random_seed': 88, '_num_worker_replicas': 1, '_num_ps_replicas': 0, '_save_checkpoints_steps': None, '_evaluation_master': '', '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x0000028580401C18>}
2018-03-13 10:41:28,131 - tensorflow - INFO [line:116] - Using config: {'_master': '', '_global_id_in_cluster': 0, '_service': None, '_task_id': 0, '_keep_checkpoint_every_n_hours': 10000, '_task_type': 'worker', '_is_chief': True, '_save_checkpoints_secs': 600, '_keep_checkpoint_max': 5, '_session_config': None, '_model_dir': 'D:\\Python\\not

## Test

In [158]:
utils = reload('recomm.trainer.utils.utils')
env = reload('recomm.trainer.env')
flex = reload('recomm.trainer.utils.flex')
est = reload('recomm.trainer.reco_mf_dnn_est')
service = reload('recomm.trainer.service')

ctrl = reload('recomm.trainer.ctrl').Ctrl.instance
params = {'conf_path': 'gs://movielens-foo/user_supplied/movielens.yaml'} # 'json_data': restful_data((22,))
r = ctrl.test(params)
r.input_fn(['D:/Python/notebook/restful/repo/foo-bar/movielens_recommendation/data/data.vl'])() 

2018-03-13 18:13:13,810 - Ctrl - INFO [line:299] - test req: {'conf_path': 'gs://movielens-foo/user_supplied/movielens.yaml'}
conf_path             gs://movielens-foo/user_supplied/movielens.yaml
pid                                                           foo-bar
raw_dir                         gs://movielens-foo/user_supplied/raws
model_id                                     movielens_recommendation
runtime_version                                                   1.4
repo                 gs://recomm-job/foo-bar/movielens_recommendation
job_dir             gs://recomm-job/foo-bar/movielens_recommendati...
data_dir            gs://recomm-job/foo-bar/movielens_recommendati...
deploy_path         gs://recomm-job/foo-bar/movielens_recommendati...
parsed_conf_path    gs://recomm-job/foo-bar/movielens_recommendati...
train_file          gs://recomm-job/foo-bar/movielens_recommendati...
valid_file          gs://recomm-job/foo-bar/movielens_recommendati...
export_name                       

ValueError: The two structures don't have the same sequence length. Input structure has length 6, while shallow structure has length 8.

### 更改GCS movielens.yaml

In [None]:
import shutil
from google.cloud.storage.blob import Blob
from io import BytesIO

utils = reload('recomm.trainer.utils.utils')
flex = reload('recomm.trainer.utils.flex')
env = reload('recomm.trainer.env')

with flex.io('../data/foo/user_supplied/movielens.yaml') as r, \
    flex.io('gs://movielens-foo/user_supplied/movielens.yaml') as w:
    w.write(r.read())

# stream = BytesIO(open('../data/foo/user_supplied/movielens.yaml', mode='rb').read())
# utils.gcs_blob('gs://movielens-foo/user_supplied/movielens.yaml').upload_from_file(stream)

<br/>
<br/>
<br/>
## Dataset

In [None]:
def make_datasets(fpath_ary, schema, n_batch=128, n_epoch=1):
    def to_dense(sp):
        dense = tf.sparse_to_dense(sp.indices, sp.dense_shape, sp.values, '')
        return tf.reshape(tf.to_int32(tf.string_to_number(dense)), [-1])

    def to_sparse(dense):
        idx = tf.where(tf.not_equal(dense, 0))
        return tf.SparseTensor(indices=idx, dense_shape=dense.get_shape(), values=tf.gather_nd(dense, idx))

    def parse_csv(value):
        data = tf.decode_csv(value, record_defaults=defaults)
        features = OrderedDict(zip(cols, data))
        multi_cols = df_conf.query("{} == '{}' and {} == True".format(schema.M_DTYPE, schema.CATG, schema.IS_MULTI)).id.values
        for col in multi_cols:
            features[col] = tf.string_split([features[col]], ',')
            features[col] = to_dense(features[col])
            # features['{}_lens'.format(col)] = tf.size(features[col])
        return features 
    
    df_conf = schema.df_conf_.query('{}.notnull()'.format(schema.TYPE))
    cols = schema.cols
    defaults = []
    for _, r in df_conf.iterrows():
        if r[schema.M_DTYPE] == schema.CATG:
            defaults.append([''] if r[schema.IS_MULTI] else [0])
        else:
            defaults.append([])
    dataset = tf.data.TextLineDataset(fpath_ary)
    dataset = dataset.map(parse_csv, num_parallel_calls=4)
    has_multi = (df_conf[schema.M_DTYPE] == schema.CATG) & (df_conf[schema.IS_MULTI] == True)
    if sum(has_multi):
        multi_cols = df_conf[has_multi].id.values
        dataset = dataset.padded_batch(n_batch, OrderedDict( zip(cols, tuple([None] if e else [] for e in has_multi))) )
    else:
        dataset = dataset.batch(n_batch)
    dataset = dataset.shuffle(n_batch * 10, seed=seed).repeat(n_epoch)
    features = dataset.make_one_shot_iterator().get_next()
    return features, features.pop(schema.label[0])
                                
# tf.reset_default_graph()
with tf.Graph().as_default():
    inputs = make_datasets(['./movielens.tr'], loader.schema, n_batch=30)
    query_lens = tf.sequence_mask([1, 2, 3])
    ctx = []
    with tf.train.MonitoredTrainingSession() as sess:
        while not sess.should_stop():
            _, = sess.run([inputs])
            # print( sess.run(inputs) )
            pass

## Feature Columns with tf.feature_column.input_layer

In [None]:
a = pd.Series(minmax_scale(np.random.normal(0, 1, size=1000)))
a.hist(bins=50)

In [None]:
%%time
tf.reset_default_graph()
with tf.Graph().as_default():
    user_id = tf.feature_column.categorical_column_with_hash_bucket('user_id', hash_bucket_size=1000, dtype=tf.int32)
    user_id = tf.feature_column.embedding_column(user_id, dimension=8)
    avg_rating = tf.feature_column.numeric_column('avg_rating')
    columns = [user_id, avg_rating]
    
    def make_datasets(fpath_ary):
        cols = ['user_id', 'query_movie_ids', 'genres', 'avg_rating', 'year', 'candidate_movie_id', 'rating']
        defaults = [[0], [''], [''], [], [], [0], []]

        def parse_csv(value):
            data = tf.decode_csv(value, record_defaults=defaults)
            features = OrderedDict(zip(cols, data))
            # print(features)
            return features
        
        dataset = tf.data.TextLineDataset(fpath_ary)
        dataset = (dataset.map(parse_csv, num_parallel_calls=4)
                          .batch(3)
                          # .padded_batch(3, OrderedDict(zip(cols, ([], [None], [None], [], [], [], []))))
                          .shuffle(10, seed=seed)
                          .repeat(1)
                  )
        return dataset.make_one_shot_iterator().get_next()
    
    inputs = make_datasets(['./te_processed.batch.csv'])
    inputs = tf.feature_column.input_layer(inputs, columns)
    # features = tf.parse_example(serialized_example, features=tf.feature_column.make_parse_example_spec(columns))
    ctx = []
    with tf.train.MonitoredTrainingSession() as sess:
        while not sess.should_stop():
            print(sess.run(inputs))

### Make Example

In [None]:
%%time
cols = ['user_id', 'query_movie_ids', 'genres', 'avg_rating', 'year', 'candidate_movie_id', 'rating']
is_multi = [False, True, True, False, False, False, False]
pd_dtypes = [int, str, str, float, float, int, float]
types = ['int64_list', 'int64_list', 'int64_list', 'float_list', 'float_list', 'int64_list', 'float_list']
tf_types = [tf.int64, tf.int64, tf.int64, tf.float32, tf.float32, tf.int64, tf.float32]
def persist_example(fpath, tfpath):
    with tf.python_io.TFRecordWriter(tfpath) as w:
        for chunk in pd.read_csv(fpath, names=cols, dtype=dict(zip(cols, pd_dtypes)), chunksize=1000):
            chunk['query_movie_ids'] = chunk.query_movie_ids.map(lambda r: map(int, r.split(',')))
            chunk['genres'] = chunk.genres.map(lambda r: map(int, r.split(',')))
            
            for idx, r in chunk.iterrows():
                ex = tf.train.Example()
                for multi, col, tpe in zip(is_multi, cols, types):
                    val = r[col]
                    # ex.features.feature[col].int64_list or float_list or bytes_list
                    feat_type = getattr(ex.features.feature[col], tpe)
                    # extend function for multivalent columns, otherwise append
                    append_or_extend = 'append' if not multi else 'extend'                    
                    getattr(feat_type.value, append_or_extend)(val)
                w.write(ex.SerializePartialToString())

persist_example('./te_processed.csv', './data.tfrecord')

In [None]:
def decode_example(ser_example):
    # queue = tf.train.string_input_producer([fpath], num_epochs=1)
    # _, ser_example = tf.TFRecordReader().read(queue)
    # ser_example = tf.train.batch([ser_example], batch_size=10)
    ctx_features = {col: tf.FixedLenFeature([], tf_tpe)
                    for col, tf_tpe in zip(cols, tf_types) if col not in ('query_movie_ids', 'genres')}
    seq_features = {col: tf.FixedLenSequenceFeature([], tf_tpe) 
                    for col, tf_tpe in [('query_movie_ids', tf.int64), ('genres', tf.int64)]}
    context_dict, sequence_dict = tf.parse_single_sequence_example(ser_example, 
                                                                   context_features=ctx_features, 
                                                                   sequence_features=seq_features)
    # for col, tpe in zip(cols, tf_types):
    #     val = feature_dict[col]
    #     feature_dict[col] = tf.sparse_to_dense(val.indices, val.dense_shape, val.values, name=col)
    feature_dict = {}
    feature_dict.update(context_dict)
    feature_dict.update(sequence_dict)
    ret = OrderedDict()
    for c in cols:
        ret[c] = feature_dict[c]
    return tuple(ret.values())

tf.reset_default_graph()
with tf.Graph().as_default():
    dataset = tf.data.TFRecordDataset(['./data.tfrecord'])
    dataset = dataset.map(decode_example).padded_batch(10, padded_shapes=([], [None], [None], [], [], [], []))
    # dataset = dataset.batch(3)
    iters = dataset.make_one_shot_iterator()
    r = iters.get_next()
    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        sess.run(tf.tables_initializer())
        print( sess.run(r) )

## Traditional parse_example
1. tf.train.Coordinator + tf.train.start_queue_runners

In [None]:
from tensorflow.python.framework import sparse_tensor
import re

def to_sparse(dense):
    idx = tf.where(tf.not_equal(dense, 0))
    return tf.SparseTensor(idx, tf.gather_nd(dense, idx), dense.get_shape())

def make_example(val):
    example = tf.train.Example(features=tf.train.Features(
        feature = {
            'query_movie_ids': tf.train.Feature(int64_list=tf.train.Int64List(value=val)),
            'genres': tf.train.Feature(int64_list=tf.train.Int64List(value=val))
        }
    ))
    return example

tf.reset_default_graph()
with tf.Graph().as_default():
    
    filename = "tmp.tfrecords"
    if not os.path.exists(filename):
        # os.remove(filename)
        writer = tf.python_io.TFRecordWriter(filename)
        with writer:
            for idx, r in teProcessed.head().iterrows():
                for col in ('query_movie_ids', 'genres'):
                    val = list(map(int, re.split(',\s*', r[col])))
                    ex = make_example(val)
                    writer.write(ex.SerializeToString())

    reader = tf.TFRecordReader()
    filename_queue = tf.train.string_input_producer(["tmp.tfrecords"], num_epochs=1)
    _, serialized_example = reader.read(filename_queue)

    batch = tf.train.batch(tensors=[serialized_example], batch_size=1)
    features = {
        'query_movie_ids': tf.VarLenFeature(tf.int64),
        'genres': tf.VarLenFeature(tf.int64)
    }
    data = tf.parse_example(batch, features)
    query_movie_ids = data['query_movie_ids']
    embbedding = tf.Variable(tf.glorot_uniform_initializer()([9125]), dtype=tf.float32)
    print(query_movie_ids.dense_shape)
    # r = tf.layers.dense(query_movie_ids, 10)
    # emb_query = tf.nn.embedding_lookup_sparse([embbedding], query_movie_ids, None, combiner='sqrtn')
    with tf.Session() as sess:
        tf.global_variables_initializer().run()
        tf.local_variables_initializer().run()
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(coord=coord, sess=sess)
        try:
            print(sess.run(data))
            pass
        except tf.errors.OutOfRangeError as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)
    

## Test

In [None]:
tf.reset_default_graph()
with tf.Graph().as_default():
    # a = tf.constant([[0, 1], [0, 2]])
    # w = tf.constant([[1, 1, 1],
    #                  [2, 2, 2],
    #                  [3, 3, 3]])
    # w1 = tf.constant([[1, 1, 1],
    #                   [2, 2, 2],
    #                   [3, 3, 3]])
    # c = tf.nn.embedding_lookup(w, a)
    print(tf.constant([1, 2, 3]))
    with tf.Session() as sess:
        tf.global_variables_initializer().run()
        # print(tf.size(tf.constant([1, 2, 3])).eval())
        # print((w * w1).eval())
        # print(c.eval())
        # print(tf.sequence_mask(tf.constant([[1], [2], [3]])).eval())