In [1]:
%matplotlib inline
import os, sys, numpy as np, pandas as pd, tensorflow as tf, re, codecs, seaborn as sns, json, time, csv, datetime as dt
import pickle, collections, random, math, numbers, scipy.sparse as sp, matplotlib.pyplot as plt, scipy.sparse as sp
from pprint import pprint

def reload(mName):
    import importlib
    if mName in sys.modules:
        del sys.modules[mName]
    return importlib.import_module(mName)


from collections import deque, defaultdict, OrderedDict
from sklearn.preprocessing import MinMaxScaler, LabelEncoder, minmax_scale
from matplotlib import pyplot as plt
plt.style.use('ggplot')

# classpath
ctx = os.path.abspath(os.path.abspath('..')).replace('\\', '/')
cps = [ctx]
_ = [sys.path.insert(0, cp) for cp in cps if cp not in sys.path]

# data path
datapath = '/'.join([ctx, 'data'])

seed = 88
utils = reload('utils.utils')
np.set_printoptions(precision=4, suppress=True, linewidth=100)
np.random.seed(seed)

  from ._conv import register_converters as _register_converters


# tf.decode_csv + tf.data.TextLineDataset

## Simple Data Preprocess

In [2]:
import datetime as dt

ratings = pd.read_csv("{}/ml-latest-small/ratings.csv".format(datapath))
ratings['timestamp'] = ratings.timestamp.map(dt.datetime.fromtimestamp).map(str)
ratings['ori_rating'] = ratings['rating']
ratings['rating'] = (ratings.rating >= 4).astype(int)
tr, te = utils.split_by_ratio(ratings)

movies = pd.read_csv("{}/ml-latest-small/movies.csv".format(datapath))
avg_rt = ratings.groupby("movieId", as_index=False).ori_rating.mean().rename(index=str, columns={'ori_rating': 'avg_rating'})
movies = movies.merge(avg_rt, how='left', on='movieId')
# movies.avg_rating.fillna(ratings.rating.mean())
movies["year"] = movies.title.str.findall("\(\s*(\d+)\s*\)").map(lambda lst: int(lst[-1]) if len(lst) else None)
# movies["year"] = minmax_scale(movies.year.fillna(movies.year.median()))

In [3]:
def preprocess(data, movie_trans, train_hist=None, is_train=True):
    queue = []
    data = data.merge(movie_trans, how="left", on="movieId")
    columns=["user_id", "query_movie_ids",
             "genres", "avg_rating", "year", "candidate_movie_id",
             "timestamp",
             "rating"]
    
    list2str = lambda lst: ','.join(map(str, lst))
    for u, df in data.groupby("userId"):
        df = df.sort_values("rating", ascending=False)
        if not is_train:
            user_movies_hist = train_hist.query("userId == {}".format(u)).movieId
        for i, (_, r) in enumerate(df.iterrows()):
            if is_train:
                query_hist = df.movieId[:i].tolist() + df.movieId[i + 1:].tolist()
                query_hist = list2str(query_hist)
                queue.append([int(r.userId), query_hist, r.genres, r.avg_rating, r.year, int(r.movieId), r.timestamp, r.rating])
            else:
                tr_hist = set(user_movies_hist.tolist())
                query_hist = list(tr_hist - set([int(r.movieId)]))
                query_hist = list2str(query_hist)
                queue.append([int(r.userId), query_hist, r.genres, r.avg_rating, r.year, int(r.movieId), r.timestamp, r.rating])
    return pd.DataFrame(queue, columns=columns)
    
tr_merged = preprocess(tr, movies)
tr_merged.to_csv('./tr.raw.movielens.csv', index=False, header=None)

te_merged = preprocess(te, movies, tr, is_train=False)
te_merged.to_csv('./te.raw.movielens.csv', index=False, header=None)
# 合併成一個檔案
merged = pd.concat([tr_merged, te_merged], ignore_index=True)
merged.to_csv('./merged_movielens.csv', index=False, header=None)
merged.head()

Unnamed: 0,user_id,query_movie_ids,genres,avg_rating,year,candidate_movie_id,timestamp,rating
0,1,"1953,2105,31,1029,1061,1129,1263,1287,1293,133...",Drama,4.26087,1989.0,1172,2009-12-14 10:53:25,1
1,1,"1172,2105,31,1029,1061,1129,1263,1287,1293,133...",Action|Crime|Thriller,4.021739,1971.0,1953,2009-12-14 10:53:11,1
2,1,"1172,1953,31,1029,1061,1129,1263,1287,1293,133...",Action|Adventure|Sci-Fi,3.478723,1982.0,2105,2009-12-14 10:52:19,1
3,1,"1172,1953,2105,1029,1061,1129,1263,1287,1293,1...",Drama,3.178571,1995.0,31,2009-12-14 10:52:24,0
4,1,"1172,1953,2105,31,1061,1129,1263,1287,1293,133...",Animation|Children|Drama|Musical,3.702381,1941.0,1029,2009-12-14 10:52:59,0


<br/>
<br/>
<br/>
## Transform Training Data

In [70]:
%%time
from tensorflow.contrib.training.python.training.hparam import HParams
utils = reload('utils.utils')
env = reload('env')
reload('service')
reco = reload('reco_mf_dnn.reco_mf_dnn_flex_shema')
ctrl = reload('ctrl').Ctrl.instance
# dir(ctrl.service)
hparam = HParams(
    conf_path='{}/foo/user_supplied/movielens.yaml'.format(datapath))

print(ctrl.gen_data(hparam))

try to parse D:/Python/notebook/recomm_prod/data/foo/user_supplied/movielens.yaml (user supplied) ...
try to transform ['D:/Python/notebook/recomm_prod/data/foo/user_supplied/raws\\merged_movielens.csv'] ... 
[D:/Python/notebook/recomm_prod/data/foo/user_supplied/raws\merged_movielens.csv]: process take time 0:01:01.056793
2018-02-12 17:15:00,294 - foo - INFO - foo: gen_data take time 0:01:04.555471
{'err_cde': 0}
Wall time: 1min 4s


## Train

In [None]:
from tensorflow.contrib.training.python.training.hparam import HParams

utils = reload('utils.utils')
env = reload('env')
reco = reload('reco_mf_dnn.reco_mf_dnn_flex_shema')
reload('service')
ctrl = reload('ctrl').Ctrl.instance
# dir(ctrl.service)
hparam = HParams(
    conf_path='{}/foo/user_supplied/movielens.yaml'.format(datapath))

ctrl.train(hparam)

2018-02-12 18:04:33,685 - ctrl - INFO - foo: try to unserialize D:/Python/notebook/recomm_prod/repo\foo\data\parsed_movielens.yaml
clear checkpoint directory D:/Python/notebook/recomm_prod/repo\foo\model
INFO:tensorflow:Using config: {'_model_dir': 'D:/Python/notebook/recomm_prod/repo\\foo\\model', '_num_ps_replicas': 0, '_session_config': None, '_task_type': 'worker', '_task_id': 0, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x000002092ACBDCF8>, '_service': None, '_log_step_count_steps': 300, '_save_summary_steps': 100, '_keep_checkpoint_max': 5, '_master': '', '_is_chief': True, '_save_checkpoints_steps': 232, '_num_worker_replicas': 1, '_tf_random_seed': 88, '_save_checkpoints_secs': None, '_keep_checkpoint_every_n_hours': 10000}
2018-02-12 18:04:35,690 - tensorflow - INFO - Using config: {'_model_dir': 'D:/Python/notebook/recomm_prod/repo\\foo\\model', '_num_ps_replicas': 0, '_session_config': None, '_task_type': 'worker', '_task_id': 0, '_cluster

In [27]:
try:
    raise Exception('xxx')
except Exception as e:
    print(repr(e))

Exception('xxx',)


In [47]:
tr = pd.read_csv('D:/Python/notebook/recomm_prod/repo/foo/data/data.tr', names=schema.cols)
vl = pd.read_csv('D:/Python/notebook/recomm_prod/repo/foo/data/data.vl', names=schema.cols)

In [49]:
tr.head()

Unnamed: 0,query_movie_ids,genres,avg_rating,year,candidate_movie_id,rating
0,"1516,1666,31,834,860,907,1018,1042,1048,1084,1...",9,0.835749,0.763158,932,0
1,"932,1666,31,834,860,907,1018,1042,1048,1084,10...",2718,0.782609,0.605263,1516,0
2,"932,1516,31,834,860,907,1018,1042,1048,1084,10...",2317,0.661939,0.701754,1666,0
3,"932,1516,1666,834,860,907,1018,1042,1048,1084,...",9,0.595238,0.815789,31,1
4,"932,1516,1666,31,860,907,1018,1042,1048,1084,1...",45914,0.71164,0.342105,834,1


In [None]:
from tensorflow.contrib.training.python.training.hparam import HParams
utils = reload('utils.utils')
reco = reload('reco_mf_dnn.reco_mf_dnn_flex_shema')

model_dir='./model/reco_mf_dnn'
n_movies = len(schema.col_states_['candidate_movie_id'].classes_)
n_genres = len(schema.col_states_['genres'].classes_)

reco_mf_dnn = reload('reco_mf_dnn.reco_mf_dnn_flex_shema')
model = reco_mf_dnn.ModelMfDNN(schema, n_movies, n_genres, model_dir, hparam)
train_fn = model.input_fn(filenames=[p.train_files], n_batch=p.batch_size)
valid_fn = model.input_fn(filenames=[p.train_files], n_batch=p.batch_size, shuffle=False)
model.fit(train_fn, valid_fn, reset=False)

In [6]:
tr = pd.read_csv('./movielens.tr', names=schema.cols)
vl = pd.read_csv('./movielens.vl', names=schema.cols)

In [7]:
tr.head()

Unnamed: 0,query_movie_ids,genres,avg_rating,year,candidate_movie_id,rating
0,"1516,1666,31,834,860,907,1018,1042,1048,1084,1...",9,0.835749,0.763158,932,0
1,"932,1666,31,834,860,907,1018,1042,1048,1084,10...",2718,0.782609,0.605263,1516,0
2,"932,1516,31,834,860,907,1018,1042,1048,1084,10...",2317,0.661939,0.701754,1666,0
3,"932,1516,1666,834,860,907,1018,1042,1048,1084,...",9,0.595238,0.815789,31,1
4,"932,1516,1666,31,860,907,1018,1042,1048,1084,1...",45914,0.71164,0.342105,834,1


In [None]:
reco_mf_dnn = reload('reco_mf_dnn.reco_mf_dnn_flex_shema')
hparam = HParams()
hparam.train_files = 'movielens.tr'
hparam.eval_files = 'movielens.vl'
hparam.eval_name = 'movielens_eval'
hparam.export_name = 'movielens_export'
hparam.batch_size = 128
hparam.n_epoch = 10
hparam.dim = 32
hparam.learning_rate = 0.001

model = reco_mf_dnn.ModelMfDNN(schema, n_movies, n_genres, model_dir, hparam)
tf.reset_default_graph()
with tf.Graph().as_default():
    features, labels = model.input_fn(filenames=['./movielens.tr'], n_epoch=1, n_batch=500, shuffle=True)()
    c = 0
    with tf.train.MonitoredTrainingSession() as sess:
        while not sess.should_stop():
            _, = sess.run([labels])
            c += len(_)
            print(len(_))
            pass
print('total:', c)

In [4]:
%%bash
cd D:/Python/notebook/tensorflow_estimator/census-demo
TRAIN_FILE=data/adult.data.csv
EVAL_FILE=data/adult.test.csv
OUTPUT_DIR=model/census
python -m trainer.task --train-files $TRAIN_FILE \
                       --eval-files $EVAL_FILE \
                       --job-dir $OUTPUT_DIR \
                       --train-steps 1000 \
                       --eval-steps 100

model dir model/census


  from ._conv import register_converters as _register_converters
INFO:tensorflow:Using config: {'_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x000001F871BA2860>, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_is_chief': True, '_model_dir': 'model/census', '_master': '', '_num_ps_replicas': 0, '_task_id': 0, '_save_summary_steps': 100, '_task_type': 'worker', '_session_config': None, '_num_worker_replicas': 1, '_tf_random_seed': None, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_keep_checkpoint_max': 5}
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after 600 secs (eval_spec.throttle_secs) or training is finished.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Restoring parameters from model/census\model.ckpt-1
INFO:tensorflow:Saving checkpoints for 2 into model/census\mode

In [41]:
%%bash

where python

D:\Python\Anaconda3\envs\py3_5\python.exe


In [35]:
%%bash
cd D:/Python/notebook/recomm_prod
TRAIN_FILE=trainer/data/movielens.tr
EVAL_FILE=trainer/data/movielens.vl
OUTPUT_DIR=trainer/model/reco_mf_dnn

gcloud ml-engine local train \
    --module-name trainer.task \
    --job-dir $OUTPUT_DIR
    -- \
    --train-files $TRAIN_DATA \
    --eval-files $EVAL_DATA \
    --train-steps 1000 \
    --eval-steps 100 \

/cygdrive/d/google-cloud-sdk/bin/gcloud: line 113: exec: D:\Python\Anaconda3\envs\py2_7\python.exe: not found
bash: line 10: --: command not found


In [None]:
def make_datasets(fpath_ary, schema, n_batch=128, n_epoch=1):
    def to_dense(sp):
        dense = tf.sparse_to_dense(sp.indices, sp.dense_shape, sp.values, '')
        return tf.reshape(tf.to_int32(tf.string_to_number(dense)), [-1])

    def to_sparse(dense):
        idx = tf.where(tf.not_equal(dense, 0))
        return tf.SparseTensor(indices=idx, dense_shape=dense.get_shape(), values=tf.gather_nd(dense, idx))

    def parse_csv(value):
        data = tf.decode_csv(value, record_defaults=defaults)
        features = OrderedDict(zip(cols, data))
        multi_cols = df_conf.query("{} == '{}' and {} == True".format(schema.M_DTYPE, schema.CATG, schema.IS_MULTI)).id.values
        for col in multi_cols:
            features[col] = tf.string_split([features[col]], ',')
            features[col] = to_dense(features[col])
            # features['{}_lens'.format(col)] = tf.size(features[col])
        return features 
    
    df_conf = schema.df_conf_.query('{}.notnull()'.format(schema.TYPE))
    cols = schema.cols
    defaults = []
    for _, r in df_conf.iterrows():
        if r[schema.M_DTYPE] == schema.CATG:
            defaults.append([''] if r[schema.IS_MULTI] else [0])
        else:
            defaults.append([])
    dataset = tf.data.TextLineDataset(fpath_ary)
    dataset = dataset.map(parse_csv, num_parallel_calls=4)
    has_multi = (df_conf[schema.M_DTYPE] == schema.CATG) & (df_conf[schema.IS_MULTI] == True)
    if sum(has_multi):
        multi_cols = df_conf[has_multi].id.values
        dataset = dataset.padded_batch(n_batch, OrderedDict( zip(cols, tuple([None] if e else [] for e in has_multi))) )
    else:
        dataset = dataset.batch(n_batch)
    dataset = dataset.shuffle(n_batch * 10, seed=seed).repeat(n_epoch)
    features = dataset.make_one_shot_iterator().get_next()
    return features, features.pop(schema.label[0])
                                
# tf.reset_default_graph()
with tf.Graph().as_default():
    inputs = make_datasets(['./movielens.tr'], loader.schema, n_batch=30)
    query_lens = tf.sequence_mask([1, 2, 3])
    ctx = []
    with tf.train.MonitoredTrainingSession() as sess:
        while not sess.should_stop():
            _, = sess.run([inputs])
            # print( sess.run(inputs) )
            pass

## Feature Columns with tf.feature_column.input_layer

In [None]:
a = pd.Series(minmax_scale(np.random.normal(0, 1, size=1000)))
a.hist(bins=50)

In [None]:
%%time
tf.reset_default_graph()
with tf.Graph().as_default():
    user_id = tf.feature_column.categorical_column_with_hash_bucket('user_id', hash_bucket_size=1000, dtype=tf.int32)
    user_id = tf.feature_column.embedding_column(user_id, dimension=8)
    avg_rating = tf.feature_column.numeric_column('avg_rating')
    columns = [user_id, avg_rating]
    
    def make_datasets(fpath_ary):
        cols = ['user_id', 'query_movie_ids', 'genres', 'avg_rating', 'year', 'candidate_movie_id', 'rating']
        defaults = [[0], [''], [''], [], [], [0], []]

        def parse_csv(value):
            data = tf.decode_csv(value, record_defaults=defaults)
            features = OrderedDict(zip(cols, data))
            # print(features)
            return features
        
        dataset = tf.data.TextLineDataset(fpath_ary)
        dataset = (dataset.map(parse_csv, num_parallel_calls=4)
                          .batch(3)
                          # .padded_batch(3, OrderedDict(zip(cols, ([], [None], [None], [], [], [], []))))
                          .shuffle(10, seed=seed)
                          .repeat(1)
                  )
        return dataset.make_one_shot_iterator().get_next()
    
    inputs = make_datasets(['./te_processed.batch.csv'])
    inputs = tf.feature_column.input_layer(inputs, columns)
    # features = tf.parse_example(serialized_example, features=tf.feature_column.make_parse_example_spec(columns))
    ctx = []
    with tf.train.MonitoredTrainingSession() as sess:
        while not sess.should_stop():
            print(sess.run(inputs))

### Make Example

In [None]:
%%time
cols = ['user_id', 'query_movie_ids', 'genres', 'avg_rating', 'year', 'candidate_movie_id', 'rating']
is_multi = [False, True, True, False, False, False, False]
pd_dtypes = [int, str, str, float, float, int, float]
types = ['int64_list', 'int64_list', 'int64_list', 'float_list', 'float_list', 'int64_list', 'float_list']
tf_types = [tf.int64, tf.int64, tf.int64, tf.float32, tf.float32, tf.int64, tf.float32]
def persist_example(fpath, tfpath):
    with tf.python_io.TFRecordWriter(tfpath) as w:
        for chunk in pd.read_csv(fpath, names=cols, dtype=dict(zip(cols, pd_dtypes)), chunksize=1000):
            chunk['query_movie_ids'] = chunk.query_movie_ids.map(lambda r: map(int, r.split(',')))
            chunk['genres'] = chunk.genres.map(lambda r: map(int, r.split(',')))
            
            for idx, r in chunk.iterrows():
                ex = tf.train.Example()
                for multi, col, tpe in zip(is_multi, cols, types):
                    val = r[col]
                    # ex.features.feature[col].int64_list or float_list or bytes_list
                    feat_type = getattr(ex.features.feature[col], tpe)
                    # extend function for multivalent columns, otherwise append
                    append_or_extend = 'append' if not multi else 'extend'                    
                    getattr(feat_type.value, append_or_extend)(val)
                w.write(ex.SerializePartialToString())

persist_example('./te_processed.csv', './data.tfrecord')

In [None]:
def decode_example(ser_example):
    # queue = tf.train.string_input_producer([fpath], num_epochs=1)
    # _, ser_example = tf.TFRecordReader().read(queue)
    # ser_example = tf.train.batch([ser_example], batch_size=10)
    ctx_features = {col: tf.FixedLenFeature([], tf_tpe)
                    for col, tf_tpe in zip(cols, tf_types) if col not in ('query_movie_ids', 'genres')}
    seq_features = {col: tf.FixedLenSequenceFeature([], tf_tpe) 
                    for col, tf_tpe in [('query_movie_ids', tf.int64), ('genres', tf.int64)]}
    context_dict, sequence_dict = tf.parse_single_sequence_example(ser_example, 
                                                                   context_features=ctx_features, 
                                                                   sequence_features=seq_features)
    # for col, tpe in zip(cols, tf_types):
    #     val = feature_dict[col]
    #     feature_dict[col] = tf.sparse_to_dense(val.indices, val.dense_shape, val.values, name=col)
    feature_dict = {}
    feature_dict.update(context_dict)
    feature_dict.update(sequence_dict)
    ret = OrderedDict()
    for c in cols:
        ret[c] = feature_dict[c]
    return tuple(ret.values())

tf.reset_default_graph()
with tf.Graph().as_default():
    dataset = tf.data.TFRecordDataset(['./data.tfrecord'])
    dataset = dataset.map(decode_example).padded_batch(10, padded_shapes=([], [None], [None], [], [], [], []))
    # dataset = dataset.batch(3)
    iters = dataset.make_one_shot_iterator()
    r = iters.get_next()
    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        sess.run(tf.tables_initializer())
        print( sess.run(r) )

## Traditional parse_example
1. tf.train.Coordinator + tf.train.start_queue_runners

In [None]:
from tensorflow.python.framework import sparse_tensor
import re

def to_sparse(dense):
    idx = tf.where(tf.not_equal(dense, 0))
    return tf.SparseTensor(idx, tf.gather_nd(dense, idx), dense.get_shape())

def make_example(val):
    example = tf.train.Example(features=tf.train.Features(
        feature = {
            'query_movie_ids': tf.train.Feature(int64_list=tf.train.Int64List(value=val)),
            'genres': tf.train.Feature(int64_list=tf.train.Int64List(value=val))
        }
    ))
    return example

tf.reset_default_graph()
with tf.Graph().as_default():
    
    filename = "tmp.tfrecords"
    if not os.path.exists(filename):
        # os.remove(filename)
        writer = tf.python_io.TFRecordWriter(filename)
        with writer:
            for idx, r in teProcessed.head().iterrows():
                for col in ('query_movie_ids', 'genres'):
                    val = list(map(int, re.split(',\s*', r[col])))
                    ex = make_example(val)
                    writer.write(ex.SerializeToString())

    reader = tf.TFRecordReader()
    filename_queue = tf.train.string_input_producer(["tmp.tfrecords"], num_epochs=1)
    _, serialized_example = reader.read(filename_queue)

    batch = tf.train.batch(tensors=[serialized_example], batch_size=1)
    features = {
        'query_movie_ids': tf.VarLenFeature(tf.int64),
        'genres': tf.VarLenFeature(tf.int64)
    }
    data = tf.parse_example(batch, features)
    query_movie_ids = data['query_movie_ids']
    embbedding = tf.Variable(tf.glorot_uniform_initializer()([9125]), dtype=tf.float32)
    print(query_movie_ids.dense_shape)
    # r = tf.layers.dense(query_movie_ids, 10)
    # emb_query = tf.nn.embedding_lookup_sparse([embbedding], query_movie_ids, None, combiner='sqrtn')
    with tf.Session() as sess:
        tf.global_variables_initializer().run()
        tf.local_variables_initializer().run()
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(coord=coord, sess=sess)
        try:
            print(sess.run(data))
            pass
        except tf.errors.OutOfRangeError as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)
    

## Test

In [None]:
tf.reset_default_graph()
with tf.Graph().as_default():
    labels = tf.constant(np.ones([10, 8]))
    pred = tf.concat([tf.Variable(tf.ones(shape=[1, 8]), trainable=False), tf.Variable(tf.truncated_normal([9, 8]))], 0)
    loss = tf.losses.mean_squared_error(predictions=pred, labels=labels)
    train_op = tf.train.GradientDescentOptimizer(0.1).minimize(loss)
    with tf.Session() as sess:
        tf.global_variables_initializer().run()
        print(pred.eval())
        for i in range(1000):
            sess.run([train_op])
        print()
        print(pred.eval())

In [None]:
tf.zeros