diff --git a/python/paddle/fluid/tests/book/high-level-api/CMakeLists.txt b/python/paddle/fluid/tests/book/high-level-api/CMakeLists.txt index c2a15bdb3b17b..c2127cafba6a5 100644 --- a/python/paddle/fluid/tests/book/high-level-api/CMakeLists.txt +++ b/python/paddle/fluid/tests/book/high-level-api/CMakeLists.txt @@ -8,3 +8,4 @@ endforeach() add_subdirectory(fit_a_line) add_subdirectory(recognize_digits) +add_subdirectory(recommender_system) diff --git a/python/paddle/fluid/tests/book/high-level-api/recommender_system/CMakeLists.txt b/python/paddle/fluid/tests/book/high-level-api/recommender_system/CMakeLists.txt new file mode 100644 index 0000000000000..673c965b662a0 --- /dev/null +++ b/python/paddle/fluid/tests/book/high-level-api/recommender_system/CMakeLists.txt @@ -0,0 +1,7 @@ +file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") +string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") + +# default test +foreach(src ${TEST_OPS}) + py_test(${src} SRCS ${src}.py) +endforeach() diff --git a/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py new file mode 100644 index 0000000000000..5d809937cfba6 --- /dev/null +++ b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py @@ -0,0 +1,308 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math +import sys +import os +import numpy as np +import paddle +import paddle.fluid as fluid +import paddle.fluid.framework as framework +import paddle.fluid.layers as layers +import paddle.fluid.nets as nets +from functools import partial + +IS_SPARSE = True +USE_GPU = False +BATCH_SIZE = 256 + + +def get_usr_combined_features(): + # FIXME(dzh) : old API integer_value(10) may have range check. + # currently we don't have user configurated check. + + USR_DICT_SIZE = paddle.dataset.movielens.max_user_id() + 1 + + uid = layers.data(name='user_id', shape=[1], dtype='int64') + + usr_emb = layers.embedding( + input=uid, + dtype='float32', + size=[USR_DICT_SIZE, 32], + param_attr='user_table', + is_sparse=IS_SPARSE) + + usr_fc = layers.fc(input=usr_emb, size=32) + + USR_GENDER_DICT_SIZE = 2 + + usr_gender_id = layers.data(name='gender_id', shape=[1], dtype='int64') + + usr_gender_emb = layers.embedding( + input=usr_gender_id, + size=[USR_GENDER_DICT_SIZE, 16], + param_attr='gender_table', + is_sparse=IS_SPARSE) + + usr_gender_fc = layers.fc(input=usr_gender_emb, size=16) + + USR_AGE_DICT_SIZE = len(paddle.dataset.movielens.age_table) + usr_age_id = layers.data(name='age_id', shape=[1], dtype="int64") + + usr_age_emb = layers.embedding( + input=usr_age_id, + size=[USR_AGE_DICT_SIZE, 16], + is_sparse=IS_SPARSE, + param_attr='age_table') + + usr_age_fc = layers.fc(input=usr_age_emb, size=16) + + USR_JOB_DICT_SIZE = paddle.dataset.movielens.max_job_id() + 1 + usr_job_id = layers.data(name='job_id', shape=[1], dtype="int64") + + usr_job_emb = layers.embedding( + input=usr_job_id, + size=[USR_JOB_DICT_SIZE, 16], + param_attr='job_table', + is_sparse=IS_SPARSE) + + usr_job_fc = layers.fc(input=usr_job_emb, size=16) + + concat_embed = layers.concat( + input=[usr_fc, usr_gender_fc, usr_age_fc, usr_job_fc], axis=1) + + usr_combined_features = layers.fc(input=concat_embed, size=200, act="tanh") + + return usr_combined_features + + +def get_mov_combined_features(): + + MOV_DICT_SIZE = paddle.dataset.movielens.max_movie_id() + 1 + + mov_id = layers.data(name='movie_id', shape=[1], dtype='int64') + + mov_emb = layers.embedding( + input=mov_id, + dtype='float32', + size=[MOV_DICT_SIZE, 32], + param_attr='movie_table', + is_sparse=IS_SPARSE) + + mov_fc = layers.fc(input=mov_emb, size=32) + + CATEGORY_DICT_SIZE = len(paddle.dataset.movielens.movie_categories()) + + category_id = layers.data( + name='category_id', shape=[1], dtype='int64', lod_level=1) + + mov_categories_emb = layers.embedding( + input=category_id, size=[CATEGORY_DICT_SIZE, 32], is_sparse=IS_SPARSE) + + mov_categories_hidden = layers.sequence_pool( + input=mov_categories_emb, pool_type="sum") + + MOV_TITLE_DICT_SIZE = len(paddle.dataset.movielens.get_movie_title_dict()) + + mov_title_id = layers.data( + name='movie_title', shape=[1], dtype='int64', lod_level=1) + + mov_title_emb = layers.embedding( + input=mov_title_id, size=[MOV_TITLE_DICT_SIZE, 32], is_sparse=IS_SPARSE) + + mov_title_conv = nets.sequence_conv_pool( + input=mov_title_emb, + num_filters=32, + filter_size=3, + act="tanh", + pool_type="sum") + + concat_embed = layers.concat( + input=[mov_fc, mov_categories_hidden, mov_title_conv], axis=1) + + # FIXME(dzh) : need tanh operator + mov_combined_features = layers.fc(input=concat_embed, size=200, act="tanh") + + return mov_combined_features + + +def inference_program(): + usr_combined_features = get_usr_combined_features() + mov_combined_features = get_mov_combined_features() + + inference = layers.cos_sim(X=usr_combined_features, Y=mov_combined_features) + scale_infer = layers.scale(x=inference, scale=5.0) + + return scale_infer + + +def train_program(): + + scale_infer = inference_program() + + label = layers.data(name='score', shape=[1], dtype='float32') + square_cost = layers.square_error_cost(input=scale_infer, label=label) + avg_cost = layers.mean(square_cost) + + return [avg_cost, scale_infer] + + +def func_feed(feeding, place, data): + feed_tensors = {} + for (key, idx) in feeding.iteritems(): + tensor = fluid.LoDTensor() + if key != "category_id" and key != "movie_title": + if key == "score": + numpy_data = np.array(map(lambda x: x[idx], data)).astype( + "float32") + else: + numpy_data = np.array(map(lambda x: x[idx], data)).astype( + "int64") + else: + numpy_data = map(lambda x: np.array(x[idx]).astype("int64"), data) + lod_info = [len(item) for item in numpy_data] + offset = 0 + lod = [offset] + for item in lod_info: + offset += item + lod.append(offset) + numpy_data = np.concatenate(numpy_data, axis=0) + tensor.set_lod([lod]) + + numpy_data = numpy_data.reshape([numpy_data.shape[0], 1]) + tensor.set(numpy_data, place) + feed_tensors[key] = tensor + return feed_tensors + + +def train(use_cuda, train_program, save_path): + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + optimizer = fluid.optimizer.SGD(learning_rate=0.2) + feeding_map = { + 'user_id': 0, + 'gender_id': 1, + 'age_id': 2, + 'job_id': 3, + 'movie_id': 4, + 'category_id': 5, + 'movie_title': 6, + 'score': 7 + } + + trainer = fluid.Trainer( + train_func=train_program, place=place, optimizer=optimizer) + + feed_order = [ + 'user_id', 'gender_id', 'age_id', 'job_id', 'movie_id', 'category_id', + 'movie_title', 'score' + ] + + def event_handler(event): + if isinstance(event, fluid.EndEpochEvent): + test_reader = paddle.batch( + paddle.dataset.movielens.test(), batch_size=BATCH_SIZE) + avg_cost_set = trainer.test( + reader=test_reader, + feed_order=feed_order, + data_feed_handler=partial(func_feed, feeding_map, place)) + + # get avg cost + avg_cost = np.array(avg_cost_set).mean() + + print("avg_cost: %s" % avg_cost) + + if float(avg_cost) < 3: # Smaller value to increase CI speed + trainer.save_params(save_path) + else: + print('BatchID {0}, Test Loss {1:0.2}'.format(event.epoch + 1, + float(avg_cost))) + if math.isnan(float(avg_cost)): + sys.exit("got NaN loss, training failed.") + + train_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.movielens.train(), buf_size=8192), + batch_size=BATCH_SIZE) + + trainer.train( + num_epochs=1, + event_handler=event_handler, + reader=train_reader, + feed_order=[ + 'user_id', 'gender_id', 'age_id', 'job_id', 'movie_id', + 'category_id', 'movie_title', 'score' + ], + data_feed_handler=partial(func_feed, feeding_map, place)) + + +def infer(use_cuda, inference_program, save_path): + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + inferencer = fluid.Inferencer( + inference_program, param_path=save_path, place=place) + + def create_lod_tensor(data, lod=None): + tensor = fluid.LoDTensor() + if lod is None: + # Tensor, the shape is [batch_size, 1] + index = 0 + lod_0 = [index] + for l in range(len(data)): + index += 1 + lod_0.append(index) + lod = [lod_0] + tensor.set_lod(lod) + + flattened_data = np.concatenate(data, axis=0).astype("int64") + flattened_data = flattened_data.reshape([len(flattened_data), 1]) + tensor.set(flattened_data, place) + return tensor + + # Generate a random input for inference + user_id = create_lod_tensor([[1]]) + gender_id = create_lod_tensor([[1]]) + age_id = create_lod_tensor([[0]]) + job_id = create_lod_tensor([[10]]) + movie_id = create_lod_tensor([[783]]) + category_id = create_lod_tensor([[10], [8], [9]], [[0, 3]]) + movie_title = create_lod_tensor([[1069], [4140], [2923], [710], [988]], + [[0, 5]]) + + results = inferencer.infer( + { + 'user_id': user_id, + 'gender_id': gender_id, + 'age_id': age_id, + 'job_id': job_id, + 'movie_id': movie_id, + 'category_id': category_id, + 'movie_title': movie_title + }, + return_numpy=False) + + print("infer results: ", np.array(results[0])) + + +def main(use_cuda): + if use_cuda and not fluid.core.is_compiled_with_cuda(): + return + save_path = "recommender_system.inference.model" + train(use_cuda=use_cuda, train_program=train_program, save_path=save_path) + infer( + use_cuda=use_cuda, + inference_program=inference_program, + save_path=save_path) + + +if __name__ == '__main__': + main(USE_GPU) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 7da123dd92ed9..61bc4735e09e1 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -217,7 +217,13 @@ def stop(self): """ self.__stop = True - def train(self, num_epochs, event_handler, reader=None, feed_order=None): + def train(self, + num_epochs, + event_handler, + reader, + feed_order, + parallel=False, + data_feed_handler=None): """ Train the model. @@ -227,6 +233,8 @@ def train(self, num_epochs, event_handler, reader=None, feed_order=None): reader: feed_order: Feeding order of reader. None will following the defining order in program + parallel: + data_feed_handler: Function to be called when data is fed. Returns: @@ -238,13 +246,14 @@ def train(self, num_epochs, event_handler, reader=None, feed_order=None): exe.run() return if self.parallel: + #TODO(sidgoyal78) add `data_feed_handler` to parallel executor self._train_by_parallel_executor(num_epochs, event_handler, reader, feed_order) else: self._train_by_executor(num_epochs, event_handler, reader, - feed_order) + feed_order, data_feed_handler) - def test(self, reader, feed_order): + def test(self, reader, feed_order, data_feed_handler=None): """ Test the model on given test data @@ -252,10 +261,11 @@ def test(self, reader, feed_order): reader: The reader that yields test data. feed_order: Feeding order of reader. None will following the defining order in program + data_feed_handler: Function that will be called when data is fed """ - return self._test_by_executor(reader, feed_order, - self.train_func_outputs) + return self._test_by_executor( + reader, feed_order, self.train_func_outputs, data_feed_handler) def save_params(self, param_path): # reference: save_persistables in io.py @@ -271,7 +281,8 @@ def _prog_and_scope_guard(self): with executor.scope_guard(self.scope): yield - def _train_by_executor(self, num_epochs, event_handler, reader, feed_order): + def _train_by_executor(self, num_epochs, event_handler, reader, feed_order, + data_feed_handler): """ Train by Executor and single device. @@ -280,7 +291,7 @@ def _train_by_executor(self, num_epochs, event_handler, reader, feed_order): event_handler: reader: feed_order: - + data_feed_handler: Returns: """ @@ -290,9 +301,15 @@ def _train_by_executor(self, num_epochs, event_handler, reader, feed_order): feed_list=feed_var_list, place=self.place) exe = executor.Executor(self.place) reader = feeder.decorate_reader(reader, multi_devices=False) - self._train_by_any_executor(event_handler, exe, num_epochs, reader) - - def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): + self._train_by_any_executor(event_handler, exe, num_epochs, reader, + data_feed_handler) + + def _train_by_any_executor(self, + event_handler, + exe, + num_epochs, + reader, + data_feed_handler=None): for epoch_id in range(num_epochs): event_handler(BeginEpochEvent(epoch_id)) for step_id, data in enumerate(reader()): @@ -301,7 +318,8 @@ def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): begin_event = BeginStepEvent(epoch_id, step_id) event_handler(begin_event) if begin_event.fetch_metrics: - metrics = exe.run(feed=data, + metrics = exe.run(feed=data_feed_handler(data) + if data_feed_handler else data, fetch_list=[ var.name for var in self.train_func_outputs @@ -311,17 +329,20 @@ def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): event_handler(EndStepEvent(epoch_id, step_id, metrics)) event_handler(EndEpochEvent(epoch_id)) - def _test_by_executor(self, reader, feed_order, fetch_list): + def _test_by_executor(self, reader, feed_order, fetch_list, + data_feed_handler): with executor.scope_guard(self.scope): feed_var_list = build_feed_var_list(self.test_program, feed_order) - feeder = data_feeder.DataFeeder( - feed_list=feed_var_list, place=self.place) + if data_feed_handler == None: + feeder = data_feeder.DataFeeder( + feed_list=feed_var_list, place=self.place) exe = executor.Executor(self.place) accumulated = len(fetch_list) * [0] count = 0 for data in reader(): outs = exe.run(program=self.test_program, - feed=feeder.feed(data), + feed=data_feed_handler(data) + if data_feed_handler else feeder.feed(data), fetch_list=fetch_list) accumulated = [x[0] + x[1][0] for x in zip(accumulated, outs)] count += 1